1+ using System ;
12using System . Collections ;
23using System . Collections . Concurrent ;
34using System . Collections . Generic ;
@@ -18,6 +19,7 @@ public class PostgresGraphStore : IGraphStore, IAsyncDisposable
1819{
1920 private readonly string _connectionString ;
2021 private readonly string _graphName ;
22+ private readonly string _graphNameLiteral ;
2123 private readonly bool _autoCreateIndexes ;
2224 private readonly ILogger < PostgresGraphStore > _logger ;
2325 private readonly ConcurrentDictionary < string , bool > _indexedLabels = new ( StringComparer . OrdinalIgnoreCase ) ;
@@ -40,6 +42,7 @@ public PostgresGraphStore(PostgresGraphStoreOptions options, ILogger<PostgresGra
4042
4143 _connectionString = options . ConnectionString ?? throw new ArgumentNullException ( nameof ( options . ConnectionString ) ) ;
4244 _graphName = options . GraphName ?? throw new ArgumentNullException ( nameof ( options . GraphName ) ) ;
45+ _graphNameLiteral = BuildGraphNameLiteral ( _graphName ) ;
4346 _autoCreateIndexes = options . AutoCreateIndexes ;
4447 _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
4548 _vertexPropertyIndexConfig = NormalizeIndexMap ( options . VertexPropertyIndexes ) ;
@@ -54,16 +57,20 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default)
5457 await ExecuteNonQueryAsync ( connection , "CREATE EXTENSION IF NOT EXISTS age;" , cancellationToken ) ;
5558 await ApplySessionConfigurationAsync ( connection , cancellationToken ) ;
5659
57- await using var ensureGraph = connection . CreateCommand ( ) ;
58- ensureGraph . CommandText = @"
59- DO $$
60- BEGIN
61- IF NOT EXISTS (SELECT 1 FROM ag_catalog.ag_graph WHERE name = @graphName) THEN
62- PERFORM ag_catalog.create_graph(@graphName);
63- END IF;
64- END $$;" ;
65- ensureGraph . Parameters . AddWithValue ( "graphName" , _graphName ) ;
66- await ensureGraph . ExecuteNonQueryAsync ( cancellationToken ) ;
60+ await using ( var existsCommand = connection . CreateCommand ( ) )
61+ {
62+ existsCommand . CommandText = "SELECT 1 FROM ag_catalog.ag_graph WHERE name = @graphName LIMIT 1;" ;
63+ existsCommand . Parameters . AddWithValue ( "graphName" , _graphName ) ;
64+ var exists = await existsCommand . ExecuteScalarAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
65+
66+ if ( exists is null )
67+ {
68+ await using var createGraph = connection . CreateCommand ( ) ;
69+ createGraph . CommandText = "SELECT ag_catalog.create_graph(@graphName);" ;
70+ createGraph . Parameters . AddWithValue ( "graphName" , _graphName ) ;
71+ await createGraph . ExecuteNonQueryAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
72+ }
73+ }
6774
6875 _logger . LogInformation ( "Apache AGE graph {GraphName} initialised." , _graphName ) ;
6976 }
@@ -74,13 +81,28 @@ public async Task UpsertNodeAsync(string id, string label, IReadOnlyDictionary<s
7481 ArgumentException . ThrowIfNullOrWhiteSpace ( label ) ;
7582 ArgumentNullException . ThrowIfNull ( properties ) ;
7683
77- var query = $ "MERGE (n:{ EscapeLabel ( label ) } {{ id: ${ CypherParameterNames . NodeId } }}) SET n += ${ CypherParameterNames . Properties } RETURN n";
7884 var parameters = new Dictionary < string , object ? >
7985 {
80- [ CypherParameterNames . NodeId ] = id ,
81- [ CypherParameterNames . Properties ] = ConvertProperties ( properties )
86+ [ CypherParameterNames . NodeId ] = id
8287 } ;
8388
89+ var propertyAssignments = BuildPropertyAssignments ( "n" , ConvertProperties ( properties ) , parameters , "node_prop" ) ;
90+
91+ var queryBuilder = new StringBuilder ( ) ;
92+ queryBuilder . Append ( $ "MERGE (n:{ EscapeLabel ( label ) } {{ id: ${ CypherParameterNames . NodeId } }})") ;
93+
94+ if ( propertyAssignments . Count > 0 )
95+ {
96+ queryBuilder . AppendLine ( ) ;
97+ queryBuilder . Append ( "SET " ) ;
98+ queryBuilder . Append ( string . Join ( ", " , propertyAssignments ) ) ;
99+ }
100+
101+ queryBuilder . AppendLine ( ) ;
102+ queryBuilder . Append ( "RETURN n" ) ;
103+
104+ var query = queryBuilder . ToString ( ) ;
105+
84106 await EnsureLabelIndexesAsync ( label , isEdge : false , cancellationToken ) . ConfigureAwait ( false ) ;
85107 await ExecuteCypherAsync ( query , parameters , cancellationToken ) . ConfigureAwait ( false ) ;
86108 _logger . LogDebug ( "Upserted node {Id} ({Label}) into graph {GraphName}." , id , label , _graphName ) ;
@@ -93,14 +115,31 @@ public async Task UpsertRelationshipAsync(string sourceId, string targetId, stri
93115 ArgumentException . ThrowIfNullOrWhiteSpace ( type ) ;
94116 ArgumentNullException . ThrowIfNull ( properties ) ;
95117
96- var query = $ "MATCH (source {{ id: ${ CypherParameterNames . SourceId } }}), (target {{ id: ${ CypherParameterNames . TargetId } }}) MERGE (source)-[rel:{ EscapeLabel ( type ) } ]->(target) SET rel += ${ CypherParameterNames . Properties } RETURN rel";
97118 var parameters = new Dictionary < string , object ? >
98119 {
99120 [ CypherParameterNames . SourceId ] = sourceId ,
100- [ CypherParameterNames . TargetId ] = targetId ,
101- [ CypherParameterNames . Properties ] = ConvertProperties ( properties )
121+ [ CypherParameterNames . TargetId ] = targetId
102122 } ;
103123
124+ var propertyAssignments = BuildPropertyAssignments ( "rel" , ConvertProperties ( properties ) , parameters , "rel_prop" ) ;
125+
126+ var queryBuilder = new StringBuilder ( ) ;
127+ queryBuilder . Append ( $ "MATCH (source {{ id: ${ CypherParameterNames . SourceId } }}), (target {{ id: ${ CypherParameterNames . TargetId } }})") ;
128+ queryBuilder . AppendLine ( ) ;
129+ queryBuilder . Append ( $ "MERGE (source)-[rel:{ EscapeLabel ( type ) } ]->(target)") ;
130+
131+ if ( propertyAssignments . Count > 0 )
132+ {
133+ queryBuilder . AppendLine ( ) ;
134+ queryBuilder . Append ( "SET " ) ;
135+ queryBuilder . Append ( string . Join ( ", " , propertyAssignments ) ) ;
136+ }
137+
138+ queryBuilder . AppendLine ( ) ;
139+ queryBuilder . Append ( "RETURN rel" ) ;
140+
141+ var query = queryBuilder . ToString ( ) ;
142+
104143 await EnsureLabelIndexesAsync ( type , isEdge : true , cancellationToken ) . ConfigureAwait ( false ) ;
105144 await ExecuteCypherAsync ( query , parameters , cancellationToken ) . ConfigureAwait ( false ) ;
106145 _logger . LogDebug ( "Upserted relationship {Source}-[{Type}]->{Target} in graph {GraphName}." , sourceId , type , targetId , _graphName ) ;
@@ -157,25 +196,21 @@ async IAsyncEnumerable<GraphRelationship> FetchAsync(string nodeId, [EnumeratorC
157196 {
158197 await using var connection = await OpenConnectionAsync ( token ) . ConfigureAwait ( false ) ;
159198 await using var command = connection . CreateCommand ( ) ;
160- command . CommandText = @"
161- SELECT
162- source_id::text,
163- target_id::text,
164- edge_type::text,
165- edge_props::text
166- FROM cypher(@graph_name, $$
167- MATCH (source { id: $node_id })-[rel]->(target)
168- RETURN source.id AS source_id, target.id AS target_id, type(rel) AS edge_type, properties(rel) AS edge_props
169- $$, @params) AS (source_id agtype, target_id agtype, edge_type agtype, edge_props agtype);
170- " ;
171- command . Parameters . AddWithValue ( CypherParameterNames . GraphName , _graphName ) ;
172- command . Parameters . Add ( new NpgsqlParameter ( CypherParameterNames . Parameters , NpgsqlDbType . Jsonb )
199+ command . CommandText = string . Concat (
200+ "SELECT " ,
201+ "\n source_id::text," ,
202+ "\n target_id::text," ,
203+ "\n edge_type::text," ,
204+ "\n edge_props::text" ,
205+ "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", $$" ,
206+ "\n MATCH (source { id: $node_id })-[rel]->(target)" ,
207+ "\n RETURN source.id AS source_id, target.id AS target_id, type(rel) AS edge_type, properties(rel) AS edge_props" ,
208+ "\n $$, @params::ag_catalog.agtype) AS (source_id agtype, target_id agtype, edge_type agtype, edge_props agtype);" ) ;
209+ var payload = JsonSerializer . Serialize ( new Dictionary < string , object ? >
173210 {
174- Value = JsonSerializer . Serialize ( new Dictionary < string , object ? >
175- {
176- [ CypherParameterNames . NodeId ] = nodeId
177- } )
211+ [ CypherParameterNames . NodeId ] = nodeId
178212 } ) ;
213+ command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , payload ) ) ;
179214
180215 await using var reader = await command . ExecuteReaderAsync ( token ) . ConfigureAwait ( false ) ;
181216 while ( await reader . ReadAsync ( token ) . ConfigureAwait ( false ) )
@@ -195,16 +230,11 @@ protected virtual async Task ExecuteCypherAsync(string query, IReadOnlyDictionar
195230 {
196231 await using var connection = await OpenConnectionAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
197232 await using var command = connection . CreateCommand ( ) ;
198- command . CommandText = @"
199- SELECT *
200- FROM cypher(@graph_name, @query, @params) AS (result agtype);
201- " ;
202- command . Parameters . AddWithValue ( CypherParameterNames . GraphName , _graphName ) ;
203- command . Parameters . AddWithValue ( CypherParameterNames . Query , query ) ;
204- command . Parameters . Add ( new NpgsqlParameter ( CypherParameterNames . Parameters , NpgsqlDbType . Jsonb )
205- {
206- Value = SerializeParameters ( parameters )
207- } ) ;
233+ var queryLiteral = WrapInDollarQuotes ( query ) ;
234+ command . CommandText = string . Concat (
235+ "SELECT *" ,
236+ "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", " , queryLiteral , "::cstring, @params::ag_catalog.agtype) AS (result agtype);" ) ;
237+ command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , SerializeParameters ( parameters ) ) ) ;
208238 await command . ExecuteNonQueryAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
209239 }
210240
@@ -416,15 +446,11 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteExplainAsync(string e
416446 {
417447 await using var connection = await OpenConnectionAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
418448 await using var command = connection . CreateCommand ( ) ;
419- command . CommandText = @"
420- SELECT plan
421- FROM cypher(@graph_name, @query, @params) AS (plan text);" ;
422- command . Parameters . AddWithValue ( CypherParameterNames . GraphName , _graphName ) ;
423- command . Parameters . AddWithValue ( CypherParameterNames . Query , explainQuery ) ;
424- command . Parameters . Add ( new NpgsqlParameter ( CypherParameterNames . Parameters , NpgsqlDbType . Jsonb )
425- {
426- Value = parameterJson
427- } ) ;
449+ var explainLiteral = WrapInDollarQuotes ( explainQuery ) ;
450+ command . CommandText = string . Concat (
451+ "SELECT plan" ,
452+ "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", " , explainLiteral , "::cstring, @params::ag_catalog.agtype) AS (plan text);" ) ;
453+ command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , parameterJson ) ) ;
428454
429455 var plan = new List < string > ( ) ;
430456 await using var reader = await command . ExecuteReaderAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
@@ -436,6 +462,97 @@ SELECT plan
436462 return plan ;
437463 }
438464
465+ private static string BuildGraphNameLiteral ( string graphName )
466+ {
467+ if ( string . IsNullOrWhiteSpace ( graphName ) )
468+ {
469+ throw new ArgumentException ( "Graph name cannot be null or whitespace." , nameof ( graphName ) ) ;
470+ }
471+
472+ foreach ( var ch in graphName )
473+ {
474+ if ( ! char . IsLetterOrDigit ( ch ) && ch != '_' )
475+ {
476+ throw new ArgumentException ( $ "Invalid character '{ ch } ' in graph name '{ graphName } '.", nameof ( graphName ) ) ;
477+ }
478+ }
479+
480+ return $ "'{ graphName } '::name";
481+ }
482+
483+ private static NpgsqlParameter CreateAgTypeParameter ( string name , string jsonPayload )
484+ {
485+ if ( jsonPayload is null )
486+ {
487+ throw new ArgumentNullException ( nameof ( jsonPayload ) ) ;
488+ }
489+
490+ return new NpgsqlParameter ( name , NpgsqlDbType . Unknown )
491+ {
492+ DataTypeName = "ag_catalog.agtype" ,
493+ Value = jsonPayload
494+ } ;
495+ }
496+
497+ private static IReadOnlyList < string > BuildPropertyAssignments ( string alias , IDictionary < string , object ? > properties , IDictionary < string , object ? > parameters , string parameterPrefix )
498+ {
499+ if ( properties . Count == 0 )
500+ {
501+ return Array . Empty < string > ( ) ;
502+ }
503+
504+ var assignments = new List < string > ( properties . Count ) ;
505+ var usedParameterNames = new HashSet < string > ( StringComparer . OrdinalIgnoreCase ) ;
506+
507+ foreach ( var ( key , value ) in properties )
508+ {
509+ var escapedProperty = EscapePropertyName ( key ) ;
510+ var parameterName = $ "{ parameterPrefix } _{ escapedProperty } ";
511+
512+ var suffix = 0 ;
513+ while ( ! usedParameterNames . Add ( parameterName ) || parameters . ContainsKey ( parameterName ) )
514+ {
515+ parameterName = $ "{ parameterPrefix } _{ escapedProperty } _{ ++ suffix } ";
516+ }
517+
518+ parameters [ parameterName ] = value ;
519+ assignments . Add ( $ "{ alias } .{ escapedProperty } = ${ parameterName } ") ;
520+ }
521+
522+ return assignments ;
523+ }
524+
525+ private static string EscapePropertyName ( string propertyName )
526+ {
527+ if ( string . IsNullOrWhiteSpace ( propertyName ) )
528+ {
529+ throw new ArgumentException ( "Property name cannot be null or whitespace." , nameof ( propertyName ) ) ;
530+ }
531+
532+ foreach ( var ch in propertyName )
533+ {
534+ if ( ! char . IsLetterOrDigit ( ch ) && ch != '_' )
535+ {
536+ throw new ArgumentException ( $ "Invalid character '{ ch } ' in property name '{ propertyName } '.", nameof ( propertyName ) ) ;
537+ }
538+ }
539+
540+ return propertyName ;
541+ }
542+
543+ private static string WrapInDollarQuotes ( string value )
544+ {
545+ ArgumentNullException . ThrowIfNull ( value ) ;
546+
547+ var delimiter = "$graphrag$" ;
548+ while ( value . Contains ( delimiter , StringComparison . Ordinal ) )
549+ {
550+ delimiter = $ "${ Guid . NewGuid ( ) : N} $";
551+ }
552+
553+ return $ "{ delimiter } { value } { delimiter } ";
554+ }
555+
439556 private static IEnumerable < string > BuildDefaultIndexCommands ( string relation , bool isEdge )
440557 {
441558 var commands = new List < string >
0 commit comments