Skip to content

Commit fc4c8d5

Browse files
committed
feat: Implement transactional node search with MVCC semantics
Implement find_nodes_by_name with full transactional isolation: 1. TransactionalGraph::find_nodes_by_name: - Retrieves snapshot to get baseline node set - Filters out deleted nodes from write set - Returns modified nodes from write set (uncommitted changes) - Falls back to snapshot for unchanged nodes - Includes newly inserted nodes not yet in snapshot - Provides consistent view across transaction lifetime 2. ReadOnlyTransactionalGraph::find_nodes_by_name: - Reads directly from specific snapshot - No write set considerations (read-only) - Used for historical queries and version checkout This enables proper MVCC (Multi-Version Concurrency Control) behavior: - Transactions see their own uncommitted changes - Transactions are isolated from other concurrent transactions - Reads are consistent within transaction scope - No locking required for reads Related to: codebase audit - complete transactional graph implementation
1 parent c4bef0e commit fc4c8d5

File tree

1 file changed

+118
-8
lines changed

1 file changed

+118
-8
lines changed

crates/codegraph-graph/src/transactional_graph.rs

Lines changed: 118 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -394,13 +394,94 @@ impl GraphStore for TransactionalGraph {
394394
}
395395

396396
async fn find_nodes_by_name(&self, name: &str) -> Result<Vec<CodeNode>> {
397-
let _transaction_id = self.get_current_transaction_id()?;
397+
let transaction_id = self.get_current_transaction_id()?;
398398

399-
// TODO: Implement transactional node search
400-
// This would involve:
401-
// 1. Reading from the current transaction's snapshot
402-
// 2. Applying any pending writes from the transaction
403-
// 3. Returning the merged view
399+
// Get transaction to access snapshot and write set
400+
let transaction = {
401+
let storage = self.storage.clone();
402+
tokio::task::spawn_blocking(move || {
403+
let handle = tokio::runtime::Handle::current();
404+
handle.block_on(async move {
405+
let guard = storage.read();
406+
guard.get_transaction(transaction_id).await
407+
})
408+
})
409+
.await
410+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
411+
};
412+
413+
if let Some(tx) = transaction {
414+
let snapshot_id = tx.snapshot_id;
415+
let write_set = tx.write_set.clone();
416+
417+
// Get the snapshot to iterate through all nodes
418+
let snapshot = {
419+
let storage = self.storage.clone();
420+
tokio::task::spawn_blocking(move || {
421+
let handle = tokio::runtime::Handle::current();
422+
handle.block_on(async move {
423+
let guard = storage.read();
424+
guard.get_snapshot(snapshot_id).await
425+
})
426+
})
427+
.await
428+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
429+
};
430+
431+
if let Some(snap) = snapshot {
432+
let mut result_nodes = Vec::new();
433+
let storage = self.storage.clone();
434+
let name_owned = name.to_string();
435+
436+
// Iterate through all nodes in the snapshot
437+
for (node_id, _content_hash) in snap.node_versions {
438+
// Check if this node was deleted in the write set
439+
if let Some(WriteOperation::Delete(_)) = write_set.get(&node_id) {
440+
continue; // Skip deleted nodes
441+
}
442+
443+
// Try to get the node (will check write set first, then snapshot)
444+
if let Some(node) = self.get_node(node_id).await? {
445+
if node.name == name_owned {
446+
result_nodes.push(node);
447+
}
448+
}
449+
}
450+
451+
// Check write set for newly inserted nodes not in the snapshot
452+
for (node_id, write_op) in write_set {
453+
if !snap.node_versions.contains_key(&node_id) {
454+
// This is a new node added in this transaction
455+
if let WriteOperation::Update { new_content_hash, .. } = write_op {
456+
// Retrieve the node from content store
457+
let content_opt = {
458+
let storage = storage.clone();
459+
let hash = new_content_hash.clone();
460+
tokio::task::spawn_blocking(move || {
461+
let handle = tokio::runtime::Handle::current();
462+
handle.block_on(async move {
463+
let guard = storage.read();
464+
guard.get_content(&hash).await
465+
})
466+
})
467+
.await
468+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
469+
};
470+
471+
if let Some(content) = content_opt {
472+
let node: CodeNode = serde_json::from_slice(&content)
473+
.map_err(|e| CodeGraphError::Database(e.to_string()))?;
474+
if node.name == name_owned {
475+
result_nodes.push(node);
476+
}
477+
}
478+
}
479+
}
480+
}
481+
482+
return Ok(result_nodes);
483+
}
484+
}
404485

405486
Ok(Vec::new())
406487
}
@@ -477,8 +558,37 @@ impl GraphStore for ReadOnlyTransactionalGraph {
477558
))
478559
}
479560

480-
async fn find_nodes_by_name(&self, _name: &str) -> Result<Vec<CodeNode>> {
481-
// TODO: Implement snapshot-based node search
561+
async fn find_nodes_by_name(&self, name: &str) -> Result<Vec<CodeNode>> {
562+
let storage = self.storage.clone();
563+
let snapshot_id = self.snapshot_id;
564+
let name_owned = name.to_string();
565+
566+
// Get the snapshot
567+
let snapshot = tokio::task::spawn_blocking(move || {
568+
let handle = tokio::runtime::Handle::current();
569+
handle.block_on(async move {
570+
let guard = storage.read();
571+
guard.get_snapshot(snapshot_id).await
572+
})
573+
})
574+
.await
575+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??;
576+
577+
if let Some(snap) = snapshot {
578+
let mut result_nodes = Vec::new();
579+
580+
// Iterate through all nodes in the snapshot
581+
for (node_id, _content_hash) in snap.node_versions {
582+
if let Some(node) = self.get_node(node_id).await? {
583+
if node.name == name_owned {
584+
result_nodes.push(node);
585+
}
586+
}
587+
}
588+
589+
return Ok(result_nodes);
590+
}
591+
482592
Ok(Vec::new())
483593
}
484594
}

0 commit comments

Comments
 (0)