Skip to content

Commit 8b6370e

Browse files
committed
Support longest path
1 parent e42cc6c commit 8b6370e

File tree

13 files changed

+334
-5
lines changed

13 files changed

+334
-5
lines changed

graphdatascience/plugin_v2_endpoints.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from graphdatascience.procedure_surface.api.node_embedding.hashgnn_endpoints import HashGNNEndpoints
3636
from graphdatascience.procedure_surface.api.node_embedding.node2vec_endpoints import Node2VecEndpoints
3737
from graphdatascience.procedure_surface.api.pathfinding.all_shortest_path_endpoints import AllShortestPathEndpoints
38+
from graphdatascience.procedure_surface.api.pathfinding.dag_endpoints import DagEndpoints
3839
from graphdatascience.procedure_surface.api.pathfinding.k_spanning_tree_endpoints import KSpanningTreeEndpoints
3940
from graphdatascience.procedure_surface.api.pathfinding.max_flow_endpoints import MaxFlowEndpoints
4041
from graphdatascience.procedure_surface.api.pathfinding.prize_steiner_tree_endpoints import PrizeSteinerTreeEndpoints
@@ -103,6 +104,7 @@
103104
from graphdatascience.procedure_surface.cypher.pathfinding.all_shortest_path_cypher_endpoints import (
104105
AllShortestPathCypherEndpoints,
105106
)
107+
from graphdatascience.procedure_surface.cypher.pathfinding.dag_cypher_endpoints import DagCypherEndpoints
106108
from graphdatascience.procedure_surface.cypher.pathfinding.k_spanning_tree_cypher_endpoints import (
107109
KSpanningTreeCypherEndpoints,
108110
)
@@ -217,6 +219,13 @@ def closeness_centrality(self) -> ClosenessEndpoints:
217219
"""
218220
return ClosenessCypherEndpoints(self._db_client)
219221

222+
@property
223+
def dag(self) -> DagEndpoints:
224+
"""
225+
Return endpoints for Directed Acyclic Graph (DAG) algorithms.
226+
"""
227+
return DagCypherEndpoints(self._db_client)
228+
220229
@property
221230
def degree_centrality(self) -> DegreeEndpoints:
222231
"""
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
5+
from graphdatascience.procedure_surface.api.pathfinding.longest_path_endpoints import LongestPathEndpoints
6+
7+
8+
class DagEndpoints(ABC):
9+
"""
10+
Container for Directed Acyclic Graph (DAG) algorithm endpoints.
11+
"""
12+
13+
@property
14+
@abstractmethod
15+
def longest_path(self) -> LongestPathEndpoints:
16+
"""Access to Longest Path algorithm endpoints for DAGs."""
17+
...
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
5+
from pandas import DataFrame
6+
7+
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
8+
from graphdatascience.procedure_surface.api.default_values import ALL_LABELS, ALL_TYPES
9+
10+
11+
class LongestPathEndpoints(ABC):
12+
@abstractmethod
13+
def stream(
14+
self,
15+
G: GraphV2,
16+
relationship_weight_property: str | None = None,
17+
relationship_types: list[str] = ALL_TYPES,
18+
node_labels: list[str] = ALL_LABELS,
19+
sudo: bool = False,
20+
log_progress: bool = True,
21+
username: str | None = None,
22+
concurrency: int | None = None,
23+
job_id: str | None = None,
24+
) -> DataFrame:
25+
"""
26+
Runs the longest path algorithm in a DAG and returns the result as a DataFrame.
27+
28+
Parameters
29+
----------
30+
G
31+
Graph object to use
32+
relationship_weight_property
33+
Name of the property to be used as weights.
34+
relationship_types
35+
Filter the graph using the given relationship types. Relationships with any of the given types will be included.
36+
node_labels
37+
Filter the graph using the given node labels. Nodes with any of the given labels will be included.
38+
sudo
39+
Disable the memory guard.
40+
log_progress
41+
Display progress logging.
42+
username
43+
As an administrator, impersonate a different user for accessing their graphs.
44+
concurrency
45+
Number of concurrent threads to use.
46+
job_id
47+
Identifier for the computation.
48+
49+
Returns
50+
-------
51+
DataFrame
52+
The longest path results as a DataFrame with columns for index, sourceNode, targetNode, totalCost, nodeIds, costs.
53+
"""
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from __future__ import annotations
2+
3+
from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
4+
from graphdatascience.procedure_surface.api.pathfinding.dag_endpoints import DagEndpoints
5+
from graphdatascience.procedure_surface.api.pathfinding.longest_path_endpoints import LongestPathEndpoints
6+
from graphdatascience.procedure_surface.arrow.pathfinding.longest_path_arrow_endpoints import (
7+
LongestPathArrowEndpoints,
8+
)
9+
10+
11+
class DagArrowEndpoints(DagEndpoints):
12+
def __init__(
13+
self,
14+
arrow_client: AuthenticatedArrowClient,
15+
show_progress: bool = False,
16+
):
17+
self._longest_path = LongestPathArrowEndpoints(arrow_client, show_progress)
18+
19+
@property
20+
def longest_path(self) -> LongestPathEndpoints:
21+
return self._longest_path
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from __future__ import annotations
2+
3+
from pandas import DataFrame
4+
5+
from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
6+
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
7+
from graphdatascience.procedure_surface.api.default_values import ALL_LABELS, ALL_TYPES
8+
from graphdatascience.procedure_surface.api.pathfinding.longest_path_endpoints import LongestPathEndpoints
9+
from graphdatascience.procedure_surface.arrow.relationship_endpoints_helper import RelationshipEndpointsHelper
10+
from graphdatascience.procedure_surface.arrow.stream_result_mapper import map_shortest_path_stream_result
11+
12+
13+
class LongestPathArrowEndpoints(LongestPathEndpoints):
14+
def __init__(
15+
self,
16+
arrow_client: AuthenticatedArrowClient,
17+
show_progress: bool = False,
18+
):
19+
self._endpoints_helper = RelationshipEndpointsHelper(
20+
arrow_client, write_back_client=None, show_progress=show_progress
21+
)
22+
23+
def stream(
24+
self,
25+
G: GraphV2,
26+
relationship_weight_property: str | None = None,
27+
relationship_types: list[str] = ALL_TYPES,
28+
node_labels: list[str] = ALL_LABELS,
29+
sudo: bool = False,
30+
log_progress: bool = True,
31+
username: str | None = None,
32+
concurrency: int | None = None,
33+
job_id: str | None = None,
34+
) -> DataFrame:
35+
config = self._endpoints_helper.create_base_config(
36+
G,
37+
relationshipWeightProperty=relationship_weight_property,
38+
relationshipTypes=relationship_types,
39+
nodeLabels=node_labels,
40+
sudo=sudo,
41+
logProgress=log_progress,
42+
username=username,
43+
concurrency=concurrency,
44+
jobId=job_id,
45+
)
46+
47+
result = self._endpoints_helper.run_job_and_stream("v2/pathfinding.longestPath", G, config)
48+
map_shortest_path_stream_result(result)
49+
50+
return result
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from __future__ import annotations
2+
3+
from graphdatascience.procedure_surface.api.pathfinding.dag_endpoints import DagEndpoints
4+
from graphdatascience.procedure_surface.api.pathfinding.longest_path_endpoints import LongestPathEndpoints
5+
from graphdatascience.procedure_surface.cypher.pathfinding.longest_path_cypher_endpoints import (
6+
LongestPathCypherEndpoints,
7+
)
8+
from graphdatascience.query_runner.query_runner import QueryRunner
9+
10+
11+
class DagCypherEndpoints(DagEndpoints):
12+
def __init__(self, query_runner: QueryRunner):
13+
self._query_runner = query_runner
14+
self._longest_path = LongestPathCypherEndpoints(query_runner)
15+
16+
@property
17+
def longest_path(self) -> LongestPathEndpoints:
18+
return self._longest_path
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from __future__ import annotations
2+
3+
from pandas import DataFrame
4+
5+
from graphdatascience.call_parameters import CallParameters
6+
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
7+
from graphdatascience.procedure_surface.api.default_values import ALL_LABELS, ALL_TYPES
8+
from graphdatascience.procedure_surface.api.pathfinding.longest_path_endpoints import LongestPathEndpoints
9+
from graphdatascience.procedure_surface.utils.config_converter import ConfigConverter
10+
from graphdatascience.query_runner.query_runner import QueryRunner
11+
12+
13+
class LongestPathCypherEndpoints(LongestPathEndpoints):
14+
def __init__(self, query_runner: QueryRunner):
15+
self._query_runner = query_runner
16+
17+
def stream(
18+
self,
19+
G: GraphV2,
20+
relationship_weight_property: str | None = None,
21+
relationship_types: list[str] = ALL_TYPES,
22+
node_labels: list[str] = ALL_LABELS,
23+
sudo: bool = False,
24+
log_progress: bool = True,
25+
username: str | None = None,
26+
concurrency: int | None = None,
27+
job_id: str | None = None,
28+
) -> DataFrame:
29+
config = ConfigConverter.convert_to_gds_config(
30+
relationshipWeightProperty=relationship_weight_property,
31+
relationshipTypes=relationship_types,
32+
nodeLabels=node_labels,
33+
sudo=sudo,
34+
logProgress=log_progress,
35+
username=username,
36+
concurrency=concurrency,
37+
jobId=job_id,
38+
)
39+
params = CallParameters(graph_name=G.name(), config=config)
40+
params.ensure_job_id_in_config()
41+
42+
return self._query_runner.call_procedure(
43+
"gds.dag.longestPath.stream",
44+
params=params,
45+
logging=log_progress,
46+
yields=["index", "sourceNode", "targetNode", "totalCost", "nodeIds", "costs"], # skip path column
47+
)

graphdatascience/session/session_v2_endpoints.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from graphdatascience.procedure_surface.api.node_embedding.hashgnn_endpoints import HashGNNEndpoints
3636
from graphdatascience.procedure_surface.api.node_embedding.node2vec_endpoints import Node2VecEndpoints
3737
from graphdatascience.procedure_surface.api.pathfinding.all_shortest_path_endpoints import AllShortestPathEndpoints
38+
from graphdatascience.procedure_surface.api.pathfinding.dag_endpoints import DagEndpoints
3839
from graphdatascience.procedure_surface.api.pathfinding.k_spanning_tree_endpoints import KSpanningTreeEndpoints
3940
from graphdatascience.procedure_surface.api.pathfinding.max_flow_endpoints import MaxFlowEndpoints
4041
from graphdatascience.procedure_surface.api.pathfinding.prize_steiner_tree_endpoints import PrizeSteinerTreeEndpoints
@@ -102,6 +103,7 @@
102103
from graphdatascience.procedure_surface.arrow.pathfinding.all_shortest_path_arrow_endpoints import (
103104
AllShortestPathArrowEndpoints,
104105
)
106+
from graphdatascience.procedure_surface.arrow.pathfinding.dag_arrow_endpoints import DagArrowEndpoints
105107
from graphdatascience.procedure_surface.arrow.pathfinding.k_spanning_tree_arrow_endpoints import (
106108
KSpanningTreeArrowEndpoints,
107109
)
@@ -230,6 +232,13 @@ def closeness_centrality(self) -> ClosenessEndpoints:
230232
"""
231233
return ClosenessArrowEndpoints(self._arrow_client, self._write_back_client, show_progress=self._show_progress)
232234

235+
@property
236+
def dag(self) -> DagEndpoints:
237+
"""
238+
Return endpoints for Directed Acyclic Graph (DAG) algorithms.
239+
"""
240+
return DagArrowEndpoints(self._arrow_client, show_progress=self._show_progress)
241+
233242
@property
234243
def degree_centrality(self) -> DegreeEndpoints:
235244
"""
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from typing import Generator
2+
3+
import pytest
4+
5+
from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
6+
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
7+
from graphdatascience.procedure_surface.arrow.pathfinding.longest_path_arrow_endpoints import LongestPathArrowEndpoints
8+
from graphdatascience.tests.integrationV2.procedure_surface.arrow.graph_creation_helper import create_graph
9+
10+
# Create a DAG (Directed Acyclic Graph) for testing longest path
11+
dag_graph = """
12+
CREATE
13+
(a: Node {id: 0}),
14+
(b: Node {id: 1}),
15+
(c: Node {id: 2}),
16+
(d: Node {id: 3}),
17+
(e: Node {id: 4}),
18+
(f: Node {id: 5}),
19+
(a)-[:LINK {cost: 1.0}]->(b),
20+
(a)-[:LINK {cost: 2.0}]->(c),
21+
(b)-[:LINK {cost: 3.0}]->(d),
22+
(c)-[:LINK {cost: 2.0}]->(d),
23+
(d)-[:LINK {cost: 1.0}]->(e),
24+
(c)-[:LINK {cost: 5.0}]->(f),
25+
(f)-[:LINK {cost: 1.0}]->(e)
26+
"""
27+
28+
29+
@pytest.fixture
30+
def sample_dag(arrow_client: AuthenticatedArrowClient) -> Generator[GraphV2, None, None]:
31+
with create_graph(arrow_client, "dag", dag_graph) as G:
32+
yield G
33+
34+
35+
@pytest.fixture
36+
def longest_path_endpoints(
37+
arrow_client: AuthenticatedArrowClient,
38+
) -> Generator[LongestPathArrowEndpoints, None, None]:
39+
yield LongestPathArrowEndpoints(arrow_client)
40+
41+
42+
def test_longest_path_stream(longest_path_endpoints: LongestPathArrowEndpoints, sample_dag: GraphV2) -> None:
43+
result_df = longest_path_endpoints.stream(
44+
G=sample_dag,
45+
relationship_weight_property="cost",
46+
)
47+
48+
assert len(result_df) == 6
49+
assert {"index", "sourceNode", "targetNode", "totalCost", "nodeIds", "costs"} == set(result_df.columns)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from typing import Generator
2+
3+
import pytest
4+
5+
from graphdatascience.procedure_surface.api.catalog.graph_api import GraphV2
6+
from graphdatascience.procedure_surface.cypher.pathfinding.longest_path_cypher_endpoints import (
7+
LongestPathCypherEndpoints,
8+
)
9+
from graphdatascience.query_runner.query_runner import QueryRunner
10+
from graphdatascience.tests.integrationV2.procedure_surface.cypher.cypher_graph_helper import create_graph
11+
12+
# Create a DAG (Directed Acyclic Graph) for testing longest path
13+
dag_graph = """
14+
CREATE
15+
(a: Node {id: 0}),
16+
(b: Node {id: 1}),
17+
(c: Node {id: 2}),
18+
(d: Node {id: 3}),
19+
(e: Node {id: 4}),
20+
(f: Node {id: 5}),
21+
(a)-[:LINK {cost: 1.0}]->(b),
22+
(a)-[:LINK {cost: 2.0}]->(c),
23+
(b)-[:LINK {cost: 3.0}]->(d),
24+
(c)-[:LINK {cost: 2.0}]->(d),
25+
(d)-[:LINK {cost: 1.0}]->(e),
26+
(c)-[:LINK {cost: 5.0}]->(f),
27+
(f)-[:LINK {cost: 1.0}]->(e)
28+
"""
29+
30+
31+
@pytest.fixture
32+
def sample_dag(query_runner: QueryRunner) -> Generator[GraphV2, None, None]:
33+
projection_query = """
34+
MATCH (source)-[r]->(target)
35+
WITH gds.graph.project('dag', source, target, {
36+
sourceNodeProperties: properties(source),
37+
targetNodeProperties: properties(target),
38+
relationshipProperties: properties(r)
39+
}) AS G
40+
RETURN G
41+
"""
42+
43+
with create_graph(query_runner, "dag", dag_graph, projection_query) as G:
44+
yield G
45+
46+
47+
@pytest.fixture
48+
def longest_path_endpoints(query_runner: QueryRunner) -> Generator[LongestPathCypherEndpoints, None, None]:
49+
yield LongestPathCypherEndpoints(query_runner)
50+
51+
52+
def test_longest_path_stream(longest_path_endpoints: LongestPathCypherEndpoints, sample_dag: GraphV2) -> None:
53+
result_df = longest_path_endpoints.stream(
54+
G=sample_dag,
55+
relationship_weight_property="cost",
56+
)
57+
58+
assert len(result_df) == 6
59+
assert {"index", "sourceNode", "targetNode", "totalCost", "nodeIds", "costs"} == set(result_df.columns)

0 commit comments

Comments
 (0)