From 9e60084ce6ea5547db2a8939a0d9a689079b60ab Mon Sep 17 00:00:00 2001 From: nidhiii-27 Date: Thu, 21 May 2026 14:21:21 +0530 Subject: [PATCH] feat(storage): enable App-Centric Observability (ACO) support in Otel --- .../com/google/cloud/storage/AcoSpan.java | 166 ++++++++++++ .../google/cloud/storage/AcoSpanBuilder.java | 240 ++++++++++++++++++ .../cloud/storage/BucketMetadataCache.java | 131 ++++++++++ .../cloud/storage/MultipartUploadClient.java | 5 +- .../OtelMultipartUploadClientDecorator.java | 18 +- .../cloud/storage/OtelStorageDecorator.java | 44 +++- .../cloud/storage/ITOpenTelemetryMPUTest.java | 40 +++ .../cloud/storage/ITOpenTelemetryTest.java | 133 ++++++++++ .../OtelStorageDecoratorAcoUnitTest.java | 234 +++++++++++++++++ 9 files changed, 1002 insertions(+), 9 deletions(-) create mode 100644 java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpan.java create mode 100644 java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpanBuilder.java create mode 100644 java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketMetadataCache.java create mode 100644 java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/OtelStorageDecoratorAcoUnitTest.java diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpan.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpan.java new file mode 100644 index 000000000000..4c89a0ca6e09 --- /dev/null +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpan.java @@ -0,0 +1,166 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.StatusCode; +import java.util.concurrent.TimeUnit; + +final class AcoSpan implements Span { + private final Span delegate; + private final String bucketName; + private final OtelStorageDecorator parent; + + AcoSpan(Span delegate, String bucketName, OtelStorageDecorator parent) { + this.delegate = delegate; + this.bucketName = bucketName; + this.parent = parent; + } + + private void applyCacheAttributes() { + if (bucketName != null && parent != null) { + BucketMetadataCache.BucketMetadata md = parent.bucketMetadataCache.get(bucketName); + if (md != null && !md.fetchPending) { + delegate.setAttribute("gcp.resource.destination.id", md.resource); + delegate.setAttribute("gcp.resource.destination.location", md.location); + } + } + } + + @Override + public void end() { + applyCacheAttributes(); + delegate.end(); + } + + @Override + public void end(long timestamp, TimeUnit unit) { + applyCacheAttributes(); + delegate.end(timestamp, unit); + } + + @Override + public Span recordException(Throwable exception) { + delegate.recordException(exception); + handleException(exception); + return this; + } + + @Override + public Span recordException(Throwable exception, Attributes attributes) { + delegate.recordException(exception, attributes); + handleException(exception); + return this; + } + + private void handleException(Throwable exception) { + if (exception instanceof StorageException && parent != null) { + StorageException se = (StorageException) exception; + if (se.getCode() == 404 && se.getMessage() != null) { + String msg = se.getMessage().toLowerCase(java.util.Locale.US); + if (msg.contains("bucket not found") || msg.contains("bucket does not exist")) { + parent.bucketMetadataCache.remove(bucketName); + } + } + } + } + + @Override + public Span setAttribute(String k, String v) { + delegate.setAttribute(k, v); + return this; + } + + @Override + public Span setAttribute(String k, long v) { + delegate.setAttribute(k, v); + return this; + } + + @Override + public Span setAttribute(String k, double v) { + delegate.setAttribute(k, v); + return this; + } + + @Override + public Span setAttribute(String k, boolean v) { + delegate.setAttribute(k, v); + return this; + } + + @Override + public Span setAttribute(AttributeKey k, T v) { + delegate.setAttribute(k, v); + return this; + } + + @Override + public Span addEvent(String n) { + delegate.addEvent(n); + return this; + } + + @Override + public Span addEvent(String n, Attributes a) { + delegate.addEvent(n, a); + return this; + } + + @Override + public Span addEvent(String n, long t, TimeUnit u) { + delegate.addEvent(n, t, u); + return this; + } + + @Override + public Span addEvent(String n, Attributes a, long t, TimeUnit u) { + delegate.addEvent(n, a, t, u); + return this; + } + + @Override + public Span setStatus(StatusCode c) { + delegate.setStatus(c); + return this; + } + + @Override + public Span setStatus(StatusCode c, String d) { + delegate.setStatus(c, d); + return this; + } + + @Override + public Span updateName(String name) { + delegate.updateName(name); + return this; + } + + @Override + public SpanContext getSpanContext() { + return delegate.getSpanContext(); + } + + @Override + public boolean isRecording() { + return delegate.isRecording(); + } +} diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpanBuilder.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpanBuilder.java new file mode 100644 index 000000000000..4163708e5d51 --- /dev/null +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/AcoSpanBuilder.java @@ -0,0 +1,240 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.Tuple; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +final class AcoSpanBuilder implements SpanBuilder { + + private static final Logger LOGGER = Logger.getLogger(AcoSpanBuilder.class.getName()); + private static final String MULTI_REGION = "multi-region"; + private static final String DUAL_REGION = "dual-region"; + private static final String PLACEHOLDER_BUCKET_LOCATION = "global"; + private static final String PLACEHOLDER_RESOURCE_PREFIX = "projects/_/buckets/"; + + private final SpanBuilder delegate; + private final OtelStorageDecorator parent; + private String bucketName; + + AcoSpanBuilder(SpanBuilder delegate, OtelStorageDecorator parent) { + this.delegate = delegate; + this.parent = parent; + } + + @Override + public SpanBuilder setAttribute(String key, String value) { + delegate.setAttribute(key, value); + if ("gsutil.uri".equals(key)) { + String name = extractBucketName(value); + if (name != null && !name.isEmpty()) { + this.bucketName = name; + } + } + return this; + } + + @Override + public SpanBuilder setAttribute(AttributeKey key, T value) { + delegate.setAttribute(key, value); + if ("gsutil.uri".equals(key.getKey()) && value instanceof String) { + String name = extractBucketName((String) value); + if (name != null && !name.isEmpty()) { + this.bucketName = name; + } + } + return this; + } + + @Override + public Span startSpan() { + if (bucketName != null && parent != null) { + checkCacheAndTriggerFetch( + parent.delegate, parent.bucketMetadataCache, parent.getCacheExecutor(), bucketName); + return new AcoSpan(delegate.startSpan(), bucketName, parent); + } + return delegate.startSpan(); + } + + @Override + public SpanBuilder setNoParent() { + delegate.setNoParent(); + return this; + } + + @Override + public SpanBuilder setAttribute(String key, boolean value) { + delegate.setAttribute(key, value); + return this; + } + + @Override + public SpanBuilder setAttribute(String key, double value) { + delegate.setAttribute(key, value); + return this; + } + + @Override + public SpanBuilder setAttribute(String key, long value) { + delegate.setAttribute(key, value); + return this; + } + + @Override + public SpanBuilder setSpanKind(SpanKind k) { + delegate.setSpanKind(k); + return this; + } + + @Override + public SpanBuilder setStartTimestamp(long t, TimeUnit u) { + delegate.setStartTimestamp(t, u); + return this; + } + + @Override + public SpanBuilder setParent(Context c) { + delegate.setParent(c); + return this; + } + + @Override + public SpanBuilder addLink(SpanContext c) { + delegate.addLink(c); + return this; + } + + @Override + public SpanBuilder addLink(SpanContext c, Attributes a) { + delegate.addLink(c, a); + return this; + } + + static ExecutorService newCacheExecutor() { + int poolSize = Math.max(4, Runtime.getRuntime().availableProcessors()); + java.util.concurrent.ThreadPoolExecutor executor = + new java.util.concurrent.ThreadPoolExecutor( + poolSize, + poolSize, + 5L, + TimeUnit.SECONDS, + new java.util.concurrent.LinkedBlockingQueue<>(10000), + r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("gcs-aco-metadata-cache-pool"); + return t; + }); + executor.allowCoreThreadTimeOut(true); + return executor; + } + + static String extractBucketName(String uri) { + if (uri == null || !uri.startsWith("gs://")) { + return null; + } + String remainder = uri.substring(5); + int firstSlash = remainder.indexOf('/'); + if (firstSlash == -1) { + return remainder; + } + return remainder.substring(0, firstSlash); + } + + static Tuple fetch(Storage delegate, String bucketName) { + Bucket bucket = delegate.get(bucketName); + if (bucket == null) { + return null; + } + + String projectId = bucket.getProject() != null ? bucket.getProject().toString() : null; + String resource; + if (projectId != null && !projectId.isEmpty()) { + resource = "projects/" + projectId + "/buckets/" + bucketName; + } else { + resource = PLACEHOLDER_RESOURCE_PREFIX + bucketName; + } + + String location = + bucket.getLocation() != null ? bucket.getLocation().toLowerCase(Locale.US) : "global"; + String locationType = + bucket.getLocationType() != null + ? bucket.getLocationType().toLowerCase(Locale.US) + : "region"; + + if (MULTI_REGION.equals(locationType) || DUAL_REGION.equals(locationType)) { + location = "global"; + } + + return Tuple.of(resource, location); + } + + static void checkCacheAndTriggerFetch( + Storage delegate, + BucketMetadataCache bucketMetadataCache, + ExecutorService cacheExecutor, + String bucketName) { + if (!bucketMetadataCache.putPendingIfAbsent( + bucketName, PLACEHOLDER_RESOURCE_PREFIX + bucketName, PLACEHOLDER_BUCKET_LOCATION)) { + return; + } + + try { + cacheExecutor.submit( + () -> { + try { + Tuple layout = fetch(delegate, bucketName); + if (layout != null) { + bucketMetadataCache.put(bucketName, layout, false); + } else { + bucketMetadataCache.remove(bucketName); + } + } catch (StorageException e) { + if (e.getCode() == 404) { + bucketMetadataCache.remove(bucketName); + } else if (e.getCode() == 403) { + bucketMetadataCache.put( + bucketName, + PLACEHOLDER_RESOURCE_PREFIX + bucketName, + PLACEHOLDER_BUCKET_LOCATION, + false); + } else { + LOGGER.log(Level.WARNING, "Background GetBucket failed", e); + bucketMetadataCache.remove(bucketName); + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Background GetBucket failed", e); + bucketMetadataCache.remove(bucketName); + } + }); + } catch (java.util.concurrent.RejectedExecutionException e) { + LOGGER.log(Level.WARNING, "Background prefetch task rejected due to pool saturation", e); + bucketMetadataCache.remove(bucketName); + } + } +} diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketMetadataCache.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketMetadataCache.java new file mode 100644 index 000000000000..aa3de76aed97 --- /dev/null +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketMetadataCache.java @@ -0,0 +1,131 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.Tuple; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +final class BucketMetadataCache { + + private static final int DEFAULT_CAPACITY = 10000; + private final ReentrantLock lock = new ReentrantLock(); + private final Map cache; + + BucketMetadataCache(int capacity) { + this.cache = + new LinkedHashMap(capacity, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > capacity; + } + }; + } + + static BucketMetadataCache getBucketMetadataCache() { + return new BucketMetadataCache(DEFAULT_CAPACITY); + } + + BucketMetadata get(String bucketName) { + lock.lock(); + try { + return cache.get(bucketName); + } finally { + lock.unlock(); + } + } + + void put(String bucketName, BucketMetadata metadata) { + lock.lock(); + try { + cache.put(bucketName, metadata); + } finally { + lock.unlock(); + } + } + + void put(String bucketName, String resource, String location, boolean pending) { + lock.lock(); + try { + cache.put(bucketName, new BucketMetadata(resource, location, pending)); + } finally { + lock.unlock(); + } + } + + void put(String bucketName, Tuple layout, boolean pending) { + lock.lock(); + try { + cache.put(bucketName, new BucketMetadata(layout.x(), layout.y(), pending)); + } finally { + lock.unlock(); + } + } + + void remove(String bucketName) { + lock.lock(); + try { + cache.remove(bucketName); + } finally { + lock.unlock(); + } + } + + void clear() { + lock.lock(); + try { + cache.clear(); + } finally { + lock.unlock(); + } + } + + boolean containsKey(String bucketName) { + lock.lock(); + try { + return cache.containsKey(bucketName); + } finally { + lock.unlock(); + } + } + + boolean putPendingIfAbsent(String bucketName, String resource, String location) { + lock.lock(); + try { + if (cache.containsKey(bucketName)) { + return false; + } + cache.put(bucketName, new BucketMetadata(resource, location, true)); + return true; + } finally { + lock.unlock(); + } + } + + static final class BucketMetadata { + final String resource; + final String location; + final boolean fetchPending; + + BucketMetadata(String resource, String location, boolean fetchPending) { + this.resource = resource; + this.location = location; + this.fetchPending = fetchPending; + } + } +} diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java index cf8353f1f6f5..7edc5acc0abb 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java @@ -118,7 +118,10 @@ public static MultipartUploadClient create(MultipartUploadSettings config) { options.createRetrier(), MultipartUploadHttpRequestManager.createFrom(options), options.getRetryAlgorithmManager()); + Storage service = options.getService(); + OtelStorageDecorator osd = + service instanceof OtelStorageDecorator ? (OtelStorageDecorator) service : null; return OtelMultipartUploadClientDecorator.decorate( - client, options.getOpenTelemetry(), Transport.HTTP); + client, osd, options.getOpenTelemetry(), Transport.HTTP); } } diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java index f5e7080fed75..34f15a5dd8ac 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java @@ -49,11 +49,18 @@ final class OtelMultipartUploadClientDecorator extends MultipartUploadClient { private final Tracer tracer; private OtelMultipartUploadClientDecorator( - MultipartUploadClient delegate, OpenTelemetry otel, Attributes baseAttributes) { + MultipartUploadClient delegate, + OtelStorageDecorator parentDecorator, + OpenTelemetry otel, + Attributes baseAttributes) { this.delegate = delegate; this.tracer = OtelStorageDecorator.TracerDecorator.decorate( - null, otel, baseAttributes, MultipartUploadClient.class.getName() + "/"); + parentDecorator, + null, + otel, + baseAttributes, + MultipartUploadClient.class.getName() + "/"); } @Override @@ -172,7 +179,10 @@ public ListMultipartUploadsResponse listMultipartUploads(ListMultipartUploadsReq } static MultipartUploadClient decorate( - MultipartUploadClient delegate, OpenTelemetry otel, Transport transport) { + MultipartUploadClient delegate, + OtelStorageDecorator parentDecorator, + OpenTelemetry otel, + Transport transport) { if (otel == OpenTelemetry.noop()) { return delegate; } @@ -185,6 +195,6 @@ static MultipartUploadClient decorate( .put("rpc.system", "XML") .put("service.name", "storage.googleapis.com") .build(); - return new OtelMultipartUploadClientDecorator(delegate, otel, baseAttributes); + return new OtelMultipartUploadClientDecorator(delegate, parentDecorator, otel, baseAttributes); } } diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java index 291db00ae5d3..2e707f3168aa 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java @@ -58,6 +58,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Locale; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.UnaryOperator; import org.checkerframework.checker.nullness.qual.NonNull; @@ -75,13 +76,16 @@ final class OtelStorageDecorator implements Storage { private final OpenTelemetry otel; private final Attributes baseAttributes; private final Tracer tracer; + final BucketMetadataCache bucketMetadataCache; + private volatile ExecutorService cacheExecutor; private OtelStorageDecorator(Storage delegate, OpenTelemetry otel, Attributes baseAttributes) { this.delegate = delegate; this.otel = otel; this.baseAttributes = baseAttributes; this.tracer = - TracerDecorator.decorate(null, otel, baseAttributes, Storage.class.getName() + "/"); + TracerDecorator.decorate(this, null, otel, baseAttributes, Storage.class.getName() + "/"); + this.bucketMetadataCache = BucketMetadataCache.getBucketMetadataCache(); } @Override @@ -1423,7 +1427,17 @@ public boolean deleteNotification(String bucket, String notificationId) { @Override public void close() throws Exception { - delegate.close(); + try { + bucketMetadataCache.clear(); + synchronized (this) { + if (cacheExecutor != null) { + cacheExecutor.shutdownNow(); + cacheExecutor.awaitTermination(5, TimeUnit.MINUTES); + } + } + } finally { + delegate.close(); + } } @Override @@ -1562,16 +1576,19 @@ static UnaryOperator retryContextDecorator(OpenTelemetry otel) { } static final class TracerDecorator implements Tracer { + @Nullable private final OtelStorageDecorator parentDecorator; @Nullable private final Context parentContextOverride; private final Tracer delegate; private final Attributes baseAttributes; private final String spanNamePrefix; TracerDecorator( + @Nullable OtelStorageDecorator parentDecorator, @Nullable Context parentContextOverride, Tracer delegate, Attributes baseAttributes, String spanNamePrefix) { + this.parentDecorator = parentDecorator; this.parentContextOverride = parentContextOverride; this.delegate = delegate; this.baseAttributes = baseAttributes; @@ -1579,6 +1596,7 @@ static final class TracerDecorator implements Tracer { } static TracerDecorator decorate( + @Nullable OtelStorageDecorator parentDecorator, @Nullable Context parentContextOverride, OpenTelemetry otel, Attributes baseAttributes, @@ -1588,7 +1606,8 @@ static TracerDecorator decorate( requireNonNull(spanNamePrefix, "spanNamePrefix must be non null"); Tracer tracer = otel.getTracer(OTEL_SCOPE_NAME, StorageOptions.getDefaultInstance().getLibraryVersion()); - return new TracerDecorator(parentContextOverride, tracer, baseAttributes, spanNamePrefix); + return new TracerDecorator( + parentDecorator, parentContextOverride, tracer, baseAttributes, spanNamePrefix); } @Override @@ -1598,7 +1617,7 @@ public SpanBuilder spanBuilder(String spanName) { if (parentContextOverride != null) { spanBuilder.setParent(parentContextOverride); } - return spanBuilder; + return new AcoSpanBuilder(spanBuilder, parentDecorator); } } @@ -1671,6 +1690,7 @@ public OtelDecoratedBlobWriteSession(BlobWriteSession delegate, Span sessionSpan this.sessionSpan = sessionSpan; this.tracer = TracerDecorator.decorate( + OtelStorageDecorator.this, Context.current(), otel, OtelStorageDecorator.this.baseAttributes, @@ -1794,6 +1814,7 @@ public OtelDecoratedCopyWriter(CopyWriter copyWriter, Span span) { this.parentContext = Context.current(); this.tracer = TracerDecorator.decorate( + OtelStorageDecorator.this, Context.current(), otel, OtelStorageDecorator.this.baseAttributes, @@ -2127,6 +2148,7 @@ private OtelDecoratingBlobAppendableUpload(BlobAppendableUpload delegate, Span u this.uploadSpan = uploadSpan; this.tracer = TracerDecorator.decorate( + OtelStorageDecorator.this, Context.current(), otel, OtelStorageDecorator.this.baseAttributes, @@ -2163,6 +2185,7 @@ private OtelDecoratingAppendableUploadWriteableByteChannel( this.openSpan = openSpan; this.tracer = TracerDecorator.decorate( + OtelStorageDecorator.this, Context.current(), otel, OtelStorageDecorator.this.baseAttributes, @@ -2276,4 +2299,17 @@ public boolean isOpen() { } } } + + ExecutorService getCacheExecutor() { + ExecutorService result = cacheExecutor; + if (result == null) { + synchronized (this) { + result = cacheExecutor; + if (result == null) { + cacheExecutor = result = AcoSpanBuilder.newCacheExecutor(); + } + } + } + return result; + } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java index e1a83ba6ebda..e059e2eea1c4 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java @@ -88,6 +88,10 @@ public void checkMPUInstrumentation() throws Exception { .key(objectName) .build()); + // Dynamically wait for the background prefetch to resolve to ensure 100% deterministic, + // non-flaky tracing + pollUntilMetadataResolved((OtelStorageDecorator) storage, bucket.getName()); + byte[] data = "Hello, World!".getBytes(StandardCharsets.UTF_8); RequestBody body = RequestBody.of(ByteBuffer.wrap(data)); UploadPartResponse upload = @@ -132,17 +136,53 @@ public void checkMPUInstrumentation() throws Exception { assertThat(uploadSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); assertThat(uploadSpan.getAttributes().get(AttributeKey.longKey("partNumber"))).isEqualTo(1); + assertThat( + uploadSpan.getAttributes().get(AttributeKey.stringKey("gcp.resource.destination.id"))) + .contains("buckets/" + bucket.getName()); + assertThat( + uploadSpan + .getAttributes() + .get(AttributeKey.stringKey("gcp.resource.destination.location"))) + .isNotEqualTo("global"); SpanData completeSpan = spans.get(2); assertThat(completeSpan.getName()) .isEqualTo("com.google.cloud.storage.MultipartUploadClient/completeMultipartUpload"); assertThat(completeSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + assertThat( + completeSpan.getAttributes().get(AttributeKey.stringKey("gcp.resource.destination.id"))) + .contains("buckets/" + bucket.getName()); + assertThat( + completeSpan + .getAttributes() + .get(AttributeKey.stringKey("gcp.resource.destination.location"))) + .isNotEqualTo("global"); SpanData listSpan = spans.get(3); assertThat(listSpan.getName()) .isEqualTo("com.google.cloud.storage.MultipartUploadClient/listMultipartUploads"); assertThat(listSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) .isEqualTo(String.format("gs://%s/", bucket.getName())); + assertThat(listSpan.getAttributes().get(AttributeKey.stringKey("gcp.resource.destination.id"))) + .contains("buckets/" + bucket.getName()); + assertThat( + listSpan + .getAttributes() + .get(AttributeKey.stringKey("gcp.resource.destination.location"))) + .isNotEqualTo("global"); + } + + private static void pollUntilMetadataResolved(OtelStorageDecorator osd, String bucketName) + throws Exception { + for (int i = 0; i < 100; i++) { + BucketMetadataCache.BucketMetadata meta = osd.bucketMetadataCache.get(bucketName); + if (meta != null && !meta.fetchPending) { + return; + } + Thread.sleep(50); + } + throw new AssertionError( + "Timeout waiting for ACO metadata prefetch to resolve for bucket: " + bucketName); } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java index 3b8957bbac64..3aea77007818 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java @@ -79,12 +79,145 @@ public void checkInstrumentation() throws Exception { .isEqualTo(transport.name().toLowerCase())); } + @Test + public void testAcoSuccessFlow() throws Exception { + TestExporter exporter = new TestExporter(); + + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build()) + .build(); + StorageOptions storageOptions = + storage.getOptions().toBuilder().setOpenTelemetry(openTelemetrySdk).build(); + try (Storage storage = storageOptions.getService()) { + storage.create(BlobInfo.newBuilder(bucket, generator.randomObjectName()).build()); + pollUntilMetadataResolved((OtelStorageDecorator) storage, bucket.getName()); + storage.create(BlobInfo.newBuilder(bucket, generator.randomObjectName()).build()); + } + + assertThat(exporter.getExportedSpans().size()).isAtLeast(2); + SpanData span1 = exporter.getExportedSpans().get(0); + SpanData span2 = exporter.getExportedSpans().get(1); + + assertAll( + () -> assertThat(getAttributeValue(span1, "gcp.client.service")).isEqualTo("Storage"), + () -> + assertThat(getAttributeValue(span1, "rpc.system")) + .isEqualTo(transport.name().toLowerCase()), + () -> assertThat(getAttributeValue(span2, "gcp.client.service")).isEqualTo("Storage"), + () -> + assertThat(getAttributeValue(span2, "gcp.resource.destination.id")) + .contains("buckets/" + bucket.getName()), + () -> + assertThat(getAttributeValue(span2, "gcp.resource.destination.location")) + .isNotEqualTo("global")); + } + + @Test + public void testAcoNonExistentBucketNoAttributes() throws Exception { + TestExporter exporter = new TestExporter(); + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build()) + .build(); + StorageOptions storageOptions = + storage.getOptions().toBuilder().setOpenTelemetry(openTelemetrySdk).build(); + String nonExistentBucket = "non-existent-bucket-" + generator.randomBucketName(); + + try (Storage storage = storageOptions.getService()) { + storage.get(nonExistentBucket); + pollUntilMetadataEvicted((OtelStorageDecorator) storage, nonExistentBucket); + storage.get(nonExistentBucket); + } + + // We should have at least 2 get spans + assertThat(exporter.getExportedSpans().size()).isAtLeast(2); + SpanData getSpan1 = exporter.getExportedSpans().get(0); + SpanData getSpan2 = exporter.getExportedSpans().get(1); + + assertAll( + () -> assertThat(getAttributeValue(getSpan2, "gcp.resource.destination.id")).isNull(), + () -> + assertThat(getAttributeValue(getSpan2, "gcp.resource.destination.location")).isNull()); + } + + @Test + public void testAcoForbiddenBucketFallbackAttributes() throws Exception { + TestExporter exporter = new TestExporter(); + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build()) + .build(); + StorageOptions storageOptions = + storage.getOptions().toBuilder().setOpenTelemetry(openTelemetrySdk).build(); + + try (Storage storage = storageOptions.getService()) { + try { + storage.get("test"); + } catch (StorageException e) { + // Expected 403 Forbidden + } + pollUntilMetadataResolved((OtelStorageDecorator) storage, "test"); + try { + storage.get("test"); + } catch (StorageException e) { + // Expected 403 Forbidden + } + } + + assertThat(exporter.getExportedSpans().size()).isAtLeast(2); + SpanData getSpan1 = exporter.getExportedSpans().get(0); + SpanData getSpan2 = exporter.getExportedSpans().get(1); + + assertAll( + () -> + assertThat(getAttributeValue(getSpan2, "gcp.resource.destination.id")) + .isEqualTo("projects/_/buckets/test"), + () -> + assertThat(getAttributeValue(getSpan2, "gcp.resource.destination.location")) + .isEqualTo("global")); + } + @Test public void noOpDoesNothing() { assertThat(storage.getOptions().getOpenTelemetry()).isSameInstanceAs(OpenTelemetry.noop()); storage.create(BlobInfo.newBuilder(bucket, generator.randomObjectName()).build()); } + private static void pollUntilMetadataResolved(OtelStorageDecorator osd, String bucketName) + throws Exception { + for (int i = 0; i < 100; i++) { + BucketMetadataCache.BucketMetadata meta = osd.bucketMetadataCache.get(bucketName); + if (meta != null && !meta.fetchPending) { + return; + } + Thread.sleep(50); + } + throw new AssertionError( + "Timeout waiting for ACO metadata prefetch to resolve for bucket: " + bucketName); + } + + private static void pollUntilMetadataEvicted(OtelStorageDecorator osd, String bucketName) + throws Exception { + for (int i = 0; i < 100; i++) { + if (!osd.bucketMetadataCache.containsKey(bucketName)) { + return; + } + Thread.sleep(50); + } + throw new AssertionError( + "Timeout waiting for ACO metadata prefetch to evict for nonexistent bucket: " + bucketName); + } + private static String getAttributeValue(SpanData spanData, String key) { return spanData.getAttributes().get(AttributeKey.stringKey(key)); } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/OtelStorageDecoratorAcoUnitTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/OtelStorageDecoratorAcoUnitTest.java new file mode 100644 index 000000000000..77895f04f4cb --- /dev/null +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/OtelStorageDecoratorAcoUnitTest.java @@ -0,0 +1,234 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.Tracer; +import java.math.BigInteger; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class OtelStorageDecoratorAcoUnitTest { + + private OpenTelemetry mockOtel; + + @Before + public void setUp() { + mockOtel = mock(OpenTelemetry.class); + Tracer mockTracer = mock(Tracer.class); + SpanBuilder mockSpanBuilder = mock(SpanBuilder.class); + Span mockSpan = mock(Span.class); + + Mockito.when(mockOtel.getTracer(Mockito.anyString(), Mockito.anyString())) + .thenReturn(mockTracer); + Mockito.when(mockTracer.spanBuilder(Mockito.anyString())).thenReturn(mockSpanBuilder); + Mockito.when(mockSpanBuilder.setAllAttributes(Mockito.any())).thenReturn(mockSpanBuilder); + Mockito.when(mockSpanBuilder.startSpan()).thenReturn(mockSpan); + } + + @Test + public void testAcoSuccessFlow() throws Exception { + Storage mockStorage = mock(Storage.class); + Bucket mockBucket = mock(Bucket.class); + + Mockito.when(mockStorage.get("success-bucket")).thenReturn(mockBucket); + Mockito.when(mockBucket.getProject()).thenReturn(BigInteger.valueOf(12345)); + Mockito.when(mockBucket.getLocation()).thenReturn("us-east1"); + Mockito.when(mockBucket.getLocationType()).thenReturn("region"); + + try (Storage decoratedStorage = + OtelStorageDecorator.decorate( + mockStorage, mockOtel, TransportCompatibility.Transport.HTTP)) { + OtelStorageDecorator osd = (OtelStorageDecorator) decoratedStorage; + + AcoSpanBuilder.checkCacheAndTriggerFetch( + osd.delegate, osd.bucketMetadataCache, osd.getCacheExecutor(), "success-bucket"); + + // Wait for background task to finish cleanly + osd.getCacheExecutor().shutdown(); + osd.getCacheExecutor().awaitTermination(5, TimeUnit.SECONDS); + + BucketMetadataCache.BucketMetadata meta = osd.bucketMetadataCache.get("success-bucket"); + assertNotNull(meta); + assertEquals("projects/12345/buckets/success-bucket", meta.resource); + assertEquals("us-east1", meta.location); + assertFalse(meta.fetchPending); + } + } + + @Test + public void testAco404NotFoundFlowWithException() throws Exception { + Storage mockStorage = mock(Storage.class); + StorageException ex = new StorageException(404, "Bucket not found"); + Mockito.when(mockStorage.get("nonexistent-bucket")).thenThrow(ex); + + try (Storage decoratedStorage = + OtelStorageDecorator.decorate( + mockStorage, mockOtel, TransportCompatibility.Transport.HTTP)) { + OtelStorageDecorator osd = (OtelStorageDecorator) decoratedStorage; + + AcoSpanBuilder.checkCacheAndTriggerFetch( + osd.delegate, osd.bucketMetadataCache, osd.getCacheExecutor(), "nonexistent-bucket"); + + // Wait for background task to finish + osd.getCacheExecutor().shutdown(); + osd.getCacheExecutor().awaitTermination(5, TimeUnit.SECONDS); + + // Verified not found -> Entry must be cleanly evicted (null) + BucketMetadataCache.BucketMetadata meta = osd.bucketMetadataCache.get("nonexistent-bucket"); + assertNull(meta); + } + } + + @Test + public void testAco404NotFoundFlowWithNull() throws Exception { + Storage mockStorage = mock(Storage.class); + Mockito.when(mockStorage.get("nonexistent-bucket")).thenReturn(null); + + try (Storage decoratedStorage = + OtelStorageDecorator.decorate( + mockStorage, mockOtel, TransportCompatibility.Transport.HTTP)) { + OtelStorageDecorator osd = (OtelStorageDecorator) decoratedStorage; + + AcoSpanBuilder.checkCacheAndTriggerFetch( + osd.delegate, osd.bucketMetadataCache, osd.getCacheExecutor(), "nonexistent-bucket"); + + // Wait for background task to finish + osd.getCacheExecutor().shutdown(); + osd.getCacheExecutor().awaitTermination(5, TimeUnit.SECONDS); + + // Verified not found -> Entry must be cleanly evicted (null) + BucketMetadataCache.BucketMetadata meta = osd.bucketMetadataCache.get("nonexistent-bucket"); + assertNull(meta); + } + } + + @Test + public void testAco403ForbiddenFlow() throws Exception { + Storage mockStorage = mock(Storage.class); + StorageException ex = new StorageException(403, "Access Denied"); + Mockito.when(mockStorage.get("forbidden-bucket")).thenThrow(ex); + + try (Storage decoratedStorage = + OtelStorageDecorator.decorate( + mockStorage, mockOtel, TransportCompatibility.Transport.HTTP)) { + OtelStorageDecorator osd = (OtelStorageDecorator) decoratedStorage; + + AcoSpanBuilder.checkCacheAndTriggerFetch( + osd.delegate, osd.bucketMetadataCache, osd.getCacheExecutor(), "forbidden-bucket"); + + // Wait for background task to finish + osd.getCacheExecutor().shutdown(); + osd.getCacheExecutor().awaitTermination(5, TimeUnit.SECONDS); + + // Forbidden -> Fallback values retained with pending = false (Do Not Retry) + BucketMetadataCache.BucketMetadata meta = osd.bucketMetadataCache.get("forbidden-bucket"); + assertNotNull(meta); + assertEquals("projects/_/buckets/forbidden-bucket", meta.resource); + assertEquals("global", meta.location); + assertFalse(meta.fetchPending); + } + } + + @Test + public void testAcoThunderingHerdProtection() throws Exception { + Storage mockStorage = mock(Storage.class); + Mockito.when(mockStorage.get("concurrent-bucket")) + .thenAnswer( + invocation -> { + Thread.sleep(100); + return null; + }); + + try (Storage decoratedStorage = + OtelStorageDecorator.decorate( + mockStorage, mockOtel, TransportCompatibility.Transport.HTTP)) { + OtelStorageDecorator osd = (OtelStorageDecorator) decoratedStorage; + + // Trigger twice concurrently + AcoSpanBuilder.checkCacheAndTriggerFetch( + osd.delegate, osd.bucketMetadataCache, osd.getCacheExecutor(), "concurrent-bucket"); + AcoSpanBuilder.checkCacheAndTriggerFetch( + osd.delegate, osd.bucketMetadataCache, osd.getCacheExecutor(), "concurrent-bucket"); + + // Wait for background tasks + osd.getCacheExecutor().shutdown(); + osd.getCacheExecutor().awaitTermination(5, TimeUnit.SECONDS); + + // Verify get was called exactly once (no duplicate fetches) + Mockito.verify(mockStorage, Mockito.times(1)).get("concurrent-bucket"); + } + } + + @Test + public void testAcoAcoSpanEndSkipsPending() throws Exception { + Storage mockStorage = mock(Storage.class); + try (Storage decoratedStorage = + OtelStorageDecorator.decorate( + mockStorage, mockOtel, TransportCompatibility.Transport.HTTP)) { + OtelStorageDecorator osd = (OtelStorageDecorator) decoratedStorage; + + // Manually put a pending placeholder entry + osd.bucketMetadataCache.put( + "pending-bucket", "projects/_/buckets/pending-bucket", "global", true); + + Span mockSpan = mock(Span.class); + AcoSpan acoSpan = new AcoSpan(mockSpan, "pending-bucket", osd); + + // Call end() while pending + acoSpan.end(); + + // Verify OTel span setAttribute was never called since cache entry was pending + Mockito.verify(mockSpan, Mockito.never()) + .setAttribute(Mockito.anyString(), Mockito.anyString()); + } + } + + @Test + public void testAcoAcoSpanEndAppliesResolved() throws Exception { + Storage mockStorage = mock(Storage.class); + try (Storage decoratedStorage = + OtelStorageDecorator.decorate( + mockStorage, mockOtel, TransportCompatibility.Transport.HTTP)) { + OtelStorageDecorator osd = (OtelStorageDecorator) decoratedStorage; + + // Manually put a resolved non-pending entry + osd.bucketMetadataCache.put( + "resolved-bucket", "projects/123/buckets/resolved-bucket", "us-east1", false); + + Span mockSpan = mock(Span.class); + AcoSpan acoSpan = new AcoSpan(mockSpan, "resolved-bucket", osd); + + acoSpan.end(); + + // Verify OTel span attributes were set successfully + Mockito.verify(mockSpan) + .setAttribute("gcp.resource.destination.id", "projects/123/buckets/resolved-bucket"); + Mockito.verify(mockSpan).setAttribute("gcp.resource.destination.location", "us-east1"); + } + } +}