3939import com .google .cloud .spanner .spi .v1 .SpannerRpc .Option ;
4040import com .google .common .annotations .VisibleForTesting ;
4141import com .google .common .base .Preconditions ;
42+ import com .google .common .base .Supplier ;
4243import com .google .common .collect .ImmutableMap ;
4344import com .google .common .util .concurrent .MoreExecutors ;
4445import com .google .protobuf .ByteString ;
5960import java .util .EnumMap ;
6061import java .util .Map ;
6162import java .util .concurrent .ThreadLocalRandom ;
63+ import java .util .concurrent .atomic .AtomicInteger ;
6264import java .util .concurrent .atomic .AtomicLong ;
65+ import java .util .concurrent .locks .Condition ;
66+ import java .util .concurrent .locks .ReentrantLock ;
6367import java .util .logging .Logger ;
6468import javax .annotation .Nullable ;
6569import javax .annotation .concurrent .GuardedBy ;
@@ -345,14 +349,17 @@ static Builder newBuilder() {
345349 }
346350
347351 private TimestampBound bound ;
348- private final Object txnLock = new Object ();
352+ private final ReentrantLock txnLock = new ReentrantLock ();
353+ private final Condition hasNoPendingStarts = txnLock .newCondition ();
349354
350355 @ GuardedBy ("txnLock" )
351356 private Timestamp timestamp ;
352357
353358 @ GuardedBy ("txnLock" )
354359 private ByteString transactionId ;
355360
361+ private final AtomicInteger pendingStarts = new AtomicInteger (0 );
362+
356363 private final Map <SpannerRpc .Option , ?> channelHint ;
357364
358365 MultiUseReadOnlyTransaction (Builder builder ) {
@@ -407,6 +414,28 @@ TransactionSelector getTransactionSelector() {
407414 return selector ;
408415 }
409416
417+ private ListenableAsyncResultSet createAsyncResultSet (
418+ Supplier <ResultSet > resultSetSupplier , int bufferRows ) {
419+ pendingStarts .incrementAndGet ();
420+ return new AsyncResultSetImpl (
421+ executorProvider ,
422+ () -> {
423+ try {
424+ return resultSetSupplier .get ();
425+ } finally {
426+ if (pendingStarts .decrementAndGet () == 0 ) {
427+ txnLock .lock ();
428+ try {
429+ hasNoPendingStarts .signalAll ();
430+ } finally {
431+ txnLock .unlock ();
432+ }
433+ }
434+ }
435+ },
436+ bufferRows );
437+ }
438+
410439 @ Override
411440 public ListenableAsyncResultSet readAsync (
412441 String table , KeySet keys , Iterable <String > columns , ReadOption ... options ) {
@@ -415,8 +444,8 @@ public ListenableAsyncResultSet readAsync(
415444 readOptions .hasBufferRows ()
416445 ? readOptions .bufferRows ()
417446 : AsyncResultSetImpl .DEFAULT_BUFFER_SIZE ;
418- return new AsyncResultSetImpl (
419- executorProvider , () -> readInternal (table , null , keys , columns , options ), bufferRows );
447+ return createAsyncResultSet (
448+ () -> readInternal (table , null , keys , columns , options ), bufferRows );
420449 }
421450
422451 @ Override
@@ -427,10 +456,8 @@ public ListenableAsyncResultSet readUsingIndexAsync(
427456 readOptions .hasBufferRows ()
428457 ? readOptions .bufferRows ()
429458 : AsyncResultSetImpl .DEFAULT_BUFFER_SIZE ;
430- return new AsyncResultSetImpl (
431- executorProvider ,
432- () -> readInternal (table , checkNotNull (index ), keys , columns , options ),
433- bufferRows );
459+ return createAsyncResultSet (
460+ () -> readInternal (table , checkNotNull (index ), keys , columns , options ), bufferRows );
434461 }
435462
436463 @ Override
@@ -440,8 +467,7 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
440467 readOptions .hasBufferRows ()
441468 ? readOptions .bufferRows ()
442469 : AsyncResultSetImpl .DEFAULT_BUFFER_SIZE ;
443- return new AsyncResultSetImpl (
444- executorProvider ,
470+ return createAsyncResultSet (
445471 () ->
446472 executeQueryInternal (
447473 statement , com .google .spanner .v1 .ExecuteSqlRequest .QueryMode .NORMAL , options ),
@@ -450,20 +476,38 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
450476
451477 @ Override
452478 public Timestamp getReadTimestamp () {
453- synchronized (txnLock ) {
479+ txnLock .lock ();
480+ try {
454481 assertTimestampAvailable (timestamp != null );
455482 return timestamp ;
483+ } finally {
484+ txnLock .unlock ();
456485 }
457486 }
458487
459488 ByteString getTransactionId () {
460- synchronized (txnLock ) {
489+ txnLock .lock ();
490+ try {
461491 return transactionId ;
492+ } finally {
493+ txnLock .unlock ();
462494 }
463495 }
464496
465497 @ Override
466498 public void close () {
499+ txnLock .lock ();
500+ try {
501+ while (pendingStarts .get () > 0 ) {
502+ try {
503+ hasNoPendingStarts .await ();
504+ } catch (InterruptedException e ) {
505+ throw SpannerExceptionFactory .propagateInterrupt (e );
506+ }
507+ }
508+ } finally {
509+ txnLock .unlock ();
510+ }
467511 ByteString id = getTransactionId ();
468512 if (id != null && !id .isEmpty ()) {
469513 rpc .clearTransactionAffinity (id );
@@ -477,7 +521,8 @@ public void close() {
477521 * Multiplexed Session.
478522 */
479523 void initFallbackTransaction () {
480- synchronized (txnLock ) {
524+ txnLock .lock ();
525+ try {
481526 span .addAnnotation ("Creating Transaction" );
482527 TransactionOptions .Builder options = TransactionOptions .newBuilder ();
483528 if (timestamp != null ) {
@@ -494,6 +539,8 @@ void initFallbackTransaction() {
494539 .setOptions (options )
495540 .build ();
496541 initTransactionInternal (request );
542+ } finally {
543+ txnLock .unlock ();
497544 }
498545 }
499546
@@ -507,7 +554,8 @@ void initTransaction() {
507554 // RTT, but optimal if the first read is slow. As the client library is now using streaming
508555 // reads, a possible optimization could be to use the first read in the transaction to begin
509556 // it implicitly.
510- synchronized (txnLock ) {
557+ txnLock .lock ();
558+ try {
511559 if (transactionId != null ) {
512560 return ;
513561 }
@@ -520,6 +568,8 @@ void initTransaction() {
520568 .setOptions (options )
521569 .build ();
522570 initTransactionInternal (request );
571+ } finally {
572+ txnLock .unlock ();
523573 }
524574 }
525575
0 commit comments