Skip to content

Commit f6614c6

Browse files
authored
[FlatPostgresCollection] Implement bulkCreateOrReplaceReturnOlder + drop + Bugfix for delete(key) always returning true (#286)
1 parent 2fe1f90 commit f6614c6

11 files changed

Lines changed: 1743 additions & 1095 deletions

File tree

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 1192 additions & 1049 deletions
Large diffs are not rendered by default.

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/MongoPostgresWriteConsistencyTest.java

Lines changed: 215 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.IOException;
1313
import java.util.HashMap;
1414
import java.util.HashSet;
15+
import java.util.LinkedHashMap;
1516
import java.util.List;
1617
import java.util.Map;
1718
import java.util.Optional;
@@ -95,6 +96,181 @@ private void insertTestDocument(String docId) throws IOException {
9596
}
9697
}
9798

99+
@Nested
100+
class BulkUpsertConsistencyTest {
101+
@ParameterizedTest(name = "{0}: bulkUpsert multiple documents")
102+
@ArgumentsSource(AllStoresProvider.class)
103+
void testBulkUpsert(String storeName) throws Exception {
104+
String docId1 = generateDocId("bulk-1");
105+
String docId2 = generateDocId("bulk-2");
106+
107+
Collection collection = getCollection(storeName);
108+
109+
Map<Key, Document> documents = new HashMap<>();
110+
documents.put(createKey(docId1), createTestDocument(docId1));
111+
documents.put(createKey(docId2), createTestDocument(docId2));
112+
113+
boolean result = collection.bulkUpsert(documents);
114+
assertTrue(result);
115+
116+
for (String docId : List.of(docId1, docId2)) {
117+
Query query = buildQueryById(docId);
118+
try (CloseableIterator<Document> iterator = collection.find(query)) {
119+
assertTrue(iterator.hasNext());
120+
Document doc = iterator.next();
121+
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
122+
123+
assertEquals("TestItem", json.get("item").asText());
124+
assertEquals(100, json.get("price").asInt());
125+
assertEquals(50, json.get("quantity").asInt());
126+
assertTrue(json.get("in_stock").asBoolean());
127+
128+
JsonNode tagsNode = json.get("tags");
129+
assertNotNull(tagsNode);
130+
assertEquals(2, tagsNode.size());
131+
}
132+
}
133+
}
134+
135+
@ParameterizedTest(name = "{0}: bulkUpsert merges fields (does not replace entire document)")
136+
@ArgumentsSource(AllStoresProvider.class)
137+
void testBulkUpsertMergesFields(String storeName) throws Exception {
138+
String docId1 = generateDocId("bulk-merge-1");
139+
String docId2 = generateDocId("bulk-merge-2");
140+
141+
Collection collection = getCollection(storeName);
142+
143+
// Step 1: Insert initial documents with all fields
144+
Map<Key, Document> initialDocs = new HashMap<>();
145+
initialDocs.put(createKey(docId1), createTestDocument(docId1));
146+
initialDocs.put(createKey(docId2), createTestDocument(docId2));
147+
148+
boolean insertResult = collection.bulkUpsert(initialDocs);
149+
assertTrue(insertResult);
150+
151+
// Step 2: Upsert with partial documents (only some fields)
152+
Map<Key, Document> partialDocs = new HashMap<>();
153+
154+
// Partial doc for docId1 - only update item and price
155+
ObjectNode partial1 = OBJECT_MAPPER.createObjectNode();
156+
partial1.put("id", getKeyString(docId1));
157+
partial1.put("item", "UpdatedItem1");
158+
partial1.put("price", 999);
159+
partialDocs.put(createKey(docId1), new JSONDocument(partial1));
160+
161+
// Partial doc for docId2 - only update quantity and in_stock
162+
ObjectNode partial2 = OBJECT_MAPPER.createObjectNode();
163+
partial2.put("id", getKeyString(docId2));
164+
partial2.put("quantity", 999);
165+
partial2.put("in_stock", false);
166+
partialDocs.put(createKey(docId2), new JSONDocument(partial2));
167+
168+
boolean upsertResult = collection.bulkUpsert(partialDocs);
169+
assertTrue(upsertResult);
170+
171+
// Step 3: Verify that fields were merged, not replaced
172+
// Doc1: item and price should be updated, other fields should be preserved
173+
Query query1 = buildQueryById(docId1);
174+
try (CloseableIterator<Document> iter = collection.find(query1)) {
175+
assertTrue(iter.hasNext());
176+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
177+
178+
// Updated fields
179+
assertEquals("UpdatedItem1", json.get("item").asText());
180+
assertEquals(999, json.get("price").asInt());
181+
182+
// Preserved fields (should still have original values)
183+
assertEquals(50, json.get("quantity").asInt());
184+
assertTrue(json.get("in_stock").asBoolean());
185+
assertEquals(1000000000000L, json.get("big_number").asLong());
186+
187+
// Arrays and JSONB should be preserved
188+
JsonNode tagsNode = json.get("tags");
189+
assertNotNull(tagsNode);
190+
assertEquals(2, tagsNode.size());
191+
192+
JsonNode propsNode = json.get("props");
193+
assertNotNull(propsNode);
194+
assertEquals("TestBrand", propsNode.get("brand").asText());
195+
}
196+
197+
// Doc2: quantity and in_stock should be updated, other fields should be preserved
198+
Query query2 = buildQueryById(docId2);
199+
try (CloseableIterator<Document> iter = collection.find(query2)) {
200+
assertTrue(iter.hasNext());
201+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
202+
203+
// Updated fields
204+
assertEquals(999, json.get("quantity").asInt());
205+
assertFalse(json.get("in_stock").asBoolean());
206+
207+
// Preserved fields (should still have original values)
208+
assertEquals("TestItem", json.get("item").asText());
209+
assertEquals(100, json.get("price").asInt());
210+
assertEquals(1000000000000L, json.get("big_number").asLong());
211+
212+
// Arrays and JSONB should be preserved
213+
JsonNode tagsNode = json.get("tags");
214+
assertNotNull(tagsNode);
215+
assertEquals(2, tagsNode.size());
216+
217+
JsonNode propsNode = json.get("props");
218+
assertNotNull(propsNode);
219+
assertEquals("TestBrand", propsNode.get("brand").asText());
220+
}
221+
}
222+
223+
@ParameterizedTest(name = "{0}: bulkUpsert skips documents with invalid fields gracefully")
224+
@ArgumentsSource(AllStoresProvider.class)
225+
void testBulkUpsertSkipsInvalidFields(String storeName) throws Exception {
226+
String docId1 = generateDocId("bulk-skip-1");
227+
String docId2 = generateDocId("bulk-skip-2");
228+
String docId3 = generateDocId("bulk-skip-3");
229+
230+
Collection collection = getCollection(storeName);
231+
232+
Map<Key, Document> documents = new LinkedHashMap<>();
233+
234+
// First document - valid
235+
documents.put(createKey(docId1), createTestDocument(docId1));
236+
237+
ObjectNode invalidFieldDoc = OBJECT_MAPPER.createObjectNode();
238+
invalidFieldDoc.put("id", getKeyString(docId2));
239+
invalidFieldDoc.put("item", "PartialItem");
240+
invalidFieldDoc.put("price", 200);
241+
invalidFieldDoc.put("quantity", 20);
242+
invalidFieldDoc.put("in_stock", false);
243+
invalidFieldDoc.putArray("numbers").add("not-a-number").add("also-not-a-number");
244+
documents.put(createKey(docId2), new JSONDocument(invalidFieldDoc));
245+
246+
// Third document - valid
247+
documents.put(createKey(docId3), createTestDocument(docId3));
248+
249+
boolean result = collection.bulkUpsert(documents);
250+
assertTrue(result);
251+
252+
for (String docId : List.of(docId1, docId2, docId3)) {
253+
Query query = buildQueryById(docId);
254+
try (CloseableIterator<Document> iter = collection.find(query)) {
255+
assertTrue(iter.hasNext());
256+
}
257+
}
258+
259+
if (storeName.equals(POSTGRES_FLAT_STORE)) {
260+
Query query2 = buildQueryById(docId2);
261+
try (CloseableIterator<Document> iter = collection.find(query2)) {
262+
assertTrue(iter.hasNext());
263+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
264+
// The 'numbers' field should be null/missing since it was skipped
265+
assertTrue(json.get("numbers") == null || json.get("numbers").isNull());
266+
// But other fields should be present
267+
assertEquals("PartialItem", json.get("item").asText());
268+
assertEquals(200, json.get("price").asInt());
269+
}
270+
}
271+
}
272+
}
273+
98274
@Nested
99275
@DisplayName("Upsert Consistency Tests")
100276
class UpsertConsistencyTests {
@@ -212,40 +388,6 @@ void testUpsertExistingDoc(String storeName) throws Exception {
212388
}
213389
}
214390

215-
@ParameterizedTest(name = "{0}: bulkUpsert multiple documents")
216-
@ArgumentsSource(AllStoresProvider.class)
217-
void testBulkUpsert(String storeName) throws Exception {
218-
String docId1 = generateDocId("bulk-1");
219-
String docId2 = generateDocId("bulk-2");
220-
221-
Collection collection = getCollection(storeName);
222-
223-
Map<Key, Document> documents = new HashMap<>();
224-
documents.put(createKey(docId1), createTestDocument(docId1));
225-
documents.put(createKey(docId2), createTestDocument(docId2));
226-
227-
boolean result = collection.bulkUpsert(documents);
228-
assertTrue(result);
229-
230-
for (String docId : List.of(docId1, docId2)) {
231-
Query query = buildQueryById(docId);
232-
try (CloseableIterator<Document> iterator = collection.find(query)) {
233-
assertTrue(iterator.hasNext());
234-
Document doc = iterator.next();
235-
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
236-
237-
assertEquals("TestItem", json.get("item").asText());
238-
assertEquals(100, json.get("price").asInt());
239-
assertEquals(50, json.get("quantity").asInt());
240-
assertTrue(json.get("in_stock").asBoolean());
241-
242-
JsonNode tagsNode = json.get("tags");
243-
assertNotNull(tagsNode);
244-
assertEquals(2, tagsNode.size());
245-
}
246-
}
247-
}
248-
249391
@ParameterizedTest(name = "{0}: upsert with non-existing fields (schema mismatch)")
250392
@ArgumentsSource(AllStoresProvider.class)
251393
void testUpsertNonExistingFields(String storeName) throws Exception {
@@ -1087,4 +1229,43 @@ void testMultipleUpdatesOnSameFieldThrowsException(String storeName) throws IOEx
10871229
}
10881230
}
10891231
}
1232+
1233+
@Nested
1234+
@DisplayName("Delete Consistency Tests")
1235+
class DeleteConsistencyTests {
1236+
1237+
@ParameterizedTest(name = "{0}: delete existing key returns true")
1238+
@ArgumentsSource(AllStoresProvider.class)
1239+
void testDeleteExistingKey(String storeName) throws Exception {
1240+
String docId = generateDocId("delete-existing");
1241+
Key key = createKey(docId);
1242+
Collection collection = getCollection(storeName);
1243+
1244+
Document document = createTestDocument(docId);
1245+
collection.upsert(key, document);
1246+
1247+
Query query = buildQueryById(docId);
1248+
try (CloseableIterator<Document> iterator = collection.find(query)) {
1249+
assertTrue(iterator.hasNext());
1250+
}
1251+
1252+
boolean result = collection.delete(key);
1253+
assertTrue(result);
1254+
1255+
try (CloseableIterator<Document> iterator = collection.find(query)) {
1256+
assertFalse(iterator.hasNext());
1257+
}
1258+
}
1259+
1260+
@ParameterizedTest(name = "{0}: delete on non-existent key returns false")
1261+
@ArgumentsSource(AllStoresProvider.class)
1262+
void testDeleteNonExistentKey(String storeName) {
1263+
Collection collection = getCollection(storeName);
1264+
1265+
Key nonExistentKey = createKey("non-existent-key-" + System.nanoTime());
1266+
1267+
boolean result = collection.delete(nonExistentKey);
1268+
assertFalse(result);
1269+
}
1270+
}
10901271
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"id": "default:bulk-replace-return-1",
3+
"item": "Original1",
4+
"price": 100,
5+
"quantity": 50,
6+
"tags": ["electronics", "sale"],
7+
"numbers": [10, 20, 30],
8+
"props": {
9+
"brand": "OriginalBrand1",
10+
"color": "red",
11+
"nested": {
12+
"key": "value1"
13+
}
14+
},
15+
"sales": {
16+
"total": 500,
17+
"region": "US"
18+
}
19+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"id": "default:bulk-replace-return-2",
3+
"item": "Original2",
4+
"price": 200,
5+
"quantity": 75,
6+
"tags": ["clothing", "premium"],
7+
"scores": [1.5, 2.5, 3.5],
8+
"props": {
9+
"brand": "OriginalBrand2",
10+
"size": "large"
11+
}
12+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"id": "default:bulk-replace-return-1",
3+
"item": "Updated1",
4+
"price": 999,
5+
"tags": ["newTag1"],
6+
"props": {
7+
"brand": "NewBrand1"
8+
}
9+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"id": "default:bulk-replace-return-2",
3+
"item": "Updated2",
4+
"price": 888
5+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"id": "default:bulk-replace-return-1",
3+
"item": "Updated1",
4+
"price": 999,
5+
"tags": ["newTag1"],
6+
"props": {
7+
"brand": "NewBrand1"
8+
}
9+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"id": "default:bulk-replace-return-2",
3+
"item": "Updated2",
4+
"price": 888
5+
}

document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ public interface Collection {
2121
* store.
2222
*
2323
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
24-
* the existing fields are modified is implementation specific. For example, upserting <code>
25-
* { "foo2": "bar2" }
24+
* the existing fields are modified is implementation specific. For example, upserting <code> {
25+
* "foo2": "bar2" }
2626
* </code> if a document <code>
2727
* { "foo1": "bar1" }
2828
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
@@ -42,8 +42,8 @@ public interface Collection {
4242
* store.
4343
*
4444
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
45-
* the existing fields are modified is implementation specific. For example, upserting <code>
46-
* { "foo2": "bar2" }
45+
* the existing fields are modified is implementation specific. For example, upserting <code> {
46+
* "foo2": "bar2" }
4747
* </code> if a document <code>
4848
* { "foo1": "bar1" }
4949
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
@@ -285,6 +285,17 @@ default boolean bulkCreateOrReplace(Map<Key, Document> documents) {
285285
throw new UnsupportedOperationException("bulkCreateOrReplace is not supported");
286286
}
287287

288+
/**
289+
* Method to bulkCreateOrReplace the given documents and return the previous copies of those
290+
* documents. This helps the clients to see how the documents were prior to upserting them and do
291+
* that in one less round trip.
292+
*/
293+
default CloseableIterator<Document> bulkCreateOrReplaceReturnOlderDocuments(
294+
Map<Key, Document> documents) throws IOException {
295+
throw new UnsupportedOperationException(
296+
"bulkCreateOrReplaceReturnOlderDocuments is not supported!");
297+
}
298+
288299
/**
289300
* Atomically create a new document if the key does not exist in the collection or, replace the
290301
* existing document if the key exists in the collection and return the created/replaced document

0 commit comments

Comments
 (0)