Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Development]
<!-- Do Not Erase This Section - Used for tracking unreleased changes -->

## [0.50.4 - 2026-01-15]

### Fixed
- **Compute / hop (cuDF)**: Hop label tracking could error or force host sync because it used pandas Index conversions; now stays engine-native. Example: `g.chain([n(), e(hops=2, label_node_hops='nh', label_edge_hops='eh', label_seeds=True)])`.

## [0.50.3 - 2026-01-14]

### Fixed
Expand Down
64 changes: 44 additions & 20 deletions graphistry/compute/hop.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,23 @@ def _combine_first_no_warn(target, fill):
if not TYPE_CHECKING:
DataFrameT = df_cons(engine_concrete)
concat = df_concat(engine_concrete)

def _domain_unique(series):
if engine_concrete == Engine.PANDAS:
return pd.Index(series.dropna().unique())
return series.dropna().unique()

def _domain_is_empty(domain) -> bool:
return domain is None or len(domain) == 0

def _domain_union(left, right):
if _domain_is_empty(left):
return right
if _domain_is_empty(right):
return left
if engine_concrete == Engine.PANDAS and isinstance(left, pd.Index):
return left.append(right)
return concat([left, right], ignore_index=True, sort=False).drop_duplicates()

nodes = df_to_engine(nodes, engine_concrete) if nodes is not None else None
target_wave_front = df_to_engine(target_wave_front, engine_concrete) if target_wave_front is not None else None
Expand Down Expand Up @@ -479,10 +496,8 @@ def resolve_label_col(requested: Optional[str], df, default_base: str) -> Option
node_hop_col = None
if track_edge_hops:
edge_hop_col = resolve_label_col(label_edge_hops, edges_indexed, '_hop')
seen_edge_marker_col = generate_safe_column_name('__gfql_edge_seen__', edges_indexed, prefix='__seen_', suffix='__')
if track_node_hops:
node_hop_col = resolve_label_col(label_node_hops, g2._nodes, '_hop')
seen_node_marker_col = generate_safe_column_name('__gfql_node_seen__', g2._nodes, prefix='__seen_', suffix='__')

wave_front = starting_nodes[[g2._node]][:0]

Expand All @@ -498,10 +513,13 @@ def resolve_label_col(requested: Optional[str], df, default_base: str) -> Option

node_hop_records = None
edge_hop_records = None
seen_node_ids = None
seen_edge_ids = None

if track_node_hops and label_seeds and node_hop_col is not None:
seed_nodes = starting_nodes[[g2._node]].drop_duplicates()
node_hop_records = seed_nodes.assign(**{node_hop_col: 0})
seen_node_ids = _domain_unique(seed_nodes[g2._node])

if debugging_hop and logger.isEnabledFor(logging.DEBUG):
logger.debug('~~~~~~~~~~ LOOP PRE ~~~~~~~~~~~')
Expand Down Expand Up @@ -636,7 +654,6 @@ def resolve_label_col(requested: Optional[str], df, default_base: str) -> Option
max_reached_hop = current_hop

if track_edge_hops and edge_hop_col is not None:
assert seen_edge_marker_col is not None
edge_label_candidates : List[DataFrameT] = []
if hop_edges_forward is not None:
edge_label_candidates.append(hop_edges_forward[[EDGE_ID]])
Expand All @@ -649,43 +666,50 @@ def resolve_label_col(requested: Optional[str], df, default_base: str) -> Option
labeled_edges = edge_df_iter.assign(**{edge_hop_col: current_hop})
if edge_hop_records is None:
edge_hop_records = labeled_edges
seen_edge_ids = _domain_unique(labeled_edges[EDGE_ID])
else:
edge_seen = edge_hop_records[[EDGE_ID]].assign(**{seen_edge_marker_col: 1})
merged_edge_labels = safe_merge(
labeled_edges,
edge_seen,
on=EDGE_ID,
how='left',
engine=engine_concrete
seen_edge_ids = (
seen_edge_ids
if seen_edge_ids is not None
else _domain_unique(edge_hop_records[EDGE_ID])
)
new_edge_labels = merged_edge_labels[merged_edge_labels[seen_edge_marker_col].isna()].drop(columns=[seen_edge_marker_col])
if _domain_is_empty(seen_edge_ids):
new_edge_labels = labeled_edges
else:
new_mask = ~labeled_edges[EDGE_ID].isin(seen_edge_ids)
new_edge_labels = labeled_edges[new_mask]
if len(new_edge_labels) > 0:
edge_hop_records = concat(
[edge_hop_records, new_edge_labels],
ignore_index=True,
sort=False
).drop_duplicates(subset=[EDGE_ID])
new_edge_ids = _domain_unique(new_edge_labels[EDGE_ID])
seen_edge_ids = _domain_union(seen_edge_ids, new_edge_ids)

if track_node_hops and node_hop_col is not None:
assert seen_node_marker_col is not None
if node_hop_records is None:
node_hop_records = new_node_ids.assign(**{node_hop_col: current_hop})
seen_node_ids = _domain_unique(node_hop_records[g2._node])
else:
node_seen = node_hop_records[[g2._node]].assign(**{seen_node_marker_col: 1})
merged_node_labels = safe_merge(
new_node_ids,
node_seen,
on=g2._node,
how='left',
engine=engine_concrete
seen_node_ids = (
seen_node_ids
if seen_node_ids is not None
else _domain_unique(node_hop_records[g2._node])
)
new_node_labels = merged_node_labels[merged_node_labels[seen_node_marker_col].isna()].drop(columns=[seen_node_marker_col])
if _domain_is_empty(seen_node_ids):
new_node_labels = new_node_ids
else:
new_mask = ~new_node_ids[g2._node].isin(seen_node_ids)
new_node_labels = new_node_ids[new_mask]
if len(new_node_labels) > 0:
node_hop_records = concat(
[node_hop_records, new_node_labels.assign(**{node_hop_col: current_hop})],
ignore_index=True,
sort=False
).drop_duplicates(subset=[g2._node])
new_node_ids_domain = _domain_unique(new_node_labels[g2._node])
seen_node_ids = _domain_union(seen_node_ids, new_node_ids_domain)

if debugging_hop and logger.isEnabledFor(logging.DEBUG):
logger.debug('~~~~~~~~~~ LOOP STEP MERGES 1 ~~~~~~~~~~~')
Expand Down
26 changes: 26 additions & 0 deletions graphistry/tests/compute/test_hop.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,32 @@ def test_hop_exact_forward(self, g_long_forwards_chain, n_a):
{'s': 'c', 'd': 'd'}
]

def test_hop_labels_forward(self, g_long_forwards_chain: CGFull, n_a):
# Exercise label tracking path (cuDF-safe seen IDs).
g2 = g_long_forwards_chain.hop(
nodes=n_a,
hops=3,
to_fixed_point=False,
direction='forward',
label_node_hops='nh',
label_edge_hops='eh',
label_seeds=True
)
assert 'nh' in g2._nodes.columns
assert 'eh' in g2._edges.columns
assert g2._nodes['nh'].isna().sum() == 0
assert g2._edges['eh'].isna().sum() == 0
node_hops = {
row['v']: int(row['nh'])
for row in g2._nodes[['v', 'nh']].to_dict(orient='records')
}
assert node_hops == {'a': 0, 'b': 1, 'c': 2, 'd': 3}
edge_hops = {
(row['s'], row['d']): int(row['eh'])
for row in g2._edges[['s', 'd', 'eh']].to_dict(orient='records')
}
assert edge_hops == {('a', 'b'): 1, ('b', 'c'): 2, ('c', 'd'): 3}

def test_hop_exact_back(self, g_long_forwards_chain: CGFull, n_d, n_a):
g_reverse = g_long_forwards_chain.nodes(
g_long_forwards_chain._nodes[
Expand Down
Loading