Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/io/grpc/Grpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ private Grpc() {
public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION =
Attributes.Key.create("io.grpc.Grpc.TRANSPORT_ATTR_SSL_SESSION");

/**
* The value for the custom label of per-RPC metrics. Defaults to empty string when unset. Must
* not be set to {@code null}.
*/
public static final CallOptions.Key<String> CALL_OPTION_CUSTOM_LABEL =
CallOptions.Key.createWithDefault("io.grpc.Grpc.CALL_OPTION_CUSTOM_LABEL", "");

/**
* Annotation for transport attributes. It follows the annotation semantics defined
* by {@link Attributes}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.CUSTOM_LABEL_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
Expand All @@ -39,6 +40,7 @@
import io.grpc.Deadline;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
Expand Down Expand Up @@ -94,6 +96,7 @@ final class OpenTelemetryMetricsModule {
private final Supplier<Stopwatch> stopwatchSupplier;
private final boolean localityEnabled;
private final boolean backendServiceEnabled;
private final boolean customLabelEnabled;
private final ImmutableList<OpenTelemetryPlugin> plugins;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
Expand All @@ -103,6 +106,7 @@ final class OpenTelemetryMetricsModule {
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
this.customLabelEnabled = optionalLabels.contains(CUSTOM_LABEL_KEY.getKey());
this.plugins = ImmutableList.copyOf(plugins);
}

Expand Down Expand Up @@ -249,7 +253,7 @@ public void streamClosed(Status status) {
statusCode = Code.DEADLINE_EXCEEDED;
}
}
attemptsState.attemptEnded();
attemptsState.attemptEnded(info.getCallOptions());
recordFinishedAttempt();
}

Expand All @@ -273,6 +277,10 @@ void recordFinishedAttempt() {
}
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
}
if (module.customLabelEnabled) {
builder.put(
CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
}
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
plugin.addLabels(builder);
}
Expand Down Expand Up @@ -383,7 +391,7 @@ private ClientTracer newClientTracer(StreamInfo info) {
}

// Called whenever each attempt is ended.
void attemptEnded() {
void attemptEnded(CallOptions callOptions) {
boolean shouldRecordFinishedCall = false;
synchronized (lock) {
if (--activeStreams == 0) {
Expand All @@ -395,11 +403,11 @@ void attemptEnded() {
}
}
if (shouldRecordFinishedCall) {
recordFinishedCall();
recordFinishedCall(callOptions);
}
}

void callEnded(Status status) {
void callEnded(Status status, CallOptions callOptions) {
callStopWatch.stop();
this.status = status;
boolean shouldRecordFinishedCall = false;
Expand All @@ -415,11 +423,11 @@ void callEnded(Status status) {
}
}
if (shouldRecordFinishedCall) {
recordFinishedCall();
recordFinishedCall(callOptions);
}
}

void recordFinishedCall() {
void recordFinishedCall(CallOptions callOptions) {
Context otelContext = otelContextWithBaggage();
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
Expand All @@ -430,11 +438,13 @@ void recordFinishedCall() {
callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);

// Base attributes
io.opentelemetry.api.common.Attributes baseAttributes =
io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, fullMethodName,
TARGET_KEY, target
);
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target);
if (module.customLabelEnabled) {
builder.put(CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
}
io.opentelemetry.api.common.Attributes baseAttributes = builder.build();

// Duration
if (module.resource.clientCallDurationCounter() != null) {
Expand Down Expand Up @@ -660,6 +670,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
callOptions = plugin.filterCallOptions(callOptions);
}
}
final CallOptions finalCallOptions = callOptions;
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
// which is true for all generated methods. Otherwise, programatically
// created methods result in high cardinality metrics.
Expand All @@ -679,7 +690,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
tracerFactory.callEnded(status, finalCallOptions);
super.onClose(status, trailers);
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public final class OpenTelemetryConstants {
public static final AttributeKey<String> BACKEND_SERVICE_KEY =
AttributeKey.stringKey("grpc.lb.backend_service");

public static final AttributeKey<String> CUSTOM_LABEL_KEY =
AttributeKey.stringKey("grpc.client.call.custom");

public static final AttributeKey<String> DISCONNECT_ERROR_KEY =
AttributeKey.stringKey("grpc.disconnect_error");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public void clientBasicMetrics() {
tracer.inboundMessage(1);
tracer.inboundWireSize(154);
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
Expand Down Expand Up @@ -453,7 +453,7 @@ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsen
fakeClock.forwardTime(100, TimeUnit.MILLISECONDS);
tracer.outboundMessage(0);
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes finalAttributes
= io.opentelemetry.api.common.Attributes.of(
Expand Down Expand Up @@ -827,7 +827,7 @@ public void recordAttemptMetrics() {
fakeClock.forwardTime(24, MILLISECONDS);
// RPC succeeded
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes clientAttributes2
= io.opentelemetry.api.common.Attributes.of(
Expand Down Expand Up @@ -995,7 +995,7 @@ public void recordAttemptMetrics_withRetryMetricsEnabled() {
tracer.streamClosed(Status.OK); // RPC succeeded

// --- The overall call ends ---
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

// Define attributes for assertions
io.opentelemetry.api.common.Attributes finalAttributes
Expand Down Expand Up @@ -1087,7 +1087,7 @@ public void recordAttemptMetrics_withHedgedCalls() {
hedgeTracer2.streamClosed(Status.OK); // Second hedge succeeds

// --- The overall call ends ---
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

// Define attributes for assertions
io.opentelemetry.api.common.Attributes finalAttributes
Expand Down Expand Up @@ -1141,7 +1141,7 @@ public void clientStreamNeverCreatedStillRecordMetrics() {
method.getFullMethodName(), emptyList());
fakeClock.forwardTime(3000, MILLISECONDS);
Status status = Status.DEADLINE_EXCEEDED.withDescription("5 seconds");
callAttemptsTracerFactory.callEnded(status);
callAttemptsTracerFactory.callEnded(status, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes attemptStartedAttributes
= io.opentelemetry.api.common.Attributes.of(
Expand Down Expand Up @@ -1255,7 +1255,7 @@ public void clientLocalityMetrics_present() {
tracer.addOptionalLabel("grpc.lb.locality", "the-moon");
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
Expand Down Expand Up @@ -1319,7 +1319,7 @@ public void clientLocalityMetrics_missing() {
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
Expand Down Expand Up @@ -1388,7 +1388,7 @@ public void clientBackendServiceMetrics_present() {
tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon");
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
Expand Down Expand Up @@ -1453,7 +1453,7 @@ public void clientBackendServiceMetrics_missing() {
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
Expand Down
20 changes: 14 additions & 6 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.Grpc;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
Expand Down Expand Up @@ -141,20 +142,22 @@ final class CachingRlsLbClient {
"grpc.lb.rls.default_target_picks",
"EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
"grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
"grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"),
Arrays.asList("grpc.client.call.custom"),
false);
TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
"EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
+ "target is also returned by the RLS server, RPCs sent to that target from the cache "
+ "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
"grpc.lb.pick_result"), Collections.emptyList(),
"grpc.lb.pick_result"),
Arrays.asList("grpc.client.call.custom"),
false);
FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
"EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
+ "RLS channel being throttled", "{pick}",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
Collections.emptyList(), false);
Arrays.asList("grpc.client.call.custom"), false);
CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
"EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
Expand Down Expand Up @@ -1033,15 +1036,16 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
Arrays.asList(helper.getChannelTarget(), lookupService,
childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
Collections.emptyList());
Arrays.asList(determineCustomLabel(args)));
}
return pickResult;
} else if (response.hasError()) {
if (hasFallback) {
return useFallback(args);
}
helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
Arrays.asList(helper.getChannelTarget(), lookupService),
Arrays.asList(determineCustomLabel(args)));
return PickResult.withError(
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
Expand All @@ -1061,7 +1065,7 @@ private PickResult useFallback(PickSubchannelArgs args) {
helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
Arrays.asList(helper.getChannelTarget(), lookupService,
fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
Collections.emptyList());
Arrays.asList(determineCustomLabel(args)));
}
return pickResult;
}
Expand All @@ -1076,6 +1080,10 @@ private String determineMetricsPickResult(PickResult pickResult) {
}
}

private String determineCustomLabel(PickSubchannelArgs args) {
return args.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL);
}

// GuardedBy CachingRlsLbClient.lock
void close() {
synchronized (lock) { // Lock is already held, but ErrorProne can't tell
Expand Down
6 changes: 3 additions & 3 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ public void metricsWithRealChannel() throws Exception {
eq(1L),
eq(Arrays.asList("directaddress:///fake-bigtable.googleapis.com", "localhost:8972",
"defaultTarget", "complete")),
eq(Arrays.asList()));
eq(Arrays.asList("")));
}

@Test
Expand Down Expand Up @@ -687,7 +687,7 @@ private void verifyLongCounterAdd(String name, int times, long value,
verify(mockMetricRecorder, times(times)).addLongCounter(
eqMetricInstrumentName(name), eq(value),
eq(Lists.newArrayList(channelTarget, "localhost:8972", dataPlaneTargetLabel, pickResult)),
eq(Lists.newArrayList()));
eq(Lists.newArrayList("")));
}

// This one is for verifying the failed_pick metric specifically.
Expand All @@ -696,7 +696,7 @@ private void verifyFailedPicksCounterAdd(int times, long value) {
verify(mockMetricRecorder, times(times)).addLongCounter(
eqMetricInstrumentName("grpc.lb.rls.failed_picks"), eq(value),
eq(Lists.newArrayList(channelTarget, "localhost:8972")),
eq(Lists.newArrayList()));
eq(Lists.newArrayList("")));
}

@SuppressWarnings("TypeParameterUnusedInFormals")
Expand Down
Loading