Skip to content

Commit 195848b

Browse files
committed
store: log cross-batch duplicate inserts in Layout::insert()
Restructured Layout::insert() from if-let-Err to match to capture affected_rows from InsertQuery::execute(). Tracks expected vs actual row counts across both the main batch path and the row-by-row fallback path. Logs a warning when affected_rows < expected for skip_duplicates immutable tables.
1 parent 78e3dd9 commit 195848b

1 file changed

Lines changed: 46 additions & 30 deletions

File tree

store/postgres/src/relational.rs

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -770,44 +770,60 @@ impl Layout {
770770
// We insert the entities in chunks to make sure each operation does
771771
// not exceed the maximum number of bindings allowed in queries
772772
let chunk_size = InsertQuery::chunk_size(table);
773+
let mut affected_rows: usize = 0;
774+
let mut expected_rows: usize = 0;
773775
for chunk in group.write_chunks(chunk_size) {
774776
// Empty chunks would lead to invalid SQL
775777
if !chunk.is_empty() {
776-
if let Err(e) = InsertQuery::new(table, &chunk)?.execute(conn).await {
777-
// We occasionally get these errors but it's entirely
778-
// unclear what causes them. We work around that by
779-
// switching to row-by-row inserts until we can figure
780-
// out what the underlying cause is
781-
let err_msg = e.to_string();
782-
if !err_msg.contains("value too large to transmit") {
783-
let (block, msg) = chunk_details(&chunk);
784-
return Err(StoreError::write_failure(
785-
e,
786-
table.object.as_str(),
787-
block,
788-
msg,
789-
));
778+
expected_rows += chunk.len();
779+
match InsertQuery::new(table, &chunk)?.execute(conn).await {
780+
Ok(count) => {
781+
affected_rows += count;
790782
}
791-
let (block, msg) = chunk_details(&chunk);
792-
warn!(logger, "Insert of entire chunk failed. Trying row by row insert.";
793-
"table" => table.object.as_str(),
794-
"block" => block,
795-
"error" => err_msg,
796-
"details" => msg
797-
);
798-
for single_chunk in chunk.as_single_writes() {
799-
InsertQuery::new(table, &single_chunk)?
800-
.execute(conn)
801-
.await
802-
.map_err(|e| {
803-
let (block, msg) = chunk_details(&single_chunk);
804-
let msg = format!("{}: offending row {:?}", msg, single_chunk);
805-
StoreError::write_failure(e, table.object.as_str(), block, msg)
806-
})?;
783+
Err(e) => {
784+
// We occasionally get these errors but it's entirely
785+
// unclear what causes them. We work around that by
786+
// switching to row-by-row inserts until we can figure
787+
// out what the underlying cause is
788+
let err_msg = e.to_string();
789+
if !err_msg.contains("value too large to transmit") {
790+
let (block, msg) = chunk_details(&chunk);
791+
return Err(StoreError::write_failure(
792+
e,
793+
table.object.as_str(),
794+
block,
795+
msg,
796+
));
797+
}
798+
let (block, msg) = chunk_details(&chunk);
799+
warn!(logger, "Insert of entire chunk failed. Trying row by row insert.";
800+
"table" => table.object.as_str(),
801+
"block" => block,
802+
"error" => err_msg,
803+
"details" => msg
804+
);
805+
for single_chunk in chunk.as_single_writes() {
806+
let count = InsertQuery::new(table, &single_chunk)?
807+
.execute(conn)
808+
.await
809+
.map_err(|e| {
810+
let (block, msg) = chunk_details(&single_chunk);
811+
let msg = format!("{}: offending row {:?}", msg, single_chunk);
812+
StoreError::write_failure(e, table.object.as_str(), block, msg)
813+
})?;
814+
affected_rows += count;
815+
}
807816
}
808817
}
809818
}
810819
}
820+
if affected_rows < expected_rows && table.immutable && table.skip_duplicates {
821+
warn!(logger, "Cross-batch duplicate inserts skipped by ON CONFLICT DO NOTHING";
822+
"entity_type" => table.object.as_str(),
823+
"expected_rows" => expected_rows,
824+
"affected_rows" => affected_rows,
825+
"skipped" => expected_rows - affected_rows);
826+
}
811827
Ok(())
812828
}
813829

0 commit comments

Comments
 (0)