diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index f2ebf62ab3..06af78e61f 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -169,7 +169,10 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger) + basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger) + if err != nil { + return nil, fmt.Errorf("failed to create based sequencer: %w", err) + } logger.Info(). Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()). diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 942940dde5..305b2e2a44 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -131,7 +131,10 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger) + basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger) + if err != nil { + return nil, fmt.Errorf("failed to create based sequencer: %w", err) + } logger.Info(). Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()). diff --git a/apps/testapp/cmd/run.go b/apps/testapp/cmd/run.go index b2035b561d..690a2764db 100644 --- a/apps/testapp/cmd/run.go +++ b/apps/testapp/cmd/run.go @@ -131,7 +131,10 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - basedSeq := based.NewBasedSequencer(fiRetriever, da, nodeConfig, genesis, logger) + basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger) + if err != nil { + return nil, fmt.Errorf("failed to create based sequencer: %w", err) + } logger.Info(). Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()). diff --git a/pkg/blob/README.md b/pkg/blob/README.md index 3424342da5..837dc6c314 100644 --- a/pkg/blob/README.md +++ b/pkg/blob/README.md @@ -3,14 +3,17 @@ This package is a **trimmed copy** of code from `celestia-node` to stay JSON-compatible with the blob RPC without importing the full Cosmos/Celestia dependency set. ## Upstream source + - `blob.go` comes from `celestia-node/blob/blob.go` @ tag `v0.28.4` (release v0.28.4), with unused pieces removed (blob v1, proof helpers, share length calc, appconsts dependency, etc.). - `submit_options.go` mirrors the exported JSON fields of `celestia-node/state/tx_config.go` @ the same tag, leaving out functional options, defaults, and Cosmos keyring helpers. ## Why copy instead of import? + - Avoids pulling Cosmos SDK / celestia-app dependencies into ev-node for the small surface we need (blob JSON and commitment for v0). - Keeps binary size and module graph smaller while remaining wire-compatible with celestia-node's blob service. ## Keeping it in sync + - When celestia-node changes blob JSON or tx config fields, update this package manually: 1. `diff -u pkg/blob/blob.go ../Celestia/celestia-node/blob/blob.go` 2. `diff -u pkg/blob/submit_options.go ../Celestia/celestia-node/state/tx_config.go` diff --git a/proto/evnode/v1/state.proto b/proto/evnode/v1/state.proto index 2aa3025676..263d3de993 100644 --- a/proto/evnode/v1/state.proto +++ b/proto/evnode/v1/state.proto @@ -19,3 +19,11 @@ message State { reserved 7; } + +// SequencerDACheckpoint tracks the position in the DA where transactions were last processed +message SequencerDACheckpoint { + // DA block height being processed + uint64 da_height = 1; + // Index of the next transaction to process within the DA block's forced inclusion batch + uint64 tx_index = 2; +} diff --git a/sequencers/based/README.md b/sequencers/based/README.md new file mode 100644 index 0000000000..9b425b5a96 --- /dev/null +++ b/sequencers/based/README.md @@ -0,0 +1,204 @@ +# Based Sequencer + +## Overview + +The Based Sequencer is a sequencer implementation that exclusively retrieves transactions from the Data Availability (DA) layer via the forced inclusion mechanism. Unlike other sequencer types, it does not accept transactions from a mempool or reaper - it treats the DA layer as a transaction queue. + +This design ensures that all transactions are force-included from DA, making the sequencer completely "based" on the DA layer's transaction ordering. + +## Architecture + +### Core Components + +1. **ForcedInclusionRetriever**: Fetches transactions from DA at epoch boundaries +2. **CheckpointStore**: Persists processing position to enable crash recovery +3. **BasedSequencer**: Orchestrates transaction retrieval and batch creation + +### Key Interfaces + +The Based Sequencer implements the `Sequencer` interface from `core/sequencer.go`: + +- `SubmitBatchTxs()` - No-op for based sequencer (transactions are not accepted) +- `GetNextBatch()` - Retrieves the next batch from DA via forced inclusion +- `VerifyBatch()` - Always returns true (all transactions come from DA) + +## Epoch-Based Transaction Retrieval + +### How Epochs Work + +Transactions are retrieved from DA in **epochs**, not individual DA blocks. An epoch is a range of DA blocks defined by `DAEpochForcedInclusion` in the genesis configuration. + +**Example**: If `DAStartHeight = 100` and `DAEpochForcedInclusion = 10`: + +- Epoch 1: DA heights 100-109 +- Epoch 2: DA heights 110-119 +- Epoch 3: DA heights 120-129 + +### Epoch Boundary Fetching + +The `ForcedInclusionRetriever` only returns transactions when queried at the **epoch end** (the last DA height in an epoch): + +```go +// When NOT at epoch end -> returns empty transactions +if daHeight != epochEnd { + return &ForcedInclusionEvent{ + StartDaHeight: daHeight, + EndDaHeight: daHeight, + Txs: [][]byte{}, + }, nil +} + +// When AT epoch end -> fetches entire epoch +// Retrieves ALL transactions from epochStart to epochEnd (inclusive) +``` + +When at an epoch end, the retriever fetches transactions from **all DA blocks in that epoch**: + +1. Fetches forced inclusion blobs from `epochStart` +2. Fetches forced inclusion blobs from each height between start and end +3. Fetches forced inclusion blobs from `epochEnd` +4. Returns all transactions as a single `ForcedInclusionEvent` + +### Why Epoch-Based? + +- **Efficiency**: Reduces the number of DA queries +- **Batching**: Allows processing multiple DA blocks worth of transactions together +- **Determinism**: Clear boundaries for when to fetch from DA +- **Gas optimization**: Fewer DA reads means lower operational costs + +## Checkpoint System + +### Purpose + +The checkpoint system tracks the exact position in the transaction stream to enable crash recovery and ensure no transactions are lost or duplicated. + +### Checkpoint Structure + +```go +type Checkpoint struct { + // DAHeight is the DA block height currently being processed + DAHeight uint64 + + // TxIndex is the index of the next transaction to process + // within the DA block's forced inclusion batch + TxIndex uint64 +} +``` + +### How Checkpoints Work + +#### 1. Initial State + +``` +Checkpoint: (DAHeight: 100, TxIndex: 0) +- Ready to fetch epoch starting at DA height 100 +``` + +#### 2. Fetching Transactions + +When `GetNextBatch()` is called and we're at an epoch end: + +``` +Request: GetNextBatch(maxBytes: 1MB) +Action: Fetch all transactions from epoch (DA heights 100-109) +Result: currentBatchTxs = [tx1, tx2, tx3, ..., txN] (from entire epoch) +``` + +#### 3. Processing Transactions + +Transactions are processed incrementally, respecting `maxBytes`: + +``` +Batch 1: [tx1, tx2] (fits in maxBytes) +Checkpoint: (DAHeight: 100, TxIndex: 2) + +Batch 2: [tx3, tx4, tx5] +Checkpoint: (DAHeight: 100, TxIndex: 5) + +... continue until all transactions from DA height 100 are consumed + +Checkpoint: (DAHeight: 101, TxIndex: 0) +- Moved to next DA height within the same epoch +``` + +#### 4. Checkpoint Persistence + +**Critical**: The checkpoint is persisted to disk **after every batch** of transactions is processed: + +```go +if txCount > 0 { + s.checkpoint.TxIndex += txCount + + // Move to next DA height when current one is exhausted + if s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) { + s.checkpoint.DAHeight++ + s.checkpoint.TxIndex = 0 + s.currentBatchTxs = nil + s.SetDAHeight(s.checkpoint.DAHeight) + } + + // Persist checkpoint to disk + if err := s.checkpointStore.Save(ctx, s.checkpoint); err != nil { + return nil, fmt.Errorf("failed to save checkpoint: %w", err) + } +} +``` + +### Crash Recovery Behavior + +#### Scenario: Crash Mid-Epoch + +**Setup**: + +- Epoch 1 spans DA heights 100-109 +- At DA height 109, fetched all transactions from the epoch +- Processed transactions up to DA height 105, TxIndex 3 +- **Crash occurs** + +**On Restart**: + +1. **Load Checkpoint**: `(DAHeight: 105, TxIndex: 3)` +2. **Lost Cache**: `currentBatchTxs` is empty (in-memory only) +3. **Attempt Fetch**: `RetrieveForcedIncludedTxs(105)` +4. **Result**: Empty (105 is not an epoch end) +5. **Continue**: Increment DA height, keep trying +6. **Eventually**: Reach DA height 109 (epoch end) +7. **Re-fetch**: Retrieve **entire epoch** again (DA heights 100-109) +8. **Resume**: Use checkpoint to skip already-processed transactions + +#### Important Implications + +**The entire epoch will be re-fetched after a crash**, even with fine-grained checkpoints. + +**Why?** + +- Transactions are only available at epoch boundaries +- In-memory cache (`currentBatchTxs`) is lost on restart +- Must wait until the next epoch end to fetch transactions again + +**What the checkpoint prevents**: + +- ✅ Re-execution of already processed transactions +- ✅ Correct resumption within a DA block's transaction list +- ✅ No transaction loss or duplication + +**What the checkpoint does NOT prevent**: + +- ❌ Re-fetching the entire epoch from DA +- ❌ Re-validation of previously fetched transactions + +### Checkpoint Storage + +The checkpoint is stored using a key-value datastore: + +```go +// Checkpoint key in the datastore +checkpointKey = ds.NewKey("/based/checkpoint") + +// Operations +checkpoint, err := checkpointStore.Load(ctx) // Load from disk +err := checkpointStore.Save(ctx, checkpoint) // Save to disk +err := checkpointStore.Delete(ctx) // Delete from disk +``` + +The checkpoint is serialized using Protocol Buffers (`pb.SequencerDACheckpoint`) for efficient storage and cross-version compatibility. diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index 7beab43b57..8845183e64 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -3,15 +3,16 @@ package based import ( "context" "errors" + "fmt" "sync/atomic" "time" + ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" "github.com/evstack/ev-node/block" coreda "github.com/evstack/ev-node/core/da" coresequencer "github.com/evstack/ev-node/core/sequencer" - "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" seqcommon "github.com/evstack/ev-node/sequencers/common" ) @@ -25,36 +26,62 @@ var _ coresequencer.Sequencer = (*BasedSequencer)(nil) // BasedSequencer is a sequencer that only retrieves transactions from the DA layer // via the forced inclusion mechanism. It does not accept transactions from the reaper. +// It uses DA as a queue and only persists a checkpoint of where it is in processing. type BasedSequencer struct { fiRetriever ForcedInclusionRetriever - da coreda.DA - config config.Config - genesis genesis.Genesis logger zerolog.Logger - daHeight atomic.Uint64 - txQueue [][]byte + daHeight atomic.Uint64 + checkpointStore *seqcommon.CheckpointStore + checkpoint *seqcommon.Checkpoint + + // Cached transactions from the current DA block being processed + currentBatchTxs [][]byte } // NewBasedSequencer creates a new based sequencer instance func NewBasedSequencer( + ctx context.Context, fiRetriever ForcedInclusionRetriever, - da coreda.DA, - config config.Config, + db ds.Batching, genesis genesis.Genesis, logger zerolog.Logger, -) *BasedSequencer { +) (*BasedSequencer, error) { bs := &BasedSequencer{ - fiRetriever: fiRetriever, - da: da, - config: config, - genesis: genesis, - logger: logger.With().Str("component", "based_sequencer").Logger(), - txQueue: make([][]byte, 0), + fiRetriever: fiRetriever, + logger: logger.With().Str("component", "based_sequencer").Logger(), + checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")), } bs.SetDAHeight(genesis.DAStartHeight) // will be overridden by the executor - return bs + // Load checkpoint from DB, or initialize if none exists + loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + checkpoint, err := bs.checkpointStore.Load(loadCtx) + if err != nil { + if errors.Is(err, seqcommon.ErrCheckpointNotFound) { + // No checkpoint exists, initialize with current DA height + bs.checkpoint = &seqcommon.Checkpoint{ + DAHeight: bs.GetDAHeight(), + TxIndex: 0, + } + } else { + return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) + } + } else { + bs.checkpoint = checkpoint + // If we had a non-zero tx index, we're resuming from a crash mid-block + // The transactions starting from that index are what we need + if checkpoint.TxIndex > 0 { + bs.logger.Debug(). + Uint64("tx_index", checkpoint.TxIndex). + Uint64("da_height", checkpoint.DAHeight). + Msg("resuming from checkpoint within DA epoch") + } + } + + return bs, nil } // SubmitBatchTxs does nothing for a based sequencer as it only pulls from DA @@ -64,106 +91,136 @@ func (s *BasedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.S return &coresequencer.SubmitBatchTxsResponse{}, nil } -// GetNextBatch retrieves the next batch of transactions from the DA layer -// It fetches forced inclusion transactions and returns them as the next batch +// GetNextBatch retrieves the next batch of transactions from the DA layer using the checkpoint +// It treats DA as a queue and only persists where it is in processing func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { - currentDAHeight := s.GetDAHeight() + // If we have no cached transactions or we've consumed all from the current DA block, + // fetch the next DA epoch + daHeight := s.GetDAHeight() + if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) { + daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes) + if err != nil { + return nil, err + } - s.logger.Debug().Uint64("da_height", currentDAHeight).Msg("fetching forced inclusion transactions from DA") + daHeight = daEndHeight + } + + // Create batch from current position up to MaxBytes + batch := s.createBatchFromCheckpoint(req.MaxBytes) + + // Update checkpoint with how many transactions we consumed + if daHeight > 0 || len(batch.Transactions) > 0 { + s.checkpoint.TxIndex += uint64(len(batch.Transactions)) + + // If we've consumed all transactions from this DA epoch, move to next + if s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) { + s.checkpoint.DAHeight = daHeight + 1 + s.checkpoint.TxIndex = 0 + s.currentBatchTxs = nil + + // Update the global DA height + s.SetDAHeight(s.checkpoint.DAHeight) + } + + // Persist checkpoint + if err := s.checkpointStore.Save(ctx, s.checkpoint); err != nil { + return nil, fmt.Errorf("failed to save checkpoint: %w", err) + } + } + + return &coresequencer.GetNextBatchResponse{ + Batch: batch, + Timestamp: time.Time{}, // TODO(@julienrbrt): we need to use DA block timestamp for determinism + BatchData: req.LastBatchData, + }, nil +} + +// fetchNextDAEpoch fetches transactions from the next DA epoch +func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { + currentDAHeight := s.checkpoint.DAHeight + + s.logger.Debug(). + Uint64("da_height", currentDAHeight). + Uint64("tx_index", s.checkpoint.TxIndex). + Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) if err != nil { // Check if forced inclusion is not configured if errors.Is(err, block.ErrForceInclusionNotConfigured) { - return nil, errors.New("forced inclusion not configured") + return 0, block.ErrForceInclusionNotConfigured } else if errors.Is(err, coreda.ErrHeightFromFuture) { - // If we get a height from future error, keep the current DA height and return batch + // If we get a height from future error, stay at current position // We'll retry the same height on the next call until DA produces that block s.logger.Debug(). Uint64("da_height", currentDAHeight). Msg("DA height from future, waiting for DA to produce block") - } else { - s.logger.Error().Err(err).Uint64("da_height", currentDAHeight).Msg("failed to retrieve forced inclusion transactions") - return nil, err - } - } else { - // Update DA height. - // If we are in between epochs, we still need to bump the da height. - // At the end of an epoch, we need to bump to go to the next epoch. - if forcedTxsEvent.EndDaHeight >= currentDAHeight { - s.SetDAHeight(forcedTxsEvent.EndDaHeight + 1) + return 0, nil } + return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) } - // Add forced inclusion transactions to the queue with validation - validTxs := 0 + // Validate and filter transactions + validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) skippedTxs := 0 for _, tx := range forcedTxsEvent.Txs { // Validate blob size against absolute maximum - if !seqcommon.ValidateBlobSize(tx) { + if uint64(len(tx)) > maxBytes { s.logger.Warn(). Uint64("da_height", forcedTxsEvent.StartDaHeight). Int("blob_size", len(tx)). - Msg("forced inclusion blob exceeds absolute maximum size - skipping") + Uint64("max_bytes", maxBytes). + Msg("forced inclusion blob exceeds maximum size - skipping") skippedTxs++ continue } - s.txQueue = append(s.txQueue, tx) - validTxs++ + validTxs = append(validTxs, tx) } s.logger.Info(). - Int("valid_tx_count", validTxs). + Int("valid_tx_count", len(validTxs)). Int("skipped_tx_count", skippedTxs). - Int("queue_size", len(s.txQueue)). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). - Msg("processed forced inclusion transactions from DA") + Msg("fetched forced inclusion transactions from DA") - batch := s.createBatchFromQueue(req.MaxBytes) + // Cache the transactions for this DA epoch + s.currentBatchTxs = validTxs - return &coresequencer.GetNextBatchResponse{ - Batch: batch, - Timestamp: time.Time{}, // TODO(@julienrbrt): we need to use DA block timestamp for determinism - BatchData: req.LastBatchData, - }, nil + return forcedTxsEvent.EndDaHeight, nil } -// createBatchFromQueue creates a batch from the transaction queue respecting MaxBytes -func (s *BasedSequencer) createBatchFromQueue(maxBytes uint64) *coresequencer.Batch { - if len(s.txQueue) == 0 { +// createBatchFromCheckpoint creates a batch from the current checkpoint position respecting MaxBytes +func (s *BasedSequencer) createBatchFromCheckpoint(maxBytes uint64) *coresequencer.Batch { + if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) { return &coresequencer.Batch{Transactions: nil} } - var batch [][]byte + var result [][]byte var totalBytes uint64 - for i, tx := range s.txQueue { + // Start from the checkpoint index + for i := s.checkpoint.TxIndex; i < uint64(len(s.currentBatchTxs)); i++ { + tx := s.currentBatchTxs[i] txSize := uint64(len(tx)) - // Always respect maxBytes, even for the first transaction + if totalBytes+txSize > maxBytes { - // Would exceed max bytes, stop here - s.txQueue = s.txQueue[i:] break } - batch = append(batch, tx) + result = append(result, tx) totalBytes += txSize - - // If this is the last transaction, clear the queue - if i == len(s.txQueue)-1 { - s.txQueue = s.txQueue[:0] - } } // Mark all transactions as force-included since based sequencer only pulls from DA - forceIncludedMask := make([]bool, len(batch)) + forceIncludedMask := make([]bool, len(result)) for i := range forceIncludedMask { forceIncludedMask[i] = true } return &coresequencer.Batch{ - Transactions: batch, + Transactions: result, ForceIncludedMask: forceIncludedMask, } } @@ -178,12 +235,11 @@ func (s *BasedSequencer) VerifyBatch(ctx context.Context, req coresequencer.Veri // SetDAHeight sets the current DA height for the sequencer // This should be called when the sequencer needs to sync to a specific DA height -func (c *BasedSequencer) SetDAHeight(height uint64) { - c.daHeight.Store(height) - c.logger.Debug().Uint64("da_height", height).Msg("DA height updated") +func (s *BasedSequencer) SetDAHeight(height uint64) { + s.daHeight.Store(height) } // GetDAHeight returns the current DA height -func (c *BasedSequencer) GetDAHeight() uint64 { - return c.daHeight.Load() +func (s *BasedSequencer) GetDAHeight() uint64 { + return s.daHeight.Load() } diff --git a/sequencers/based/sequencer_test.go b/sequencers/based/sequencer_test.go index 0f994f91f5..30a4bd6118 100644 --- a/sequencers/based/sequencer_test.go +++ b/sequencers/based/sequencer_test.go @@ -2,10 +2,10 @@ package based import ( "context" - "errors" "testing" - "time" + ds "github.com/ipfs/go-datastore" + syncds "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -14,87 +14,42 @@ import ( "github.com/evstack/ev-node/block" coreda "github.com/evstack/ev-node/core/da" coresequencer "github.com/evstack/ev-node/core/sequencer" - "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" ) -// MockDA is a mock implementation of DA for testing -type MockDA struct { +// MockForcedInclusionRetriever is a mock implementation of ForcedInclusionRetriever for testing +type MockForcedInclusionRetriever struct { mock.Mock } -func (m *MockDA) Submit(ctx context.Context, blobs [][]byte, gasPrice float64, namespace []byte) ([][]byte, error) { - args := m.Called(ctx, blobs, gasPrice, namespace) +func (m *MockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error) { + args := m.Called(ctx, daHeight) if args.Get(0) == nil { return nil, args.Error(1) } - return args.Get(0).([][]byte), args.Error(1) + return args.Get(0).(*block.ForcedInclusionEvent), args.Error(1) } -func (m *MockDA) SubmitWithOptions(ctx context.Context, blobs [][]byte, gasPrice float64, namespace []byte, options []byte) ([][]byte, error) { - args := m.Called(ctx, blobs, gasPrice, namespace, options) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).([][]byte), args.Error(1) -} - -func (m *MockDA) GetIDs(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { - args := m.Called(ctx, height, namespace) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*coreda.GetIDsResult), args.Error(1) -} - -func (m *MockDA) Get(ctx context.Context, ids [][]byte, namespace []byte) ([][]byte, error) { - args := m.Called(ctx, ids, namespace) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).([][]byte), args.Error(1) -} - -func (m *MockDA) GetProofs(ctx context.Context, ids [][]byte, namespace []byte) ([]coreda.Proof, error) { - args := m.Called(ctx, ids, namespace) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).([]coreda.Proof), args.Error(1) -} +// createTestSequencer is a helper function to create a sequencer for testing +func createTestSequencer(t *testing.T, mockRetriever *MockForcedInclusionRetriever, gen genesis.Genesis) *BasedSequencer { + t.Helper() -func (m *MockDA) Validate(ctx context.Context, ids [][]byte, proofs []coreda.Proof, namespace []byte) ([]bool, error) { - args := m.Called(ctx, ids, proofs, namespace) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).([]bool), args.Error(1) -} + // Create in-memory datastore + db := syncds.MutexWrap(ds.NewMapDatastore()) -func (m *MockDA) Commit(ctx context.Context, blobs [][]byte, namespace []byte) ([][]byte, error) { - args := m.Called(ctx, blobs, namespace) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).([][]byte), args.Error(1) + seq, err := NewBasedSequencer(context.Background(), mockRetriever, db, gen, zerolog.Nop()) + require.NoError(t, err) + return seq } func TestBasedSequencer_SubmitBatchTxs(t *testing.T) { - mockDA := new(MockDA) + mockRetriever := new(MockForcedInclusionRetriever) gen := genesis.Genesis{ ChainID: "test-chain", DAEpochForcedInclusion: 10, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" - - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + seq := createTestSequencer(t, mockRetriever, gen) // Submit should succeed but be ignored req := coresequencer.SubmitBatchTxsRequest{ @@ -108,19 +63,19 @@ func TestBasedSequencer_SubmitBatchTxs(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp) - // Transactions should not be added to queue for based sequencer - assert.Equal(t, 0, len(seq.txQueue)) + // Transactions should not be processed for based sequencer + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) } func TestBasedSequencer_GetNextBatch_WithForcedTxs(t *testing.T) { testBlobs := [][]byte{[]byte("tx1"), []byte("tx2")} - mockDA := new(MockDA) - mockDA.On("GetIDs", mock.Anything, uint64(100), mock.Anything).Return(&coreda.GetIDsResult{ - IDs: []coreda.ID{[]byte("id1"), []byte("id2")}, - Timestamp: time.Now(), + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs, + StartDaHeight: 100, + EndDaHeight: 100, }, nil) - mockDA.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(testBlobs, nil) gen := genesis.Genesis{ ChainID: "test-chain", @@ -128,15 +83,7 @@ func TestBasedSequencer_GetNextBatch_WithForcedTxs(t *testing.T) { DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" - - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + seq := createTestSequencer(t, mockRetriever, gen) req := coresequencer.GetNextBatchRequest{ MaxBytes: 1000000, @@ -151,15 +98,20 @@ func TestBasedSequencer_GetNextBatch_WithForcedTxs(t *testing.T) { assert.Equal(t, []byte("tx1"), resp.Batch.Transactions[0]) assert.Equal(t, []byte("tx2"), resp.Batch.Transactions[1]) - // DA height should be updated to epochEnd + 1 - assert.Equal(t, uint64(101), seq.GetDAHeight()) + // Checkpoint should have moved to next DA height + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) - mockDA.AssertExpectations(t) + mockRetriever.AssertExpectations(t) } func TestBasedSequencer_GetNextBatch_EmptyDA(t *testing.T) { - mockDA := new(MockDA) - mockDA.On("GetIDs", mock.Anything, uint64(100), mock.Anything).Return(nil, coreda.ErrBlobNotFound) + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: [][]byte{}, + StartDaHeight: 100, + EndDaHeight: 100, + }, nil) gen := genesis.Genesis{ ChainID: "test-chain", @@ -167,15 +119,7 @@ func TestBasedSequencer_GetNextBatch_EmptyDA(t *testing.T) { DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" - - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + seq := createTestSequencer(t, mockRetriever, gen) req := coresequencer.GetNextBatchRequest{ MaxBytes: 1000000, @@ -186,26 +130,23 @@ func TestBasedSequencer_GetNextBatch_EmptyDA(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp) require.NotNil(t, resp.Batch) + // Should return empty batch when DA has no transactions assert.Equal(t, 0, len(resp.Batch.Transactions)) - mockDA.AssertExpectations(t) + mockRetriever.AssertExpectations(t) } func TestBasedSequencer_GetNextBatch_NotConfigured(t *testing.T) { - mockDA := new(MockDA) + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(nil, block.ErrForceInclusionNotConfigured) + gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, DAEpochForcedInclusion: 1, } - // Create config without forced inclusion namespace - cfgNoFI := config.DefaultConfig() - cfgNoFI.DA.ForcedInclusionNamespace = "" - daClient := block.NewDAClient(mockDA, cfgNoFI, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfgNoFI, gen, zerolog.Nop()) + seq := createTestSequencer(t, mockRetriever, gen) req := coresequencer.GetNextBatchRequest{ MaxBytes: 1000000, @@ -215,26 +156,24 @@ func TestBasedSequencer_GetNextBatch_NotConfigured(t *testing.T) { resp, err := seq.GetNextBatch(context.Background(), req) require.Error(t, err) require.Nil(t, resp) + assert.ErrorIs(t, err, block.ErrForceInclusionNotConfigured) + + mockRetriever.AssertExpectations(t) } func TestBasedSequencer_GetNextBatch_WithMaxBytes(t *testing.T) { - testBlobs := [][]byte{ - make([]byte, 50), // 50 bytes - make([]byte, 60), // 60 bytes - make([]byte, 100), // 100 bytes - } - - mockDA := new(MockDA) - // First call returns forced txs at height 100 - mockDA.On("GetIDs", mock.Anything, uint64(100), mock.Anything).Return(&coreda.GetIDsResult{ - IDs: []coreda.ID{[]byte("id1"), []byte("id2"), []byte("id3")}, - Timestamp: time.Now(), - }, nil).Once() - mockDA.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(testBlobs, nil).Once() - - // Subsequent calls at height 101 and 102 (after DA height bumps) should return no new forced txs - mockDA.On("GetIDs", mock.Anything, uint64(101), mock.Anything).Return(nil, coreda.ErrBlobNotFound).Once() - mockDA.On("GetIDs", mock.Anything, uint64(102), mock.Anything).Return(nil, coreda.ErrBlobNotFound).Once() + // Create transactions of known sizes + tx1 := make([]byte, 100) + tx2 := make([]byte, 150) + tx3 := make([]byte, 200) + testBlobs := [][]byte{tx1, tx2, tx3} + + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs, + StartDaHeight: 100, + EndDaHeight: 100, + }, nil) gen := genesis.Genesis{ ChainID: "test-chain", @@ -242,19 +181,11 @@ func TestBasedSequencer_GetNextBatch_WithMaxBytes(t *testing.T) { DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" - - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + seq := createTestSequencer(t, mockRetriever, gen) - // First call with max 100 bytes - should get first 2 txs (50 + 60 = 110, but logic allows if batch has content) + // First call with MaxBytes that fits only first 2 transactions req := coresequencer.GetNextBatchRequest{ - MaxBytes: 100, + MaxBytes: 250, LastBatchData: nil, } @@ -262,36 +193,49 @@ func TestBasedSequencer_GetNextBatch_WithMaxBytes(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp) require.NotNil(t, resp.Batch) - // Should get first tx (50 bytes), second tx would exceed limit (50+60=110 > 100) - assert.Equal(t, 1, len(resp.Batch.Transactions)) - assert.Equal(t, 2, len(seq.txQueue)) // 2 remaining in queue + // Should only get first 2 transactions (100 + 150 = 250 bytes) + assert.Equal(t, 2, len(resp.Batch.Transactions)) + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(2), seq.checkpoint.TxIndex) - // Second call should get next tx from queue - resp2, err := seq.GetNextBatch(context.Background(), req) - require.NoError(t, err) - require.NotNil(t, resp2) - require.NotNil(t, resp2.Batch) - assert.Equal(t, 1, len(resp2.Batch.Transactions)) - assert.Equal(t, 1, len(seq.txQueue)) // 1 remaining in queue - - // Third call with larger maxBytes to get the 100-byte tx - req3 := coresequencer.GetNextBatchRequest{ - MaxBytes: 200, + // Second call should get the remaining transaction + req = coresequencer.GetNextBatchRequest{ + MaxBytes: 1000, LastBatchData: nil, } - resp3, err := seq.GetNextBatch(context.Background(), req3) + + resp, err = seq.GetNextBatch(context.Background(), req) require.NoError(t, err) - require.NotNil(t, resp3) - require.NotNil(t, resp3.Batch) - assert.Equal(t, 1, len(resp3.Batch.Transactions)) - assert.Equal(t, 0, len(seq.txQueue)) // Queue should be empty + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 1, len(resp.Batch.Transactions)) + assert.Equal(t, 200, len(resp.Batch.Transactions[0])) + + // After consuming all transactions, checkpoint should move to next DA height + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) - mockDA.AssertExpectations(t) + mockRetriever.AssertExpectations(t) } -func TestBasedSequencer_GetNextBatch_FromQueue(t *testing.T) { - mockDA := new(MockDA) - mockDA.On("GetIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil, coreda.ErrBlobNotFound) +func TestBasedSequencer_GetNextBatch_MultipleDABlocks(t *testing.T) { + testBlobs1 := [][]byte{[]byte("tx1"), []byte("tx2")} + testBlobs2 := [][]byte{[]byte("tx3"), []byte("tx4")} + + mockRetriever := new(MockForcedInclusionRetriever) + // First DA block + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs1, + StartDaHeight: 100, + EndDaHeight: 100, + }, nil).Once() + + // Second DA block + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(101)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs2, + StartDaHeight: 101, + EndDaHeight: 101, + }, nil).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -299,49 +243,37 @@ func TestBasedSequencer_GetNextBatch_FromQueue(t *testing.T) { DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" - - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) - - // Pre-populate the queue - seq.txQueue = [][]byte{[]byte("queued_tx1"), []byte("queued_tx2")} + seq := createTestSequencer(t, mockRetriever, gen) req := coresequencer.GetNextBatchRequest{ MaxBytes: 1000000, LastBatchData: nil, } + // First batch from first DA block resp, err := seq.GetNextBatch(context.Background(), req) require.NoError(t, err) require.NotNil(t, resp) - require.NotNil(t, resp.Batch) assert.Equal(t, 2, len(resp.Batch.Transactions)) - assert.Equal(t, []byte("queued_tx1"), resp.Batch.Transactions[0]) - assert.Equal(t, []byte("queued_tx2"), resp.Batch.Transactions[1]) - - // Queue should be empty now - assert.Equal(t, 0, len(seq.txQueue)) -} + assert.Equal(t, []byte("tx1"), resp.Batch.Transactions[0]) + assert.Equal(t, []byte("tx2"), resp.Batch.Transactions[1]) + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) -func TestBasedSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) { - mockDA := new(MockDA) + // Second batch from second DA block + resp, err = seq.GetNextBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, 2, len(resp.Batch.Transactions)) + assert.Equal(t, []byte("tx3"), resp.Batch.Transactions[0]) + assert.Equal(t, []byte("tx4"), resp.Batch.Transactions[1]) + assert.Equal(t, uint64(102), seq.checkpoint.DAHeight) - // First call: return a forced tx that will be added to queue - forcedTx := make([]byte, 150) - mockDA.On("GetIDs", mock.Anything, uint64(100), mock.Anything).Return(&coreda.GetIDsResult{ - IDs: []coreda.ID{[]byte("id1")}, - Timestamp: time.Now(), - }, nil).Once() - mockDA.On("Get", mock.Anything, mock.Anything, mock.Anything).Return([][]byte{forcedTx}, nil).Once() + mockRetriever.AssertExpectations(t) +} - // Second call: no new forced txs at height 101 (after first call bumped DA height to epochEnd + 1) - mockDA.On("GetIDs", mock.Anything, uint64(101), mock.Anything).Return(nil, coreda.ErrBlobNotFound).Once() +func TestBasedSequencer_GetNextBatch_ResumesFromCheckpoint(t *testing.T) { + testBlobs := [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")} + mockRetriever := new(MockForcedInclusionRetriever) gen := genesis.Genesis{ ChainID: "test-chain", @@ -349,173 +281,220 @@ func TestBasedSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testin DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" - - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) + seq := createTestSequencer(t, mockRetriever, gen) - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + // Simulate processing first transaction (resuming from checkpoint after restart) + seq.checkpoint.DAHeight = 100 + seq.checkpoint.TxIndex = 1 + seq.currentBatchTxs = testBlobs - // First call with maxBytes = 100 - // Forced tx (150 bytes) is added to queue, but batch will be empty since it exceeds maxBytes - req1 := coresequencer.GetNextBatchRequest{ - MaxBytes: 100, + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 1000000, LastBatchData: nil, } - resp1, err := seq.GetNextBatch(context.Background(), req1) + // Should resume from index 1, getting tx2 and tx3 + resp, err := seq.GetNextBatch(context.Background(), req) require.NoError(t, err) - require.NotNil(t, resp1) - require.NotNil(t, resp1.Batch) - assert.Equal(t, 0, len(resp1.Batch.Transactions), "Should have no txs as forced tx exceeds maxBytes") + require.NotNil(t, resp) + assert.Equal(t, 2, len(resp.Batch.Transactions)) + assert.Equal(t, []byte("tx2"), resp.Batch.Transactions[0]) + assert.Equal(t, []byte("tx3"), resp.Batch.Transactions[1]) + + // Should have moved to next DA height + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) +} - // Verify forced tx is in queue - assert.Equal(t, 1, len(seq.txQueue), "Forced tx should be in queue") +func TestBasedSequencer_GetNextBatch_ForcedInclusionExceedsMaxBytes(t *testing.T) { + // Create a transaction that exceeds maxBytes + largeTx := make([]byte, 2000) + testBlobs := [][]byte{largeTx} + + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs, + StartDaHeight: 100, + EndDaHeight: 100, + }, nil) - // Second call with larger maxBytes = 200 - // Should process tx from queue - req2 := coresequencer.GetNextBatchRequest{ - MaxBytes: 200, + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq := createTestSequencer(t, mockRetriever, gen) + + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 1000, // Much smaller than the transaction LastBatchData: nil, } - resp2, err := seq.GetNextBatch(context.Background(), req2) + resp, err := seq.GetNextBatch(context.Background(), req) require.NoError(t, err) - require.NotNil(t, resp2) - require.NotNil(t, resp2.Batch) - assert.Equal(t, 1, len(resp2.Batch.Transactions), "Should include tx from queue") - assert.Equal(t, 150, len(resp2.Batch.Transactions[0])) - - // Queue should now be empty - assert.Equal(t, 0, len(seq.txQueue), "Queue should be empty") + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + // Should return empty batch since transaction exceeds max bytes + assert.Equal(t, 0, len(resp.Batch.Transactions)) - mockDA.AssertExpectations(t) + mockRetriever.AssertExpectations(t) } -func TestBasedSequencer_GetNextBatch_ForcedInclusionExceedsMaxBytes(t *testing.T) { - mockDA := new(MockDA) - - // Return forced txs where combined they exceed maxBytes - forcedTx1 := make([]byte, 100) - forcedTx2 := make([]byte, 80) - mockDA.On("GetIDs", mock.Anything, uint64(100), mock.Anything).Return(&coreda.GetIDsResult{ - IDs: []coreda.ID{[]byte("id1"), []byte("id2")}, - Timestamp: time.Now(), - }, nil).Once() - mockDA.On("Get", mock.Anything, mock.Anything, mock.Anything).Return([][]byte{forcedTx1, forcedTx2}, nil).Once() +func TestBasedSequencer_VerifyBatch(t *testing.T) { + mockRetriever := new(MockForcedInclusionRetriever) + gen := genesis.Genesis{ + ChainID: "test-chain", + DAEpochForcedInclusion: 10, + } - // Second call at height 101 (after first call bumped DA height to epochEnd + 1) - mockDA.On("GetIDs", mock.Anything, uint64(101), mock.Anything).Return(nil, coreda.ErrBlobNotFound).Once() + seq := createTestSequencer(t, mockRetriever, gen) + + req := coresequencer.VerifyBatchRequest{ + Id: []byte("test-chain"), + BatchData: [][]byte{[]byte("tx1"), []byte("tx2")}, + } + + resp, err := seq.VerifyBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + // Based sequencer always verifies as true since all txs come from DA + assert.True(t, resp.Status) +} +func TestBasedSequencer_SetDAHeight(t *testing.T) { + mockRetriever := new(MockForcedInclusionRetriever) gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, - DAEpochForcedInclusion: 1, + DAEpochForcedInclusion: 10, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" + seq := createTestSequencer(t, mockRetriever, gen) - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) + // Initial height from genesis + assert.Equal(t, uint64(100), seq.GetDAHeight()) - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + // Set new height + seq.SetDAHeight(200) + assert.Equal(t, uint64(200), seq.GetDAHeight()) +} - // First call with maxBytes = 120 - // Should get only first forced tx (100 bytes), second stays in queue - req1 := coresequencer.GetNextBatchRequest{ - MaxBytes: 120, - LastBatchData: nil, - } +func TestBasedSequencer_GetNextBatch_ErrorHandling(t *testing.T) { + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(nil, block.ErrForceInclusionNotConfigured) - resp1, err := seq.GetNextBatch(context.Background(), req1) - require.NoError(t, err) - require.NotNil(t, resp1) - require.NotNil(t, resp1.Batch) - assert.Equal(t, 1, len(resp1.Batch.Transactions), "Should only include first forced tx") - assert.Equal(t, 100, len(resp1.Batch.Transactions[0])) + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } - // Verify second tx is still in queue - assert.Equal(t, 1, len(seq.txQueue), "Second tx should be in queue") + seq := createTestSequencer(t, mockRetriever, gen) - // Second call - should get the second tx from queue - req2 := coresequencer.GetNextBatchRequest{ - MaxBytes: 120, + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 1000000, LastBatchData: nil, } - resp2, err := seq.GetNextBatch(context.Background(), req2) - require.NoError(t, err) - require.NotNil(t, resp2) - require.NotNil(t, resp2.Batch) - assert.Equal(t, 1, len(resp2.Batch.Transactions), "Should include second tx from queue") - assert.Equal(t, 80, len(resp2.Batch.Transactions[0])) - - // Queue should now be empty - assert.Equal(t, 0, len(seq.txQueue), "Queue should be empty") + resp, err := seq.GetNextBatch(context.Background(), req) + require.Error(t, err) + require.Nil(t, resp) + assert.ErrorIs(t, err, block.ErrForceInclusionNotConfigured) - mockDA.AssertExpectations(t) + mockRetriever.AssertExpectations(t) } -func TestBasedSequencer_VerifyBatch(t *testing.T) { - mockDA := new(MockDA) +func TestBasedSequencer_GetNextBatch_HeightFromFuture(t *testing.T) { + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(nil, coreda.ErrHeightFromFuture) + gen := genesis.Genesis{ ChainID: "test-chain", + DAStartHeight: 100, DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" + seq := createTestSequencer(t, mockRetriever, gen) - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) - - req := coresequencer.VerifyBatchRequest{ - Id: []byte("test-chain"), - BatchData: [][]byte{[]byte("tx1")}, + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 1000000, + LastBatchData: nil, } - resp, err := seq.VerifyBatch(context.Background(), req) + // Should not error, but return empty batch + resp, err := seq.GetNextBatch(context.Background(), req) require.NoError(t, err) - assert.True(t, resp.Status) + require.NotNil(t, resp) + assert.Equal(t, 0, len(resp.Batch.Transactions)) + + // DA height should stay the same + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + + mockRetriever.AssertExpectations(t) } -func TestBasedSequencer_SetDAHeight(t *testing.T) { - mockDA := new(MockDA) +func TestBasedSequencer_CheckpointPersistence(t *testing.T) { + testBlobs := [][]byte{[]byte("tx1"), []byte("tx2")} + + mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: testBlobs, + StartDaHeight: 100, + EndDaHeight: 100, + }, nil) + gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" + // Create persistent datastore + db := syncds.MutexWrap(ds.NewMapDatastore()) - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) + // Create first sequencer + seq1, err := NewBasedSequencer(context.Background(), mockRetriever, db, gen, zerolog.Nop()) + require.NoError(t, err) - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + req := coresequencer.GetNextBatchRequest{ + MaxBytes: 1000000, + LastBatchData: nil, + } - assert.Equal(t, uint64(100), seq.GetDAHeight()) + // Process a batch + resp, err := seq1.GetNextBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, 2, len(resp.Batch.Transactions)) - seq.SetDAHeight(200) - assert.Equal(t, uint64(200), seq.GetDAHeight()) + // Create a new sequencer with the same datastore (simulating restart) + seq2, err := NewBasedSequencer(context.Background(), mockRetriever, db, gen, zerolog.Nop()) + require.NoError(t, err) + + // Checkpoint should be loaded from DB + assert.Equal(t, uint64(101), seq2.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq2.checkpoint.TxIndex) + + mockRetriever.AssertExpectations(t) } -func TestBasedSequencer_GetNextBatch_ErrorHandling(t *testing.T) { - mockDA := new(MockDA) - mockDA.On("GetIDs", mock.Anything, uint64(100), mock.Anything).Return(nil, errors.New("DA connection error")) +func TestBasedSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { + mockRetriever := new(MockForcedInclusionRetriever) + + // First DA block returns empty transactions + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: [][]byte{}, + StartDaHeight: 100, + EndDaHeight: 100, + }, nil).Once() + + // Second DA block also returns empty transactions + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(101)).Return(&block.ForcedInclusionEvent{ + Txs: [][]byte{}, + StartDaHeight: 101, + EndDaHeight: 101, + }, nil).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -523,27 +502,40 @@ func TestBasedSequencer_GetNextBatch_ErrorHandling(t *testing.T) { DAEpochForcedInclusion: 1, } - cfg := config.DefaultConfig() - cfg.DA.Namespace = "test-ns" - cfg.DA.DataNamespace = "test-data-ns" - cfg.DA.ForcedInclusionNamespace = "test-fi-ns" - - daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop()) - fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop()) - - seq := NewBasedSequencer(fiRetriever, mockDA, cfg, gen, zerolog.Nop()) + seq := createTestSequencer(t, mockRetriever, gen) req := coresequencer.GetNextBatchRequest{ MaxBytes: 1000000, LastBatchData: nil, } - // With new error handling, errors during blob processing return empty batch instead of error + // Initial DA height should be 100 + assert.Equal(t, uint64(100), seq.GetDAHeight()) + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + + // First batch - empty DA block at height 100 resp, err := seq.GetNextBatch(context.Background(), req) require.NoError(t, err) require.NotNil(t, resp) require.NotNil(t, resp.Batch) - assert.Equal(t, 0, len(resp.Batch.Transactions), "Should return empty batch on DA error") + assert.Equal(t, 0, len(resp.Batch.Transactions)) + + // DA height should have increased to 101 even though no transactions were processed + assert.Equal(t, uint64(101), seq.GetDAHeight()) + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + + // Second batch - empty DA block at height 101 + resp, err = seq.GetNextBatch(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 0, len(resp.Batch.Transactions)) + + // DA height should have increased to 102 + assert.Equal(t, uint64(102), seq.GetDAHeight()) + assert.Equal(t, uint64(102), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) - mockDA.AssertExpectations(t) + mockRetriever.AssertExpectations(t) } diff --git a/sequencers/common/checkpoint.go b/sequencers/common/checkpoint.go new file mode 100644 index 0000000000..039451d2c9 --- /dev/null +++ b/sequencers/common/checkpoint.go @@ -0,0 +1,90 @@ +package common + +import ( + "context" + "errors" + "fmt" + + ds "github.com/ipfs/go-datastore" + "google.golang.org/protobuf/proto" + + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +// ErrCheckpointNotFound is returned when no checkpoint exists in the datastore +var ErrCheckpointNotFound = errors.New("checkpoint not found") + +// Checkpoint tracks the position in the DA where we last processed transactions +type Checkpoint struct { + // DAHeight is the DA block height we're currently processing or have just finished + DAHeight uint64 + // TxIndex is the index of the next transaction to process within the DA block's forced inclusion batch + // If TxIndex == 0, it means we've finished processing the previous DA block and should fetch the next one + TxIndex uint64 +} + +// CheckpointStore manages persistence of the checkpoint +type CheckpointStore struct { + db ds.Batching + checkpointKey ds.Key +} + +// NewCheckpointStore creates a new checkpoint store +func NewCheckpointStore(db ds.Batching, checkpointkey ds.Key) *CheckpointStore { + return &CheckpointStore{ + db: db, + checkpointKey: checkpointkey, + } +} + +// Load loads the checkpoint from the datastore +// Returns ErrCheckpointNotFound if no checkpoint exists +func (cs *CheckpointStore) Load(ctx context.Context) (*Checkpoint, error) { + data, err := cs.db.Get(ctx, cs.checkpointKey) + if err != nil { + if errors.Is(err, ds.ErrNotFound) { + return nil, ErrCheckpointNotFound + } + return nil, fmt.Errorf("failed to load checkpoint: %w", err) + } + + pbCheckpoint := &pb.SequencerDACheckpoint{} + if err := proto.Unmarshal(data, pbCheckpoint); err != nil { + return nil, fmt.Errorf("failed to unmarshal checkpoint: %w", err) + } + + return &Checkpoint{ + DAHeight: pbCheckpoint.DaHeight, + TxIndex: pbCheckpoint.TxIndex, + }, nil +} + +// Save persists the checkpoint to the datastore +func (cs *CheckpointStore) Save(ctx context.Context, checkpoint *Checkpoint) error { + pbCheckpoint := &pb.SequencerDACheckpoint{ + DaHeight: checkpoint.DAHeight, + TxIndex: checkpoint.TxIndex, + } + + data, err := proto.Marshal(pbCheckpoint) + if err != nil { + return fmt.Errorf("failed to marshal checkpoint: %w", err) + } + + if err := cs.db.Put(ctx, cs.checkpointKey, data); err != nil { + return fmt.Errorf("failed to save checkpoint: %w", err) + } + + return nil +} + +// Delete removes the checkpoint from the datastore +func (cs *CheckpointStore) Delete(ctx context.Context) error { + if err := cs.db.Delete(ctx, cs.checkpointKey); err != nil { + if errors.Is(err, ds.ErrNotFound) { + return nil // Already deleted + } + return fmt.Errorf("failed to delete checkpoint: %w", err) + } + return nil +} diff --git a/sequencers/common/checkpoint_test.go b/sequencers/common/checkpoint_test.go new file mode 100644 index 0000000000..88b59ad888 --- /dev/null +++ b/sequencers/common/checkpoint_test.go @@ -0,0 +1,135 @@ +package common + +import ( + "context" + "testing" + + ds "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/require" +) + +var ( + checkpointKey = ds.NewKey("/checkpoint") +) + +func TestCheckpointStore_SaveAndLoad(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + store := NewCheckpointStore(db, checkpointKey) + + // Test loading when no checkpoint exists + _, err := store.Load(ctx) + require.ErrorIs(t, err, ErrCheckpointNotFound) + + // Test saving a checkpoint + checkpoint := &Checkpoint{ + DAHeight: 100, + TxIndex: 5, + } + err = store.Save(ctx, checkpoint) + require.NoError(t, err) + + // Test loading the saved checkpoint + loaded, err := store.Load(ctx) + require.NoError(t, err) + require.Equal(t, checkpoint.DAHeight, loaded.DAHeight) + require.Equal(t, checkpoint.TxIndex, loaded.TxIndex) + + // Test updating the checkpoint + checkpoint.DAHeight = 200 + checkpoint.TxIndex = 10 + err = store.Save(ctx, checkpoint) + require.NoError(t, err) + + loaded, err = store.Load(ctx) + require.NoError(t, err) + require.Equal(t, uint64(200), loaded.DAHeight) + require.Equal(t, uint64(10), loaded.TxIndex) +} + +func TestCheckpointStore_Delete(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + store := NewCheckpointStore(db, checkpointKey) + + // Save a checkpoint + checkpoint := &Checkpoint{ + DAHeight: 100, + TxIndex: 5, + } + err := store.Save(ctx, checkpoint) + require.NoError(t, err) + + // Delete it + err = store.Delete(ctx) + require.NoError(t, err) + + // Verify it's gone + _, err = store.Load(ctx) + require.ErrorIs(t, err, ErrCheckpointNotFound) + + // Delete again should not error + err = store.Delete(ctx) + require.NoError(t, err) +} + +func TestCheckpoint_EdgeCases(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + store := NewCheckpointStore(db, checkpointKey) + + // Test with zero values + checkpoint := &Checkpoint{ + DAHeight: 0, + TxIndex: 0, + } + err := store.Save(ctx, checkpoint) + require.NoError(t, err) + + loaded, err := store.Load(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), loaded.DAHeight) + require.Equal(t, uint64(0), loaded.TxIndex) + + // Test with max uint64 values + checkpoint = &Checkpoint{ + DAHeight: ^uint64(0), + TxIndex: ^uint64(0), + } + err = store.Save(ctx, checkpoint) + require.NoError(t, err) + + loaded, err = store.Load(ctx) + require.NoError(t, err) + require.Equal(t, ^uint64(0), loaded.DAHeight) + require.Equal(t, ^uint64(0), loaded.TxIndex) +} + +func TestCheckpointStore_ConcurrentAccess(t *testing.T) { + ctx := context.Background() + db := ds.NewMapDatastore() + store := NewCheckpointStore(db, checkpointKey) + + // Save initial checkpoint + checkpoint := &Checkpoint{ + DAHeight: 100, + TxIndex: 0, + } + err := store.Save(ctx, checkpoint) + require.NoError(t, err) + + // Test concurrent reads + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + defer func() { done <- true }() + loaded, err := store.Load(ctx) + require.NoError(t, err) + require.NotNil(t, loaded) + }() + } + + for i := 0; i < 10; i++ { + <-done + } +} diff --git a/sequencers/common/size_validation.go b/sequencers/common/size_validation.go index ee781ce205..a88206e280 100644 --- a/sequencers/common/size_validation.go +++ b/sequencers/common/size_validation.go @@ -7,21 +7,3 @@ const ( // Blobs exceeding this size are invalid and should be rejected permanently. AbsoluteMaxBlobSize = 2 * 1024 * 1024 // 2MB ) - -// ValidateBlobSize checks if a single blob exceeds the absolute maximum allowed size. -// This checks against the DA layer limit, not the per-batch limit. -// Returns true if the blob is within the absolute size limit, false otherwise. -func ValidateBlobSize(blob []byte) bool { - return uint64(GetBlobSize(blob)) <= AbsoluteMaxBlobSize -} - -// WouldExceedCumulativeSize checks if adding a blob would exceed the cumulative size limit for a batch. -// Returns true if adding the blob would exceed the limit, false otherwise. -func WouldExceedCumulativeSize(currentSize int, blobSize int, maxBytes uint64) bool { - return uint64(currentSize)+uint64(blobSize) > maxBytes -} - -// GetBlobSize returns the size of a blob in bytes. -func GetBlobSize(blob []byte) int { - return len(blob) -} diff --git a/sequencers/common/size_validation_test.go b/sequencers/common/size_validation_test.go deleted file mode 100644 index 103c66d8be..0000000000 --- a/sequencers/common/size_validation_test.go +++ /dev/null @@ -1,141 +0,0 @@ -package common - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestValidateBlobSize(t *testing.T) { - tests := []struct { - name string - blobSize int - want bool - }{ - { - name: "empty blob", - blobSize: 0, - want: true, - }, - { - name: "small blob", - blobSize: 100, - want: true, - }, - { - name: "exactly at limit", - blobSize: int(AbsoluteMaxBlobSize), - want: true, - }, - { - name: "one byte over limit", - blobSize: int(AbsoluteMaxBlobSize) + 1, - want: false, - }, - { - name: "far exceeds limit", - blobSize: int(AbsoluteMaxBlobSize) * 2, - want: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - blob := make([]byte, tt.blobSize) - got := ValidateBlobSize(blob) - assert.Equal(t, tt.want, got) - }) - } -} - -func TestWouldExceedCumulativeSize(t *testing.T) { - tests := []struct { - name string - currentSize int - blobSize int - maxBytes uint64 - want bool - }{ - { - name: "empty batch, small blob", - currentSize: 0, - blobSize: 50, - maxBytes: 100, - want: false, - }, - { - name: "would fit exactly", - currentSize: 50, - blobSize: 50, - maxBytes: 100, - want: false, - }, - { - name: "would exceed by one byte", - currentSize: 50, - blobSize: 51, - maxBytes: 100, - want: true, - }, - { - name: "far exceeds", - currentSize: 80, - blobSize: 100, - maxBytes: 100, - want: true, - }, - { - name: "zero max bytes", - currentSize: 0, - blobSize: 1, - maxBytes: 0, - want: true, - }, - { - name: "current already at limit", - currentSize: 100, - blobSize: 1, - maxBytes: 100, - want: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := WouldExceedCumulativeSize(tt.currentSize, tt.blobSize, tt.maxBytes) - assert.Equal(t, tt.want, got) - }) - } -} - -func TestGetBlobSize(t *testing.T) { - tests := []struct { - name string - blobSize int - want int - }{ - { - name: "empty blob", - blobSize: 0, - want: 0, - }, - { - name: "small blob", - blobSize: 42, - want: 42, - }, - { - name: "large blob", - blobSize: 1024 * 1024, - want: 1024 * 1024, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - blob := make([]byte, tt.blobSize) - got := GetBlobSize(blob) - assert.Equal(t, tt.want, got) - }) - } -} diff --git a/sequencers/single/README.md b/sequencers/single/README.md index 2b6184c3c4..323e76f41e 100644 --- a/sequencers/single/README.md +++ b/sequencers/single/README.md @@ -6,6 +6,8 @@ The single sequencer is a component of the Evolve framework that handles transac The sequencer receives transactions from clients, batches them together, and submits these batches to a Data Availability layer. It maintains transaction and batch queues, handles recovery from crashes, and provides verification mechanisms for batches. +**Key Feature**: The single sequencer implements a checkpoint system for forced inclusion transactions from DA, ensuring that **DA transactions are never re-executed after a crash**. + ```mermaid flowchart LR Client["Client"] --> Sequencer @@ -19,34 +21,40 @@ flowchart LR The main component that orchestrates the entire sequencing process. It: -- Receives transactions from clients +- Receives transactions from clients (mempool transactions) +- Retrieves forced inclusion transactions from the DA layer - Maintains transaction and batch queues - Periodically creates and submits batches to the DA layer -- Handles recovery from crashes +- Handles recovery from crashes via checkpoint system - Provides verification mechanisms for batches -### TransactionQueue - -Manages the queue of pending transactions: - -- Stores transactions in memory and in the database -- Provides methods to add transactions and extract batches -- Handles recovery of transactions from the database after a crash - ### BatchQueue -Manages the queue of batches: +Manages the queue of pending **mempool transactions**: - Stores batches in memory and in the database - Provides methods to add and retrieve batches - Handles recovery of batches from the database after a crash +- Supports queue size limits for backpressure + +### CheckpointStore -### DAClient +Manages persistence of the forced inclusion checkpoint: -Handles communication with the Data Availability layer: +- Tracks the exact position in the DA transaction stream +- Stores `DAHeight` (current DA block height being processed) +- Stores `TxIndex` (index within the forced inclusion batch) +- Enables crash recovery without re-executing DA transactions +- Persisted after every batch of forced inclusion transactions -- Submits batches to the DA layer -- Retrieves batch status from the DA layer +### ForcedInclusionRetriever + +Retrieves forced inclusion transactions from the DA layer: + +- Fetches transactions at epoch boundaries +- Returns all transactions from an entire epoch +- Validates transaction sizes +- Supports epoch-based transaction retrieval ## Flow of Calls @@ -54,11 +62,13 @@ Handles communication with the Data Availability layer: ```mermaid flowchart TD - A["NewSequencer()"] --> B["LoadLastBatchHashFromDB()"] - B --> C["LoadSeenBatchesFromDB()"] - C --> D["Load Transaction Queue from DB"] - D --> E["Load BatchQueue from DB"] - E --> F["Start batch submission loop"] + A["NewSequencer()"] --> B["Load BatchQueue from DB"] + B --> C["Load Checkpoint from DB"] + C --> D{"Checkpoint exists?"} + D -->|Yes| E["Resume from checkpoint position"] + D -->|No| F["Initialize new checkpoint"] + E --> G["Ready to process"] + F --> G ``` ### Transaction Submission Flow @@ -81,15 +91,21 @@ flowchart TD E --> F["Add to BatchQueue"] ``` -### Batch Retrieval Flow +### Batch Retrieval Flow (with Forced Inclusion) ```mermaid flowchart TD A["GetNextBatch()"] --> B["Validate ID"] - B --> C["Check batch hash match"] - C --> D["If match or both nil"] - D --> E["Get batch from BatchQueue"] - E --> F["Update last batch hash"] + B --> C{"Have cached forced inclusion txs?"} + C -->|No| D["fetchNextDABatch()"] + C -->|Yes| E["Process from checkpoint"] + D --> E + E --> F["Create forced inclusion batch from checkpoint position"] + F --> G["Update checkpoint with consumed txs"] + G --> H["Persist checkpoint to disk"] + H --> I["Get mempool batch from BatchQueue"] + I --> J["Combine forced inclusion + mempool txs"] + J --> K["Return combined batch"] ``` ### Batch Verification Flow @@ -101,63 +117,134 @@ flowchart TD C --> D["Return status"] ``` +## Checkpoint System for Forced Inclusion + +### Purpose + +The checkpoint system ensures that **forced inclusion transactions from DA are never re-executed after a crash**. This is critical for correctness and determinism. + +### How It Works + +1. **Checkpoint Structure**: + + ```go + type Checkpoint struct { + DAHeight uint64 // Current DA block height being processed + TxIndex uint64 // Index within the forced inclusion batch + } + ``` + +2. **Processing Flow**: + - Fetch forced inclusion transactions from DA at epoch boundaries + - Cache transactions in memory (`cachedForcedInclusionTxs`) + - Process transactions incrementally from checkpoint position + - Update `TxIndex` after each batch + - When all transactions consumed, increment `DAHeight` and reset `TxIndex` + - **Persist checkpoint after every batch** + +3. **Crash Recovery**: + - On restart, load checkpoint from disk + - Re-fetch forced inclusion transactions from DA (entire epoch) + - Resume processing from `checkpoint.TxIndex` + - Skip already-processed transactions + +### Example + +``` +Initial state: Checkpoint(DAHeight: 100, TxIndex: 0) +DA returns 3 transactions at height 100 + +Batch 1: Process tx[0] + → Checkpoint(DAHeight: 100, TxIndex: 1) ✅ Persisted + +Batch 2: Process tx[1] + → Checkpoint(DAHeight: 100, TxIndex: 2) ✅ Persisted + +**CRASH OCCURS** + +Restart: Load Checkpoint(DAHeight: 100, TxIndex: 2) from disk + → Re-fetch transactions from DA height 100 + → Resume from tx[2] (skip tx[0] and tx[1]) + → ✅ No re-execution! + +Batch 3: Process tx[2] + → Checkpoint(DAHeight: 101, TxIndex: 0) ✅ Persisted +``` + +### Comparison with Mempool Transactions + +| Aspect | Forced Inclusion (DA) | Mempool Transactions | +| ----------------- | ------------------------------- | ---------------------- | +| Source | DA layer via forced inclusion | Client submissions | +| Persistence | Checkpoint (DAHeight + TxIndex) | BatchQueue | +| Crash Recovery | Resume from checkpoint position | Resume from queue | +| Re-execution Risk | ❌ Prevented by checkpoint | ❌ Prevented by queue | +| Priority | Always first in batch | After forced inclusion | + ## Database Layout -The single sequencer uses a key-value database to store transactions, batches, and metadata. Here's the layout of the database: +The single sequencer uses a key-value database to store batches, checkpoints, and metadata. Here's the layout of the database: ### Keys -| Key Pattern | Description | -|---------------------------|---------------------------------------------------------| -| `l` | Last batch hash | -| `seen:` | Marker for seen batch hashes | -| `` | Batch data (hash is the batch hash) | -| `tx:` | Transaction data (hash is SHA-256 of transaction bytes) | +| Key Pattern | Description | +| -------------------- | ------------------------------------------ | +| `/single/checkpoint` | Checkpoint for forced inclusion processing | +| `batches/` | Batch data (mempool transactions) | ### Key Details -#### Last Batch Hash Key (`l`) - -- Stores the hash of the last processed batch -- Used for recovery after a crash -- Value: Raw bytes of the hash - -#### Seen Batch Hash Keys (`seen:`) +#### Checkpoint Key (`/single/checkpoint`) -- Marks batches that have been seen and processed -- Used for batch verification -- Value: `1` (presence indicates the batch has been seen) +- Stores the forced inclusion checkpoint +- Used for crash recovery +- Value: Protobuf-encoded checkpoint data (`SequencerDACheckpoint`) +- Updated after every batch of forced inclusion transactions -#### Batch Keys (``) +#### Batch Keys (`batches/`) -- Stores the actual batch data +- Stores mempool transaction batches - Key is the hex-encoded hash of the batch - Value: Protobuf-encoded batch data - -#### Transaction Keys (`tx:`) - -- Stores individual transactions -- Key is prefixed with `tx:` followed by the SHA-256 hash of the transaction bytes -- Value: Raw transaction bytes +- Managed by `BatchQueue` ## Recovery Mechanism The single sequencer implements a robust recovery mechanism to handle crashes: -1. On startup, it loads the last batch hash from the database -2. It loads all seen batch hashes into memory -3. It loads all pending transactions from the database into the transaction queue -4. It loads all pending batches from the database into the batch queue -5. It resumes normal operation, continuing from where it left off +### For Forced Inclusion Transactions (DA) + +1. On startup, load checkpoint from database +2. If no checkpoint exists, initialize with genesis DA height +3. Resume from checkpoint position (`DAHeight` + `TxIndex`) +4. Re-fetch forced inclusion transactions from DA +5. Skip already-processed transactions using `TxIndex` +6. Continue processing from where it left off + +**Result**: ✅ No forced inclusion transactions are re-executed + +### For Mempool Transactions (Queue) + +1. On startup, load all pending batches from database into `BatchQueue` +2. Resume processing batches in order +3. Continue normal operation + +**Result**: ✅ No mempool transactions are lost + +### Combined Recovery + +Both systems work together to ensure: -This ensures that no transactions are lost in case of a crash, and the sequencer can continue operating seamlessly. +- **Correctness**: No transaction is lost or re-executed +- **Determinism**: Same state after crash recovery +- **Atomicity**: Checkpoint and queue are consistent ## Metrics The sequencer exposes the following metrics: | Metric | Description | -|-------------------------|--------------------------------------------------| +| ----------------------- | ------------------------------------------------ | | `gas_price` | The gas price of DA | | `last_blob_size` | The size in bytes of the last DA blob | | `transaction_status` | Count of transaction statuses for DA submissions | diff --git a/sequencers/single/doc.go b/sequencers/single/doc.go deleted file mode 100644 index b4f6abe716..0000000000 --- a/sequencers/single/doc.go +++ /dev/null @@ -1,4 +0,0 @@ -/* -This package implements a single sequencer. -*/ -package single diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index ea60cf003c..8b114e1e06 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -1,8 +1,10 @@ +// Package single implements a single sequencer. package single import ( "bytes" "context" + "encoding/hex" "errors" "fmt" "sync/atomic" @@ -18,42 +20,35 @@ import ( seqcommon "github.com/evstack/ev-node/sequencers/common" ) -var ( - // ErrInvalidId is returned when the chain id is invalid - ErrInvalidId = errors.New("invalid chain id") -) +// ErrInvalidId is returned when the chain id is invalid +var ErrInvalidId = errors.New("invalid chain id") // ForcedInclusionRetriever defines the interface for retrieving forced inclusion transactions from DA type ForcedInclusionRetriever interface { RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error) } -// pendingForcedInclusionTx represents a forced inclusion transaction that couldn't fit in the current epoch -type pendingForcedInclusionTx struct { - Data []byte - OriginalHeight uint64 -} - var _ coresequencer.Sequencer = (*Sequencer)(nil) // Sequencer implements core sequencing interface type Sequencer struct { - logger zerolog.Logger - - proposer bool + fiRetriever ForcedInclusionRetriever + logger zerolog.Logger + proposer bool Id []byte da coreda.DA batchTime time.Duration - - queue *BatchQueue // single queue for immediate availability + queue *BatchQueue // single queue for immediate availability // Forced inclusion support - fiRetriever ForcedInclusionRetriever - genesis genesis.Genesis - daHeight atomic.Uint64 - pendingForcedInclusionTxs []pendingForcedInclusionTx + daHeight atomic.Uint64 + checkpointStore *seqcommon.CheckpointStore + checkpoint *seqcommon.Checkpoint + + // Cached forced inclusion transactions from the current epoch + cachedForcedInclusionTxs [][]byte } // NewSequencer creates a new Single Sequencer @@ -67,32 +62,57 @@ func NewSequencer( proposer bool, maxQueueSize int, fiRetriever ForcedInclusionRetriever, - gen genesis.Genesis, + genesis genesis.Genesis, ) (*Sequencer, error) { s := &Sequencer{ - logger: logger, - da: da, - batchTime: batchTime, - Id: id, - queue: NewBatchQueue(db, "batches", maxQueueSize), - proposer: proposer, - fiRetriever: fiRetriever, - genesis: gen, - pendingForcedInclusionTxs: make([]pendingForcedInclusionTx, 0), + logger: logger, + da: da, + batchTime: batchTime, + Id: id, + queue: NewBatchQueue(db, "batches", maxQueueSize), + proposer: proposer, + fiRetriever: fiRetriever, + checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), } - s.SetDAHeight(gen.DAStartHeight) // will be overridden by the executor + s.SetDAHeight(genesis.DAStartHeight) // will be overridden by the executor loadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + + // Load batch queue from DB if err := s.queue.Load(loadCtx); err != nil { return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) } - // No DA submission loop here; handled by central manager + // Load checkpoint from DB, or initialize if none exists + checkpoint, err := s.checkpointStore.Load(loadCtx) + if err != nil { + if errors.Is(err, seqcommon.ErrCheckpointNotFound) { + // No checkpoint exists, initialize with current DA height + s.checkpoint = &seqcommon.Checkpoint{ + DAHeight: s.GetDAHeight(), + TxIndex: 0, + } + } else { + return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) + } + } else { + s.checkpoint = checkpoint + // If we had a non-zero tx index, we're resuming from a crash mid-block + // The transactions starting from that index are what we need + if checkpoint.TxIndex > 0 { + s.logger.Debug(). + Uint64("tx_index", checkpoint.TxIndex). + Uint64("da_height", checkpoint.DAHeight). + Msg("resuming from checkpoint within DA epoch") + } + } + return s, nil } // SubmitBatchTxs implements sequencing.Sequencer. +// It adds mempool transactions to a batch. func (c *Sequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { if !c.isValid(req.Id) { return nil, ErrInvalidId @@ -121,46 +141,26 @@ func (c *Sequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Submit } // GetNextBatch implements sequencing.Sequencer. +// It gets the next batch of transactions and fetch for forced included transactions. func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { if !c.isValid(req.Id) { return nil, ErrInvalidId } - currentDAHeight := c.GetDAHeight() - - forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) - if err != nil { - if errors.Is(err, coreda.ErrHeightFromFuture) { - c.logger.Debug(). - Uint64("da_height", currentDAHeight). - Msg("DA height from future, waiting for DA to produce block") - } else if !errors.Is(err, block.ErrForceInclusionNotConfigured) { - c.logger.Error().Err(err).Uint64("da_height", currentDAHeight).Msg("failed to retrieve forced inclusion transactions") + // If we have no cached transactions or we've consumed all from the current cache, + // fetch the next DA epoch + daHeight := c.GetDAHeight() + if len(c.cachedForcedInclusionTxs) == 0 || c.checkpoint.TxIndex >= uint64(len(c.cachedForcedInclusionTxs)) { + daEndHeight, err := c.fetchNextDAEpoch(ctx, req.MaxBytes) + if err != nil { + return nil, err } - // Still create an empty forced inclusion event - forcedTxsEvent = &block.ForcedInclusionEvent{ - Txs: [][]byte{}, - StartDaHeight: currentDAHeight, - EndDaHeight: currentDAHeight, - } - } else { - // Update DA height. - // If we are in between epochs, we still need to bump the da height. - // At the end of an epoch, we need to bump to go to the next epoch. - if forcedTxsEvent.EndDaHeight >= currentDAHeight { - c.SetDAHeight(forcedTxsEvent.EndDaHeight + 1) - } + daHeight = daEndHeight } - // Always try to process forced inclusion transactions (including pending from previous epochs) - forcedTxs := c.processForcedInclusionTxs(forcedTxsEvent, req.MaxBytes) - - c.logger.Debug(). - Int("tx_count", len(forcedTxs)). - Uint64("da_height_start", forcedTxsEvent.StartDaHeight). - Uint64("da_height_end", forcedTxsEvent.EndDaHeight). - Msg("retrieved forced inclusion transactions from DA") + // Process forced inclusion transactions from checkpoint position + forcedTxs := c.processForcedInclusionTxsFromCheckpoint(req.MaxBytes) // Calculate size used by forced inclusion transactions forcedTxsSize := 0 @@ -168,6 +168,32 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB forcedTxsSize += len(tx) } + // Update checkpoint after consuming forced inclusion transactions + if daHeight > 0 || len(forcedTxs) > 0 { + c.checkpoint.TxIndex += uint64(len(forcedTxs)) + + // If we've consumed all transactions from this DA epoch, move to next + if c.checkpoint.TxIndex >= uint64(len(c.cachedForcedInclusionTxs)) { + c.checkpoint.DAHeight = daHeight + 1 + c.checkpoint.TxIndex = 0 + c.cachedForcedInclusionTxs = nil + + // Update the global DA height + c.SetDAHeight(c.checkpoint.DAHeight) + } + + // Persist checkpoint + if err := c.checkpointStore.Save(ctx, c.checkpoint); err != nil { + return nil, fmt.Errorf("failed to save checkpoint: %w", err) + } + + c.logger.Debug(). + Int("forced_tx_count", len(forcedTxs)). + Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). + Msg("processed forced inclusion transactions and updated checkpoint") + } + batch, err := c.queue.Next(ctx) if err != nil { return nil, err @@ -187,7 +213,10 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // Would exceed limit, return remaining txs to the front of the queue excludedBatch := coresequencer.Batch{Transactions: batch.Transactions[i:]} if err := c.queue.Prepend(ctx, excludedBatch); err != nil { + // tx will be lost forever, but we shouldn't halt. + // halting doesn't not add any value. c.logger.Error().Err(err). + Str("tx", hex.EncodeToString(tx)). Int("excluded_count", len(batch.Transactions)-i). Msg("failed to prepend excluded transactions back to queue") } else { @@ -237,7 +266,6 @@ func (c *Sequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBat } if !c.proposer { - proofs, err := c.da.GetProofs(ctx, req.BatchData, c.Id) if err != nil { return nil, fmt.Errorf("failed to get proofs: %w", err) @@ -255,6 +283,7 @@ func (c *Sequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBat } return &coresequencer.VerifyBatchResponse{Status: true}, nil } + return &coresequencer.VerifyBatchResponse{Status: true}, nil } @@ -266,7 +295,6 @@ func (c *Sequencer) isValid(Id []byte) bool { // This should be called when the sequencer needs to sync to a specific DA height func (c *Sequencer) SetDAHeight(height uint64) { c.daHeight.Store(height) - c.logger.Debug().Uint64("da_height", height).Msg("DA height updated") } // GetDAHeight returns the current DA height @@ -274,95 +302,81 @@ func (c *Sequencer) GetDAHeight() uint64 { return c.daHeight.Load() } -// processForcedInclusionTxs processes forced inclusion transactions with size validation and pending queue management -func (c *Sequencer) processForcedInclusionTxs(event *block.ForcedInclusionEvent, maxBytes uint64) [][]byte { - currentSize := 0 - var newPendingTxs []pendingForcedInclusionTx - var validatedTxs [][]byte +// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint +func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { + currentDAHeight := c.checkpoint.DAHeight - // First, process any pending transactions from previous epochs - for _, pendingTx := range c.pendingForcedInclusionTxs { - txSize := seqcommon.GetBlobSize(pendingTx.Data) - - if !seqcommon.ValidateBlobSize(pendingTx.Data) { - c.logger.Warn(). - Uint64("original_height", pendingTx.OriginalHeight). - Int("blob_size", txSize). - Msg("pending forced inclusion blob exceeds absolute maximum size - skipping") - continue - } + c.logger.Debug(). + Uint64("da_height", currentDAHeight). + Uint64("tx_index", c.checkpoint.TxIndex). + Msg("fetching forced inclusion transactions from DA") - if seqcommon.WouldExceedCumulativeSize(currentSize, txSize, maxBytes) { + forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) + if err != nil { + if errors.Is(err, coreda.ErrHeightFromFuture) { c.logger.Debug(). - Uint64("original_height", pendingTx.OriginalHeight). - Int("current_size", currentSize). - Int("blob_size", txSize). - Msg("pending blob would exceed max size for this epoch - deferring again") - newPendingTxs = append(newPendingTxs, pendingTx) - continue + Uint64("da_height", currentDAHeight). + Msg("DA height from future, waiting for DA to produce block") + return 0, nil + } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { + // Forced inclusion not configured, continue without forced txs + c.cachedForcedInclusionTxs = [][]byte{} + return 0, nil } - validatedTxs = append(validatedTxs, pendingTx.Data) - currentSize += txSize - - c.logger.Debug(). - Uint64("original_height", pendingTx.OriginalHeight). - Int("blob_size", txSize). - Int("current_size", currentSize). - Msg("processed pending forced inclusion transaction") + return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) } - // Now process new transactions from this epoch - for _, tx := range event.Txs { - txSize := seqcommon.GetBlobSize(tx) - - if !seqcommon.ValidateBlobSize(tx) { + // Validate and filter transactions + validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) + skippedTxs := 0 + for _, tx := range forcedTxsEvent.Txs { + if uint64(len(tx)) > maxBytes { c.logger.Warn(). - Uint64("da_height", event.StartDaHeight). - Int("blob_size", txSize). - Msg("forced inclusion blob exceeds absolute maximum size - skipping") + Uint64("da_height", forcedTxsEvent.StartDaHeight). + Int("blob_size", len(tx)). + Uint64("max_bytes", maxBytes). + Msg("forced inclusion blob exceeds maximum size - skipping") + skippedTxs++ continue } + validTxs = append(validTxs, tx) + } - if seqcommon.WouldExceedCumulativeSize(currentSize, txSize, maxBytes) { - c.logger.Debug(). - Uint64("da_height", event.StartDaHeight). - Int("current_size", currentSize). - Int("blob_size", txSize). - Msg("blob would exceed max size for this epoch - deferring to pending queue") - - // Store for next call - newPendingTxs = append(newPendingTxs, pendingForcedInclusionTx{ - Data: tx, - OriginalHeight: event.StartDaHeight, - }) - continue - } + c.logger.Info(). + Int("valid_tx_count", len(validTxs)). + Int("skipped_tx_count", skippedTxs). + Uint64("da_height_start", forcedTxsEvent.StartDaHeight). + Uint64("da_height_end", forcedTxsEvent.EndDaHeight). + Msg("fetched forced inclusion transactions from DA") - validatedTxs = append(validatedTxs, tx) - currentSize += txSize + // Cache the transactions + c.cachedForcedInclusionTxs = validTxs - c.logger.Debug(). - Int("blob_size", txSize). - Int("current_size", currentSize). - Msg("processed forced inclusion transaction") - } + return forcedTxsEvent.EndDaHeight, nil +} - // Update pending queue - c.pendingForcedInclusionTxs = newPendingTxs - if len(newPendingTxs) > 0 { - c.logger.Info(). - Int("new_pending_count", len(newPendingTxs)). - Msg("stored pending forced inclusion transactions for next epoch") +// processForcedInclusionTxsFromCheckpoint processes forced inclusion transactions from checkpoint position +func (c *Sequencer) processForcedInclusionTxsFromCheckpoint(maxBytes uint64) [][]byte { + if len(c.cachedForcedInclusionTxs) == 0 || c.checkpoint.TxIndex >= uint64(len(c.cachedForcedInclusionTxs)) { + return [][]byte{} } - if len(validatedTxs) > 0 { - c.logger.Info(). - Int("processed_tx_count", len(validatedTxs)). - Int("pending_tx_count", len(newPendingTxs)). - Int("current_size", currentSize). - Msg("completed processing forced inclusion transactions") + var result [][]byte + var totalBytes uint64 + + // Start from the checkpoint index + for i := c.checkpoint.TxIndex; i < uint64(len(c.cachedForcedInclusionTxs)); i++ { + tx := c.cachedForcedInclusionTxs[i] + txSize := uint64(len(tx)) + + if totalBytes+txSize > maxBytes { + break + } + + result = append(result, tx) + totalBytes += txSize } - return validatedTxs + return result } diff --git a/sequencers/single/sequencer_test.go b/sequencers/single/sequencer_test.go index e7e8096789..6124642acb 100644 --- a/sequencers/single/sequencer_test.go +++ b/sequencers/single/sequencer_test.go @@ -33,6 +33,32 @@ func (m *MockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Con return args.Get(0).(*block.ForcedInclusionEvent), args.Error(1) } +// newTestSequencer creates a sequencer for tests that don't need full initialization +func newTestSequencer(t *testing.T, db ds.Batching, fiRetriever ForcedInclusionRetriever, proposer bool) *Sequencer { + ctx := context.Background() + logger := zerolog.Nop() + + gen := genesis.Genesis{ + ChainID: "test", + DAStartHeight: 100, + } + + seq, err := NewSequencer( + ctx, + logger, + db, + nil, + []byte("test"), + 1*time.Second, + proposer, + 0, // unlimited queue + fiRetriever, + gen, + ) + require.NoError(t, err) + return seq +} + func TestSequencer_SubmitBatchTxs(t *testing.T) { dummyDA := coreda.NewDummyDA(100_000_000, 10*time.Second) db := ds.NewMapDatastore() @@ -135,17 +161,32 @@ func TestSequencer_SubmitBatchTxs_EmptyBatch(t *testing.T) { func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) { db := ds.NewMapDatastore() + ctx := context.Background() logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - queue: NewBatchQueue(db, "batches", 0), // 0 = unlimited for test - Id: []byte("test"), - fiRetriever: mockRetriever, + + gen := genesis.Genesis{ + ChainID: "test", + DAStartHeight: 100, } + + seq, err := NewSequencer( + ctx, + logger, + db, + nil, + []byte("test"), + 1*time.Second, + true, + 0, // unlimited queue + mockRetriever, + gen, + ) + require.NoError(t, err) + defer func() { err := db.Close() if err != nil { @@ -154,7 +195,7 @@ func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) { }() // Test case where lastBatchHash and seq.lastBatchHash are both nil - res, err := seq.GetNextBatch(context.Background(), coresequencer.GetNextBatchRequest{Id: seq.Id}) + res, err := seq.GetNextBatch(ctx, coresequencer.GetNextBatchRequest{Id: seq.Id}) if err != nil { t.Fatalf("Failed to get next batch: %v", err) } @@ -175,17 +216,12 @@ func TestSequencer_GetNextBatch_Success(t *testing.T) { mockBatch := &coresequencer.Batch{Transactions: [][]byte{[]byte("tx1"), []byte("tx2")}} db := ds.NewMapDatastore() - logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - queue: NewBatchQueue(db, "batches", 0), // 0 = unlimited for test - Id: []byte("test"), - fiRetriever: mockRetriever, - } + + seq := newTestSequencer(t, db, mockRetriever, true) defer func() { err := db.Close() if err != nil { @@ -239,19 +275,14 @@ func TestSequencer_VerifyBatch(t *testing.T) { t.Run("Proposer Mode", func(t *testing.T) { mockDA := damocks.NewMockDA(t) - logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - Id: Id, - proposer: true, - da: mockDA, - queue: NewBatchQueue(db, "proposer_queue", 0), // 0 = unlimited for test - fiRetriever: mockRetriever, - } + db := ds.NewMapDatastore() + seq := newTestSequencer(t, db, mockRetriever, true) + seq.da = mockDA + defer db.Close() res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: seq.Id, BatchData: batchData}) assert.NoError(err) @@ -265,18 +296,14 @@ func TestSequencer_VerifyBatch(t *testing.T) { t.Run("Non-Proposer Mode", func(t *testing.T) { t.Run("Valid Proofs", func(t *testing.T) { mockDA := damocks.NewMockDA(t) - logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - Id: Id, - proposer: false, - da: mockDA, - queue: NewBatchQueue(db, "valid_proofs_queue", 0), - fiRetriever: mockRetriever, - } + + db := ds.NewMapDatastore() + seq := newTestSequencer(t, db, mockRetriever, false) + seq.da = mockDA + defer db.Close() mockDA.On("GetProofs", context.Background(), batchData, Id).Return(proofs, nil).Once() mockDA.On("Validate", mock.Anything, batchData, proofs, Id).Return([]bool{true, true}, nil).Once() @@ -290,18 +317,14 @@ func TestSequencer_VerifyBatch(t *testing.T) { t.Run("Invalid Proof", func(t *testing.T) { mockDA := damocks.NewMockDA(t) - logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - Id: Id, - proposer: false, - da: mockDA, - queue: NewBatchQueue(db, "invalid_proof_queue", 0), - fiRetriever: mockRetriever, - } + + db := ds.NewMapDatastore() + seq := newTestSequencer(t, db, mockRetriever, false) + seq.da = mockDA + defer db.Close() mockDA.On("GetProofs", context.Background(), batchData, Id).Return(proofs, nil).Once() mockDA.On("Validate", mock.Anything, batchData, proofs, Id).Return([]bool{true, false}, nil).Once() @@ -315,18 +338,14 @@ func TestSequencer_VerifyBatch(t *testing.T) { t.Run("GetProofs Error", func(t *testing.T) { mockDA := damocks.NewMockDA(t) - logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - Id: Id, - proposer: false, - da: mockDA, - queue: NewBatchQueue(db, "getproofs_err_queue", 0), - fiRetriever: mockRetriever, - } + + db := ds.NewMapDatastore() + seq := newTestSequencer(t, db, mockRetriever, false) + seq.da = mockDA + defer db.Close() expectedErr := errors.New("get proofs failed") mockDA.On("GetProofs", context.Background(), batchData, Id).Return(nil, expectedErr).Once() @@ -341,18 +360,14 @@ func TestSequencer_VerifyBatch(t *testing.T) { t.Run("Validate Error", func(t *testing.T) { mockDA := damocks.NewMockDA(t) - logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - Id: Id, - proposer: false, - da: mockDA, - queue: NewBatchQueue(db, "validate_err_queue", 0), - fiRetriever: mockRetriever, - } + + db := ds.NewMapDatastore() + seq := newTestSequencer(t, db, mockRetriever, false) + seq.da = mockDA + defer db.Close() expectedErr := errors.New("validate failed") mockDA.On("GetProofs", context.Background(), batchData, Id).Return(proofs, nil).Once() @@ -367,19 +382,14 @@ func TestSequencer_VerifyBatch(t *testing.T) { t.Run("Invalid ID", func(t *testing.T) { mockDA := damocks.NewMockDA(t) - logger := zerolog.Nop() mockRetriever := new(MockForcedInclusionRetriever) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, mock.Anything). Return(nil, block.ErrForceInclusionNotConfigured).Maybe() - seq := &Sequencer{ - logger: logger, - Id: Id, - proposer: false, - da: mockDA, - queue: NewBatchQueue(db, "invalid_queue", 0), - fiRetriever: mockRetriever, - } + db := ds.NewMapDatastore() + seq := newTestSequencer(t, db, mockRetriever, false) + seq.da = mockDA + defer db.Close() invalidId := []byte("invalid") res, err := seq.VerifyBatch(context.Background(), coresequencer.VerifyBatchRequest{Id: invalidId, BatchData: batchData}) @@ -548,12 +558,13 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { EndDaHeight: 100, }, nil).Once() - // Second call should process pending tx at DA height 101 (after first call bumped it to epochEnd + 1) + // Second call won't fetch from DA - tx2 is still in cache + // Only after both txs are consumed will we fetch from DA height 101 mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(101)).Return(&block.ForcedInclusionEvent{ Txs: [][]byte{}, StartDaHeight: 101, EndDaHeight: 101, - }, nil).Once() + }, nil).Maybe() gen := genesis.Genesis{ ChainID: "test-chain", @@ -588,8 +599,8 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { assert.Equal(t, 1, len(resp.Batch.Transactions), "Should only include first forced tx") assert.Equal(t, 100, len(resp.Batch.Transactions[0])) - // Verify pending tx is stored - assert.Equal(t, 1, len(seq.pendingForcedInclusionTxs), "Second tx should be pending") + // Verify checkpoint reflects that we've consumed one tx + assert.Equal(t, uint64(1), seq.checkpoint.TxIndex, "Should have consumed one tx from cache") // Second call - should get the pending forced tx resp2, err := seq.GetNextBatch(ctx, getReq) @@ -598,8 +609,9 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { assert.Equal(t, 1, len(resp2.Batch.Transactions), "Should include pending forced tx") assert.Equal(t, 80, len(resp2.Batch.Transactions[0])) - // Pending queue should now be empty - assert.Equal(t, 0, len(seq.pendingForcedInclusionTxs), "Pending queue should be empty") + // Checkpoint should have moved to next DA height after consuming all cached txs + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight, "Should have moved to next DA height") + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex, "TxIndex should be reset") mockFI.AssertExpectations(t) } @@ -612,20 +624,21 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) mockFI := &MockForcedInclusionRetriever{} - // First call returns a large forced tx that gets deferred - largeForcedTx := make([]byte, 150) + // First call returns a large forced tx that will get evicted + largeForcedTx1, largeForcedTx2 := make([]byte, 75), make([]byte, 75) mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ - Txs: [][]byte{largeForcedTx}, + Txs: [][]byte{largeForcedTx1, largeForcedTx2}, StartDaHeight: 100, EndDaHeight: 100, }, nil).Once() - // Second call returns no new forced txs at height 101 (after first call bumped DA height to epochEnd + 1), but pending should still be processed + // Second call won't fetch from DA - forced tx is still in cache + // Only after the forced tx is consumed will we fetch from DA height 101 mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(101)).Return(&block.ForcedInclusionEvent{ Txs: [][]byte{}, StartDaHeight: 101, EndDaHeight: 101, - }, nil).Once() + }, nil).Maybe() gen := genesis.Genesis{ ChainID: "test-chain", @@ -658,22 +671,22 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) require.NoError(t, err) // First call with maxBytes = 100 - // Large forced tx (150 bytes) won't fit, gets deferred - // Batch tx (50 bytes) should be returned getReq := coresequencer.GetNextBatchRequest{ Id: []byte("test-chain"), - MaxBytes: 100, + MaxBytes: 125, LastBatchData: nil, } resp, err := seq.GetNextBatch(ctx, getReq) require.NoError(t, err) require.NotNil(t, resp.Batch) - assert.Equal(t, 1, len(resp.Batch.Transactions), "Should have batch tx only") - assert.Equal(t, 50, len(resp.Batch.Transactions[0])) + assert.Equal(t, 2, len(resp.Batch.Transactions), "Should have 1 batch tx + 1 forced tx") + assert.Equal(t, 75, len(resp.Batch.Transactions[0])) // forced tx is 75 bytes + assert.Equal(t, 50, len(resp.Batch.Transactions[1])) // batch tx is 50 bytes - // Verify pending forced tx is stored - assert.Equal(t, 1, len(seq.pendingForcedInclusionTxs), "Large forced tx should be pending") + // Verify checkpoint shows no forced tx was consumed (tx too large) + assert.Equal(t, uint64(1), seq.checkpoint.TxIndex, "Only one forced tx should be consumed") + assert.Greater(t, len(seq.cachedForcedInclusionTxs), 1, "Remaining forced tx should still be cached") // Second call with larger maxBytes = 200 // Should process pending forced tx first @@ -687,10 +700,11 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) require.NoError(t, err) require.NotNil(t, resp2.Batch) assert.Equal(t, 1, len(resp2.Batch.Transactions), "Should include pending forced tx") - assert.Equal(t, 150, len(resp2.Batch.Transactions[0])) + assert.Equal(t, 75, len(resp2.Batch.Transactions[0])) - // Pending queue should now be empty - assert.Equal(t, 0, len(seq.pendingForcedInclusionTxs), "Pending queue should be empty") + // Checkpoint should reflect that forced tx was consumed + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight, "Should have moved to next DA height") + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex, "TxIndex should be reset after consuming all") mockFI.AssertExpectations(t) } @@ -706,18 +720,27 @@ func TestSequencer_QueueLimit_Integration(t *testing.T) { Return(nil, block.ErrForceInclusionNotConfigured).Maybe() // Create a sequencer with a small queue limit for testing + ctx := context.Background() logger := zerolog.Nop() - seq := &Sequencer{ - logger: logger, - da: mockDA, - batchTime: time.Second, - Id: []byte("test"), - queue: NewBatchQueue(db, "test_queue", 2), // Very small limit for testing - proposer: true, - fiRetriever: mockRetriever, + + gen := genesis.Genesis{ + ChainID: "test", + DAStartHeight: 100, } - ctx := context.Background() + seq, err := NewSequencer( + ctx, + logger, + db, + mockDA, + []byte("test"), + time.Second, + true, + 2, // Very small limit for testing + mockRetriever, + gen, + ) + require.NoError(t, err) // Test successful batch submission within limit batch1 := createTestBatch(t, 3) @@ -939,3 +962,189 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { // 4. ✅ Continues to throttle when queue fills up again // 5. ✅ Provides backpressure to prevent resource exhaustion } + +func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + // Create forced inclusion txs at DA height 100 + mockFI := &MockForcedInclusionRetriever{} + forcedTx1 := make([]byte, 100) + forcedTx2 := make([]byte, 80) + forcedTx3 := make([]byte, 90) + mockFI.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ + Txs: [][]byte{forcedTx1, forcedTx2, forcedTx3}, + StartDaHeight: 100, + EndDaHeight: 100, + }, nil) + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + } + + // Create first sequencer instance + seq1, err := NewSequencer( + ctx, + logger, + db, + nil, + []byte("test-chain"), + 1*time.Second, + true, + 100, + mockFI, + gen, + ) + require.NoError(t, err) + + // First call - get first forced tx + getReq := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 120, + LastBatchData: nil, + } + + resp1, err := seq1.GetNextBatch(ctx, getReq) + require.NoError(t, err) + require.NotNil(t, resp1.Batch) + assert.Equal(t, 1, len(resp1.Batch.Transactions), "Should get first forced tx") + assert.Equal(t, 100, len(resp1.Batch.Transactions[0])) + + // Verify checkpoint is persisted + assert.Equal(t, uint64(1), seq1.checkpoint.TxIndex, "Checkpoint should show 1 tx consumed") + assert.Equal(t, uint64(100), seq1.checkpoint.DAHeight, "Checkpoint should be at DA height 100") + + // Second call - get second forced tx + resp2, err := seq1.GetNextBatch(ctx, getReq) + require.NoError(t, err) + require.NotNil(t, resp2.Batch) + assert.Equal(t, 1, len(resp2.Batch.Transactions), "Should get second forced tx") + assert.Equal(t, 80, len(resp2.Batch.Transactions[0])) + + // Verify checkpoint updated + assert.Equal(t, uint64(2), seq1.checkpoint.TxIndex, "Checkpoint should show 2 txs consumed") + + // SIMULATE CRASH: Create new sequencer instance with same DB + // This simulates a node restart/crash + seq2, err := NewSequencer( + ctx, + logger, + db, + nil, + []byte("test-chain"), + 1*time.Second, + true, + 100, + mockFI, + gen, + ) + require.NoError(t, err) + + // Verify checkpoint was loaded from disk + assert.Equal(t, uint64(2), seq2.checkpoint.TxIndex, "Checkpoint should be loaded from disk") + assert.Equal(t, uint64(100), seq2.checkpoint.DAHeight, "DA height should be loaded from disk") + + // Third call on new sequencer instance - should get third forced tx (NOT re-execute first two) + resp3, err := seq2.GetNextBatch(ctx, getReq) + require.NoError(t, err) + require.NotNil(t, resp3.Batch) + assert.Equal(t, 1, len(resp3.Batch.Transactions), "Should get third forced tx (resume from checkpoint)") + assert.Equal(t, 90, len(resp3.Batch.Transactions[0]), "Should be third tx, not first") + + // Verify checkpoint moved to next DA height after consuming all + assert.Equal(t, uint64(101), seq2.checkpoint.DAHeight, "Should have moved to next DA height") + assert.Equal(t, uint64(0), seq2.checkpoint.TxIndex, "TxIndex should be reset") + + t.Log("✅ Checkpoint system successfully prevented re-execution of DA transactions after crash") + mockFI.AssertExpectations(t) +} + +func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { + db := ds.NewMapDatastore() + ctx := context.Background() + + mockRetriever := new(MockForcedInclusionRetriever) + + // First DA epoch returns empty transactions + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)). + Return(&block.ForcedInclusionEvent{ + Txs: [][]byte{}, + StartDaHeight: 100, + EndDaHeight: 105, + }, nil).Once() + + // Second DA epoch also returns empty transactions + mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(106)). + Return(&block.ForcedInclusionEvent{ + Txs: [][]byte{}, + StartDaHeight: 106, + EndDaHeight: 111, + }, nil).Once() + + gen := genesis.Genesis{ + ChainID: "test", + DAStartHeight: 100, + DAEpochForcedInclusion: 5, + } + + seq, err := NewSequencer( + ctx, + zerolog.Nop(), + db, + nil, + []byte("test"), + 1*time.Second, + true, + 1000, + mockRetriever, + gen, + ) + require.NoError(t, err) + + defer func() { + err := db.Close() + if err != nil { + t.Fatalf("Failed to close sequencer: %v", err) + } + }() + + req := coresequencer.GetNextBatchRequest{ + Id: seq.Id, + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Initial DA height should be 100 + assert.Equal(t, uint64(100), seq.GetDAHeight()) + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + + // First batch - empty DA block at height 100 + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 0, len(resp.Batch.Transactions)) + + // DA height should have increased to 106 even though no transactions were processed + assert.Equal(t, uint64(106), seq.GetDAHeight()) + assert.Equal(t, uint64(106), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + + // Second batch - empty DA block at height 106 + resp, err = seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Batch) + assert.Equal(t, 0, len(resp.Batch.Transactions)) + + // DA height should have increased to 112 + assert.Equal(t, uint64(112), seq.GetDAHeight()) + assert.Equal(t, uint64(112), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + + mockRetriever.AssertExpectations(t) +} diff --git a/types/epoch_test.go b/types/epoch_test.go index 5787126186..c293bcd350 100644 --- a/types/epoch_test.go +++ b/types/epoch_test.go @@ -91,6 +91,13 @@ func TestCalculateEpochNumber(t *testing.T) { daHeight: 105, expectedEpoch: 6, }, + { + name: "epoch size 0", + daStartHeight: 100, + daEpochSize: 0, + daHeight: 105, + expectedEpoch: 1, + }, } for _, tt := range tests { diff --git a/types/pb/evnode/v1/state.pb.go b/types/pb/evnode/v1/state.pb.go index ea7610b7d9..87030d2fd9 100644 --- a/types/pb/evnode/v1/state.pb.go +++ b/types/pb/evnode/v1/state.pb.go @@ -123,6 +123,61 @@ func (x *State) GetLastHeaderHash() []byte { return nil } +// SequencerDACheckpoint tracks the position in the DA where transactions were last processed +type SequencerDACheckpoint struct { + state protoimpl.MessageState `protogen:"open.v1"` + // DA block height being processed + DaHeight uint64 `protobuf:"varint,1,opt,name=da_height,json=daHeight,proto3" json:"da_height,omitempty"` + // Index of the next transaction to process within the DA block's forced inclusion batch + TxIndex uint64 `protobuf:"varint,2,opt,name=tx_index,json=txIndex,proto3" json:"tx_index,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SequencerDACheckpoint) Reset() { + *x = SequencerDACheckpoint{} + mi := &file_evnode_v1_state_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SequencerDACheckpoint) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SequencerDACheckpoint) ProtoMessage() {} + +func (x *SequencerDACheckpoint) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_state_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SequencerDACheckpoint.ProtoReflect.Descriptor instead. +func (*SequencerDACheckpoint) Descriptor() ([]byte, []int) { + return file_evnode_v1_state_proto_rawDescGZIP(), []int{1} +} + +func (x *SequencerDACheckpoint) GetDaHeight() uint64 { + if x != nil { + return x.DaHeight + } + return 0 +} + +func (x *SequencerDACheckpoint) GetTxIndex() uint64 { + if x != nil { + return x.TxIndex + } + return 0 +} + var File_evnode_v1_state_proto protoreflect.FileDescriptor const file_evnode_v1_state_proto_rawDesc = "" + @@ -136,7 +191,10 @@ const file_evnode_v1_state_proto_rawDesc = "" + "\x0flast_block_time\x18\x05 \x01(\v2\x1a.google.protobuf.TimestampR\rlastBlockTime\x12\x1b\n" + "\tda_height\x18\x06 \x01(\x04R\bdaHeight\x12\x19\n" + "\bapp_hash\x18\b \x01(\fR\aappHash\x12(\n" + - "\x10last_header_hash\x18\t \x01(\fR\x0elastHeaderHashJ\x04\b\a\x10\bB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\x10last_header_hash\x18\t \x01(\fR\x0elastHeaderHashJ\x04\b\a\x10\b\"O\n" + + "\x15SequencerDACheckpoint\x12\x1b\n" + + "\tda_height\x18\x01 \x01(\x04R\bdaHeight\x12\x19\n" + + "\btx_index\x18\x02 \x01(\x04R\atxIndexB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_state_proto_rawDescOnce sync.Once @@ -150,15 +208,16 @@ func file_evnode_v1_state_proto_rawDescGZIP() []byte { return file_evnode_v1_state_proto_rawDescData } -var file_evnode_v1_state_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_evnode_v1_state_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_evnode_v1_state_proto_goTypes = []any{ (*State)(nil), // 0: evnode.v1.State - (*Version)(nil), // 1: evnode.v1.Version - (*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp + (*SequencerDACheckpoint)(nil), // 1: evnode.v1.SequencerDACheckpoint + (*Version)(nil), // 2: evnode.v1.Version + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp } var file_evnode_v1_state_proto_depIdxs = []int32{ - 1, // 0: evnode.v1.State.version:type_name -> evnode.v1.Version - 2, // 1: evnode.v1.State.last_block_time:type_name -> google.protobuf.Timestamp + 2, // 0: evnode.v1.State.version:type_name -> evnode.v1.Version + 3, // 1: evnode.v1.State.last_block_time:type_name -> google.protobuf.Timestamp 2, // [2:2] is the sub-list for method output_type 2, // [2:2] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name @@ -178,7 +237,7 @@ func file_evnode_v1_state_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_evnode_v1_state_proto_rawDesc), len(file_evnode_v1_state_proto_rawDesc)), NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, },