Skip to content

Commit 1101ac8

Browse files
authored
Merge pull request #25 from structured-world/feat/#24-adapter-updates-merge-wildcard-vector
feat: use MERGE for edges, wildcard patterns, type()/labels() functions
2 parents 082e031 + f0e1ff3 commit 1101ac8

5 files changed

Lines changed: 133 additions & 116 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ venv/
1717
env/
1818

1919
# Version files generated by hatch-vcs
20-
coordinode/coordinode/_version.py
20+
coordinode/_version.py
2121
langchain-coordinode/langchain_coordinode/_version.py
2222
llama-index-coordinode/llama_index/graph_stores/coordinode/_version.py
2323
GAPS.md
24+
CLAUDE.md

langchain-coordinode/langchain_coordinode/graph.py

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,29 @@ def refresh_schema(self) -> None:
7272
structured = _parse_schema(text)
7373
# Augment with relationship triples (start_label, type, end_label) via
7474
# Cypher — get_schema_text() only lists edge types without direction.
75-
# CoordiNode: wildcard [r] returns no results; build typed pattern from
76-
# the rel_props keys returned by _parse_schema().
77-
rel_types = list(structured.get("rel_props", {}).keys())
78-
if rel_types:
79-
rel_filter = "|".join(_cypher_ident(t) for t in rel_types)
80-
rows = self._client.cypher(
81-
f"MATCH (a)-[r:{rel_filter}]->(b) "
82-
"RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst"
83-
)
75+
# No LIMIT here intentionally: RETURN DISTINCT already collapses all edges
76+
# to unique (src_label, rel_type, dst_label) combinations, so the result
77+
# is bounded by the number of distinct relationship type triples, not by
78+
# total edge count. Adding a LIMIT would silently drop relationship types
79+
# that happen to appear beyond the limit, producing an incomplete schema.
80+
rows = self._client.cypher(
81+
"MATCH (a)-[r]->(b) RETURN DISTINCT labels(a) AS src_labels, type(r) AS rel, labels(b) AS dst_labels"
82+
)
83+
if rows:
84+
# Deduplicate after _first_label() normalization: RETURN DISTINCT operates on
85+
# raw label lists, but _first_label(min()) can collapse different multi-label
86+
# combinations to the same (start, type, end) triple (e.g. ['Employee','Person']
87+
# and ['Person','Employee'] both min-normalize to 'Employee'). Use a set to
88+
# ensure each relationship triple appears at most once.
89+
triples: set[tuple[str, str, str]] = set()
90+
for row in rows:
91+
start = _first_label(row.get("src_labels"))
92+
end = _first_label(row.get("dst_labels"))
93+
rel = row.get("rel")
94+
if start and rel and end:
95+
triples.add((start, rel, end))
8496
structured["relationships"] = [
85-
{"start": row["src"], "type": row["rel"], "end": row["dst"]}
86-
for row in rows
87-
if row.get("src") and row.get("rel") and row.get("dst")
97+
{"start": start, "type": rel, "end": end} for start, rel, end in sorted(triples)
8898
]
8999
self._structured_schema = structured
90100

@@ -95,18 +105,14 @@ def add_graph_documents(
95105
) -> None:
96106
"""Store nodes and relationships extracted from ``GraphDocument`` objects.
97107
98-
Nodes are upserted by ``id`` (used as the ``name`` property) via
99-
``MERGE``, so repeated calls are safe for nodes.
100-
101-
Relationships are created with unconditional ``CREATE`` because
102-
CoordiNode does not yet support ``MERGE`` for edge patterns. Re-ingesting
103-
the same ``GraphDocument`` will therefore produce duplicate edges.
108+
Both nodes and relationships are upserted via ``MERGE``, so repeated
109+
calls with the same data are idempotent.
104110
105111
Args:
106112
graph_documents: List of ``langchain_community.graphs.graph_document.GraphDocument``.
107113
include_source: If ``True``, also store the source ``Document`` as a
108114
``__Document__`` node linked to every extracted entity via
109-
``MENTIONS`` edges (also unconditional ``CREATE``).
115+
``MENTIONS`` edges.
110116
"""
111117
for doc in graph_documents:
112118
for node in doc.nodes:
@@ -133,12 +139,10 @@ def _upsert_node(self, node: Any) -> None:
133139
)
134140

135141
def _create_edge(self, rel: Any) -> None:
136-
"""Create a relationship via unconditional CREATE.
142+
"""Upsert a relationship via MERGE (idempotent).
137143
138-
CoordiNode does not support MERGE for edge patterns. Re-ingesting the
139-
same relationship will create a duplicate edge. SET r += $props is
140-
skipped when props is empty because SET r += {} is not supported by all
141-
server versions.
144+
SET r += $props is skipped when props is empty because
145+
SET r += {} is not supported by all server versions.
142146
"""
143147
src_label = _cypher_ident(rel.source.type or "Entity")
144148
dst_label = _cypher_ident(rel.target.type or "Entity")
@@ -148,19 +152,19 @@ def _create_edge(self, rel: Any) -> None:
148152
self._client.cypher(
149153
f"MATCH (src:{src_label} {{name: $src}}) "
150154
f"MATCH (dst:{dst_label} {{name: $dst}}) "
151-
f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props",
155+
f"MERGE (src)-[r:{rel_type}]->(dst) SET r += $props",
152156
params={"src": rel.source.id, "dst": rel.target.id, "props": props},
153157
)
154158
else:
155159
self._client.cypher(
156160
f"MATCH (src:{src_label} {{name: $src}}) "
157161
f"MATCH (dst:{dst_label} {{name: $dst}}) "
158-
f"CREATE (src)-[r:{rel_type}]->(dst)",
162+
f"MERGE (src)-[r:{rel_type}]->(dst)",
159163
params={"src": rel.source.id, "dst": rel.target.id},
160164
)
161165

162166
def _link_document_to_entities(self, doc: Any) -> None:
163-
"""Upsert a ``__Document__`` node and CREATE ``MENTIONS`` edges to all entities."""
167+
"""Upsert a ``__Document__`` node and MERGE ``MENTIONS`` edges to all entities."""
164168
src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source)
165169
self._client.cypher(
166170
"MERGE (d:__Document__ {id: $id}) SET d.page_content = $text",
@@ -169,7 +173,7 @@ def _link_document_to_entities(self, doc: Any) -> None:
169173
for node in doc.nodes:
170174
label = _cypher_ident(node.type or "Entity")
171175
self._client.cypher(
172-
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) CREATE (d)-[:MENTIONS]->(n)",
176+
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) MERGE (d)-[:MENTIONS]->(n)",
173177
params={"doc_id": src_id, "name": node.id},
174178
)
175179

@@ -211,10 +215,7 @@ def _stable_document_id(source: Any) -> str:
211215
212216
Combines ``page_content`` and sorted ``metadata`` items so the same
213217
document produces the same ``__Document__`` node ID across different
214-
Python processes. This makes document-node creation stable when
215-
``include_source=True`` is used, but does not make re-ingest fully
216-
idempotent because ``MENTIONS`` edges are not deduplicated until edge
217-
``MERGE``/dedup support is added to CoordiNode.
218+
Python processes.
218219
"""
219220
content = getattr(source, "page_content", "") or ""
220221
metadata = getattr(source, "metadata", {}) or {}
@@ -232,6 +233,20 @@ def _stable_document_id(source: Any) -> str:
232233
return hashlib.sha256(canonical.encode()).hexdigest()[:32]
233234

234235

236+
def _first_label(labels: Any) -> str | None:
237+
"""Extract a stable label from a labels() result (list of strings).
238+
239+
openCypher does not guarantee a stable ordering for labels(), so using
240+
labels[0] would produce nondeterministic schema entries across calls.
241+
We return the lexicographically smallest label as a deterministic rule.
242+
"""
243+
if isinstance(labels, list) and labels:
244+
return str(min(labels))
245+
if isinstance(labels, str):
246+
return labels
247+
return None
248+
249+
235250
def _cypher_ident(name: str) -> str:
236251
"""Escape a label/type name for use as a Cypher identifier."""
237252
# ASCII-only word characters: letter/digit/underscore, not starting with digit.

llama-index-coordinode/llama_index/graph_stores/coordinode/base.py

Lines changed: 18 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,7 @@ def get_triplets(
117117
properties: dict[str, Any] | None = None,
118118
ids: list[str] | None = None,
119119
) -> list[list[LabelledNode]]:
120-
"""Retrieve triplets (subject, predicate, object) as node triples.
121-
122-
Note:
123-
``relation_names`` is **required**. CoordiNode does not support
124-
untyped wildcard ``[r]`` relationship patterns — they silently return
125-
no rows. Omitting ``relation_names`` raises ``NotImplementedError``.
126-
"""
120+
"""Retrieve triplets (subject, predicate, object) as node triples."""
127121
conditions: list[str] = []
128122
params: dict[str, Any] = {}
129123

@@ -133,22 +127,15 @@ def get_triplets(
133127
conditions.append("(n.name IN $entity_names OR m.name IN $entity_names)")
134128
params["entity_names"] = entity_names
135129
if relation_names:
136-
# Escape each type name to prevent Cypher injection
137130
rel_filter = "|".join(_cypher_ident(t) for t in relation_names)
138131
rel_pattern = f"[r:{rel_filter}]"
139132
else:
140-
# CoordiNode: wildcard [r] pattern returns no results.
141-
# Callers must supply relation_names for the query to work.
142-
raise NotImplementedError(
143-
"CoordinodePropertyGraphStore.get_triplets() requires relation_names — "
144-
"CoordiNode does not support untyped wildcard [r] patterns"
145-
)
133+
rel_pattern = "[r]"
146134

147135
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
148-
# CoordiNode: use r.__type__ instead of type(r) — type() returns null.
149136
cypher = (
150137
f"MATCH (n)-{rel_pattern}->(m) {where} "
151-
"RETURN n, r.__type__ AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
138+
"RETURN n, type(r) AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
152139
"LIMIT 1000"
153140
)
154141
result = self._client.cypher(cypher, params=params)
@@ -189,27 +176,23 @@ def get_rel_map(
189176
if not graph_nodes:
190177
return []
191178

192-
# CoordiNode: wildcard [r] pattern returns no results. Fetch all
193-
# known edge types from the schema and build a typed pattern instead,
194-
# e.g. [r:TYPE_A|TYPE_B|...].
195-
schema_text = self._client.get_schema_text()
196-
edge_types = _parse_edge_types_from_schema(schema_text)
197-
198179
ignored = set(ignore_rels) if ignore_rels else set()
199-
active_types = [t for t in edge_types if t not in ignored]
200-
201-
if not active_types:
202-
return []
203-
204-
rel_filter = "|".join(_cypher_ident(t) for t in active_types)
205180
node_ids = [n.id for n in graph_nodes]
206-
safe_limit = int(limit) # coerce to int to prevent Cypher injection via non-integer input
181+
safe_limit = int(limit)
207182
params: dict[str, object] = {"ids": node_ids}
208183

184+
# Push ignore_rels filter into the WHERE clause so LIMIT applies only
185+
# to non-ignored edges and callers receive up to `limit` visible results.
186+
if ignored:
187+
params["ignored"] = list(ignored)
188+
ignore_clause = "AND type(r) NOT IN $ignored "
189+
else:
190+
ignore_clause = ""
191+
209192
cypher = (
210-
f"MATCH (n)-[r:{rel_filter}]->(m) "
211-
f"WHERE n.id IN $ids "
212-
f"RETURN n, r.__type__ AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
193+
"MATCH (n)-[r]->(m) "
194+
f"WHERE n.id IN $ids {ignore_clause}"
195+
f"RETURN n, type(r) AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
213196
f"LIMIT {safe_limit}"
214197
)
215198
result = self._client.cypher(cypher, params=params)
@@ -237,28 +220,21 @@ def upsert_nodes(self, nodes: list[LabelledNode]) -> None:
237220
self._client.cypher(cypher, params={"id": node.id, "props": props})
238221

239222
def upsert_relations(self, relations: list[Relation]) -> None:
240-
"""Upsert relationships into the graph."""
223+
"""Upsert relationships into the graph (idempotent via MERGE)."""
241224
for rel in relations:
242225
props = rel.properties or {}
243226
label = _cypher_ident(rel.label)
244-
# CoordiNode does not yet support MERGE for edge patterns; use CREATE.
245-
# A WHERE NOT (src)-[:TYPE]->(dst) guard was tested but returns 0
246-
# rows silently in CoordiNode, making all CREATE statements no-ops.
247-
# Until server-side MERGE or pattern predicates are supported,
248-
# repeated calls will create duplicate edges.
249-
# SET r += $props is skipped when props is empty — SET r += {} is
250-
# not supported by all server versions.
251227
if props:
252228
cypher = (
253229
f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) "
254-
f"CREATE (src)-[r:{label}]->(dst) SET r += $props"
230+
f"MERGE (src)-[r:{label}]->(dst) SET r += $props"
255231
)
256232
self._client.cypher(
257233
cypher,
258234
params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props},
259235
)
260236
else:
261-
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) CREATE (src)-[r:{label}]->(dst)"
237+
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) MERGE (src)-[r:{label}]->(dst)"
262238
self._client.cypher(
263239
cypher,
264240
params={"src_id": rel.source_id, "dst_id": rel.target_id},
@@ -376,29 +352,3 @@ def _node_label(node: LabelledNode) -> str:
376352
if isinstance(node, EntityNode):
377353
return node.label or "Entity"
378354
return "Node"
379-
380-
381-
def _parse_edge_types_from_schema(schema_text: str) -> list[str]:
382-
"""Extract edge type names from CoordiNode schema text.
383-
384-
Parses the "Edge types:" section produced by ``get_schema_text()``.
385-
"""
386-
edge_types: list[str] = []
387-
lines = iter(schema_text.splitlines())
388-
389-
# Advance to the "Edge types:" header.
390-
for line in lines:
391-
if line.strip().lower().startswith("edge types"):
392-
break
393-
394-
# Collect bullet items until the first blank line.
395-
for line in lines:
396-
stripped = line.strip()
397-
if not stripped:
398-
break
399-
if stripped.startswith(("-", "*")):
400-
name = stripped.lstrip("-* ").split("(")[0].strip()
401-
if name:
402-
edge_types.append(name)
403-
404-
return edge_types

tests/integration/adapters/test_langchain.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,15 @@ def test_add_graph_documents_creates_relationship(graph, unique_tag):
9797
graph.add_graph_documents([doc])
9898

9999
# Verify the relationship was created, not just the source node.
100-
# count(*) instead of count(r): CoordiNode returns 0 for relationship-variable counts
101100
result = graph.query(
102-
"MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(*) AS cnt",
101+
"MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(r) AS cnt",
103102
params={"src": f"Charlie-{unique_tag}", "dst": f"GraphRAG-{unique_tag}"},
104103
)
105-
assert result[0]["cnt"] >= 1, f"relationship not found: {result}"
104+
assert result[0]["cnt"] == 1, f"expected exactly 1 relationship: {result}"
106105

107106

108107
def test_add_graph_documents_idempotent(graph, unique_tag):
109-
"""Calling add_graph_documents twice must not raise.
110-
111-
Nodes are idempotent (MERGE). Edges are NOT — CoordiNode does not yet
112-
support MERGE for edges, so unconditional CREATE is used and duplicate
113-
edges are expected after two ingests.
114-
"""
108+
"""Calling add_graph_documents twice produces exactly one edge (MERGE idempotent)."""
115109
node_a = Node(id=f"Idempotent-{unique_tag}", type="LCIdempotent")
116110
node_b = Node(id=f"IdempTarget-{unique_tag}", type="LCIdempotent")
117111
rel = Relationship(source=node_a, target=node_b, type="LC_IDEMP_REL")
@@ -131,12 +125,12 @@ def test_add_graph_documents_idempotent(graph, unique_tag):
131125
)
132126
assert result[0]["cnt"] == 1
133127

134-
# Edges: unconditional CREATE → count >= 1 (may be > 1 due to CoordiNode limitation)
128+
# Edges: MERGE keeps count at 1 (idempotent)
135129
result = graph.query(
136-
"MATCH (a:LCIdempotent {name: $src})-[r:LC_IDEMP_REL]->(b:LCIdempotent {name: $dst}) RETURN count(*) AS cnt",
130+
"MATCH (a:LCIdempotent {name: $src})-[r:LC_IDEMP_REL]->(b:LCIdempotent {name: $dst}) RETURN count(r) AS cnt",
137131
params={"src": f"Idempotent-{unique_tag}", "dst": f"IdempTarget-{unique_tag}"},
138132
)
139-
assert result[0]["cnt"] >= 1
133+
assert result[0]["cnt"] == 1
140134

141135

142136
def test_schema_refreshes_after_add(graph, unique_tag):

0 commit comments

Comments
 (0)