1212from graphdatascience .server_version .server_version import ServerVersion
1313
1414
15- class CypherAggregationApi :
15+ class CypherProjectionApi :
1616 RELATIONSHIP_TYPE = "relationshipType"
1717 SOURCE_NODE_LABEL = "sourceNodeLabels"
1818 TARGET_NODE_LABEL = "targetNodeLabels"
@@ -76,7 +76,7 @@ def run(self, node_dfs: List[DataFrame], relationship_dfs: List[DataFrame]) -> N
7676
7777 # New Cypher propjection supports concurrency since 2.3.0
7878 if self ._server_version >= ServerVersion (2 , 3 , 0 ):
79- self .CypherAggregationRunner (
79+ self .CypherProjectionRunner (
8080 self ._query_runner ,
8181 self ._graph_name ,
8282 self ._concurrency ,
@@ -98,7 +98,9 @@ def graph_construct_error_multidf(element: str) -> str:
9898 node_df = node_dfs [0 ]
9999 rel_df = relationship_dfs [0 ]
100100
101- self .CyperProjectionRunner (self ._query_runner , self ._graph_name , self ._concurrency ).run (node_df , rel_df )
101+ self .LegacyCypherProjectionRunner (self ._query_runner , self ._graph_name , self ._concurrency ).run (
102+ node_df , rel_df
103+ )
102104
103105 def _should_warn_about_arrow_missing (self ) -> bool :
104106 try :
@@ -118,7 +120,7 @@ def _should_warn_about_arrow_missing(self) -> bool:
118120
119121 return should_warn
120122
121- class CypherAggregationRunner :
123+ class CypherProjectionRunner :
122124 _BIT_COL_SUFFIX = "_is_present" + str (uuid4 ())
123125
124126 def __init__ (
@@ -149,9 +151,9 @@ def run(self, node_dfs: List[DataFrame], relationship_dfs: List[DataFrame]) -> N
149151 is_cypher_projection_v2 = self ._server_version >= ServerVersion (2 , 4 , 0 )
150152
151153 properties_key = (
152- CypherAggregationApi .REL_PROPERTIES_NEW
154+ CypherProjectionApi .REL_PROPERTIES_NEW
153155 if is_cypher_projection_v2
154- else CypherAggregationApi .REL_PROPERTIES
156+ else CypherProjectionApi .REL_PROPERTIES
155157 )
156158
157159 aligned_node_dfs = self .adjust_node_dfs (node_dfs , graph_schema , properties_key )
@@ -168,12 +170,12 @@ def run(self, node_dfs: List[DataFrame], relationship_dfs: List[DataFrame]) -> N
168170
169171 property_clauses : List [str ] = [
170172 self .check_value_clause (combined_cols , prop_col )
171- for prop_col in [CypherAggregationApi .SOURCE_NODE_PROPERTIES , properties_key ]
173+ for prop_col in [CypherProjectionApi .SOURCE_NODE_PROPERTIES , properties_key ]
172174 ]
173175
174176 source_node_labels_clause = (
175- self .check_value_clause (combined_cols , CypherAggregationApi .SOURCE_NODE_LABEL )
176- if CypherAggregationApi .SOURCE_NODE_LABEL in combined_cols
177+ self .check_value_clause (combined_cols , CypherProjectionApi .SOURCE_NODE_LABEL )
178+ if CypherProjectionApi .SOURCE_NODE_LABEL in combined_cols
177179 else ""
178180 )
179181 rel_type_clause = (
@@ -255,23 +257,23 @@ def adjust_node_dfs(
255257 f"targetNodeId{ self ._BIT_COL_SUFFIX } " : False ,
256258 }
257259
258- if CypherAggregationApi .RELATIONSHIP_TYPE in schema .all_rels .all :
259- node_dict [CypherAggregationApi .RELATIONSHIP_TYPE ] = None
260- node_dict [CypherAggregationApi .RELATIONSHIP_TYPE + self ._BIT_COL_SUFFIX ] = False
260+ if CypherProjectionApi .RELATIONSHIP_TYPE in schema .all_rels .all :
261+ node_dict [CypherProjectionApi .RELATIONSHIP_TYPE ] = None
262+ node_dict [CypherProjectionApi .RELATIONSHIP_TYPE + self ._BIT_COL_SUFFIX ] = False
261263
262264 if "labels" in schema .nodes_per_df [i ].all :
263- node_dict [CypherAggregationApi .SOURCE_NODE_LABEL + self ._BIT_COL_SUFFIX ] = True
264- node_dict [CypherAggregationApi .SOURCE_NODE_LABEL ] = df ["labels" ]
265+ node_dict [CypherProjectionApi .SOURCE_NODE_LABEL + self ._BIT_COL_SUFFIX ] = True
266+ node_dict [CypherProjectionApi .SOURCE_NODE_LABEL ] = df ["labels" ]
265267 elif "labels" in schema .all_nodes .all :
266- node_dict [CypherAggregationApi .SOURCE_NODE_LABEL + self ._BIT_COL_SUFFIX ] = False
267- node_dict [CypherAggregationApi .SOURCE_NODE_LABEL ] = ""
268+ node_dict [CypherProjectionApi .SOURCE_NODE_LABEL + self ._BIT_COL_SUFFIX ] = False
269+ node_dict [CypherProjectionApi .SOURCE_NODE_LABEL ] = ""
268270
269271 def collect_to_dict (row : Dict [str , Any ]) -> Dict [str , Any ]:
270272 return {column : row [column ] for column in schema .nodes_per_df [i ].properties }
271273
272274 node_dict_df = DataFrame (node_dict )
273- node_dict_df [CypherAggregationApi .SOURCE_NODE_PROPERTIES ] = df .apply (collect_to_dict , axis = 1 )
274- node_dict_df [CypherAggregationApi .SOURCE_NODE_PROPERTIES + self ._BIT_COL_SUFFIX ] = True
275+ node_dict_df [CypherProjectionApi .SOURCE_NODE_PROPERTIES ] = df .apply (collect_to_dict , axis = 1 )
276+ node_dict_df [CypherProjectionApi .SOURCE_NODE_PROPERTIES + self ._BIT_COL_SUFFIX ] = True
275277 node_dict_df [properties_key ] = None
276278 node_dict_df [properties_key + self ._BIT_COL_SUFFIX ] = False
277279
@@ -291,25 +293,25 @@ def adjust_rel_dfs(
291293 f"targetNodeId{ self ._BIT_COL_SUFFIX } " : True ,
292294 }
293295
294- if CypherAggregationApi .RELATIONSHIP_TYPE in schema .rels_per_df [i ].all :
295- rel_dict [CypherAggregationApi .RELATIONSHIP_TYPE + self ._BIT_COL_SUFFIX ] = True
296- rel_dict [CypherAggregationApi .RELATIONSHIP_TYPE ] = df [CypherAggregationApi .RELATIONSHIP_TYPE ]
297- elif CypherAggregationApi .RELATIONSHIP_TYPE in schema .all_rels .all :
298- rel_dict [CypherAggregationApi .RELATIONSHIP_TYPE + self ._BIT_COL_SUFFIX ] = False
299- rel_dict [CypherAggregationApi .RELATIONSHIP_TYPE ] = None
296+ if CypherProjectionApi .RELATIONSHIP_TYPE in schema .rels_per_df [i ].all :
297+ rel_dict [CypherProjectionApi .RELATIONSHIP_TYPE + self ._BIT_COL_SUFFIX ] = True
298+ rel_dict [CypherProjectionApi .RELATIONSHIP_TYPE ] = df [CypherProjectionApi .RELATIONSHIP_TYPE ]
299+ elif CypherProjectionApi .RELATIONSHIP_TYPE in schema .all_rels .all :
300+ rel_dict [CypherProjectionApi .RELATIONSHIP_TYPE + self ._BIT_COL_SUFFIX ] = False
301+ rel_dict [CypherProjectionApi .RELATIONSHIP_TYPE ] = None
300302
301303 if "labels" in schema .all_nodes .all :
302- rel_dict [CypherAggregationApi .SOURCE_NODE_LABEL ] = None
303- rel_dict [CypherAggregationApi .SOURCE_NODE_LABEL + self ._BIT_COL_SUFFIX ] = False
304+ rel_dict [CypherProjectionApi .SOURCE_NODE_LABEL ] = None
305+ rel_dict [CypherProjectionApi .SOURCE_NODE_LABEL + self ._BIT_COL_SUFFIX ] = False
304306
305307 def collect_to_dict (row : Dict [str , Any ]) -> Dict [str , Any ]:
306308 return {column : row [column ] for column in schema .rels_per_df [i ].properties }
307309
308310 rel_dict_df = DataFrame (rel_dict )
309311 rel_dict_df [properties_key ] = df .apply (collect_to_dict , axis = 1 )
310312 rel_dict_df [properties_key + self ._BIT_COL_SUFFIX ] = True
311- rel_dict_df [CypherAggregationApi .SOURCE_NODE_PROPERTIES ] = None
312- rel_dict_df [CypherAggregationApi .SOURCE_NODE_PROPERTIES + self ._BIT_COL_SUFFIX ] = False
313+ rel_dict_df [CypherProjectionApi .SOURCE_NODE_PROPERTIES ] = None
314+ rel_dict_df [CypherProjectionApi .SOURCE_NODE_PROPERTIES + self ._BIT_COL_SUFFIX ] = False
313315
314316 adjusted_dfs .append (rel_dict_df )
315317
@@ -321,21 +323,21 @@ def nodes_config_part(self, node_cols: List[EntityColumnSchema], is_cypher_proje
321323 nodes_config_fields : List [str ] = []
322324 if any (x .has_labels () for x in node_cols ):
323325 nodes_config_fields .append (
324- f"{ CypherAggregationApi .SOURCE_NODE_LABEL } : { CypherAggregationApi .SOURCE_NODE_LABEL } "
326+ f"{ CypherProjectionApi .SOURCE_NODE_LABEL } : { CypherProjectionApi .SOURCE_NODE_LABEL } "
325327 )
326328 if is_cypher_projection_v2 :
327329 nodes_config_fields .append (
328- f"{ CypherAggregationApi .TARGET_NODE_LABEL } : NULL" ,
330+ f"{ CypherProjectionApi .TARGET_NODE_LABEL } : NULL" ,
329331 )
330332
331333 # as we first list all nodes at the top of the df, we don't need to lookup properties for the target node
332334 if any (x .has_properties () for x in node_cols ):
333335 nodes_config_fields .append (
334- f"{ CypherAggregationApi .SOURCE_NODE_PROPERTIES } : { CypherAggregationApi .SOURCE_NODE_PROPERTIES } "
336+ f"{ CypherProjectionApi .SOURCE_NODE_PROPERTIES } : { CypherProjectionApi .SOURCE_NODE_PROPERTIES } "
335337 )
336338 if is_cypher_projection_v2 :
337339 nodes_config_fields .append (
338- f"{ CypherAggregationApi .TARGET_NODE_PROPERTIES } : NULL" ,
340+ f"{ CypherProjectionApi .TARGET_NODE_PROPERTIES } : NULL" ,
339341 )
340342
341343 return nodes_config_fields
@@ -345,15 +347,15 @@ def rels_config_part(self, rel_cols: List[EntityColumnSchema], properties_key: s
345347
346348 if any (x .has_rel_type () for x in rel_cols ):
347349 rels_config_fields .append (
348- f"{ CypherAggregationApi .RELATIONSHIP_TYPE } : { CypherAggregationApi .RELATIONSHIP_TYPE } "
350+ f"{ CypherProjectionApi .RELATIONSHIP_TYPE } : { CypherProjectionApi .RELATIONSHIP_TYPE } "
349351 )
350352
351353 if any (x .has_properties () for x in rel_cols ):
352354 rels_config_fields .append (f"{ properties_key } : { properties_key } " )
353355
354356 return rels_config_fields
355357
356- class CyperProjectionRunner :
358+ class LegacyCypherProjectionRunner :
357359 def __init__ (self , query_runner : QueryRunner , graph_name : str , concurrency : int ):
358360 self ._query_runner = query_runner
359361 self ._concurrency = concurrency
0 commit comments