Skip to content

Commit c4bef0e

Browse files
committed
feat: Implement content-addressed storage and node retrieval in TransactionalGraph
Complete critical missing functionality in the transactional graph layer: 1. Content-Addressed Storage in add_node: - Serialize nodes to JSON - Store content using VersionedRocksDbStorage.store_content() - Use content hash in write set operations - Enables proper persistence of node data 2. Complete get_node Implementation: - Check write set first for uncommitted changes - Handle deleted nodes (return None) - Retrieve modified nodes from content store by hash - Fall back to reading from snapshot for unchanged nodes - Properly implements MVCC read semantics 3. Fix update_node to Store Content: - Store both old and new versions in content store - Generate content hashes via store_content() - Maintains proper before/after images for rollback 4. Implement ReadOnlyTransactionalGraph::get_node: - Read directly from specific snapshot - Used for version checkout and historical queries - No write set to check (read-only) These changes enable the versioning API handlers to function with actual persistent storage instead of returning placeholder data. Related to: codebase audit - complete transactional graph implementation
1 parent ab2da00 commit c4bef0e

File tree

1 file changed

+119
-59
lines changed

1 file changed

+119
-59
lines changed

crates/codegraph-graph/src/transactional_graph.rs

Lines changed: 119 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -173,19 +173,30 @@ impl GraphStore for TransactionalGraph {
173173
async fn add_node(&mut self, node: CodeNode) -> Result<()> {
174174
let transaction_id = self.get_current_transaction_id()?;
175175

176-
// Generate content hash for the node
177-
let content_hash = {
178-
let serialized =
179-
serde_json::to_vec(&node).map_err(|e| CodeGraphError::Database(e.to_string()))?;
176+
// Serialize the node for content-addressed storage
177+
let serialized =
178+
serde_json::to_vec(&node).map_err(|e| CodeGraphError::Database(e.to_string()))?;
180179

181-
use sha2::{Digest, Sha256};
182-
let mut hasher = Sha256::new();
183-
hasher.update(&serialized);
184-
format!("{:x}", hasher.finalize())
180+
// Store the content and get the hash
181+
let content_hash = {
182+
let storage = self.storage.clone();
183+
let serialized_clone = serialized.clone();
184+
tokio::task::spawn_blocking(move || {
185+
let handle = tokio::runtime::Handle::current();
186+
handle.block_on(async move {
187+
let mut guard = storage.write();
188+
guard.store_content(&serialized_clone).await
189+
})
190+
})
191+
.await
192+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
185193
};
186194

187-
// Add to transaction's write set
188-
let write_op = WriteOperation::Insert(node.id);
195+
// Add to transaction's write set with the content hash
196+
let write_op = WriteOperation::Update {
197+
old_content_hash: String::new(), // New insert has no old content
198+
new_content_hash: content_hash,
199+
};
189200
{
190201
let storage = self.storage.clone();
191202
let node_id = node.id;
@@ -203,9 +214,6 @@ impl GraphStore for TransactionalGraph {
203214
.map_err(|e| CodeGraphError::Threading(e.to_string()))??;
204215
}
205216

206-
// Store the actual content in the storage with the hash
207-
// TODO: Implement content-addressed storage for the node data
208-
209217
Ok(())
210218
}
211219

@@ -226,44 +234,73 @@ impl GraphStore for TransactionalGraph {
226234
}
227235

228236
// Check if this node is in the current transaction's write set first
229-
let is_in_write = {
237+
let transaction = {
230238
let storage = self.storage.clone();
231239
tokio::task::spawn_blocking(move || {
232240
let handle = tokio::runtime::Handle::current();
233241
handle.block_on(async move {
234242
let guard = storage.read();
235-
Ok::<_, CodeGraphError>(guard.get_transaction(transaction_id).await?)
243+
guard.get_transaction(transaction_id).await
236244
})
237245
})
238246
.await
239247
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
240248
};
241-
if let Some(transaction) = is_in_write {
242-
if transaction.write_set.contains_key(&id) {
249+
250+
if let Some(tx) = transaction {
251+
if let Some(write_op) = tx.write_set.get(&id) {
243252
// Node is being modified in this transaction
244-
// TODO: Return the modified version from the write set
253+
match write_op {
254+
WriteOperation::Delete(_) => {
255+
// Node was deleted in this transaction
256+
return Ok(None);
257+
}
258+
WriteOperation::Update { new_content_hash, .. } => {
259+
// Retrieve the modified version from content store
260+
let content_opt = {
261+
let storage = self.storage.clone();
262+
let hash = new_content_hash.clone();
263+
tokio::task::spawn_blocking(move || {
264+
let handle = tokio::runtime::Handle::current();
265+
handle.block_on(async move {
266+
let guard = storage.read();
267+
guard.get_content(&hash).await
268+
})
269+
})
270+
.await
271+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
272+
};
273+
274+
if let Some(content) = content_opt {
275+
let node: CodeNode = serde_json::from_slice(&content)
276+
.map_err(|e| CodeGraphError::Database(e.to_string()))?;
277+
return Ok(Some(node));
278+
}
279+
}
280+
WriteOperation::Insert(_) => {
281+
// This shouldn't happen as we use Update for inserts now
282+
}
283+
}
245284
}
246-
}
247285

248-
// If not in write set, read from the snapshot
249-
let maybe_tx = {
250-
let storage = self.storage.clone();
251-
tokio::task::spawn_blocking(move || {
252-
let handle = tokio::runtime::Handle::current();
253-
handle.block_on(async move {
254-
let guard = storage.read();
255-
Ok::<_, CodeGraphError>(guard.get_transaction(transaction_id).await?)
286+
// If not in write set, read from the transaction's snapshot
287+
let snapshot_id = tx.snapshot_id;
288+
let node_opt = {
289+
let storage = self.storage.clone();
290+
tokio::task::spawn_blocking(move || {
291+
let handle = tokio::runtime::Handle::current();
292+
handle.block_on(async move {
293+
let guard = storage.read();
294+
guard.read_node_at_snapshot(id, snapshot_id).await
295+
})
256296
})
257-
})
258-
.await
259-
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
260-
};
261-
if let Some(_transaction) = maybe_tx {
262-
// TODO: Read node from the transaction's snapshot
263-
// This would involve reading from the snapshot's content store
297+
.await
298+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
299+
};
300+
301+
return Ok(node_opt);
264302
}
265303

266-
// For now, return None as placeholder
267304
Ok(None)
268305
}
269306

@@ -273,28 +310,41 @@ impl GraphStore for TransactionalGraph {
273310
// Get the current version of the node to create before/after images
274311
let old_node = self.get_node(node.id).await?;
275312

276-
let (old_hash, new_hash) = {
277-
let old_hash = if let Some(ref old) = old_node {
278-
let serialized =
279-
serde_json::to_vec(old).map_err(|e| CodeGraphError::Database(e.to_string()))?;
280-
281-
use sha2::{Digest, Sha256};
282-
let mut hasher = Sha256::new();
283-
hasher.update(&serialized);
284-
format!("{:x}", hasher.finalize())
285-
} else {
286-
String::new()
287-
};
288-
289-
let new_serialized =
290-
serde_json::to_vec(&node).map_err(|e| CodeGraphError::Database(e.to_string()))?;
313+
// Get old hash if node exists
314+
let old_hash = if let Some(ref old) = old_node {
315+
let serialized =
316+
serde_json::to_vec(old).map_err(|e| CodeGraphError::Database(e.to_string()))?;
317+
let storage = self.storage.clone();
318+
let serialized_clone = serialized.clone();
319+
tokio::task::spawn_blocking(move || {
320+
let handle = tokio::runtime::Handle::current();
321+
handle.block_on(async move {
322+
let mut guard = storage.write();
323+
guard.store_content(&serialized_clone).await
324+
})
325+
})
326+
.await
327+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
328+
} else {
329+
String::new()
330+
};
291331

292-
use sha2::{Digest, Sha256};
293-
let mut hasher = Sha256::new();
294-
hasher.update(&new_serialized);
295-
let new_hash = format!("{:x}", hasher.finalize());
332+
// Serialize and store the new node content
333+
let new_serialized =
334+
serde_json::to_vec(&node).map_err(|e| CodeGraphError::Database(e.to_string()))?;
296335

297-
(old_hash, new_hash)
336+
let new_hash = {
337+
let storage = self.storage.clone();
338+
let serialized_clone = new_serialized.clone();
339+
tokio::task::spawn_blocking(move || {
340+
let handle = tokio::runtime::Handle::current();
341+
handle.block_on(async move {
342+
let mut guard = storage.write();
343+
guard.store_content(&serialized_clone).await
344+
})
345+
})
346+
.await
347+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??
298348
};
299349

300350
// Add to transaction's write set
@@ -398,11 +448,21 @@ impl GraphStore for ReadOnlyTransactionalGraph {
398448
}
399449

400450
async fn get_node(&self, id: NodeId) -> Result<Option<CodeNode>> {
401-
// TODO: Read node from the specific snapshot
402-
// This involves looking up the content hash for this node in the snapshot
403-
// and then retrieving the content from the content store
451+
// Read node from the specific snapshot
452+
let storage = self.storage.clone();
453+
let snapshot_id = self.snapshot_id;
404454

405-
Ok(None)
455+
let node_opt = tokio::task::spawn_blocking(move || {
456+
let handle = tokio::runtime::Handle::current();
457+
handle.block_on(async move {
458+
let guard = storage.read();
459+
guard.read_node_at_snapshot(id, snapshot_id).await
460+
})
461+
})
462+
.await
463+
.map_err(|e| CodeGraphError::Threading(e.to_string()))??;
464+
465+
Ok(node_opt)
406466
}
407467

408468
async fn update_node(&mut self, _node: CodeNode) -> Result<()> {

0 commit comments

Comments
 (0)