@@ -608,6 +608,7 @@ private static void checkStreamException(
608608 private Deque <AbstractMessage > requests = new ConcurrentLinkedDeque <>();
609609 private volatile CountDownLatch freezeLock = new CountDownLatch (0 );
610610 private final AtomicInteger freezeAfterReturningNumRows = new AtomicInteger ();
611+ private final AtomicInteger freezeAfterNumRequests = new AtomicInteger (-1 );
611612 private Queue <Exception > exceptions = new ConcurrentLinkedQueue <>();
612613 private boolean stickyGlobalExceptions = false ;
613614 private ConcurrentMap <Statement , StatementResult > statementResults = new ConcurrentHashMap <>();
@@ -813,17 +814,37 @@ public void setIgnoreInlineBeginRequest(boolean ignore) {
813814 }
814815
815816 public void freeze () {
816- freezeLock = new CountDownLatch (1 );
817+ synchronized (lock ) {
818+ freezeLock = new CountDownLatch (1 );
819+ }
817820 }
818821
819822 public void unfreeze () {
820- freezeLock .countDown ();
823+ synchronized (lock ) {
824+ freezeAfterNumRequests .set (-1 );
825+ freezeLock .countDown ();
826+ }
821827 }
822828
823829 public void freezeAfterReturningNumRows (int numRows ) {
824830 freezeAfterReturningNumRows .set (numRows );
825831 }
826832
833+ public void freezeAfter (int numRequests ) {
834+ freezeAfterNumRequests .set (numRequests );
835+ }
836+
837+ private void maybeFreezeAndRecordRequest (AbstractMessage request ) {
838+ synchronized (lock ) {
839+ if (freezeAfterNumRequests .get () >= 0 ) {
840+ if (freezeAfterNumRequests .decrementAndGet () == -1 ) {
841+ freeze ();
842+ }
843+ }
844+ requests .add (request );
845+ }
846+ }
847+
827848 public void setMaxSessionsInOneBatch (int max ) {
828849 this .maxNumSessionsInOneBatch = max ;
829850 }
@@ -836,7 +857,7 @@ public void setMaxTotalSessions(int max) {
836857 public void batchCreateSessions (
837858 BatchCreateSessionsRequest request ,
838859 StreamObserver <BatchCreateSessionsResponse > responseObserver ) {
839- requests . add (request );
860+ maybeFreezeAndRecordRequest (request );
840861 Preconditions .checkNotNull (request .getDatabase ());
841862 String name = null ;
842863 try {
@@ -898,7 +919,7 @@ public void batchCreateSessions(
898919 @ Override
899920 public void createSession (
900921 CreateSessionRequest request , StreamObserver <Session > responseObserver ) {
901- requests . add (request );
922+ maybeFreezeAndRecordRequest (request );
902923 Preconditions .checkNotNull (request .getDatabase ());
903924 Preconditions .checkNotNull (request .getSession ());
904925 String name = generateSessionName (request .getDatabase ());
@@ -938,7 +959,7 @@ public void createSession(
938959
939960 @ Override
940961 public void getSession (GetSessionRequest request , StreamObserver <Session > responseObserver ) {
941- requests . add (request );
962+ maybeFreezeAndRecordRequest (request );
942963 Preconditions .checkNotNull (request .getName ());
943964 try {
944965 getSessionExecutionTime .simulateExecutionTime (exceptions , stickyGlobalExceptions , freezeLock );
@@ -983,7 +1004,7 @@ private <T> void setSessionNotFound(String name, StreamObserver<T> responseObser
9831004 @ Override
9841005 public void listSessions (
9851006 ListSessionsRequest request , StreamObserver <ListSessionsResponse > responseObserver ) {
986- requests . add (request );
1007+ maybeFreezeAndRecordRequest (request );
9871008 try {
9881009 listSessionsExecutionTime .simulateExecutionTime (
9891010 exceptions , stickyGlobalExceptions , freezeLock );
@@ -1006,7 +1027,7 @@ public void listSessions(
10061027
10071028 @ Override
10081029 public void deleteSession (DeleteSessionRequest request , StreamObserver <Empty > responseObserver ) {
1009- requests . add (request );
1030+ maybeFreezeAndRecordRequest (request );
10101031 Preconditions .checkNotNull (request .getName ());
10111032 try {
10121033 deleteSessionExecutionTime .simulateExecutionTime (
@@ -1035,7 +1056,7 @@ void doDeleteSession(Session session) {
10351056
10361057 @ Override
10371058 public void executeSql (ExecuteSqlRequest request , StreamObserver <ResultSet > responseObserver ) {
1038- requests . add (request );
1059+ maybeFreezeAndRecordRequest (request );
10391060 Preconditions .checkNotNull (request .getSession ());
10401061 Session session = getSession (request .getSession ());
10411062 if (session == null ) {
@@ -1133,7 +1154,7 @@ private void returnResultSet(
11331154 @ Override
11341155 public void executeBatchDml (
11351156 ExecuteBatchDmlRequest request , StreamObserver <ExecuteBatchDmlResponse > responseObserver ) {
1136- requests . add (request );
1157+ maybeFreezeAndRecordRequest (request );
11371158 Preconditions .checkNotNull (request .getSession ());
11381159 Session session = getSession (request .getSession ());
11391160 if (session == null ) {
@@ -1241,7 +1262,7 @@ public void executeStreamingSql(
12411262 || !request
12421263 .getSql ()
12431264 .equals (MultiplexedSessionDatabaseClient .DETERMINE_DIALECT_STATEMENT .getSql ())) {
1244- requests . add (request );
1265+ maybeFreezeAndRecordRequest (request );
12451266 }
12461267 Preconditions .checkNotNull (request .getSession ());
12471268 Session session = getSession (request .getSession ());
@@ -1687,7 +1708,7 @@ private <T> void throwTransactionAborted(ByteString transactionId) {
16871708
16881709 @ Override
16891710 public void read (final ReadRequest request , StreamObserver <ResultSet > responseObserver ) {
1690- requests . add (request );
1711+ maybeFreezeAndRecordRequest (request );
16911712 Preconditions .checkNotNull (request .getSession ());
16921713 Session session = getSession (request .getSession ());
16931714 if (session == null ) {
@@ -1720,7 +1741,7 @@ public void read(final ReadRequest request, StreamObserver<ResultSet> responseOb
17201741 @ Override
17211742 public void streamingRead (
17221743 final ReadRequest request , StreamObserver <PartialResultSet > responseObserver ) {
1723- requests . add (request );
1744+ maybeFreezeAndRecordRequest (request );
17241745 Preconditions .checkNotNull (request .getSession ());
17251746 Session session = getSession (request .getSession ());
17261747 if (session == null ) {
@@ -1945,7 +1966,7 @@ public void beginTransaction(
19451966 .getRequestOptions ()
19461967 .getTransactionTag ()
19471968 .equals ("multiplexed-rw-background-begin-txn" )) {
1948- requests . add (request );
1969+ maybeFreezeAndRecordRequest (request );
19491970 }
19501971 Preconditions .checkNotNull (request .getSession ());
19511972 Session session = getSession (request .getSession ());
@@ -2080,7 +2101,7 @@ private void ensureMostRecentTransaction(Session session, ByteString transaction
20802101
20812102 @ Override
20822103 public void commit (CommitRequest request , StreamObserver <CommitResponse > responseObserver ) {
2083- requests . add (request );
2104+ maybeFreezeAndRecordRequest (request );
20842105 Preconditions .checkNotNull (request .getSession ());
20852106 Session session = getSession (request .getSession ());
20862107 if (session == null ) {
@@ -2152,7 +2173,7 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
21522173 @ Override
21532174 public void batchWrite (
21542175 BatchWriteRequest request , StreamObserver <BatchWriteResponse > responseObserver ) {
2155- requests . add (request );
2176+ maybeFreezeAndRecordRequest (request );
21562177 Preconditions .checkNotNull (request .getSession ());
21572178 Session session = getSession (request .getSession ());
21582179 if (session == null ) {
@@ -2181,7 +2202,7 @@ private void commitTransaction(ByteString transactionId) {
21812202
21822203 @ Override
21832204 public void rollback (RollbackRequest request , StreamObserver <Empty > responseObserver ) {
2184- requests . add (request );
2205+ maybeFreezeAndRecordRequest (request );
21852206 Preconditions .checkNotNull (request .getTransactionId ());
21862207 Session session = getSession (request .getSession ());
21872208 if (session == null ) {
@@ -2230,7 +2251,7 @@ public void markCommitRetryOnTransaction(ByteString transactionId) {
22302251 @ Override
22312252 public void partitionQuery (
22322253 PartitionQueryRequest request , StreamObserver <PartitionResponse > responseObserver ) {
2233- requests . add (request );
2254+ maybeFreezeAndRecordRequest (request );
22342255 try {
22352256 partitionQueryExecutionTime .simulateExecutionTime (
22362257 exceptions , stickyGlobalExceptions , freezeLock );
@@ -2249,7 +2270,7 @@ public void partitionQuery(
22492270 @ Override
22502271 public void partitionRead (
22512272 PartitionReadRequest request , StreamObserver <PartitionResponse > responseObserver ) {
2252- requests . add (request );
2273+ maybeFreezeAndRecordRequest (request );
22532274 try {
22542275 partitionReadExecutionTime .simulateExecutionTime (
22552276 exceptions , stickyGlobalExceptions , freezeLock );
@@ -2418,23 +2439,27 @@ public ServerServiceDefinition getServiceDefinition() {
24182439 /** Removes all sessions and transactions. Mocked results are not removed. */
24192440 @ Override
24202441 public void reset () {
2421- requests = new ConcurrentLinkedDeque <>();
2422- exceptions = new ConcurrentLinkedQueue <>();
2423- statementGetCounts = new ConcurrentHashMap <>();
2424- sessions = new ConcurrentHashMap <>();
2425- sessionLastUsed = new ConcurrentHashMap <>();
2426- transactions = new ConcurrentHashMap <>();
2427- transactionsStarted .clear ();
2428- isPartitionedDmlTransaction = new ConcurrentHashMap <>();
2429- abortedTransactions = new ConcurrentHashMap <>();
2430- transactionCounters = new ConcurrentHashMap <>();
2431- partitionTokens = new ConcurrentHashMap <>();
2432- transactionLastUsed = new ConcurrentHashMap <>();
2433- transactionSequenceNo = new ConcurrentHashMap <>();
2434-
2435- numSessionsCreated .set (0 );
2436- stickyGlobalExceptions = false ;
2437- freezeLock .countDown ();
2442+ synchronized (lock ) {
2443+ requests = new ConcurrentLinkedDeque <>();
2444+ exceptions = new ConcurrentLinkedQueue <>();
2445+ statementGetCounts = new ConcurrentHashMap <>();
2446+ sessions = new ConcurrentHashMap <>();
2447+ sessionLastUsed = new ConcurrentHashMap <>();
2448+ transactions = new ConcurrentHashMap <>();
2449+ transactionsStarted .clear ();
2450+ isPartitionedDmlTransaction = new ConcurrentHashMap <>();
2451+ abortedTransactions = new ConcurrentHashMap <>();
2452+ transactionCounters = new ConcurrentHashMap <>();
2453+ partitionTokens = new ConcurrentHashMap <>();
2454+ transactionLastUsed = new ConcurrentHashMap <>();
2455+ transactionSequenceNo = new ConcurrentHashMap <>();
2456+
2457+ numSessionsCreated .set (0 );
2458+ freezeAfterNumRequests .set (-1 );
2459+ freezeAfterReturningNumRows .set (0 );
2460+ stickyGlobalExceptions = false ;
2461+ freezeLock .countDown ();
2462+ }
24382463 }
24392464
24402465 public void removeAllExecutionTimes () {
0 commit comments