Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 25 additions & 30 deletions pkg/exporters/verifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,27 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
}

// processBlocks processes blocks from the queue
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue <-chan *types.Header) {
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) {
logger := e.logger.With().Int("worker_id", workerID).Logger()
logger.Debug().Msg("worker started")

for header := range blockQueue {
e.verifyBlock(ctx, m, header)
reEnqueue := e.verifyBlock(ctx, m, header)
if reEnqueue {
// re-queue with delay in background
go func(h *types.Header) {
select {
case <-time.After(5 * time.Minute):
select {
case blockQueue <- h:
logger.Debug().Uint64("block", h.Number.Uint64()).Msg("re-queued block after cooldown")
case <-ctx.Done():
// graceful shutdown, don't send
}
case <-ctx.Done():
}
}(header)
}
}

logger.Debug().Msg("worker stopped")
Expand All @@ -135,7 +150,7 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight,
}

// verifyBlock attempts to verify a DA height for a given block status.
func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) {
func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool {
blockHeight := header.Number.Uint64()

// check if block has transactions
Expand Down Expand Up @@ -175,7 +190,7 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
case <-ctx.Done():
// context cancelled during graceful shutdown, not an error
logger.Debug().Msg("block verification stopped due to shutdown")
return
return false
case <-time.After(interval):
// proceed with retry
}
Expand Down Expand Up @@ -226,27 +241,16 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
Dur("duration", time.Since(startTime)).
Msg("header blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return
}

// verification failed
if retries >= len(retryIntervals)+1 {
logger.Error().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("max retries reached - header blob not verified")
e.onVerified(m, namespace, blockHeight, daHeight, false, 0)
return
Comment on lines -233 to -239
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ends up being unreachable and is handled by the case at the very end of the fn

return false
}
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")

case "data":
if len(blockResultWithBlobs.DataBlob) == 0 {
logger.Info().
Dur("duration", time.Since(startTime)).
Msg("empty data block - no verification needed")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return
return false
}

// perform actual verification between bytes from ev-node and Celestia.
Expand All @@ -262,27 +266,18 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
Dur("duration", time.Since(startTime)).
Msg("data blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return
}

// verification failed
if retries >= len(retryIntervals)+1 {
logger.Error().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("max retries reached - data blob not verified")
e.onVerified(m, namespace, blockHeight, daHeight, false, 0)
return
return false
}
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")

default:
logger.Error().Str("namespace", namespace).Msg("unknown namespace type")
return
return false
}
}

// if loop completes without success, log final error
logger.Error().Msg("max retries exhausted - ALERT: failed to verify block")
logger.Error().Msg("max retries exhausted: failed to verify block")
e.onVerified(m, namespace, blockHeight, 0, false, 0)
return true
}