-
Notifications
You must be signed in to change notification settings - Fork 137
feat(gas): add batch listen #3478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(gas): add batch listen #3478
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
3 Skipped Deployments
|
Pull Request Review: Batch Listen FeatureThis PR introduces batch signal listening functionality to the workflow engine, allowing workflows to receive multiple signals in a single operation. This is a significant enhancement that can improve throughput and reduce overhead when dealing with multiple signals. SummaryChanges: 1,215 additions, 732 deletions across 16 files Positive AspectsArchitecture & Design
Code Quality
Critical Issues1. Breaking Change: Macro-generated
|
25ece3d to
6ff29f6
Compare
5c9b75f to
0471792
Compare
PR Review: feat(gas): add batch listenSummaryThis PR introduces batch signal listening to the Gasoline workflow engine, allowing workflows to listen for and process multiple signals at once instead of one at a time. This is a significant architectural improvement that can reduce latency and improve throughput for workflows that process multiple signals. Code Quality & Best PracticesStrengths:
Issues & Suggestions:
Potential Bugs
Performance ConsiderationsImprovements:
Concerns:
Security Concerns
Test CoverageMissing:
Recommended Tests: // Test batch listen with multiple signals
#[tokio::test]
async fn test_listen_n_multiple_signals() {
// Send 5 signals, listen for 3, verify we get the oldest 3
}
// Test listen_n with limit larger than available signals
#[tokio::test]
async fn test_listen_n_limit_exceeds_available() {
// Send 2 signals, listen for 5, verify we get 2
}
// Test backward compatibility
#[tokio::test]
async fn test_old_signal_event_conversion() {
// Load workflow with old Signal events, verify they convert to Signals
}
// Test signal ordering across types
#[tokio::test]
async fn test_listen_n_mixed_signal_types() {
// Send signals of type A and B interleaved by time
// Verify they're received in chronological order
}Architecture & DesignPositive:
Questions:
Summary & RecommendationsThis is a well-structured PR that adds valuable batching capabilities to the workflow engine. However, it needs: Must Fix:
Should Fix:
Nice to Have:
The core implementation is solid, but the missing tests and potential edge cases around limits and ordering need to be addressed before merging. Generated by Claude Code |
Code ReviewSummaryThis PR adds batch signal listening capabilities to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation instead of one at a time. This is a significant architectural improvement that should improve efficiency when dealing with high-volume signal processing. Architecture Changes✅ Well-designed changes:
Code Quality✅ Strengths:
Potential Issues1. EventType Mismatch in insert::signals_event (packages/gasoline/src/db/kv/keys/history.rs:1645)pub fn signals_event(/* ... */) -> Result<()> {
common(
subspace,
tx,
workflow_id,
location,
EventType::Signal, // ❌ Should be EventType::Signals
version,
create_ts,
)
}This appears to use Recommended fix: EventType::Signals, // Use the new batch signal type2. Missing Validation on Signal CountThe pub struct SignalsEvent {
pub names: Vec<String>,
pub bodies: Vec<Box<serde_json::value::RawValue>>,
}In the database deserialization code (packages/gasoline/src/db/kv/debug.rs:1545), there is a zip operation that could silently drop data if lengths do not match. Consider adding validation: pub struct SignalsEvent {
pub names: Vec<String>,
pub bodies: Vec<Box<serde_json::value::RawValue>>,
}
impl SignalsEvent {
pub fn new(names: Vec<String>, bodies: Vec<Box<serde_json::value::RawValue>>) -> Result<Self> {
ensure\!(names.len() == bodies.len(), "signals names and bodies must have same length");
ensure\!(\!names.is_empty(), "signals event must contain at least one signal");
Ok(Self { names, bodies })
}
}3. Removed CustomListener Without Clear Migration PathThe PR removes the
If any users were using
4. Potential Performance Issue: Sort After Parallel FetchIn .buffer_unordered(1024)
.try_collect::<Vec<_>>()
.await?;
// Sort by ts
signals.sort_by_key(|key| key.create_ts);
// Apply limit
Ok(signals.into_iter().take(limit).collect())The code fetches up to
Recommendation: Consider limiting the total fetch earlier or documenting the tradeoff. Performance Considerations✅ Good:
Security✅ No obvious security concerns
Test Coverage❌ Critical Gap: No Tests for Batch Listening The PR adds significant new functionality but I could not find any tests for:
Recommendation: Add tests covering:
Example test case: #[workflow_test]
async fn test_batch_signal_listen() {
// Publish 5 signals
// ctx.listen_n::<MySignal>(10) should return all 5
// Verify order matches timestamps
}Additional Observations
RecommendationsMust Fix:
Should Consider:
Nice to Have:
VerdictThis is a well-architected feature that makes logical sense for the workflow engine. However, it has a critical bug (wrong EventType) and lacks test coverage. Once these issues are addressed, this will be a solid improvement. Status: Needs Changes 🔴 The EventType bug could cause production issues with history replay, and the lack of tests makes it risky to merge. Please address the critical issues before merging. |
6ff29f6 to
dba469c
Compare
0471792 to
f508120
Compare
PR Review: Add Batch ListenOverviewThis PR introduces batch signal listening capabilities to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation. Strengths1. Well-Structured API Changes
2. Comprehensive Database Implementation
3. Good Error Handling
4. Logging and Observability
Concerns1. Breaking API Change Recommendations:
2. Potential Index Corruption 🔴 Recommendations:
3. Empty Vec Semantics
Recommendation: Document when empty vecs are expected vs errors 4. Test Coverage Gaps 🟡
5. Performance 💭 6. Magic Number 🟡 7. Removed EventData::Empty Security & Safety✅ No security concerns ConclusionWell-implemented feature! Main concerns:
Recommendation: Add breaking change docs and batch tests before merging. Overall: Approve with comments 🎉 |
PR Review: feat(gas): add batch listenThis PR introduces batch signal listening to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation. This is a significant performance optimization. Architecture ChangesCore API Changes:
Database Layer:
History Management:
Code Quality - Strengths
Areas for Improvement
Potential Bugs
PerformancePositive:
Concerns:
SecurityNo significant issues. Proper conflict ranges prevent races, signal ack prevents double-delivery. Test CoverageMissing: No test files in PR. Critical tests needed:
Best PracticesFollows CLAUDE.md conventions well (structured logging, lowercase messages, anyhow::Result, proper timestamps). Minor: Some functions exceed reasonable length (180 lines). Consider extracting helpers. Final RecommendationAPPROVE with minor recommendations Well-architected change with meaningful performance improvements. Code quality is high and follows conventions. Before merging:
Nice work! The parallel signal fetching is particularly well done. |
dba469c to
2a53bcf
Compare
7034a04 to
7d477c8
Compare
f9664d9 to
a7d5ffd
Compare
PR Review: feat(gas): add batch listenSummaryThis PR introduces batch signal listening functionality to the Gasoline workflow engine, allowing workflows to receive multiple signals at once instead of one at a time. This is a significant architectural change that affects the core signal handling mechanisms. Code Quality & ArchitectureStrengths
Areas of Concern1. Breaking API Change in Listen trait (High Severity)The Listen::listen signature changed from async fn listen(ctx: &mut ListenCtx) -> WorkflowResult to async fn listen(ctx: &mut ListenCtx, limit: usize) -> WorkflowResult<Vec> Issue: This is a breaking change for any custom implementations of the Listen trait outside the codebase. The #[signal] macro was updated to generate the new signature, but any hand-written implementations will break. Recommendation: Consider deprecation path or document this as a breaking change in release notes. 2. Inconsistent Error Handling (Medium Severity)In workflow.rs:703-709, the .context() error message for an empty vec case could be more helpful for debugging. Recommendation: Use a more specific error or add debug assertions. 3. Removed CustomListener Trait (Medium Severity)The PR removes the CustomListener trait and custom_listener method without explanation. Files affected:
Recommendation: Document why this was removed. 4. History Storage Complexity (Medium Severity)The new Signals event type stores multiple signals with indexed storage. The deserialization logic in db/kv/debug.rs has checks that assume indexes arrive in order (0, 1, 2...). Recommendation: Add documentation about ordering guarantees. 5. Missing Test Coverage (High Severity)No new tests were added for the batch listen functionality. Missing test scenarios:
Recommendation: Add comprehensive tests in tests/workflows/signal_test.rs. Performance ConsiderationsPositive Impact
Potential Concerns
Migration & CompatibilityBreaking Changes
Questions
Recommendations SummaryMust Fix (High Priority)
Should Fix (Medium Priority)
ConclusionThis is a solid architectural improvement that enables important workflow batching capabilities. The implementation is generally well-structured, but needs test coverage, documentation of breaking changes, and clarification on database migration/compatibility. The code quality is good and follows the existing patterns in the codebase. Overall Assessment: Please address the test coverage and breaking change documentation before merging. |
Code Review: feat(gas): add batch listenOverviewThis PR adds batch listening capability to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation. This is a significant enhancement that improves efficiency for workflows that need to process multiple signals. ✅ Strengths
|
a7d5ffd to
41442d3
Compare
7d477c8 to
21b69e8
Compare
21b69e8 to
cb268ef
Compare
41442d3 to
3f3140e
Compare
Pull Request Review: feat(gas): add batch listenThis PR adds batch signal listening capabilities to the Gasoline workflow engine, allowing workflows to efficiently receive multiple signals at once. Overall, this is a well-structured enhancement with good attention to detail. Below are my findings: Strengths1. API Design
2. Database ImplementationThe pull_next_signals implementation (lines 1912-2092 in db/kv/mod.rs) is well done:
3. History Event Handling
4. Macro UpdatesThe #[signal] macro properly adapts to the new trait signature, requiring all implementations to return Vec. Issues Found1. Critical: Potential Signal Loss in pull_next_signalsLocation: engine/packages/gasoline/src/db/kv/mod.rs:1938-1970 You are applying the limit twice:
Problem: If you have 3 signal types and limit=5:
This means signals with later timestamps might never be considered, even if they arrived before signals in other streams. Fix: Remove the limit from the RangeOption and only apply it after sorting, or use a larger multiplier like limit * owned_filter.len(). 2. Error Handling: Empty Vec ValidationLocation: engine/packages/gasoline/src/ctx/workflow.rs:703-709 The error message does not include context about which signal type failed. Consider adding the type name to the error context. 3. Missing DocumentationLocation: Multiple files The new public APIs lack doc comments:
Add comprehensive doc comments explaining the limit parameter, ordering guarantees, return value guarantees, and example usage. 4. Inconsistent Naming in ListenCtxLocation: engine/packages/gasoline/src/ctx/listen.rs:17,27 You renamed last_try to last_attempt which is good, but the field documentation does not explain what "last attempt" means in the context of the retry logic. Potential Issues1. Performance: Unbounded BufferingLocation: engine/packages/gasoline/src/db/kv/mod.rs:2052 You are processing up to 1024 signals concurrently with .buffer_unordered(1024). For workflows with very large batches, this could cause memory pressure. Is there a realistic upper bound on the limit parameter? 2. Removed custom_listener APILocation: engine/packages/gasoline/src/ctx/versioned_workflow.rs:128-136 You completely removed the custom_listener API. Was this intentional? Were there any external uses of the CustomListener trait? This is a breaking API change. 3. Metric Granularity LostLocation: engine/packages/gasoline/src/ctx/listen.rs:47-82 Previously, SIGNAL_PULL_DURATION was recorded per signal. Now it is not recorded at all. This removes observability into signal fetch performance. Consider adding a metric for batch size and batch pull duration. Minor Issues1. Type Annotation RedundancyLocation: engine/packages/gasoline/src/db/kv/mod.rs:1923 Could use explicit Vec instead of Vec<_> for clarity. 2. Unused ParameterLocation: engine/packages/gasoline/src/db/kv/mod.rs:1915,1919 Parameters prefixed with _ (_workflow_name, _loop_location) suggest they are intentionally unused. Verify these are not needed for the new batch logic. 3. Comment FormattingLocation: Multiple history key implementations The new IndexedSignalIdKey, IndexedNameKey, etc. lack module-level documentation explaining the indexing scheme and why it is necessary. Testing Recommendations
Code Quality Summary
RecommendationRequest Changes - Address the critical signal loss issue in pull_next_signals before merging. The other issues are important but less critical. Priority Fixes:
Great work on this feature! The core implementation is solid; it just needs these refinements before merging. |
3f3140e to
d321009
Compare
cb268ef to
a9857fb
Compare
Merge activity
|

No description provided.