Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (bc *Components) GetLastState() types.State {
return types.State{}
}

// Start starts all components and monitors for critical errors
// Start starts all components and monitors for critical errors.
// It is blocking and returns when the context is cancelled or an error occurs
func (bc *Components) Start(ctx context.Context) error {
ctxWithCancel, cancel := context.WithCancel(ctx)

Expand Down Expand Up @@ -137,6 +138,7 @@ func NewSyncComponents(
metrics *Metrics,
blockOpts BlockOptions,
) (*Components, error) {
logger.Info().Msg("Starting in sync-mode")
cacheManager, err := cache.NewManager(config, store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create cache manager: %w", err)
Expand Down Expand Up @@ -200,6 +202,7 @@ func NewAggregatorComponents(
metrics *Metrics,
blockOpts BlockOptions,
) (*Components, error) {
logger.Info().Msg("Starting in aggregator-mode")
cacheManager, err := cache.NewManager(config, store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create cache manager: %w", err)
Expand Down
32 changes: 2 additions & 30 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (e *Executor) initializeState() error {
LastBlockHeight: e.genesis.InitialHeight - 1,
LastBlockTime: e.genesis.StartTime,
AppHash: stateRoot,
DAHeight: 0,
DAHeight: e.genesis.DAStartHeight,
}
}

Expand Down Expand Up @@ -633,35 +633,7 @@ func (e *Executor) validateBlock(lastState types.State, header *types.SignedHead
return fmt.Errorf("invalid header: %w", err)
}

// Validate header against data
if err := types.Validate(header, data); err != nil {
return fmt.Errorf("header-data validation failed: %w", err)
}

// Check chain ID
if header.ChainID() != lastState.ChainID {
return fmt.Errorf("chain ID mismatch: expected %s, got %s",
lastState.ChainID, header.ChainID())
}

// Check height
expectedHeight := lastState.LastBlockHeight + 1
if header.Height() != expectedHeight {
return fmt.Errorf("invalid height: expected %d, got %d",
expectedHeight, header.Height())
}

// Check timestamp
if header.Height() > 1 && lastState.LastBlockTime.After(header.Time()) {
return fmt.Errorf("block time must be strictly increasing")
}

// Check app hash
if !bytes.Equal(header.AppHash, lastState.AppHash) {
return fmt.Errorf("app hash mismatch")
}

return nil
return lastState.AssertValidForNextState(header, data)
}

// sendCriticalError sends a critical error to the error channel without blocking
Expand Down
16 changes: 8 additions & 8 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil)

events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
require.Len(t, events, 1)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Header with no data hash present should trigger empty data creation (per current logic)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil)

events := r.processBlobs(context.Background(), [][]byte{hb}, 88)
require.Len(t, events, 1)
Expand All @@ -223,7 +223,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil)
hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil, nil)
gotH := r.tryDecodeHeader(hb, 123)
require.NotNil(t, gotH)
assert.Equal(t, sh.Hash().String(), gotH.Hash().String())
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) {

// Prepare header/data blobs
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 9, addr, pub, signer, 1)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data, nil)

cfg := config.DefaultConfig()
cfg.DA.Namespace = "nsHdr"
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {

// Create header and data for the same block height but from different DA heights
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil)

// Process header from DA height 100 first
events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100)
Expand Down Expand Up @@ -361,9 +361,9 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
data4Bin, data4 := makeSignedDataBytes(t, gen.ChainID, 4, addr, pub, signer, 2)
data5Bin, data5 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1)

hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data)
hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data)
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data)
hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data, nil)
hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data, nil)
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil)

// Process multiple headers from DA height 200 - should be stored as pending
events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
Expand Down
60 changes: 41 additions & 19 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (s *Syncer) GetLastState() types.State {

stateCopy := *state
stateCopy.AppHash = bytes.Clone(state.AppHash)
stateCopy.LastHeaderHash = bytes.Clone(state.LastHeaderHash)

return stateCopy
}
Expand All @@ -182,21 +183,34 @@ func (s *Syncer) initializeState() error {
// Load state from store
state, err := s.store.GetState(s.ctx)
if err != nil {
// Use genesis state if no state exists
// Initialize new chain state for a fresh full node (no prior state on disk)
// Mirror executor initialization to ensure AppHash matches headers produced by the sequencer.
stateRoot, _, initErr := s.exec.InitChain(
s.ctx,
s.genesis.StartTime,
s.genesis.InitialHeight,
s.genesis.ChainID,
)
if initErr != nil {
return fmt.Errorf("failed to initialize execution client: %w", initErr)
}

state = types.State{
ChainID: s.genesis.ChainID,
InitialHeight: s.genesis.InitialHeight,
LastBlockHeight: s.genesis.InitialHeight - 1,
LastBlockTime: s.genesis.StartTime,
DAHeight: 0,
DAHeight: s.genesis.DAStartHeight,
AppHash: stateRoot,
}
}

if state.DAHeight < s.genesis.DAStartHeight {
return fmt.Errorf("DA height (%d) is lower than DA start height (%d)", state.DAHeight, s.genesis.DAStartHeight)
}
s.SetLastState(state)

// Set DA height
daHeight := max(state.DAHeight, s.genesis.DAStartHeight)
s.SetDAHeight(daHeight)
s.SetDAHeight(state.DAHeight)

s.logger.Info().
Uint64("height", state.LastBlockHeight).
Expand Down Expand Up @@ -385,7 +399,14 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
if err := s.trySyncNextBlock(event); err != nil {
s.logger.Error().Err(err).Msg("failed to sync next block")
// If the error is not due to an validation error, re-store the event as pending
if !errors.Is(err, errInvalidBlock) {
switch {
case errors.Is(err, errInvalidBlock):
// do not reschedule
case errors.Is(err, errInvalidState):
s.sendCriticalError(fmt.Errorf("invalid state detected (block-height %d, state-height %d) "+
"- block references do not match local state. Manual intervention required: %w", event.Header.Height(),
s.GetLastState().LastBlockHeight, err))
default:
s.cache.SetPendingEvent(height, event)
}
return
Expand All @@ -402,8 +423,12 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
}
}

// errInvalidBlock is returned when a block is failing validation
var errInvalidBlock = errors.New("invalid block")
var (
// errInvalidBlock is returned when a block is failing validation
errInvalidBlock = errors.New("invalid block")
// errInvalidState is returned when the state has diverged from the DA blocks
errInvalidState = errors.New("invalid state")
)

// trySyncNextBlock attempts to sync the next available block
// the event is always the next block in sequence as processHeightEvent ensures it.
Expand All @@ -425,10 +450,13 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
// Compared to the executor logic where the current block needs to be applied first,
// here only the previous block needs to be applied to proceed to the verification.
// The header validation must be done before applying the block to avoid executing gibberish
if err := s.validateBlock(header, data); err != nil {
if err := s.validateBlock(currentState, data, header); err != nil {
// remove header as da included (not per se needed, but keep cache clean)
s.cache.RemoveHeaderDAIncluded(headerHash)
return errors.Join(errInvalidBlock, fmt.Errorf("failed to validate block: %w", err))
if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) {
return errors.Join(errInvalidBlock, err)
}
return err
}

// Apply block
Expand Down Expand Up @@ -534,23 +562,17 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade
// NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct
// or if the data was gibberish and somehow passed all validation prior but the header was correct
// we are still losing both in the pending event. This should never happen.
func (s *Syncer) validateBlock(
header *types.SignedHeader,
data *types.Data,
) error {
func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error {
// Set custom verifier for aggregator node signature
header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider)

// Validate header with data
if err := header.ValidateBasicWithData(data); err != nil {
return fmt.Errorf("invalid header: %w", err)
}

// Validate header against data
if err := types.Validate(header, data); err != nil {
return fmt.Errorf("header-data validation failed: %w", err)
if err := currState.AssertValidForNextState(header, data); err != nil {
return errors.Join(errInvalidState, err)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {
Return(nil, errors.New("temporary failure")).Once()

// Second call - success (should reset backoff and increment DA height)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil)
data := &types.Data{
Metadata: &types.Metadata{
ChainID: gen.ChainID,
Expand Down
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
heightEvents := make([]common.DAHeightEvent, totalHeights)
for i := uint64(0); i < totalHeights; i++ {
blockHeight, daHeight := i+gen.InitialHeight, i+daHeightOffset
_, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil)
_, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil, nil)
d := &types.Data{Metadata: &types.Metadata{ChainID: gen.ChainID, Height: blockHeight, Time: uint64(time.Now().UnixNano())}}
heightEvents[i] = common.DAHeightEvent{Header: sh, Data: d, DaHeight: daHeight}
}
Expand Down
Loading
Loading