Skip to content

Commit d59e27e

Browse files
authored
Merge pull request #15 from structured-world/fix/#14-e2e-cypher-compat
fix: CoordiNode Cypher compatibility — add_graph_documents, __type__/__label__, MATCH+CREATE
2 parents ce93640 + 828e8d9 commit d59e27e

9 files changed

Lines changed: 548 additions & 44 deletions

File tree

langchain-coordinode/langchain_coordinode/graph.py

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from __future__ import annotations
44

5+
import hashlib
6+
import json
57
import re
68
from typing import Any
79

@@ -70,19 +72,107 @@ def refresh_schema(self) -> None:
7072
structured = _parse_schema(text)
7173
# Augment with relationship triples (start_label, type, end_label) via
7274
# Cypher — get_schema_text() only lists edge types without direction.
73-
try:
75+
# CoordiNode: wildcard [r] returns no results; build typed pattern from
76+
# the rel_props keys returned by _parse_schema().
77+
rel_types = list(structured.get("rel_props", {}).keys())
78+
if rel_types:
79+
rel_filter = "|".join(_cypher_ident(t) for t in rel_types)
7480
rows = self._client.cypher(
75-
"MATCH (a)-[r]->(b) RETURN DISTINCT labels(a)[0] AS src, type(r) AS rel, labels(b)[0] AS dst"
81+
f"MATCH (a)-[r:{rel_filter}]->(b) "
82+
"RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst"
7683
)
7784
structured["relationships"] = [
7885
{"start": row["src"], "type": row["rel"], "end": row["dst"]}
7986
for row in rows
8087
if row.get("src") and row.get("rel") and row.get("dst")
8188
]
82-
except Exception: # noqa: BLE001
83-
pass # Graph may have no relationships yet; structured["relationships"] stays []
8489
self._structured_schema = structured
8590

91+
def add_graph_documents(
92+
self,
93+
graph_documents: list[Any],
94+
include_source: bool = False,
95+
) -> None:
96+
"""Store nodes and relationships extracted from ``GraphDocument`` objects.
97+
98+
Nodes are upserted by ``id`` (used as the ``name`` property) via
99+
``MERGE``, so repeated calls are safe for nodes.
100+
101+
Relationships are created with unconditional ``CREATE`` because
102+
CoordiNode does not yet support ``MERGE`` for edge patterns. Re-ingesting
103+
the same ``GraphDocument`` will therefore produce duplicate edges.
104+
105+
Args:
106+
graph_documents: List of ``langchain_community.graphs.graph_document.GraphDocument``.
107+
include_source: If ``True``, also store the source ``Document`` as a
108+
``__Document__`` node linked to every extracted entity via
109+
``MENTIONS`` edges (also unconditional ``CREATE``).
110+
"""
111+
for doc in graph_documents:
112+
for node in doc.nodes:
113+
self._upsert_node(node)
114+
for rel in doc.relationships:
115+
self._create_edge(rel)
116+
if include_source and doc.source:
117+
self._link_document_to_entities(doc)
118+
119+
# Invalidate cached schema so next access reflects new data
120+
self._schema = None
121+
self._structured_schema = None
122+
123+
def _upsert_node(self, node: Any) -> None:
124+
"""Upsert a single node by ``id`` via MERGE."""
125+
label = _cypher_ident(node.type or "Entity")
126+
props = dict(node.properties or {})
127+
# Always enforce node.id as the merge key; incoming
128+
# properties["name"] must not drift from the MERGE predicate.
129+
props["name"] = node.id
130+
self._client.cypher(
131+
f"MERGE (n:{label} {{name: $name}}) SET n += $props",
132+
params={"name": node.id, "props": props},
133+
)
134+
135+
def _create_edge(self, rel: Any) -> None:
136+
"""Create a relationship via unconditional CREATE.
137+
138+
CoordiNode does not support MERGE for edge patterns. Re-ingesting the
139+
same relationship will create a duplicate edge. SET r += $props is
140+
skipped when props is empty because SET r += {} is not supported by all
141+
server versions.
142+
"""
143+
src_label = _cypher_ident(rel.source.type or "Entity")
144+
dst_label = _cypher_ident(rel.target.type or "Entity")
145+
rel_type = _cypher_ident(rel.type)
146+
props = dict(rel.properties or {})
147+
if props:
148+
self._client.cypher(
149+
f"MATCH (src:{src_label} {{name: $src}}) "
150+
f"MATCH (dst:{dst_label} {{name: $dst}}) "
151+
f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props",
152+
params={"src": rel.source.id, "dst": rel.target.id, "props": props},
153+
)
154+
else:
155+
self._client.cypher(
156+
f"MATCH (src:{src_label} {{name: $src}}) "
157+
f"MATCH (dst:{dst_label} {{name: $dst}}) "
158+
f"CREATE (src)-[r:{rel_type}]->(dst)",
159+
params={"src": rel.source.id, "dst": rel.target.id},
160+
)
161+
162+
def _link_document_to_entities(self, doc: Any) -> None:
163+
"""Upsert a ``__Document__`` node and CREATE ``MENTIONS`` edges to all entities."""
164+
src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source)
165+
self._client.cypher(
166+
"MERGE (d:__Document__ {id: $id}) SET d.page_content = $text",
167+
params={"id": src_id, "text": doc.source.page_content or ""},
168+
)
169+
for node in doc.nodes:
170+
label = _cypher_ident(node.type or "Entity")
171+
self._client.cypher(
172+
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) CREATE (d)-[:MENTIONS]->(n)",
173+
params={"doc_id": src_id, "name": node.id},
174+
)
175+
86176
def query(
87177
self,
88178
query: str,
@@ -116,6 +206,40 @@ def __exit__(self, *args: Any) -> None:
116206
# ── Schema parser ─────────────────────────────────────────────────────────
117207

118208

209+
def _stable_document_id(source: Any) -> str:
210+
"""Return a deterministic ID for a LangChain Document.
211+
212+
Combines ``page_content`` and sorted ``metadata`` items so the same
213+
document produces the same ``__Document__`` node ID across different
214+
Python processes. This makes document-node creation stable when
215+
``include_source=True`` is used, but does not make re-ingest fully
216+
idempotent because ``MENTIONS`` edges are not deduplicated until edge
217+
``MERGE``/dedup support is added to CoordiNode.
218+
"""
219+
content = getattr(source, "page_content", "") or ""
220+
metadata = getattr(source, "metadata", {}) or {}
221+
# Use canonical JSON encoding to avoid delimiter ambiguity and ensure
222+
# determinism for nested/non-scalar metadata values. default=str converts
223+
# non-JSON-serializable types (datetime, UUID, Path, …) to their string
224+
# representation so the hash never raises TypeError.
225+
canonical = json.dumps(
226+
{"content": content, "metadata": metadata},
227+
sort_keys=True,
228+
separators=(",", ":"),
229+
ensure_ascii=False,
230+
default=str,
231+
)
232+
return hashlib.sha256(canonical.encode()).hexdigest()[:32]
233+
234+
235+
def _cypher_ident(name: str) -> str:
236+
"""Escape a label/type name for use as a Cypher identifier."""
237+
# ASCII-only word characters: letter/digit/underscore, not starting with digit.
238+
if re.match(r"^[A-Za-z_]\w*$", name, re.ASCII):
239+
return name
240+
return f"`{name.replace('`', '``')}`"
241+
242+
119243
def _parse_schema(schema_text: str) -> dict[str, Any]:
120244
"""Convert CoordiNode schema text into LangChain's structured format.
121245
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
# namespace package
1+
from pkgutil import extend_path
2+
3+
__path__ = extend_path(__path__, __name__)
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
# namespace package
1+
from pkgutil import extend_path
2+
3+
__path__ = extend_path(__path__, __name__)

llama-index-coordinode/llama_index/graph_stores/coordinode/base.py

Lines changed: 97 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,13 @@ def get_triplets(
117117
properties: dict[str, Any] | None = None,
118118
ids: list[str] | None = None,
119119
) -> list[list[LabelledNode]]:
120-
"""Retrieve triplets (subject, predicate, object) as node triples."""
120+
"""Retrieve triplets (subject, predicate, object) as node triples.
121+
122+
Note:
123+
``relation_names`` is **required**. CoordiNode does not support
124+
untyped wildcard ``[r]`` relationship patterns — they silently return
125+
no rows. Omitting ``relation_names`` raises ``NotImplementedError``.
126+
"""
121127
conditions: list[str] = []
122128
params: dict[str, Any] = {}
123129

@@ -131,20 +137,26 @@ def get_triplets(
131137
rel_filter = "|".join(_cypher_ident(t) for t in relation_names)
132138
rel_pattern = f"[r:{rel_filter}]"
133139
else:
134-
rel_pattern = "[r]"
140+
# CoordiNode: wildcard [r] pattern returns no results.
141+
# Callers must supply relation_names for the query to work.
142+
raise NotImplementedError(
143+
"CoordinodePropertyGraphStore.get_triplets() requires relation_names — "
144+
"CoordiNode does not support untyped wildcard [r] patterns"
145+
)
135146

136147
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
148+
# CoordiNode: use r.__type__ instead of type(r) — type() returns null.
137149
cypher = (
138150
f"MATCH (n)-{rel_pattern}->(m) {where} "
139-
"RETURN n, type(r) AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
151+
"RETURN n, r.__type__ AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
140152
"LIMIT 1000"
141153
)
142154
result = self._client.cypher(cypher, params=params)
143155

144156
triplets: list[list[LabelledNode]] = []
145157
for row in result:
146158
src_data = row.get("n", {})
147-
rel_type = row.get("rel_type", "RELATED")
159+
rel_type = row.get("rel_type") or "RELATED"
148160
dst_data = row.get("m", {})
149161
src_id = str(row.get("_src_id", ""))
150162
dst_id = str(row.get("_dst_id", ""))
@@ -158,30 +170,47 @@ def get_triplets(
158170
def get_rel_map(
159171
self,
160172
graph_nodes: list[LabelledNode],
161-
depth: int = 2,
173+
depth: int = 1,
162174
limit: int = 30,
163175
ignore_rels: list[str] | None = None,
164176
) -> list[list[LabelledNode]]:
165-
"""Get relationship map for a set of nodes up to ``depth`` hops."""
177+
"""Get relationship map for a set of nodes up to ``depth`` hops.
178+
179+
Note: only ``depth=1`` (single hop) is supported. ``depth > 1`` raises
180+
``NotImplementedError`` because CoordiNode does not yet serialise
181+
variable-length path results.
182+
"""
183+
if depth != 1:
184+
raise NotImplementedError(
185+
"CoordinodePropertyGraphStore.get_rel_map() currently supports depth=1 only; "
186+
"variable-length path queries are not yet available in CoordiNode"
187+
)
188+
166189
if not graph_nodes:
167190
return []
168191

169-
node_ids = [n.id for n in graph_nodes]
170-
ignored = list(ignore_rels) if ignore_rels else []
192+
# CoordiNode: wildcard [r] pattern returns no results. Fetch all
193+
# known edge types from the schema and build a typed pattern instead,
194+
# e.g. [r:TYPE_A|TYPE_B|...].
195+
schema_text = self._client.get_schema_text()
196+
edge_types = _parse_edge_types_from_schema(schema_text)
171197

172-
# Push ignore_rels filter into Cypher so LIMIT applies after filtering;
173-
# a Python-side filter after LIMIT would silently truncate valid results.
198+
ignored = set(ignore_rels) if ignore_rels else set()
199+
active_types = [t for t in edge_types if t not in ignored]
200+
201+
if not active_types:
202+
return []
203+
204+
rel_filter = "|".join(_cypher_ident(t) for t in active_types)
205+
node_ids = [n.id for n in graph_nodes]
206+
safe_limit = int(limit) # coerce to int to prevent Cypher injection via non-integer input
174207
params: dict[str, object] = {"ids": node_ids}
175-
ignore_clause = ""
176-
if ignored:
177-
ignore_clause = " AND NONE(rel IN r WHERE type(rel) IN $ignored_rels)"
178-
params["ignored_rels"] = ignored
179208

180209
cypher = (
181-
f"MATCH (n)-[r*1..{depth}]->(m) "
182-
f"WHERE n.id IN $ids{ignore_clause} "
183-
f"RETURN n, r, m, n.id AS _src_id, m.id AS _dst_id "
184-
f"LIMIT {limit}"
210+
f"MATCH (n)-[r:{rel_filter}]->(m) "
211+
f"WHERE n.id IN $ids "
212+
f"RETURN n, r.__type__ AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
213+
f"LIMIT {safe_limit}"
185214
)
186215
result = self._client.cypher(cypher, params=params)
187216

@@ -191,13 +220,7 @@ def get_rel_map(
191220
dst_data = row.get("m", {})
192221
src_id = str(row.get("_src_id", ""))
193222
dst_id = str(row.get("_dst_id", ""))
194-
# Variable-length path [r*1..N] returns a list of relationship dicts.
195-
rels = row.get("r", [])
196-
if isinstance(rels, list) and rels:
197-
first_rel = rels[0]
198-
rel_label = first_rel.get("type", "RELATED") if isinstance(first_rel, dict) else str(first_rel)
199-
else:
200-
rel_label = "RELATED"
223+
rel_label = str(row.get("_rel_type") or "RELATED")
201224
src = _node_result_to_labelled(src_id, src_data)
202225
dst = _node_result_to_labelled(dst_id, dst_data)
203226
rel = Relation(label=rel_label, source_id=src_id, target_id=dst_id)
@@ -217,18 +240,29 @@ def upsert_relations(self, relations: list[Relation]) -> None:
217240
"""Upsert relationships into the graph."""
218241
for rel in relations:
219242
props = rel.properties or {}
220-
cypher = (
221-
f"MATCH (src {{id: $src_id}}), (dst {{id: $dst_id}}) "
222-
f"MERGE (src)-[r:{_cypher_ident(rel.label)}]->(dst) SET r += $props"
223-
)
224-
self._client.cypher(
225-
cypher,
226-
params={
227-
"src_id": rel.source_id,
228-
"dst_id": rel.target_id,
229-
"props": props,
230-
},
231-
)
243+
label = _cypher_ident(rel.label)
244+
# CoordiNode does not yet support MERGE for edge patterns; use CREATE.
245+
# A WHERE NOT (src)-[:TYPE]->(dst) guard was tested but returns 0
246+
# rows silently in CoordiNode, making all CREATE statements no-ops.
247+
# Until server-side MERGE or pattern predicates are supported,
248+
# repeated calls will create duplicate edges.
249+
# SET r += $props is skipped when props is empty — SET r += {} is
250+
# not supported by all server versions.
251+
if props:
252+
cypher = (
253+
f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) "
254+
f"CREATE (src)-[r:{label}]->(dst) SET r += $props"
255+
)
256+
self._client.cypher(
257+
cypher,
258+
params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props},
259+
)
260+
else:
261+
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) CREATE (src)-[r:{label}]->(dst)"
262+
self._client.cypher(
263+
cypher,
264+
params={"src_id": rel.source_id, "dst_id": rel.target_id},
265+
)
232266

233267
def delete(
234268
self,
@@ -342,3 +376,29 @@ def _node_label(node: LabelledNode) -> str:
342376
if isinstance(node, EntityNode):
343377
return node.label or "Entity"
344378
return "Node"
379+
380+
381+
def _parse_edge_types_from_schema(schema_text: str) -> list[str]:
382+
"""Extract edge type names from CoordiNode schema text.
383+
384+
Parses the "Edge types:" section produced by ``get_schema_text()``.
385+
"""
386+
edge_types: list[str] = []
387+
lines = iter(schema_text.splitlines())
388+
389+
# Advance to the "Edge types:" header.
390+
for line in lines:
391+
if line.strip().lower().startswith("edge types"):
392+
break
393+
394+
# Collect bullet items until the first blank line.
395+
for line in lines:
396+
stripped = line.strip()
397+
if not stripped:
398+
break
399+
if stripped.startswith(("-", "*")):
400+
name = stripped.lstrip("-* ").split("(")[0].strip()
401+
if name:
402+
edge_types.append(name)
403+
404+
return edge_types

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ coordinode = { workspace = true }
2828
dev = [
2929
"build>=1.2",
3030
"grpcio-tools>=1.60",
31+
"langchain-community>=0.3",
32+
"langchain-core>=0.3",
33+
"llama-index-core>=0.12",
3134
"pytest>=8",
3235
"pytest-asyncio>=0.23",
3336
"pytest-timeout>=2",

tests/integration/adapters/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)