Skip to content

Commit 81d15f2

Browse files
committed
chore(storage): exposed fastOpenBidiRead to Storage interface.
1 parent 1c738a9 commit 81d15f2

4 files changed

Lines changed: 123 additions & 0 deletions

File tree

java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSessionAdapter.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,29 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.ApiFutures;
21+
import com.google.cloud.storage.StorageDataClient.FastOpenObjectReadSession;
2122
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.util.concurrent.MoreExecutors;
2324
import java.io.IOException;
25+
import java.util.Objects;
2426

2527
final class BlobReadSessionAdapter implements BlobReadSession {
2628

2729
@VisibleForTesting final ObjectReadSession session;
30+
private ObjectReadSessionStreamRead<?> fastOpenRead;
31+
private final ReadProjectionConfig<?> fastOpenConfig;
2832

2933
BlobReadSessionAdapter(ObjectReadSession session) {
34+
this(session, null, null);
35+
}
36+
37+
private BlobReadSessionAdapter(
38+
ObjectReadSession session,
39+
ObjectReadSessionStreamRead<?> fastOpenRead,
40+
ReadProjectionConfig<?> fastOpenConfig) {
3041
this.session = session;
42+
this.fastOpenRead = fastOpenRead;
43+
this.fastOpenConfig = fastOpenConfig;
3144
}
3245

3346
@Override
@@ -40,6 +53,17 @@ public BlobInfo getBlobInfo() {
4053
@SuppressWarnings({"rawtypes", "unchecked"})
4154
@Override
4255
public <Projection> Projection readAs(ReadProjectionConfig<Projection> config) {
56+
synchronized (this) {
57+
if (fastOpenRead != null && Objects.equals(config, fastOpenConfig)) {
58+
Projection projection = (Projection) fastOpenRead.project();
59+
fastOpenRead = null;
60+
if (projection instanceof ApiFuture) {
61+
ApiFuture apiFuture = (ApiFuture) projection;
62+
return (Projection) StorageException.coalesceAsync(apiFuture);
63+
}
64+
return projection;
65+
}
66+
}
4367
Projection projection = session.readAs(config);
4468
if (projection instanceof ApiFuture) {
4569
ApiFuture apiFuture = (ApiFuture) projection;
@@ -59,4 +83,14 @@ static ApiFuture<BlobReadSession> wrap(ApiFuture<ObjectReadSession> session) {
5983
BlobReadSessionAdapter::new,
6084
MoreExecutors.directExecutor());
6185
}
86+
87+
static <Projection> ApiFuture<BlobReadSession> wrap(
88+
ApiFuture<FastOpenObjectReadSession<Projection>> session,
89+
ReadProjectionConfig<Projection> config) {
90+
return ApiFutures.transform(
91+
StorageException.coalesceAsync(session),
92+
fastOpen ->
93+
new BlobReadSessionAdapter(fastOpen.getSession(), fastOpen.getRead(), config),
94+
MoreExecutors.directExecutor());
95+
}
6296
}

java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,6 +1487,31 @@ public ApiFuture<BlobReadSession> blobReadSession(BlobId id, BlobSourceOption...
14871487
return BlobReadSessionAdapter.wrap(session);
14881488
}
14891489

1490+
@Override
1491+
public <Projection> ApiFuture<BlobReadSession> blobReadSession(
1492+
BlobId id, ReadProjectionConfig<Projection> config, BlobSourceOption... options) {
1493+
Opts<ObjectSourceOpt> opts = Opts.unwrap(options);
1494+
Object object = codecs.blobId().encode(id);
1495+
1496+
BidiReadObjectSpec.Builder spec =
1497+
BidiReadObjectSpec.newBuilder().setBucket(object.getBucket()).setObject(object.getName());
1498+
1499+
long generation = object.getGeneration();
1500+
if (generation > 0) {
1501+
spec.setGeneration(generation);
1502+
}
1503+
BidiReadObjectRequest.Builder b = BidiReadObjectRequest.newBuilder();
1504+
b.setReadObjectSpec(spec);
1505+
opts.bidiReadObjectRequest().apply(b);
1506+
BidiReadObjectRequest req = b.build();
1507+
1508+
GrpcCallContext context = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
1509+
ApiFuture<StorageDataClient.FastOpenObjectReadSession<Projection>> session =
1510+
storageDataClient.fastOpenReadSession(req, context, config);
1511+
1512+
return BlobReadSessionAdapter.wrap(session, config);
1513+
}
1514+
14901515
@Override
14911516
public GrpcStorageOptions getOptions() {
14921517
return (GrpcStorageOptions) super.getOptions();

java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,49 @@ public ApiFuture<BlobReadSession> blobReadSession(BlobId id, BlobSourceOption...
15041504
}
15051505
}
15061506

1507+
@Override
1508+
public <Projection> ApiFuture<BlobReadSession> blobReadSession(
1509+
BlobId id, ReadProjectionConfig<Projection> config, BlobSourceOption... options) {
1510+
Span blobReadSessionSpan =
1511+
tracer
1512+
.spanBuilder(BLOB_READ_SESSION)
1513+
.setAttribute("gsutil.uri", id.toGsUtilUriWithGeneration())
1514+
.startSpan();
1515+
try (Scope ignore1 = blobReadSessionSpan.makeCurrent()) {
1516+
Context blobReadSessionContext = Context.current();
1517+
Span ready = tracer.spanBuilder(BLOB_READ_SESSION + "/ready").startSpan();
1518+
ApiFuture<BlobReadSession> blobReadSessionApiFuture =
1519+
delegate.blobReadSession(id, config, options);
1520+
ApiFuture<BlobReadSession> futureDecorated =
1521+
ApiFutures.transform(
1522+
blobReadSessionApiFuture,
1523+
delegate -> {
1524+
ready.end();
1525+
return new OtelDecoratingBlobReadSession(
1526+
delegate, id, blobReadSessionContext, blobReadSessionSpan);
1527+
},
1528+
MoreExecutors.directExecutor());
1529+
ApiFutures.addCallback(
1530+
futureDecorated,
1531+
(OnFailureApiFutureCallback<BlobReadSession>)
1532+
t -> {
1533+
blobReadSessionSpan.recordException(t);
1534+
blobReadSessionSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
1535+
blobReadSessionSpan.end();
1536+
ready.recordException(t);
1537+
ready.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
1538+
ready.end();
1539+
},
1540+
MoreExecutors.directExecutor());
1541+
return futureDecorated;
1542+
} catch (Throwable t) {
1543+
blobReadSessionSpan.recordException(t);
1544+
blobReadSessionSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
1545+
blobReadSessionSpan.end();
1546+
throw t;
1547+
}
1548+
}
1549+
15071550
@Override
15081551
public BlobAppendableUpload blobAppendableUpload(
15091552
BlobInfo blobInfo, BlobAppendableUploadConfig uploadConfig, BlobWriteOption... options) {

java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5955,6 +5955,27 @@ default ApiFuture<BlobReadSession> blobReadSession(BlobId id, BlobSourceOption..
59555955
return throwGrpcOnly(fmtMethodName("blobReadSession", BlobId.class, BlobSourceOption.class));
59565956
}
59575957

5958+
/**
5959+
* Asynchronously set up a new {@link BlobReadSession} for the specified {@link BlobId}, {@link
5960+
* ReadProjectionConfig} and {@code options}.
5961+
*
5962+
* <p>This is a "fast-open" version of {@link #blobReadSession(BlobId, BlobSourceOption...)} which
5963+
* allows the initial read to be started at the same time as the session is opened.
5964+
*
5965+
* @param id the blob to read from
5966+
* @param config the configuration for the initial read
5967+
* @param options blob source options
5968+
* @since 2.65.0 This new api is in preview and is subject to breaking changes.
5969+
*/
5970+
@BetaApi
5971+
@TransportCompatibility({Transport.GRPC})
5972+
default <Projection> ApiFuture<BlobReadSession> blobReadSession(
5973+
BlobId id, ReadProjectionConfig<Projection> config, BlobSourceOption... options) {
5974+
return throwGrpcOnly(
5975+
fmtMethodName(
5976+
"blobReadSession", BlobId.class, ReadProjectionConfig.class, BlobSourceOption.class));
5977+
}
5978+
59585979
/**
59595980
* Create a new {@link BlobAppendableUpload} for the specified {@code blobInfo} and {@code
59605981
* options}.

0 commit comments

Comments
 (0)