Skip to content

Commit 6a7110d

Browse files
committed
chore(Datastore): Add Operation and Attempt metrics for HttpJson transport
1 parent d463c15 commit 6a7110d

11 files changed

Lines changed: 843 additions & 186 deletions

File tree

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.datastore;
1818

19+
import com.google.api.core.InternalApi;
1920
import com.google.api.gax.grpc.GrpcStatusCode;
2021
import com.google.api.gax.rpc.ApiException;
2122
import com.google.api.gax.rpc.StatusCode;
@@ -176,7 +177,8 @@ static DatastoreException propagateUserException(Exception ex) {
176177
* @param throwable the throwable to extract the status code from
177178
* @return the status code name, or "UNKNOWN" if not determinable
178179
*/
179-
static String extractStatusCode(Throwable throwable) {
180+
@InternalApi
181+
public static String extractStatusCode(Throwable throwable) {
180182
Throwable current = throwable;
181183
while (current != null) {
182184
if (current instanceof DatastoreException) {

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java

Lines changed: 140 additions & 148 deletions
Large diffs are not rendered by default.

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,20 @@
1818
import static com.google.cloud.BaseService.EXCEPTION_HANDLER;
1919

2020
import com.google.api.core.InternalApi;
21+
import com.google.api.core.ObsoleteApi;
2122
import com.google.api.gax.retrying.RetrySettings;
23+
import com.google.api.gax.rpc.StatusCode;
2224
import com.google.cloud.RetryHelper;
2325
import com.google.cloud.RetryHelper.RetryHelperException;
2426
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
27+
import com.google.cloud.datastore.telemetry.MetricsRecorder;
28+
import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder;
29+
import com.google.cloud.datastore.telemetry.TelemetryConstants;
30+
import com.google.cloud.datastore.telemetry.TelemetryUtils;
2531
import com.google.cloud.datastore.telemetry.TraceUtil;
32+
import com.google.cloud.http.HttpTransportOptions;
33+
import com.google.common.base.Preconditions;
34+
import com.google.common.base.Stopwatch;
2635
import com.google.datastore.v1.AllocateIdsRequest;
2736
import com.google.datastore.v1.AllocateIdsResponse;
2837
import com.google.datastore.v1.BeginTransactionRequest;
@@ -53,7 +62,10 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {
5362
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
5463
private final RetrySettings retrySettings;
5564
private final DatastoreOptions datastoreOptions;
65+
private final MetricsRecorder metricsRecorder;
66+
private final boolean isHttpTransport;
5667

68+
@ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder")
5769
public RetryAndTraceDatastoreRpcDecorator(
5870
DatastoreRpc datastoreRpc,
5971
TraceUtil otelTraceUtil,
@@ -63,6 +75,69 @@ public RetryAndTraceDatastoreRpcDecorator(
6375
this.retrySettings = retrySettings;
6476
this.datastoreOptions = datastoreOptions;
6577
this.otelTraceUtil = otelTraceUtil;
78+
this.metricsRecorder = new NoOpMetricsRecorder();
79+
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
80+
}
81+
82+
private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
83+
this.datastoreRpc = builder.datastoreRpc;
84+
this.otelTraceUtil = builder.otelTraceUtil;
85+
this.retrySettings = builder.retrySettings;
86+
this.datastoreOptions = builder.datastoreOptions;
87+
this.metricsRecorder = builder.metricsRecorder;
88+
this.isHttpTransport = builder.isHttpTransport;
89+
}
90+
91+
public static Builder newBuilder() {
92+
return new Builder();
93+
}
94+
95+
public static class Builder {
96+
private DatastoreRpc datastoreRpc;
97+
private TraceUtil otelTraceUtil;
98+
private RetrySettings retrySettings;
99+
private DatastoreOptions datastoreOptions;
100+
101+
// Defaults configured for this class
102+
private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder();
103+
private boolean isHttpTransport = false;
104+
105+
private Builder() {}
106+
107+
public Builder setDatastoreRpc(DatastoreRpc datastoreRpc) {
108+
this.datastoreRpc = datastoreRpc;
109+
return this;
110+
}
111+
112+
public Builder setTraceUtil(TraceUtil otelTraceUtil) {
113+
this.otelTraceUtil = otelTraceUtil;
114+
return this;
115+
}
116+
117+
public Builder setRetrySettings(RetrySettings retrySettings) {
118+
this.retrySettings = retrySettings;
119+
return this;
120+
}
121+
122+
public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) {
123+
this.datastoreOptions = datastoreOptions;
124+
return this;
125+
}
126+
127+
public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) {
128+
Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null");
129+
this.metricsRecorder = metricsRecorder;
130+
return this;
131+
}
132+
133+
public RetryAndTraceDatastoreRpcDecorator build() {
134+
Preconditions.checkNotNull(datastoreRpc, "datastoreRpc is required");
135+
Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required");
136+
Preconditions.checkNotNull(retrySettings, "retrySettings is required");
137+
Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required");
138+
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
139+
return new RetryAndTraceDatastoreRpcDecorator(this);
140+
}
66141
}
67142

68143
@Override
@@ -110,7 +185,10 @@ public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryReques
110185
? com.google.cloud.datastore.telemetry.TraceUtil
111186
.SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY
112187
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY);
113-
return invokeRpc(() -> datastoreRpc.runAggregationQuery(request), spanName);
188+
return invokeRpc(
189+
() -> datastoreRpc.runAggregationQuery(request),
190+
spanName,
191+
TelemetryConstants.METHOD_RUN_AGGREGATION_QUERY);
114192
}
115193

116194
@Override
@@ -124,14 +202,34 @@ public boolean isClosed() {
124202
}
125203

126204
public <O> O invokeRpc(Callable<O> block, String startSpan) {
127-
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
128-
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
129-
return RetryHelper.runWithRetries(
130-
block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
205+
return invokeRpc(block, startSpan, null);
206+
}
207+
208+
<O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
209+
TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
210+
Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
211+
String operationStatus = StatusCode.Code.UNKNOWN.toString();
212+
try (TraceUtil.Scope ignored = span.makeCurrent()) {
213+
Callable<O> callable =
214+
TelemetryUtils.attemptMetricsCallable(
215+
block, metricsRecorder, datastoreOptions, isHttpTransport, methodName);
216+
O result =
217+
RetryHelper.runWithRetries(
218+
callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
219+
operationStatus = StatusCode.Code.OK.toString();
220+
return result;
131221
} catch (RetryHelperException e) {
222+
operationStatus = DatastoreException.extractStatusCode(e);
132223
span.end(e);
133224
throw DatastoreException.translateAndThrow(e);
134225
} finally {
226+
TelemetryUtils.recordOperationMetrics(
227+
metricsRecorder,
228+
datastoreOptions,
229+
isHttpTransport,
230+
stopwatch,
231+
methodName,
232+
operationStatus);
135233
span.end();
136234
}
137235
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,13 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
208208
}
209209

210210
@Override
211+
@SuppressWarnings("MustBeClosedChecker")
211212
public Scope makeCurrent() {
212-
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
213-
return new Scope(scope);
214-
}
213+
// span.makeCurrent() opens a ThreadLocal scope that binds this span to the current thread.
214+
// We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned
215+
// TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources
216+
// to control when the ThreadLocal context is restored.
217+
return new Scope(span.makeCurrent());
215218
}
216219
}
217220

@@ -238,10 +241,14 @@ static class Context implements TraceUtil.Context {
238241
}
239242

240243
@Override
244+
@SuppressWarnings("MustBeClosedChecker")
241245
public Scope makeCurrent() {
242-
try (io.opentelemetry.context.Scope scope = context.makeCurrent()) {
243-
return new Scope(scope);
244-
}
246+
// context.makeCurrent() opens a ThreadLocal scope that binds this context to the current
247+
// thread.
248+
// We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned
249+
// TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources
250+
// to control when the ThreadLocal context is restored.
251+
return new Scope(context.makeCurrent());
245252
}
246253
}
247254

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ public interface MetricsRecorder {
3838
/** Records the number of attempts a transaction took. */
3939
void recordTransactionAttemptCount(long count, Map<String, String> attributes);
4040

41+
/** Records the latency of a single RPC attempt in milliseconds. */
42+
void recordAttemptLatency(double latencyMs, Map<String, String> attributes);
43+
44+
/** Records the count of a single RPC attempt. */
45+
void recordAttemptCount(long count, Map<String, String> attributes);
46+
47+
/** Records the total latency of an operation (including retries) in milliseconds. */
48+
void recordOperationLatency(double latencyMs, Map<String, String> attributes);
49+
50+
/** Records the count of an operation. */
51+
void recordOperationCount(long count, Map<String, String> attributes);
52+
4153
/**
4254
* Returns a {@link MetricsRecorder} instance based on the provided OpenTelemetry options.
4355
*

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,19 @@
1616

1717
package com.google.cloud.datastore.telemetry;
1818

19+
import com.google.api.core.InternalApi;
1920
import java.util.Map;
2021

2122
/**
2223
* Metrics recorder implementation, used to stub out metrics instrumentation when metrics are
2324
* disabled.
25+
*
26+
* <p>WARNING: This class is intended for internal use only. It was made public to be used across
27+
* packages as a default. It should not be used by external customers and its API may change without
28+
* notice.
2429
*/
25-
class NoOpMetricsRecorder implements MetricsRecorder {
30+
@InternalApi
31+
public class NoOpMetricsRecorder implements MetricsRecorder {
2632

2733
@Override
2834
public void recordTransactionLatency(double latencyMs, Map<String, String> attributes) {
@@ -33,4 +39,24 @@ public void recordTransactionLatency(double latencyMs, Map<String, String> attri
3339
public void recordTransactionAttemptCount(long count, Map<String, String> attributes) {
3440
/* No-Op OTel Operation */
3541
}
42+
43+
@Override
44+
public void recordAttemptLatency(double latencyMs, Map<String, String> attributes) {
45+
/* No-Op OTel Operation */
46+
}
47+
48+
@Override
49+
public void recordAttemptCount(long count, Map<String, String> attributes) {
50+
/* No-Op OTel Operation */
51+
}
52+
53+
@Override
54+
public void recordOperationLatency(double latencyMs, Map<String, String> attributes) {
55+
/* No-Op OTel Operation */
56+
}
57+
58+
@Override
59+
public void recordOperationCount(long count, Map<String, String> attributes) {
60+
/* No-Op OTel Operation */
61+
}
3662
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {
3333

3434
private final DoubleHistogram transactionLatency;
3535
private final LongCounter transactionAttemptCount;
36+
private final DoubleHistogram attemptLatency;
37+
private final LongCounter attemptCount;
38+
private final DoubleHistogram operationLatency;
39+
private final LongCounter operationCount;
3640

3741
OpenTelemetryMetricsRecorder(@Nonnull OpenTelemetry openTelemetry) {
3842
this.openTelemetry = openTelemetry;
@@ -51,6 +55,34 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {
5155
.counterBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT)
5256
.setDescription("Number of attempts to commit a transaction")
5357
.build();
58+
59+
this.attemptLatency =
60+
meter
61+
.histogramBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY)
62+
.setDescription("Latency of a single RPC attempt")
63+
.setUnit("ms")
64+
.build();
65+
66+
this.attemptCount =
67+
meter
68+
.counterBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT)
69+
.setDescription("Number of RPC attempts")
70+
.setUnit("1")
71+
.build();
72+
73+
this.operationLatency =
74+
meter
75+
.histogramBuilder(TelemetryConstants.METRIC_NAME_OPERATION_LATENCY)
76+
.setDescription("Total latency of an operation including retries")
77+
.setUnit("ms")
78+
.build();
79+
80+
this.operationCount =
81+
meter
82+
.counterBuilder(TelemetryConstants.METRIC_NAME_OPERATION_COUNT)
83+
.setDescription("Number of operations")
84+
.setUnit("1")
85+
.build();
5486
}
5587

5688
OpenTelemetry getOpenTelemetry() {
@@ -67,6 +99,26 @@ public void recordTransactionAttemptCount(long count, Map<String, String> attrib
6799
transactionAttemptCount.add(count, toOtelAttributes(attributes));
68100
}
69101

102+
@Override
103+
public void recordAttemptLatency(double latencyMs, Map<String, String> attributes) {
104+
attemptLatency.record(latencyMs, toOtelAttributes(attributes));
105+
}
106+
107+
@Override
108+
public void recordAttemptCount(long count, Map<String, String> attributes) {
109+
attemptCount.add(count, toOtelAttributes(attributes));
110+
}
111+
112+
@Override
113+
public void recordOperationLatency(double latencyMs, Map<String, String> attributes) {
114+
operationLatency.record(latencyMs, toOtelAttributes(attributes));
115+
}
116+
117+
@Override
118+
public void recordOperationCount(long count, Map<String, String> attributes) {
119+
operationCount.add(count, toOtelAttributes(attributes));
120+
}
121+
70122
private static Attributes toOtelAttributes(Map<String, String> attributes) {
71123
AttributesBuilder builder = Attributes.builder();
72124
if (attributes != null) {

0 commit comments

Comments
 (0)