Skip to content

Commit 60f1ff1

Browse files
committed
Add ignore param to createDocuments for silent duplicate handling
Adapters: - MariaDB/MySQL: INSERT IGNORE INTO - PostgreSQL: ON CONFLICT DO NOTHING - SQLite: INSERT OR IGNORE INTO - MongoDB: session-scoped pre-filter before insertMany Database.php: - Intra-batch dedup by ID (tenant-aware, first occurrence wins) - Pre-fetch existing IDs to skip known duplicates - Deferred relationship creation for ignore mode (no orphans) - Race-condition reconciliation via _createdAt timestamp verification upsertDocuments: - Batch-fetch existing docs with find() instead of per-row getDocument() - Tenant-aware composite keys for seenIds duplicate check All paths are tenant-per-document aware with null-safe array_filter.
1 parent 72ee161 commit 60f1ff1

8 files changed

Lines changed: 516 additions & 23 deletions

File tree

src/Database/Adapter.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document
729729
*
730730
* @param Document $collection
731731
* @param array<Document> $documents
732+
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
732733
*
733734
* @return array<Document>
734735
*
735736
* @throws DatabaseException
736737
*/
737-
abstract public function createDocuments(Document $collection, array $documents): array;
738+
abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array;
738739

739740
/**
740741
* Update Document

src/Database/Adapter/Mongo.php

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,11 +1460,11 @@ public function castingBefore(Document $collection, Document $document): Documen
14601460
* @throws DuplicateException
14611461
* @throws DatabaseException
14621462
*/
1463-
public function createDocuments(Document $collection, array $documents): array
1463+
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
14641464
{
14651465
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());
1466-
14671466
$options = $this->getTransactionOptions();
1467+
14681468
$records = [];
14691469
$hasSequence = null;
14701470
$documents = \array_map(fn ($doc) => clone $doc, $documents);
@@ -1487,6 +1487,95 @@ public function createDocuments(Document $collection, array $documents): array
14871487
$records[] = $record;
14881488
}
14891489

1490+
// Pre-filter duplicates within the session to avoid aborting the transaction.
1491+
if ($ignore && !empty($records)) {
1492+
$existingKeys = [];
1493+
1494+
try {
1495+
if ($this->sharedTables && $this->tenantPerDocument) {
1496+
$idsByTenant = [];
1497+
foreach ($records as $record) {
1498+
$uid = $record['_uid'] ?? '';
1499+
if ($uid === '') {
1500+
continue;
1501+
}
1502+
$tenant = $record['_tenant'] ?? $this->getTenant();
1503+
$idsByTenant[$tenant][] = $uid;
1504+
}
1505+
1506+
foreach ($idsByTenant as $tenant => $tenantUids) {
1507+
$tenantUids = \array_values(\array_unique($tenantUids));
1508+
$findOptions = $this->getTransactionOptions([
1509+
'projection' => ['_uid' => 1],
1510+
'batchSize' => \count($tenantUids),
1511+
]);
1512+
$filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant];
1513+
$response = $this->client->find($name, $filters, $findOptions);
1514+
foreach ($response->cursor->firstBatch ?? [] as $doc) {
1515+
$existingKeys[$tenant . ':' . $doc->_uid] = true;
1516+
}
1517+
$cursorId = $response->cursor->id ?? 0;
1518+
while ($cursorId != 0) {
1519+
$more = $this->client->getMore($cursorId, $name, \count($tenantUids));
1520+
foreach ($more->cursor->nextBatch ?? [] as $doc) {
1521+
$existingKeys[$tenant . ':' . $doc->_uid] = true;
1522+
}
1523+
$cursorId = $more->cursor->id ?? 0;
1524+
}
1525+
}
1526+
} else {
1527+
$uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records), fn ($v) => $v !== null);
1528+
if (!empty($uids)) {
1529+
$uidValues = \array_values(\array_unique($uids));
1530+
$findOptions = $this->getTransactionOptions([
1531+
'projection' => ['_uid' => 1],
1532+
'batchSize' => \count($uidValues),
1533+
]);
1534+
$filters = ['_uid' => ['$in' => $uidValues]];
1535+
if ($this->sharedTables) {
1536+
$filters['_tenant'] = $this->getTenantFilters($collection->getId());
1537+
}
1538+
$response = $this->client->find($name, $filters, $findOptions);
1539+
foreach ($response->cursor->firstBatch ?? [] as $doc) {
1540+
$existingKeys[$doc->_uid] = true;
1541+
}
1542+
$cursorId = $response->cursor->id ?? 0;
1543+
while ($cursorId != 0) {
1544+
$more = $this->client->getMore($cursorId, $name, \count($uidValues));
1545+
foreach ($more->cursor->nextBatch ?? [] as $doc) {
1546+
$existingKeys[$doc->_uid] = true;
1547+
}
1548+
$cursorId = $more->cursor->id ?? 0;
1549+
}
1550+
}
1551+
}
1552+
} catch (MongoException $e) {
1553+
throw $this->processException($e);
1554+
}
1555+
1556+
if (!empty($existingKeys)) {
1557+
$filteredRecords = [];
1558+
$filteredDocuments = [];
1559+
$tenantPerDoc = $this->sharedTables && $this->tenantPerDocument;
1560+
foreach ($records as $i => $record) {
1561+
$uid = $record['_uid'] ?? '';
1562+
$key = $tenantPerDoc
1563+
? ($record['_tenant'] ?? $this->getTenant()) . ':' . $uid
1564+
: $uid;
1565+
if (!isset($existingKeys[$key])) {
1566+
$filteredRecords[] = $record;
1567+
$filteredDocuments[] = $documents[$i];
1568+
}
1569+
}
1570+
$records = $filteredRecords;
1571+
$documents = $filteredDocuments;
1572+
}
1573+
1574+
if (empty($records)) {
1575+
return [];
1576+
}
1577+
}
1578+
14901579
try {
14911580
$documents = $this->client->insertMany($name, $records, $options);
14921581
} catch (MongoException $e) {

src/Database/Adapter/Pool.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume
268268
return $this->delegate(__FUNCTION__, \func_get_args());
269269
}
270270

271-
public function createDocuments(Document $collection, array $documents): array
271+
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
272272
{
273273
return $this->delegate(__FUNCTION__, \func_get_args());
274274
}

src/Database/Adapter/Postgres.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
13651365
return $document;
13661366
}
13671367

1368+
protected function getInsertKeyword(bool $ignore): string
1369+
{
1370+
return 'INSERT INTO';
1371+
}
1372+
1373+
protected function getInsertSuffix(bool $ignore, string $table): string
1374+
{
1375+
if (!$ignore) {
1376+
return '';
1377+
}
1378+
1379+
$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';
1380+
1381+
return "ON CONFLICT {$conflictTarget} DO NOTHING";
1382+
}
1383+
1384+
protected function getInsertPermissionsSuffix(bool $ignore): string
1385+
{
1386+
if (!$ignore) {
1387+
return '';
1388+
}
1389+
1390+
$conflictTarget = $this->sharedTables
1391+
? '("_type", "_permission", "_document", "_tenant")'
1392+
: '("_type", "_permission", "_document")';
1393+
1394+
return "ON CONFLICT {$conflictTarget} DO NOTHING";
1395+
}
1396+
13681397
/**
13691398
* @param string $tableName
13701399
* @param string $columns

0 commit comments

Comments
 (0)