Skip to content

Commit 6d009a7

Browse files
committed
feat: use MERGE for edges, wildcard patterns, type()/labels() functions
CoordiNode v0.3.1+ supports wildcard [r] in MATCH patterns, type(r), labels(n), and MERGE for relationship patterns. Update adapters to use these instead of the old workarounds. LangChain adapter: - refresh_schema(): wildcard MATCH (a)-[r]->(b) with labels()/type() - _create_edge(): CREATE → MERGE (idempotent edge upsert) - _link_document_to_entities(): CREATE → MERGE for MENTIONS edges LlamaIndex adapter: - get_triplets(): wildcard [r] pattern; type(r) instead of r.__type__ - get_rel_map(): wildcard [r]; ignore_rels pushed into Cypher WHERE so LIMIT applies only to non-ignored edges - upsert_relations(): CREATE → MERGE (idempotent) - Remove _parse_edge_types_from_schema() (no longer needed) Tests: - count(*) → count(r) for relationship counting - cnt >= 1 → cnt == 1 for idempotent edge assertions - get_triplets() test uses wildcard (no relation_names) Closes #24
1 parent 082e031 commit 6d009a7

5 files changed

Lines changed: 57 additions & 115 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ venv/
1717
env/
1818

1919
# Version files generated by hatch-vcs
20-
coordinode/coordinode/_version.py
20+
coordinode/_version.py
2121
langchain-coordinode/langchain_coordinode/_version.py
2222
llama-index-coordinode/llama_index/graph_stores/coordinode/_version.py
2323
GAPS.md
24+
CLAUDE.md

langchain-coordinode/langchain_coordinode/graph.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,18 @@ def refresh_schema(self) -> None:
7272
structured = _parse_schema(text)
7373
# Augment with relationship triples (start_label, type, end_label) via
7474
# Cypher — get_schema_text() only lists edge types without direction.
75-
# CoordiNode: wildcard [r] returns no results; build typed pattern from
76-
# the rel_props keys returned by _parse_schema().
77-
rel_types = list(structured.get("rel_props", {}).keys())
78-
if rel_types:
79-
rel_filter = "|".join(_cypher_ident(t) for t in rel_types)
80-
rows = self._client.cypher(
81-
f"MATCH (a)-[r:{rel_filter}]->(b) "
82-
"RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst"
83-
)
75+
rows = self._client.cypher(
76+
"MATCH (a)-[r]->(b) RETURN DISTINCT labels(a) AS src_labels, type(r) AS rel, labels(b) AS dst_labels"
77+
)
78+
if rows:
8479
structured["relationships"] = [
85-
{"start": row["src"], "type": row["rel"], "end": row["dst"]}
80+
{
81+
"start": _first_label(row.get("src_labels")),
82+
"type": row["rel"],
83+
"end": _first_label(row.get("dst_labels")),
84+
}
8685
for row in rows
87-
if row.get("src") and row.get("rel") and row.get("dst")
86+
if _first_label(row.get("src_labels")) and row.get("rel") and _first_label(row.get("dst_labels"))
8887
]
8988
self._structured_schema = structured
9089

@@ -95,18 +94,14 @@ def add_graph_documents(
9594
) -> None:
9695
"""Store nodes and relationships extracted from ``GraphDocument`` objects.
9796
98-
Nodes are upserted by ``id`` (used as the ``name`` property) via
99-
``MERGE``, so repeated calls are safe for nodes.
100-
101-
Relationships are created with unconditional ``CREATE`` because
102-
CoordiNode does not yet support ``MERGE`` for edge patterns. Re-ingesting
103-
the same ``GraphDocument`` will therefore produce duplicate edges.
97+
Both nodes and relationships are upserted via ``MERGE``, so repeated
98+
calls with the same data are idempotent.
10499
105100
Args:
106101
graph_documents: List of ``langchain_community.graphs.graph_document.GraphDocument``.
107102
include_source: If ``True``, also store the source ``Document`` as a
108103
``__Document__`` node linked to every extracted entity via
109-
``MENTIONS`` edges (also unconditional ``CREATE``).
104+
``MENTIONS`` edges.
110105
"""
111106
for doc in graph_documents:
112107
for node in doc.nodes:
@@ -133,12 +128,10 @@ def _upsert_node(self, node: Any) -> None:
133128
)
134129

135130
def _create_edge(self, rel: Any) -> None:
136-
"""Create a relationship via unconditional CREATE.
131+
"""Upsert a relationship via MERGE (idempotent).
137132
138-
CoordiNode does not support MERGE for edge patterns. Re-ingesting the
139-
same relationship will create a duplicate edge. SET r += $props is
140-
skipped when props is empty because SET r += {} is not supported by all
141-
server versions.
133+
SET r += $props is skipped when props is empty because
134+
SET r += {} is not supported by all server versions.
142135
"""
143136
src_label = _cypher_ident(rel.source.type or "Entity")
144137
dst_label = _cypher_ident(rel.target.type or "Entity")
@@ -148,19 +141,19 @@ def _create_edge(self, rel: Any) -> None:
148141
self._client.cypher(
149142
f"MATCH (src:{src_label} {{name: $src}}) "
150143
f"MATCH (dst:{dst_label} {{name: $dst}}) "
151-
f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props",
144+
f"MERGE (src)-[r:{rel_type}]->(dst) SET r += $props",
152145
params={"src": rel.source.id, "dst": rel.target.id, "props": props},
153146
)
154147
else:
155148
self._client.cypher(
156149
f"MATCH (src:{src_label} {{name: $src}}) "
157150
f"MATCH (dst:{dst_label} {{name: $dst}}) "
158-
f"CREATE (src)-[r:{rel_type}]->(dst)",
151+
f"MERGE (src)-[r:{rel_type}]->(dst)",
159152
params={"src": rel.source.id, "dst": rel.target.id},
160153
)
161154

162155
def _link_document_to_entities(self, doc: Any) -> None:
163-
"""Upsert a ``__Document__`` node and CREATE ``MENTIONS`` edges to all entities."""
156+
"""Upsert a ``__Document__`` node and MERGE ``MENTIONS`` edges to all entities."""
164157
src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source)
165158
self._client.cypher(
166159
"MERGE (d:__Document__ {id: $id}) SET d.page_content = $text",
@@ -169,7 +162,7 @@ def _link_document_to_entities(self, doc: Any) -> None:
169162
for node in doc.nodes:
170163
label = _cypher_ident(node.type or "Entity")
171164
self._client.cypher(
172-
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) CREATE (d)-[:MENTIONS]->(n)",
165+
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) MERGE (d)-[:MENTIONS]->(n)",
173166
params={"doc_id": src_id, "name": node.id},
174167
)
175168

@@ -211,10 +204,7 @@ def _stable_document_id(source: Any) -> str:
211204
212205
Combines ``page_content`` and sorted ``metadata`` items so the same
213206
document produces the same ``__Document__`` node ID across different
214-
Python processes. This makes document-node creation stable when
215-
``include_source=True`` is used, but does not make re-ingest fully
216-
idempotent because ``MENTIONS`` edges are not deduplicated until edge
217-
``MERGE``/dedup support is added to CoordiNode.
207+
Python processes.
218208
"""
219209
content = getattr(source, "page_content", "") or ""
220210
metadata = getattr(source, "metadata", {}) or {}
@@ -232,6 +222,15 @@ def _stable_document_id(source: Any) -> str:
232222
return hashlib.sha256(canonical.encode()).hexdigest()[:32]
233223

234224

225+
def _first_label(labels: Any) -> str | None:
226+
"""Extract the first label from a labels() result (list of strings)."""
227+
if isinstance(labels, list) and labels:
228+
return str(labels[0])
229+
if isinstance(labels, str):
230+
return labels
231+
return None
232+
233+
235234
def _cypher_ident(name: str) -> str:
236235
"""Escape a label/type name for use as a Cypher identifier."""
237236
# ASCII-only word characters: letter/digit/underscore, not starting with digit.

llama-index-coordinode/llama_index/graph_stores/coordinode/base.py

Lines changed: 18 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,7 @@ def get_triplets(
117117
properties: dict[str, Any] | None = None,
118118
ids: list[str] | None = None,
119119
) -> list[list[LabelledNode]]:
120-
"""Retrieve triplets (subject, predicate, object) as node triples.
121-
122-
Note:
123-
``relation_names`` is **required**. CoordiNode does not support
124-
untyped wildcard ``[r]`` relationship patterns — they silently return
125-
no rows. Omitting ``relation_names`` raises ``NotImplementedError``.
126-
"""
120+
"""Retrieve triplets (subject, predicate, object) as node triples."""
127121
conditions: list[str] = []
128122
params: dict[str, Any] = {}
129123

@@ -133,22 +127,15 @@ def get_triplets(
133127
conditions.append("(n.name IN $entity_names OR m.name IN $entity_names)")
134128
params["entity_names"] = entity_names
135129
if relation_names:
136-
# Escape each type name to prevent Cypher injection
137130
rel_filter = "|".join(_cypher_ident(t) for t in relation_names)
138131
rel_pattern = f"[r:{rel_filter}]"
139132
else:
140-
# CoordiNode: wildcard [r] pattern returns no results.
141-
# Callers must supply relation_names for the query to work.
142-
raise NotImplementedError(
143-
"CoordinodePropertyGraphStore.get_triplets() requires relation_names — "
144-
"CoordiNode does not support untyped wildcard [r] patterns"
145-
)
133+
rel_pattern = "[r]"
146134

147135
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
148-
# CoordiNode: use r.__type__ instead of type(r) — type() returns null.
149136
cypher = (
150137
f"MATCH (n)-{rel_pattern}->(m) {where} "
151-
"RETURN n, r.__type__ AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
138+
"RETURN n, type(r) AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
152139
"LIMIT 1000"
153140
)
154141
result = self._client.cypher(cypher, params=params)
@@ -189,27 +176,23 @@ def get_rel_map(
189176
if not graph_nodes:
190177
return []
191178

192-
# CoordiNode: wildcard [r] pattern returns no results. Fetch all
193-
# known edge types from the schema and build a typed pattern instead,
194-
# e.g. [r:TYPE_A|TYPE_B|...].
195-
schema_text = self._client.get_schema_text()
196-
edge_types = _parse_edge_types_from_schema(schema_text)
197-
198179
ignored = set(ignore_rels) if ignore_rels else set()
199-
active_types = [t for t in edge_types if t not in ignored]
200-
201-
if not active_types:
202-
return []
203-
204-
rel_filter = "|".join(_cypher_ident(t) for t in active_types)
205180
node_ids = [n.id for n in graph_nodes]
206-
safe_limit = int(limit) # coerce to int to prevent Cypher injection via non-integer input
181+
safe_limit = int(limit)
207182
params: dict[str, object] = {"ids": node_ids}
208183

184+
# Push ignore_rels filter into the WHERE clause so LIMIT applies only
185+
# to non-ignored edges and callers receive up to `limit` visible results.
186+
if ignored:
187+
params["ignored"] = list(ignored)
188+
ignore_clause = "AND type(r) NOT IN $ignored "
189+
else:
190+
ignore_clause = ""
191+
209192
cypher = (
210-
f"MATCH (n)-[r:{rel_filter}]->(m) "
211-
f"WHERE n.id IN $ids "
212-
f"RETURN n, r.__type__ AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
193+
"MATCH (n)-[r]->(m) "
194+
f"WHERE n.id IN $ids {ignore_clause}"
195+
f"RETURN n, type(r) AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
213196
f"LIMIT {safe_limit}"
214197
)
215198
result = self._client.cypher(cypher, params=params)
@@ -237,28 +220,21 @@ def upsert_nodes(self, nodes: list[LabelledNode]) -> None:
237220
self._client.cypher(cypher, params={"id": node.id, "props": props})
238221

239222
def upsert_relations(self, relations: list[Relation]) -> None:
240-
"""Upsert relationships into the graph."""
223+
"""Upsert relationships into the graph (idempotent via MERGE)."""
241224
for rel in relations:
242225
props = rel.properties or {}
243226
label = _cypher_ident(rel.label)
244-
# CoordiNode does not yet support MERGE for edge patterns; use CREATE.
245-
# A WHERE NOT (src)-[:TYPE]->(dst) guard was tested but returns 0
246-
# rows silently in CoordiNode, making all CREATE statements no-ops.
247-
# Until server-side MERGE or pattern predicates are supported,
248-
# repeated calls will create duplicate edges.
249-
# SET r += $props is skipped when props is empty — SET r += {} is
250-
# not supported by all server versions.
251227
if props:
252228
cypher = (
253229
f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) "
254-
f"CREATE (src)-[r:{label}]->(dst) SET r += $props"
230+
f"MERGE (src)-[r:{label}]->(dst) SET r += $props"
255231
)
256232
self._client.cypher(
257233
cypher,
258234
params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props},
259235
)
260236
else:
261-
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) CREATE (src)-[r:{label}]->(dst)"
237+
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) MERGE (src)-[r:{label}]->(dst)"
262238
self._client.cypher(
263239
cypher,
264240
params={"src_id": rel.source_id, "dst_id": rel.target_id},
@@ -376,29 +352,3 @@ def _node_label(node: LabelledNode) -> str:
376352
if isinstance(node, EntityNode):
377353
return node.label or "Entity"
378354
return "Node"
379-
380-
381-
def _parse_edge_types_from_schema(schema_text: str) -> list[str]:
382-
"""Extract edge type names from CoordiNode schema text.
383-
384-
Parses the "Edge types:" section produced by ``get_schema_text()``.
385-
"""
386-
edge_types: list[str] = []
387-
lines = iter(schema_text.splitlines())
388-
389-
# Advance to the "Edge types:" header.
390-
for line in lines:
391-
if line.strip().lower().startswith("edge types"):
392-
break
393-
394-
# Collect bullet items until the first blank line.
395-
for line in lines:
396-
stripped = line.strip()
397-
if not stripped:
398-
break
399-
if stripped.startswith(("-", "*")):
400-
name = stripped.lstrip("-* ").split("(")[0].strip()
401-
if name:
402-
edge_types.append(name)
403-
404-
return edge_types

tests/integration/adapters/test_langchain.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,15 @@ def test_add_graph_documents_creates_relationship(graph, unique_tag):
9797
graph.add_graph_documents([doc])
9898

9999
# Verify the relationship was created, not just the source node.
100-
# count(*) instead of count(r): CoordiNode returns 0 for relationship-variable counts
101100
result = graph.query(
102-
"MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(*) AS cnt",
101+
"MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(r) AS cnt",
103102
params={"src": f"Charlie-{unique_tag}", "dst": f"GraphRAG-{unique_tag}"},
104103
)
105-
assert result[0]["cnt"] >= 1, f"relationship not found: {result}"
104+
assert result[0]["cnt"] == 1, f"expected exactly 1 relationship: {result}"
106105

107106

108107
def test_add_graph_documents_idempotent(graph, unique_tag):
109-
"""Calling add_graph_documents twice must not raise.
110-
111-
Nodes are idempotent (MERGE). Edges are NOT — CoordiNode does not yet
112-
support MERGE for edges, so unconditional CREATE is used and duplicate
113-
edges are expected after two ingests.
114-
"""
108+
"""Calling add_graph_documents twice produces exactly one edge (MERGE idempotent)."""
115109
node_a = Node(id=f"Idempotent-{unique_tag}", type="LCIdempotent")
116110
node_b = Node(id=f"IdempTarget-{unique_tag}", type="LCIdempotent")
117111
rel = Relationship(source=node_a, target=node_b, type="LC_IDEMP_REL")
@@ -131,12 +125,12 @@ def test_add_graph_documents_idempotent(graph, unique_tag):
131125
)
132126
assert result[0]["cnt"] == 1
133127

134-
# Edges: unconditional CREATE → count >= 1 (may be > 1 due to CoordiNode limitation)
128+
# Edges: MERGE keeps count at 1 (idempotent)
135129
result = graph.query(
136-
"MATCH (a:LCIdempotent {name: $src})-[r:LC_IDEMP_REL]->(b:LCIdempotent {name: $dst}) RETURN count(*) AS cnt",
130+
"MATCH (a:LCIdempotent {name: $src})-[r:LC_IDEMP_REL]->(b:LCIdempotent {name: $dst}) RETURN count(r) AS cnt",
137131
params={"src": f"Idempotent-{unique_tag}", "dst": f"IdempTarget-{unique_tag}"},
138132
)
139-
assert result[0]["cnt"] >= 1
133+
assert result[0]["cnt"] == 1
140134

141135

142136
def test_schema_refreshes_after_add(graph, unique_tag):

tests/integration/adapters/test_llama_index.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,9 @@ def test_upsert_and_get_triplets(store, tag):
9696
)
9797
store.upsert_relations([rel])
9898

99-
# CoordiNode does not support wildcard [r] patterns yet — must pass relation_names.
100-
# See: get_triplets() implementation note.
99+
# Wildcard [r] works — no need to specify relation_names.
101100
triplets = store.get_triplets(
102101
entity_names=[f"Src-{tag}"],
103-
relation_names=["LI_RESEARCHES"],
104102
)
105103
assert isinstance(triplets, list)
106104
assert len(triplets) >= 1

0 commit comments

Comments
 (0)