From 44dd413bf599db247b55da75e55ea278d229ef88 Mon Sep 17 00:00:00 2001 From: chatton Date: Tue, 2 Dec 2025 10:38:18 +0000 Subject: [PATCH] chore: resubmit header on failed verification --- pkg/exporters/verifier/verifier.go | 55 ++++++++++++++---------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/pkg/exporters/verifier/verifier.go b/pkg/exporters/verifier/verifier.go index 8a5b306..5ffde23 100644 --- a/pkg/exporters/verifier/verifier.go +++ b/pkg/exporters/verifier/verifier.go @@ -113,12 +113,27 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error } // processBlocks processes blocks from the queue -func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue <-chan *types.Header) { +func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) { logger := e.logger.With().Int("worker_id", workerID).Logger() logger.Debug().Msg("worker started") for header := range blockQueue { - e.verifyBlock(ctx, m, header) + reEnqueue := e.verifyBlock(ctx, m, header) + if reEnqueue { + // re-queue with delay in background + go func(h *types.Header) { + select { + case <-time.After(5 * time.Minute): + select { + case blockQueue <- h: + logger.Debug().Uint64("block", h.Number.Uint64()).Msg("re-queued block after cooldown") + case <-ctx.Done(): + // graceful shutdown, don't send + } + case <-ctx.Done(): + } + }(header) + } } logger.Debug().Msg("worker stopped") @@ -135,7 +150,7 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight, } // verifyBlock attempts to verify a DA height for a given block status. -func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) { +func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool { blockHeight := header.Number.Uint64() // check if block has transactions @@ -175,7 +190,7 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header * case <-ctx.Done(): // context cancelled during graceful shutdown, not an error logger.Debug().Msg("block verification stopped due to shutdown") - return + return false case <-time.After(interval): // proceed with retry } @@ -226,19 +241,8 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header * Dur("duration", time.Since(startTime)). Msg("header blob verified on Celestia") e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return - } - - // verification failed - if retries >= len(retryIntervals)+1 { - logger.Error(). - Uint64("da_height", daHeight). - Dur("duration", time.Since(startTime)). - Msg("max retries reached - header blob not verified") - e.onVerified(m, namespace, blockHeight, daHeight, false, 0) - return + return false } - logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") case "data": if len(blockResultWithBlobs.DataBlob) == 0 { @@ -246,7 +250,7 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header * Dur("duration", time.Since(startTime)). Msg("empty data block - no verification needed") e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return + return false } // perform actual verification between bytes from ev-node and Celestia. @@ -262,27 +266,18 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header * Dur("duration", time.Since(startTime)). Msg("data blob verified on Celestia") e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return - } - - // verification failed - if retries >= len(retryIntervals)+1 { - logger.Error(). - Uint64("da_height", daHeight). - Dur("duration", time.Since(startTime)). - Msg("max retries reached - data blob not verified") - e.onVerified(m, namespace, blockHeight, daHeight, false, 0) - return + return false } logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") default: logger.Error().Str("namespace", namespace).Msg("unknown namespace type") - return + return false } } // if loop completes without success, log final error - logger.Error().Msg("max retries exhausted - ALERT: failed to verify block") + logger.Error().Msg("max retries exhausted: failed to verify block") e.onVerified(m, namespace, blockHeight, 0, false, 0) + return true }