1818
1919import com .google .api .core .InternalApi ;
2020import com .google .common .annotations .VisibleForTesting ;
21- import com .google .common .util .concurrent .ThreadFactoryBuilder ;
2221import com .google .spanner .v1 .BeginTransactionRequest ;
2322import com .google .spanner .v1 .CacheUpdate ;
2423import com .google .spanner .v1 .CommitRequest ;
3231import java .util .ArrayList ;
3332import java .util .List ;
3433import java .util .Objects ;
35- import java .util .Set ;
36- import java .util .concurrent .ConcurrentLinkedQueue ;
37- import java .util .concurrent .ExecutorService ;
38- import java .util .concurrent .LinkedBlockingQueue ;
3934import java .util .concurrent .ThreadLocalRandom ;
40- import java .util .concurrent .ThreadPoolExecutor ;
41- import java .util .concurrent .TimeUnit ;
42- import java .util .concurrent .atomic .AtomicBoolean ;
4335import java .util .concurrent .atomic .AtomicLong ;
4436import java .util .function .Predicate ;
4537import javax .annotation .Nullable ;
5244@ InternalApi
5345public final class ChannelFinder {
5446 private static final Predicate <String > NO_EXCLUDED_ENDPOINTS = address -> false ;
55- private static final int CACHE_UPDATE_DRAIN_BATCH_SIZE = 64 ;
56- private static final int MAX_CACHE_UPDATE_THREADS =
57- Math .max (2 , Runtime .getRuntime ().availableProcessors ());
58- private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool ();
5947
6048 private final Object updateLock = new Object ();
6149 private final AtomicLong databaseId = new AtomicLong ();
6250 private final KeyRecipeCache recipeCache = new KeyRecipeCache ();
6351 private final KeyRangeCache rangeCache ;
64- private final ConcurrentLinkedQueue <PendingCacheUpdate > pendingUpdates =
65- new ConcurrentLinkedQueue <>();
66- private final AtomicBoolean drainScheduled = new AtomicBoolean ();
67- private volatile java .util .concurrent .CountDownLatch drainingLatch =
68- new java .util .concurrent .CountDownLatch (0 );
69- @ Nullable private final EndpointLifecycleManager lifecycleManager ;
70- @ Nullable private final String finderKey ;
7152
7253 public ChannelFinder (ChannelEndpointCache endpointCache ) {
73- this (endpointCache , null , null );
54+ this (endpointCache , null );
7455 }
7556
7657 public ChannelFinder (
7758 ChannelEndpointCache endpointCache , @ Nullable EndpointLifecycleManager lifecycleManager ) {
78- this (endpointCache , lifecycleManager , null );
79- }
80-
81- ChannelFinder (
82- ChannelEndpointCache endpointCache ,
83- @ Nullable EndpointLifecycleManager lifecycleManager ,
84- @ Nullable String finderKey ) {
8559 this .rangeCache = new KeyRangeCache (Objects .requireNonNull (endpointCache ), lifecycleManager );
86- this .lifecycleManager = lifecycleManager ;
87- this .finderKey = finderKey ;
8860 }
8961
9062 void useDeterministicRandom () {
9163 rangeCache .useDeterministicRandom ();
9264 }
9365
94- private static ExecutorService createCacheUpdatePool () {
95- ThreadPoolExecutor executor =
96- new ThreadPoolExecutor (
97- MAX_CACHE_UPDATE_THREADS ,
98- MAX_CACHE_UPDATE_THREADS ,
99- 30L ,
100- TimeUnit .SECONDS ,
101- new LinkedBlockingQueue <>(),
102- new ThreadFactoryBuilder ()
103- .setDaemon (true )
104- .setNameFormat ("spanner-cache-update-%d" )
105- .build ());
106- executor .allowCoreThreadTimeOut (true );
107- return executor ;
108- }
109-
110- private static final class PendingCacheUpdate {
111- private final CacheUpdate update ;
112-
113- private PendingCacheUpdate (CacheUpdate update ) {
114- this .update = update ;
115- }
116- }
117-
11866 private boolean isMaterialUpdate (CacheUpdate update ) {
11967 return update .getGroupCount () > 0
12068 || update .getRangeCount () > 0
@@ -130,65 +78,16 @@ private boolean shouldProcessUpdate(CacheUpdate update) {
13078 }
13179
13280 public void update (CacheUpdate update ) {
133- Set <String > currentAddresses ;
13481 synchronized (updateLock ) {
13582 applyUpdateLocked (update );
136- currentAddresses = snapshotActiveAddressesLocked ();
13783 }
138- publishLifecycleUpdate (currentAddresses );
13984 }
14085
14186 public void updateAsync (CacheUpdate update ) {
14287 if (!shouldProcessUpdate (update )) {
14388 return ;
14489 }
145- pendingUpdates .add (new PendingCacheUpdate (update ));
146- if (drainScheduled .compareAndSet (false , true )) {
147- java .util .concurrent .CountDownLatch latch = new java .util .concurrent .CountDownLatch (1 );
148- drainingLatch = latch ;
149- CACHE_UPDATE_POOL .execute (
150- () -> {
151- try {
152- drainPendingUpdate ();
153- } finally {
154- latch .countDown ();
155- }
156- });
157- }
158- }
159-
160- private void drainPendingUpdate () {
161- List <PendingCacheUpdate > batch = new ArrayList <>(CACHE_UPDATE_DRAIN_BATCH_SIZE );
162- while (true ) {
163- drainBatch (batch );
164- if (!batch .isEmpty ()) {
165- applyBatch (batch );
166- batch .clear ();
167- }
168- drainScheduled .set (false );
169- if (pendingUpdates .isEmpty () || !drainScheduled .compareAndSet (false , true )) {
170- return ;
171- }
172- }
173- }
174-
175- private void drainBatch (List <PendingCacheUpdate > batch ) {
176- PendingCacheUpdate toApply ;
177- while (batch .size () < CACHE_UPDATE_DRAIN_BATCH_SIZE
178- && (toApply = pendingUpdates .poll ()) != null ) {
179- batch .add (toApply );
180- }
181- }
182-
183- private void applyBatch (List <PendingCacheUpdate > batch ) {
184- Set <String > currentAddresses ;
185- synchronized (updateLock ) {
186- for (PendingCacheUpdate pendingUpdate : batch ) {
187- applyUpdateLocked (pendingUpdate .update );
188- }
189- currentAddresses = snapshotActiveAddressesLocked ();
190- }
191- publishLifecycleUpdate (currentAddresses );
90+ update (update );
19291 }
19392
19493 private void applyUpdateLocked (CacheUpdate update ) {
@@ -207,42 +106,11 @@ private void applyUpdateLocked(CacheUpdate update) {
207106 rangeCache .addRanges (update );
208107 }
209108
210- @ Nullable
211- private Set <String > snapshotActiveAddressesLocked () {
212- if (lifecycleManager == null || finderKey == null ) {
213- return null ;
214- }
215- return rangeCache .getActiveAddresses ();
216- }
217-
218- private void publishLifecycleUpdate (@ Nullable Set <String > currentAddresses ) {
219- if (currentAddresses == null ) {
220- return ;
221- }
222- lifecycleManager .updateActiveAddressesAsync (finderKey , currentAddresses );
223- }
224-
225109 /**
226- * Test-only hook used by {@link KeyAwareChannel#awaitPendingCacheUpdates()} to wait until the
227- * async cache update worker has finished applying the latest pending update.
110+ * Test-only hook retained for compatibility with callers that previously waited on async work.
228111 */
229112 @ VisibleForTesting
230- void awaitPendingUpdates () throws InterruptedException {
231- long deadline = System .nanoTime () + java .util .concurrent .TimeUnit .SECONDS .toNanos (5 );
232- while (System .nanoTime () < deadline ) {
233- java .util .concurrent .CountDownLatch latch = drainingLatch ;
234- if (latch != null ) {
235- long remainingNanos = deadline - System .nanoTime ();
236- if (remainingNanos <= 0 ) {
237- break ;
238- }
239- latch .await (remainingNanos , java .util .concurrent .TimeUnit .NANOSECONDS );
240- }
241- if (pendingUpdates .isEmpty () && !drainScheduled .get ()) {
242- return ;
243- }
244- }
245- }
113+ void awaitPendingUpdates () {}
246114
247115 public ChannelEndpoint findServer (ReadRequest .Builder reqBuilder ) {
248116 return findServer (reqBuilder , preferLeader (reqBuilder .getTransaction ()), NO_EXCLUDED_ENDPOINTS );
0 commit comments