diff --git a/api/src/main/java/io/grpc/Grpc.java b/api/src/main/java/io/grpc/Grpc.java index baa9f5f0ab6..a45c613fd18 100644 --- a/api/src/main/java/io/grpc/Grpc.java +++ b/api/src/main/java/io/grpc/Grpc.java @@ -56,6 +56,13 @@ private Grpc() { public static final Attributes.Key TRANSPORT_ATTR_SSL_SESSION = Attributes.Key.create("io.grpc.Grpc.TRANSPORT_ATTR_SSL_SESSION"); + /** + * The value for the custom label of per-RPC metrics. Defaults to empty string when unset. Must + * not be set to {@code null}. + */ + public static final CallOptions.Key CALL_OPTION_CUSTOM_LABEL = + CallOptions.Key.createWithDefault("io.grpc.Grpc.CALL_OPTION_CUSTOM_LABEL", ""); + /** * Annotation for transport attributes. It follows the annotation semantics defined * by {@link Attributes}. diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 3e5137e0034..eeac0f12835 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.CUSTOM_LABEL_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; @@ -39,6 +40,7 @@ import io.grpc.Deadline; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; @@ -94,6 +96,7 @@ final class OpenTelemetryMetricsModule { private final Supplier stopwatchSupplier; private final boolean localityEnabled; private final boolean backendServiceEnabled; + private final boolean customLabelEnabled; private final ImmutableList plugins; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, @@ -103,6 +106,7 @@ final class OpenTelemetryMetricsModule { this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); + this.customLabelEnabled = optionalLabels.contains(CUSTOM_LABEL_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); } @@ -249,7 +253,7 @@ public void streamClosed(Status status) { statusCode = Code.DEADLINE_EXCEEDED; } } - attemptsState.attemptEnded(); + attemptsState.attemptEnded(info.getCallOptions()); recordFinishedAttempt(); } @@ -273,6 +277,10 @@ void recordFinishedAttempt() { } builder.put(BACKEND_SERVICE_KEY, savedBackendService); } + if (module.customLabelEnabled) { + builder.put( + CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } @@ -383,7 +391,7 @@ private ClientTracer newClientTracer(StreamInfo info) { } // Called whenever each attempt is ended. - void attemptEnded() { + void attemptEnded(CallOptions callOptions) { boolean shouldRecordFinishedCall = false; synchronized (lock) { if (--activeStreams == 0) { @@ -395,11 +403,11 @@ void attemptEnded() { } } if (shouldRecordFinishedCall) { - recordFinishedCall(); + recordFinishedCall(callOptions); } } - void callEnded(Status status) { + void callEnded(Status status, CallOptions callOptions) { callStopWatch.stop(); this.status = status; boolean shouldRecordFinishedCall = false; @@ -415,11 +423,11 @@ void callEnded(Status status) { } } if (shouldRecordFinishedCall) { - recordFinishedCall(); + recordFinishedCall(callOptions); } } - void recordFinishedCall() { + void recordFinishedCall(CallOptions callOptions) { Context otelContext = otelContextWithBaggage(); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); @@ -430,11 +438,13 @@ void recordFinishedCall() { callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS); // Base attributes - io.opentelemetry.api.common.Attributes baseAttributes = - io.opentelemetry.api.common.Attributes.of( - METHOD_KEY, fullMethodName, - TARGET_KEY, target - ); + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target); + if (module.customLabelEnabled) { + builder.put(CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } + io.opentelemetry.api.common.Attributes baseAttributes = builder.build(); // Duration if (module.resource.clientCallDurationCounter() != null) { @@ -660,6 +670,7 @@ public ClientCall interceptCall( callOptions = plugin.filterCallOptions(callOptions); } } + final CallOptions finalCallOptions = callOptions; // Only record method name as an attribute if isSampledToLocalTracing is set to true, // which is true for all generated methods. Otherwise, programatically // created methods result in high cardinality metrics. @@ -679,7 +690,7 @@ public void start(Listener responseListener, Metadata headers) { new SimpleForwardingClientCallListener(responseListener) { @Override public void onClose(Status status, Metadata trailers) { - tracerFactory.callEnded(status); + tracerFactory.callEnded(status, finalCallOptions); super.onClose(status, trailers); } }, diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index 2c7123198c4..c09a1a2beca 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -38,6 +38,9 @@ public final class OpenTelemetryConstants { public static final AttributeKey BACKEND_SERVICE_KEY = AttributeKey.stringKey("grpc.lb.backend_service"); + public static final AttributeKey CUSTOM_LABEL_KEY = + AttributeKey.stringKey("grpc.client.call.custom"); + public static final AttributeKey DISCONNECT_ERROR_KEY = AttributeKey.stringKey("grpc.disconnect_error"); diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 58759294fca..3fdeaf08fbb 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -322,7 +322,7 @@ public void clientBasicMetrics() { tracer.inboundMessage(1); tracer.inboundWireSize(154); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes clientAttributes = io.opentelemetry.api.common.Attributes.of( @@ -453,7 +453,7 @@ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsen fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); tracer.outboundMessage(0); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes finalAttributes = io.opentelemetry.api.common.Attributes.of( @@ -827,7 +827,7 @@ public void recordAttemptMetrics() { fakeClock.forwardTime(24, MILLISECONDS); // RPC succeeded tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes clientAttributes2 = io.opentelemetry.api.common.Attributes.of( @@ -995,7 +995,7 @@ public void recordAttemptMetrics_withRetryMetricsEnabled() { tracer.streamClosed(Status.OK); // RPC succeeded // --- The overall call ends --- - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); // Define attributes for assertions io.opentelemetry.api.common.Attributes finalAttributes @@ -1087,7 +1087,7 @@ public void recordAttemptMetrics_withHedgedCalls() { hedgeTracer2.streamClosed(Status.OK); // Second hedge succeeds // --- The overall call ends --- - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); // Define attributes for assertions io.opentelemetry.api.common.Attributes finalAttributes @@ -1141,7 +1141,7 @@ public void clientStreamNeverCreatedStillRecordMetrics() { method.getFullMethodName(), emptyList()); fakeClock.forwardTime(3000, MILLISECONDS); Status status = Status.DEADLINE_EXCEEDED.withDescription("5 seconds"); - callAttemptsTracerFactory.callEnded(status); + callAttemptsTracerFactory.callEnded(status, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attemptStartedAttributes = io.opentelemetry.api.common.Attributes.of( @@ -1255,7 +1255,7 @@ public void clientLocalityMetrics_present() { tracer.addOptionalLabel("grpc.lb.locality", "the-moon"); tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1319,7 +1319,7 @@ public void clientLocalityMetrics_missing() { ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1388,7 +1388,7 @@ public void clientBackendServiceMetrics_present() { tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon"); tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1453,7 +1453,7 @@ public void clientBackendServiceMetrics_missing() { ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 83e9d482bc5..a2846fd04c8 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -33,6 +33,7 @@ import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ConnectivityState; +import io.grpc.Grpc; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -141,20 +142,22 @@ final class CachingRlsLbClient { "grpc.lb.rls.default_target_picks", "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target", - "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(), + "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), + Arrays.asList("grpc.client.call.custom"), false); TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks", "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default " + "target is also returned by the RLS server, RPCs sent to that target from the cache " + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", - "grpc.lb.pick_result"), Collections.emptyList(), + "grpc.lb.pick_result"), + Arrays.asList("grpc.client.call.custom"), false); FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks", "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the " + "RLS channel being throttled", "{pick}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"), - Collections.emptyList(), false); + Arrays.asList("grpc.client.call.custom"), false); CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries", "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"), @@ -1033,7 +1036,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1, Arrays.asList(helper.getChannelTarget(), lookupService, childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)), - Collections.emptyList()); + Arrays.asList(determineCustomLabel(args))); } return pickResult; } else if (response.hasError()) { @@ -1041,7 +1044,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { return useFallback(args); } helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1, - Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList()); + Arrays.asList(helper.getChannelTarget(), lookupService), + Arrays.asList(determineCustomLabel(args))); return PickResult.withError( convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); @@ -1061,7 +1065,7 @@ private PickResult useFallback(PickSubchannelArgs args) { helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1, Arrays.asList(helper.getChannelTarget(), lookupService, fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)), - Collections.emptyList()); + Arrays.asList(determineCustomLabel(args))); } return pickResult; } @@ -1076,6 +1080,10 @@ private String determineMetricsPickResult(PickResult pickResult) { } } + private String determineCustomLabel(PickSubchannelArgs args) { + return args.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL); + } + // GuardedBy CachingRlsLbClient.lock void close() { synchronized (lock) { // Lock is already held, but ErrorProne can't tell diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 8d16d1bd74c..740c330e1e3 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -382,7 +382,7 @@ public void metricsWithRealChannel() throws Exception { eq(1L), eq(Arrays.asList("directaddress:///fake-bigtable.googleapis.com", "localhost:8972", "defaultTarget", "complete")), - eq(Arrays.asList())); + eq(Arrays.asList(""))); } @Test @@ -687,7 +687,7 @@ private void verifyLongCounterAdd(String name, int times, long value, verify(mockMetricRecorder, times(times)).addLongCounter( eqMetricInstrumentName(name), eq(value), eq(Lists.newArrayList(channelTarget, "localhost:8972", dataPlaneTargetLabel, pickResult)), - eq(Lists.newArrayList())); + eq(Lists.newArrayList(""))); } // This one is for verifying the failed_pick metric specifically. @@ -696,7 +696,7 @@ private void verifyFailedPicksCounterAdd(int times, long value) { verify(mockMetricRecorder, times(times)).addLongCounter( eqMetricInstrumentName("grpc.lb.rls.failed_picks"), eq(value), eq(Lists.newArrayList(channelTarget, "localhost:8972")), - eq(Lists.newArrayList())); + eq(Lists.newArrayList(""))); } @SuppressWarnings("TypeParameterUnusedInFormals")