diff --git a/pkg/pyroscope/modules_experimental.go b/pkg/pyroscope/modules_experimental.go index 705c25f833..45a92c429e 100644 --- a/pkg/pyroscope/modules_experimental.go +++ b/pkg/pyroscope/modules_experimental.go @@ -4,11 +4,9 @@ import ( "context" "fmt" "slices" - "time" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/netutil" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" @@ -221,6 +219,7 @@ func (f *Pyroscope) initSegmentWriterClient() (_ services.Service, err error) { logger, f.reg, f.segmentWriterRing, placement, + grpc.WithStatsHandler(util.GrpcClientStatsHandler(f.reg)), ) if err != nil { return nil, err @@ -309,6 +308,7 @@ func (f *Pyroscope) initMetastoreClient() (services.Service, error) { f.logger, f.Cfg.Metastore.GRPCClientConfig, disc, + grpc.WithStatsHandler(util.GrpcClientStatsHandler(f.reg)), ) return f.metastoreClient.Service(), nil } @@ -370,6 +370,7 @@ func (f *Pyroscope) initQueryBackendClient() (services.Service, error) { f.Cfg.QueryBackend.Address, f.Cfg.QueryBackend.GRPCClientConfig, f.Cfg.QueryBackend.ClientTimeout, + grpc.WithStatsHandler(util.GrpcClientStatsHandler(f.reg)), ) if err != nil { return nil, err @@ -448,19 +449,7 @@ func (f *Pyroscope) initHealthServer() (services.Service, error) { } func (f *Pyroscope) grpcClientInterceptors() []grpc.UnaryClientInterceptor { - requestDuration := util.RegisterOrGet(f.reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "pyroscope", - Subsystem: "grpc_client", - Name: "request_duration_seconds", - Help: "Time (in seconds) spent waiting for gRPC response.", - Buckets: prometheus.DefBuckets, - NativeHistogramBucketFactor: 1.1, - NativeHistogramMaxBucketNumber: 50, - NativeHistogramMinResetDuration: time.Hour, - }, []string{"method", "status_code"})) - return []grpc.UnaryClientInterceptor{ - middleware.UnaryClientInstrumentInterceptor(requestDuration, middleware.ReportGRPCStatusOption), otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), } } diff --git a/pkg/util/grpc.go b/pkg/util/grpc.go new file mode 100644 index 0000000000..aa28d54823 --- /dev/null +++ b/pkg/util/grpc.go @@ -0,0 +1,88 @@ +package util + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +type grpcClientStatsKey struct{} + +func GrpcClientStatsHandler(reg prometheus.Registerer) stats.Handler { + return &statsHandler{ + ElapsedDuration: RegisterOrGet(reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "pyroscope", + Subsystem: "grpc_client", + Name: "request_duration_seconds", + Help: "Time (in seconds) required to send and recieve a gRPC request/response.", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 50, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"method", "status_code"})), + + RequestDecompressedBytes: RegisterOrGet(reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "pyroscope", + Subsystem: "grpc_client", + Name: "request_body_bytes", + Help: "Number of decompressed bytes in the request body.", + }, []string{"method"})), + + ResponseDecompressedBytes: RegisterOrGet(reg, prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "pyroscope", + Subsystem: "grpc_client", + Name: "response_body_bytes", + Help: "Number of decompressed bytes in the response body.", + }, []string{"method"})), + } +} + +type statsHandler struct { + ElapsedDuration *prometheus.HistogramVec + RequestDecompressedBytes *prometheus.HistogramVec + ResponseDecompressedBytes *prometheus.HistogramVec +} + +func (s *statsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {} + +func (s *statsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) { + if !rpcStats.IsClient() { + return + } + + info, ok := ctx.Value(grpcClientStatsKey{}).(*stats.RPCTagInfo) + if !ok { + return + } + + switch msg := rpcStats.(type) { + case *stats.InPayload: + s.ResponseDecompressedBytes.With(prometheus.Labels{ + "method": info.FullMethodName, + }).Observe(float64(msg.Length)) + + case *stats.OutPayload: + s.RequestDecompressedBytes.With(prometheus.Labels{ + "method": info.FullMethodName, + }).Observe(float64(msg.Length)) + + case *stats.End: + statusCode, _ := status.FromError(msg.Error) + + s.ElapsedDuration.With(prometheus.Labels{ + "method": info.FullMethodName, + "status_code": statusCode.Code().String(), + }).Observe(msg.EndTime.Sub(msg.BeginTime).Seconds()) + } +} + +func (s *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +func (s *statsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + return context.WithValue(ctx, grpcClientStatsKey{}, info) +}