Skip to content

Commit b8cd2cd

Browse files
committed
more fixes
1 parent add4e67 commit b8cd2cd

3 files changed

Lines changed: 192 additions & 37 deletions

File tree

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 60 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,12 @@
2424
import com.google.spanner.v1.CommitRequest;
2525
import com.google.spanner.v1.DirectedReadOptions;
2626
import com.google.spanner.v1.ExecuteSqlRequest;
27-
import com.google.spanner.v1.Group;
2827
import com.google.spanner.v1.Mutation;
2928
import com.google.spanner.v1.ReadRequest;
3029
import com.google.spanner.v1.RoutingHint;
31-
import com.google.spanner.v1.Tablet;
3230
import com.google.spanner.v1.TransactionOptions;
3331
import com.google.spanner.v1.TransactionSelector;
3432
import java.util.ArrayList;
35-
import java.util.HashSet;
3633
import java.util.List;
3734
import java.util.Objects;
3835
import java.util.Set;
@@ -55,6 +52,7 @@
5552
@InternalApi
5653
public final class ChannelFinder {
5754
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
55+
private static final int CACHE_UPDATE_DRAIN_BATCH_SIZE = 64;
5856
private static final int MAX_CACHE_UPDATE_THREADS =
5957
Math.max(2, Runtime.getRuntime().availableProcessors());
6058
private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool();
@@ -132,39 +130,12 @@ private boolean shouldProcessUpdate(CacheUpdate update) {
132130
}
133131

134132
public void update(CacheUpdate update) {
133+
Set<String> currentAddresses;
135134
synchronized (updateLock) {
136-
long currentId = databaseId.get();
137-
long updateDatabaseId = update.getDatabaseId();
138-
if (updateDatabaseId != 0 && currentId != updateDatabaseId) {
139-
if (currentId != 0) {
140-
recipeCache.clear();
141-
rangeCache.clear();
142-
}
143-
databaseId.set(updateDatabaseId);
144-
}
145-
if (update.hasKeyRecipes()) {
146-
recipeCache.addRecipes(update.getKeyRecipes());
147-
}
148-
rangeCache.addRanges(update);
149-
150-
// Notify the lifecycle manager about server addresses so it can create endpoints
151-
// in the background and start probing, and evict stale endpoints atomically.
152-
if (lifecycleManager != null && finderKey != null) {
153-
Set<String> currentAddresses = new HashSet<>();
154-
for (Group group : update.getGroupList()) {
155-
for (Tablet tablet : group.getTabletsList()) {
156-
String addr = tablet.getServerAddress();
157-
if (!addr.isEmpty()) {
158-
currentAddresses.add(addr);
159-
}
160-
}
161-
}
162-
// Also include addresses from existing cached tablets not in this update.
163-
currentAddresses.addAll(rangeCache.getActiveAddresses());
164-
// Atomically ensure endpoints exist and evict stale ones.
165-
lifecycleManager.updateActiveAddresses(finderKey, currentAddresses);
166-
}
135+
applyUpdateLocked(update);
136+
currentAddresses = snapshotActiveAddressesLocked();
167137
}
138+
publishLifecycleUpdate(currentAddresses);
168139
}
169140

170141
public void updateAsync(CacheUpdate update) {
@@ -187,10 +158,12 @@ public void updateAsync(CacheUpdate update) {
187158
}
188159

189160
private void drainPendingUpdate() {
161+
List<PendingCacheUpdate> batch = new ArrayList<>(CACHE_UPDATE_DRAIN_BATCH_SIZE);
190162
while (true) {
191-
PendingCacheUpdate toApply;
192-
while ((toApply = pendingUpdates.poll()) != null) {
193-
update(toApply.update);
163+
drainBatch(batch);
164+
if (!batch.isEmpty()) {
165+
applyBatch(batch);
166+
batch.clear();
194167
}
195168
drainScheduled.set(false);
196169
if (pendingUpdates.isEmpty() || !drainScheduled.compareAndSet(false, true)) {
@@ -199,6 +172,56 @@ private void drainPendingUpdate() {
199172
}
200173
}
201174

175+
private void drainBatch(List<PendingCacheUpdate> batch) {
176+
PendingCacheUpdate toApply;
177+
while (batch.size() < CACHE_UPDATE_DRAIN_BATCH_SIZE
178+
&& (toApply = pendingUpdates.poll()) != null) {
179+
batch.add(toApply);
180+
}
181+
}
182+
183+
private void applyBatch(List<PendingCacheUpdate> batch) {
184+
Set<String> currentAddresses;
185+
synchronized (updateLock) {
186+
for (PendingCacheUpdate pendingUpdate : batch) {
187+
applyUpdateLocked(pendingUpdate.update);
188+
}
189+
currentAddresses = snapshotActiveAddressesLocked();
190+
}
191+
publishLifecycleUpdate(currentAddresses);
192+
}
193+
194+
private void applyUpdateLocked(CacheUpdate update) {
195+
long currentId = databaseId.get();
196+
long updateDatabaseId = update.getDatabaseId();
197+
if (updateDatabaseId != 0 && currentId != updateDatabaseId) {
198+
if (currentId != 0) {
199+
recipeCache.clear();
200+
rangeCache.clear();
201+
}
202+
databaseId.set(updateDatabaseId);
203+
}
204+
if (update.hasKeyRecipes()) {
205+
recipeCache.addRecipes(update.getKeyRecipes());
206+
}
207+
rangeCache.addRanges(update);
208+
}
209+
210+
@Nullable
211+
private Set<String> snapshotActiveAddressesLocked() {
212+
if (lifecycleManager == null || finderKey == null) {
213+
return null;
214+
}
215+
return rangeCache.getActiveAddresses();
216+
}
217+
218+
private void publishLifecycleUpdate(@Nullable Set<String> currentAddresses) {
219+
if (currentAddresses == null) {
220+
return;
221+
}
222+
lifecycleManager.updateActiveAddressesAsync(finderKey, currentAddresses);
223+
}
224+
202225
/**
203226
* Test-only hook used by {@link KeyAwareChannel#awaitPendingCacheUpdates()} to wait until the
204227
* async cache update worker has finished applying the latest pending update.

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Map;
3030
import java.util.Set;
3131
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentLinkedQueue;
33+
import java.util.concurrent.ExecutorService;
3234
import java.util.concurrent.Executors;
3335
import java.util.concurrent.ScheduledExecutorService;
3436
import java.util.concurrent.ScheduledFuture;
@@ -103,6 +105,11 @@ static final class EndpointState {
103105
private final ChannelEndpointCache endpointCache;
104106
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
105107
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();
108+
private final Map<String, Long> finderGenerations = new ConcurrentHashMap<>();
109+
private final Map<String, PendingActiveAddressUpdate> pendingActiveAddressUpdates =
110+
new ConcurrentHashMap<>();
111+
private final Set<String> queuedFinderKeys = ConcurrentHashMap.newKeySet();
112+
private final ConcurrentLinkedQueue<String> queuedFinders = new ConcurrentLinkedQueue<>();
106113

107114
/**
108115
* Active addresses reported by each ChannelFinder, keyed by database id.
@@ -118,15 +125,27 @@ static final class EndpointState {
118125

119126
private final Object activeAddressLock = new Object();
120127

128+
private final ExecutorService activeAddressUpdateExecutor;
121129
private final ScheduledExecutorService scheduler;
122130
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
131+
private final AtomicBoolean activeAddressDrainScheduled = new AtomicBoolean(false);
123132
private final long probeIntervalSeconds;
124133
private final Duration idleEvictionDuration;
125134
private final Clock clock;
126135
private final String defaultEndpointAddress;
127136

128137
private ScheduledFuture<?> evictionFuture;
129138

139+
private static final class PendingActiveAddressUpdate {
140+
private final Set<String> activeAddresses;
141+
private final long generation;
142+
143+
private PendingActiveAddressUpdate(Set<String> activeAddresses, long generation) {
144+
this.activeAddresses = activeAddresses;
145+
this.generation = generation;
146+
}
147+
}
148+
130149
EndpointLifecycleManager(ChannelEndpointCache endpointCache) {
131150
this(
132151
endpointCache,
@@ -146,6 +165,13 @@ static final class EndpointState {
146165
this.idleEvictionDuration = idleEvictionDuration;
147166
this.clock = clock;
148167
this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress();
168+
this.activeAddressUpdateExecutor =
169+
Executors.newSingleThreadExecutor(
170+
r -> {
171+
Thread t = new Thread(r, "spanner-active-address-update");
172+
t.setDaemon(true);
173+
return t;
174+
});
149175
this.scheduler =
150176
Executors.newScheduledThreadPool(
151177
2,
@@ -213,6 +239,59 @@ private void clearTransientFailureEvictionMarker(String address) {
213239
}
214240
}
215241

242+
/**
243+
* Enqueues active-address reconciliation on a dedicated worker so cache-map updates do not block
244+
* on endpoint lifecycle bookkeeping.
245+
*/
246+
void updateActiveAddressesAsync(String finderKey, Set<String> activeAddresses) {
247+
if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) {
248+
return;
249+
}
250+
synchronized (activeAddressLock) {
251+
long generation = finderGenerations.getOrDefault(finderKey, 0L);
252+
pendingActiveAddressUpdates.put(
253+
finderKey, new PendingActiveAddressUpdate(new HashSet<>(activeAddresses), generation));
254+
if (queuedFinderKeys.add(finderKey)) {
255+
queuedFinders.add(finderKey);
256+
}
257+
}
258+
scheduleActiveAddressDrain();
259+
}
260+
261+
private void scheduleActiveAddressDrain() {
262+
if (!activeAddressDrainScheduled.compareAndSet(false, true)) {
263+
return;
264+
}
265+
activeAddressUpdateExecutor.execute(this::drainPendingActiveAddressUpdates);
266+
}
267+
268+
private void drainPendingActiveAddressUpdates() {
269+
while (true) {
270+
String finderKey = queuedFinders.poll();
271+
if (finderKey == null) {
272+
activeAddressDrainScheduled.set(false);
273+
if (queuedFinders.isEmpty() || !activeAddressDrainScheduled.compareAndSet(false, true)) {
274+
return;
275+
}
276+
continue;
277+
}
278+
279+
queuedFinderKeys.remove(finderKey);
280+
PendingActiveAddressUpdate pendingUpdate = pendingActiveAddressUpdates.remove(finderKey);
281+
if (pendingUpdate == null) {
282+
continue;
283+
}
284+
285+
synchronized (activeAddressLock) {
286+
long currentGeneration = finderGenerations.getOrDefault(finderKey, 0L);
287+
if (currentGeneration != pendingUpdate.generation) {
288+
continue;
289+
}
290+
}
291+
updateActiveAddresses(finderKey, pendingUpdate.activeAddresses);
292+
}
293+
}
294+
216295
/**
217296
* Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle
218297
* eviction timer for this endpoint.
@@ -295,6 +374,9 @@ void unregisterFinder(String finderKey) {
295374
return;
296375
}
297376
synchronized (activeAddressLock) {
377+
finderGenerations.merge(finderKey, 1L, Long::sum);
378+
pendingActiveAddressUpdates.remove(finderKey);
379+
queuedFinderKeys.remove(finderKey);
298380
if (activeAddressesPerFinder.remove(finderKey) == null) {
299381
return;
300382
}
@@ -588,6 +670,7 @@ void shutdown() {
588670
}
589671

590672
logger.log(Level.FINE, "Shutting down endpoint lifecycle manager");
673+
activeAddressUpdateExecutor.shutdownNow();
591674

592675
if (evictionFuture != null) {
593676
evictionFuture.cancel(false);
@@ -602,6 +685,9 @@ void shutdown() {
602685
}
603686
endpoints.clear();
604687
transientFailureEvictedAddresses.clear();
688+
pendingActiveAddressUpdates.clear();
689+
queuedFinderKeys.clear();
690+
queuedFinders.clear();
605691

606692
scheduler.shutdown();
607693
try {

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/ChannelFinderTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.CountDownLatch;
3434
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.Future;
3537
import java.util.concurrent.TimeUnit;
3638
import java.util.concurrent.atomic.AtomicLong;
3739
import org.junit.Test;
@@ -112,6 +114,28 @@ public void updateAsyncProcessesDatabaseTransitionWithoutRangesOrGroups() throws
112114
assertThat(rangeCache(finder).size()).isEqualTo(0);
113115
}
114116

117+
@Test
118+
public void updateDoesNotBlockOnLifecycleManagerAddressReconciliation() throws Exception {
119+
BlockingLifecycleManager lifecycleManager =
120+
new BlockingLifecycleManager(new FakeEndpointCache());
121+
ChannelFinder finder = new ChannelFinder(new FakeEndpointCache(), lifecycleManager, "db-1");
122+
ExecutorService executor = Executors.newSingleThreadExecutor();
123+
try {
124+
Future<?> future = executor.submit(() -> finder.update(singleRangeUpdate(0)));
125+
126+
future.get(1, TimeUnit.SECONDS);
127+
assertThat(lifecycleManager.updateStarted.await(5, TimeUnit.SECONDS)).isTrue();
128+
assertThat(rangeCache(finder).size()).isEqualTo(1);
129+
130+
lifecycleManager.releaseUpdate.countDown();
131+
assertThat(lifecycleManager.updateFinished.await(5, TimeUnit.SECONDS)).isTrue();
132+
} finally {
133+
lifecycleManager.releaseUpdate.countDown();
134+
lifecycleManager.shutdown();
135+
executor.shutdownNow();
136+
}
137+
}
138+
115139
private static CacheUpdate singleRangeUpdate(int index) {
116140
String startKey = String.format("k%05d", index);
117141
String limitKey = String.format("k%05d", index + 1);
@@ -262,4 +286,26 @@ public String authority() {
262286
return "fake";
263287
}
264288
}
289+
290+
private static final class BlockingLifecycleManager extends EndpointLifecycleManager {
291+
private final CountDownLatch updateStarted = new CountDownLatch(1);
292+
private final CountDownLatch releaseUpdate = new CountDownLatch(1);
293+
private final CountDownLatch updateFinished = new CountDownLatch(1);
294+
295+
private BlockingLifecycleManager(ChannelEndpointCache endpointCache) {
296+
super(endpointCache);
297+
}
298+
299+
@Override
300+
void updateActiveAddresses(String finderKey, java.util.Set<String> activeAddresses) {
301+
updateStarted.countDown();
302+
try {
303+
releaseUpdate.await(5, TimeUnit.SECONDS);
304+
} catch (InterruptedException e) {
305+
Thread.currentThread().interrupt();
306+
} finally {
307+
updateFinished.countDown();
308+
}
309+
}
310+
}
265311
}

0 commit comments

Comments
 (0)