Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Metrics will be available at `http://localhost:2112/metrics`
- `--balance.addresses`: Comma-separated Celestia addresses to monitor (enables balance checking)
- `--balance.consensus-rpc-urls`: Comma-separated consensus RPC URLs for balance queries (required if balance.addresses is set)
- `--balance.scrape-interval`: Balance check scrape interval in seconds (default: 30)
- `--verifier.workers`: Number of concurrent workers for block verification (default: 50)
- `--verbose`: Enable verbose logging (default: false)

### Example with Custom Endpoints
Expand Down
4 changes: 4 additions & 0 deletions cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
flagBalanceAddresses = "balance.addresses"
flagBalanceRpcUrls = "balance.consensus-rpc-urls"
flagBalanceScrapeInterval = "balance.scrape-interval"
flagVerifierWorkers = "verifier.workers"

metricsPath = "/metrics"
)
Expand All @@ -67,6 +68,7 @@ type flagValues struct {
balanceAddresses string
balanceRpcUrls string
balanceScrapeInterval int
verifierWorkers int
}

func NewMonitorCmd() *cobra.Command {
Expand Down Expand Up @@ -99,6 +101,7 @@ func NewMonitorCmd() *cobra.Command {
cmd.Flags().StringVar(&flags.balanceAddresses, flagBalanceAddresses, "", "Comma-separated celestia addresses to monitor (enables balance checking)")
cmd.Flags().StringVar(&flags.balanceRpcUrls, flagBalanceRpcUrls, "", "Comma-separated consensus rpc urls for balance queries (required if balance.addresses is set)")
cmd.Flags().IntVar(&flags.balanceScrapeInterval, flagBalanceScrapeInterval, 30, "Balance check scrape interval in seconds (default: 30)")
cmd.Flags().IntVar(&flags.verifierWorkers, flagVerifierWorkers, 50, "Number of concurrent workers for block verification (default: 50)")

if err := cmd.MarkFlagRequired(flagHeaderNS); err != nil {
panic(err)
Expand Down Expand Up @@ -204,6 +207,7 @@ func monitorAndExportMetrics(_ *cobra.Command, _ []string) error {
cfg.HeaderNS,
cfg.DataNS,
flags.chainID,
flags.verifierWorkers,
logger,
),
}
Expand Down
49 changes: 46 additions & 3 deletions pkg/exporters/verifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package verifier

import (
"context"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/evstack/ev-metrics/internal/clients/celestia"
"github.com/evstack/ev-metrics/internal/clients/evm"
"github.com/evstack/ev-metrics/internal/clients/evnode"
"github.com/evstack/ev-metrics/pkg/metrics"
"github.com/rs/zerolog"
"time"
)

var _ metrics.Exporter = &exporter{}
Expand All @@ -20,6 +22,7 @@ func NewMetricsExporter(
evmClient *evm.Client,
headerNS, dataNS []byte,
chainID string,
workers int,
logger zerolog.Logger,
) metrics.Exporter {
return &exporter{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The number of workers is not validated. If a user provides a non-positive value for --verifier.workers (e.g., 0 or -1), it will cause issues. A negative value will cause a panic when creating the blockQueue channel, and a value of 0 will lead to a deadlock as blocks will be added to the queue but never processed. It's crucial to validate this input and fail fast if it's invalid. I suggest panicking to ensure the application doesn't start in an invalid state.

Suggested change
return &exporter{
if workers <= 0 {
panic("number of verifier workers must be a positive integer")
}
return &exporter{

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has a default value at the flag level so this will never be hit.

Expand All @@ -29,6 +32,7 @@ func NewMetricsExporter(
headerNS: headerNS,
dataNS: dataNS,
chainID: chainID,
workers: workers,
logger: logger.With().Str("component", "verification_monitor").Logger(),
}
}
Expand All @@ -41,6 +45,7 @@ type exporter struct {
headerNS []byte
dataNS []byte
chainID string
workers int
logger zerolog.Logger
}

Expand All @@ -53,14 +58,33 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
}
defer sub.Unsubscribe()

// create buffered channel for block queue
blockQueue := make(chan *types.Header, e.workers*2)

// start work pool
var workerGroup sync.WaitGroup
for i := 0; i < e.workers; i++ {
workerGroup.Add(1)
workerID := i
go func() {
defer workerGroup.Done()
e.processBlocks(ctx, m, workerID, blockQueue)
}()
}

e.logger.Info().Int("workers", e.workers).Msg("started verification work pool")

// ticker to refresh submission duration metric every 10 seconds
refreshTicker := time.NewTicker(10 * time.Second)
defer refreshTicker.Stop()

// main subscription loop
for {
select {
case <-ctx.Done():
e.logger.Info().Msg("stopping block verification")
close(blockQueue)
workerGroup.Wait()
return nil
case <-refreshTicker.C:
// ensure that submission duration is always included in the 60 second window.
Expand All @@ -75,12 +99,31 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
Time("arrival_time", arrivalTime).
Msg("received block header from subscription")

// spawn a goroutine to handle this block's retries
go e.verifyBlock(ctx, m, header)
// send block to work pool, blocking until space is available
select {
case blockQueue <- header:
// block queued successfully
case <-ctx.Done():
close(blockQueue)
workerGroup.Wait()
return nil
}
}
}
}

// processBlocks processes blocks from the queue
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue <-chan *types.Header) {
logger := e.logger.With().Int("worker_id", workerID).Logger()
logger.Debug().Msg("worker started")

for header := range blockQueue {
e.verifyBlock(ctx, m, header)
}

logger.Debug().Msg("worker stopped")
}

func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight, daHeight uint64, verified bool, submissionDuration time.Duration) {
if verified {
m.RecordSubmissionDaHeight(e.chainID, namespace, daHeight)
Expand Down