diff --git a/README.md b/README.md index a738256..5ae1bb0 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ Metrics will be available at `http://localhost:2112/metrics` - `--balance.addresses`: Comma-separated Celestia addresses to monitor (enables balance checking) - `--balance.consensus-rpc-urls`: Comma-separated consensus RPC URLs for balance queries (required if balance.addresses is set) - `--balance.scrape-interval`: Balance check scrape interval in seconds (default: 30) +- `--verifier.workers`: Number of concurrent workers for block verification (default: 50) - `--verbose`: Enable verbose logging (default: false) ### Example with Custom Endpoints diff --git a/cmd/monitor.go b/cmd/monitor.go index 57dc084..c0f6a42 100644 --- a/cmd/monitor.go +++ b/cmd/monitor.go @@ -42,6 +42,7 @@ const ( flagBalanceAddresses = "balance.addresses" flagBalanceRpcUrls = "balance.consensus-rpc-urls" flagBalanceScrapeInterval = "balance.scrape-interval" + flagVerifierWorkers = "verifier.workers" metricsPath = "/metrics" ) @@ -67,6 +68,7 @@ type flagValues struct { balanceAddresses string balanceRpcUrls string balanceScrapeInterval int + verifierWorkers int } func NewMonitorCmd() *cobra.Command { @@ -99,6 +101,7 @@ func NewMonitorCmd() *cobra.Command { cmd.Flags().StringVar(&flags.balanceAddresses, flagBalanceAddresses, "", "Comma-separated celestia addresses to monitor (enables balance checking)") cmd.Flags().StringVar(&flags.balanceRpcUrls, flagBalanceRpcUrls, "", "Comma-separated consensus rpc urls for balance queries (required if balance.addresses is set)") cmd.Flags().IntVar(&flags.balanceScrapeInterval, flagBalanceScrapeInterval, 30, "Balance check scrape interval in seconds (default: 30)") + cmd.Flags().IntVar(&flags.verifierWorkers, flagVerifierWorkers, 50, "Number of concurrent workers for block verification (default: 50)") if err := cmd.MarkFlagRequired(flagHeaderNS); err != nil { panic(err) @@ -204,6 +207,7 @@ func monitorAndExportMetrics(_ *cobra.Command, _ []string) error { cfg.HeaderNS, cfg.DataNS, flags.chainID, + flags.verifierWorkers, logger, ), } diff --git a/pkg/exporters/verifier/verifier.go b/pkg/exporters/verifier/verifier.go index da7f98e..8a5b306 100644 --- a/pkg/exporters/verifier/verifier.go +++ b/pkg/exporters/verifier/verifier.go @@ -2,13 +2,15 @@ package verifier import ( "context" + "sync" + "time" + "github.com/ethereum/go-ethereum/core/types" "github.com/evstack/ev-metrics/internal/clients/celestia" "github.com/evstack/ev-metrics/internal/clients/evm" "github.com/evstack/ev-metrics/internal/clients/evnode" "github.com/evstack/ev-metrics/pkg/metrics" "github.com/rs/zerolog" - "time" ) var _ metrics.Exporter = &exporter{} @@ -20,6 +22,7 @@ func NewMetricsExporter( evmClient *evm.Client, headerNS, dataNS []byte, chainID string, + workers int, logger zerolog.Logger, ) metrics.Exporter { return &exporter{ @@ -29,6 +32,7 @@ func NewMetricsExporter( headerNS: headerNS, dataNS: dataNS, chainID: chainID, + workers: workers, logger: logger.With().Str("component", "verification_monitor").Logger(), } } @@ -41,6 +45,7 @@ type exporter struct { headerNS []byte dataNS []byte chainID string + workers int logger zerolog.Logger } @@ -53,14 +58,33 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error } defer sub.Unsubscribe() + // create buffered channel for block queue + blockQueue := make(chan *types.Header, e.workers*2) + + // start work pool + var workerGroup sync.WaitGroup + for i := 0; i < e.workers; i++ { + workerGroup.Add(1) + workerID := i + go func() { + defer workerGroup.Done() + e.processBlocks(ctx, m, workerID, blockQueue) + }() + } + + e.logger.Info().Int("workers", e.workers).Msg("started verification work pool") + // ticker to refresh submission duration metric every 10 seconds refreshTicker := time.NewTicker(10 * time.Second) defer refreshTicker.Stop() + // main subscription loop for { select { case <-ctx.Done(): e.logger.Info().Msg("stopping block verification") + close(blockQueue) + workerGroup.Wait() return nil case <-refreshTicker.C: // ensure that submission duration is always included in the 60 second window. @@ -75,12 +99,31 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error Time("arrival_time", arrivalTime). Msg("received block header from subscription") - // spawn a goroutine to handle this block's retries - go e.verifyBlock(ctx, m, header) + // send block to work pool, blocking until space is available + select { + case blockQueue <- header: + // block queued successfully + case <-ctx.Done(): + close(blockQueue) + workerGroup.Wait() + return nil + } } } } +// processBlocks processes blocks from the queue +func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue <-chan *types.Header) { + logger := e.logger.With().Int("worker_id", workerID).Logger() + logger.Debug().Msg("worker started") + + for header := range blockQueue { + e.verifyBlock(ctx, m, header) + } + + logger.Debug().Msg("worker stopped") +} + func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight, daHeight uint64, verified bool, submissionDuration time.Duration) { if verified { m.RecordSubmissionDaHeight(e.chainID, namespace, daHeight)