|
32 | 32 | import java.nio.channels.ClosedSelectorException; |
33 | 33 | import java.util.Arrays; |
34 | 34 | import java.util.Map; |
35 | | -import java.util.concurrent.Executors; |
36 | 35 | import java.util.concurrent.RejectedExecutionException; |
37 | | -import java.util.concurrent.ThreadFactory; |
38 | 36 | import java.util.concurrent.TimeoutException; |
39 | 37 | import java.util.concurrent.atomic.AtomicInteger; |
40 | 38 | import org.eclipse.jetty.client.BytesRequestContent; |
|
48 | 46 | import org.eclipse.jetty.http.HttpHeader; |
49 | 47 | import org.eclipse.jetty.http.HttpMethod; |
50 | 48 | import org.eclipse.jetty.io.EofException; |
| 49 | +import org.eclipse.jetty.util.thread.QueuedThreadPool; |
51 | 50 | import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; |
52 | 51 | import org.eclipse.jetty.util.thread.Scheduler; |
53 | 52 |
|
@@ -86,20 +85,38 @@ static JettyHttpApiHostClient create(String url, Config config) { |
86 | 85 | boolean daemon = false; |
87 | 86 | Scheduler scheduler = |
88 | 87 | new ScheduledExecutorScheduler(schedulerName, daemon, myLoader, myThreadGroup); |
89 | | - ThreadFactory factory = |
90 | | - runnable -> { |
91 | | - Thread t = new Thread(myThreadGroup, runnable); |
92 | | - t.setName("JettyHttpApiHostClient-" + threadCount.incrementAndGet()); |
93 | | - t.setDaemon(true); |
94 | | - return t; |
95 | | - }; |
96 | | - // By default HttpClient will use a QueuedThreadPool with minThreads=8 and maxThreads=200. |
97 | | - // 8 threads is probably too much for most apps, especially since asynchronous I/O means that |
98 | | - // 8 concurrent API requests probably don't need that many threads. It's also not clear |
99 | | - // what advantage we'd get from using a QueuedThreadPool with a smaller minThreads value, versus |
100 | | - // just one of the standard java.util.concurrent pools. Here we have minThreads=1, maxThreads=∞, |
101 | | - // and idleTime=60 seconds. maxThreads=200 and maxThreads=∞ are probably equivalent in practice. |
102 | | - httpClient.setExecutor(Executors.newCachedThreadPool(factory)); |
| 88 | + /* |
| 89 | + * Thread Pool Configuration & Bug Analysis: |
| 90 | + * |
| 91 | + * In previous versions of the runtime, an unbounded CachedThreadPool was used here: |
| 92 | + * `httpClient.setExecutor(Executors.newCachedThreadPool(factory));` |
| 93 | + * |
| 94 | + * Under high load (e.g., when a customer's custom retry logic aggressively retries failing |
| 95 | + * RPCs like `BeginTransaction`), an unbounded thread pool creates thousands of threads instantly. |
| 96 | + * This leads to a system collapse: |
| 97 | + * 1. JVM Overload: The Java container becomes severely memory and CPU constrained. |
| 98 | + * 2. Appserver Flooded: The avalanche of concurrent requests from the Java container floods the |
| 99 | + * C++ Appserver proxy. |
| 100 | + * 3. Triggering the C++ Bug Mask: Under massive load, the C++ Appserver's gRPC calls to the |
| 101 | + * Datastore fail with UNAVAILABLE or RESOURCE_EXHAUSTED errors. |
| 102 | + * 4. The Response: Because these aren't standard application errors, the C++ code |
| 103 | + * (DatastoreClientHelper::DoneImpl) masks them as `Error::INTERNAL_ERROR` and returns the |
| 104 | + * message "Internal Datastore Error" to the Java client to prevent leaking internal |
| 105 | + * infrastructure details. |
| 106 | + * 5. The Java client throws DatastoreFailureException, triggering the customer's loop again. |
| 107 | + * |
| 108 | + * To prevent this "retry storm", we explicitly use a bounded QueuedThreadPool. |
| 109 | + * We cap the threads at `maxConnectionsPerDestination` (which defaults to 100) |
| 110 | + * instead of a hardcoded 200 to prevent severe memory pressure (Thread Stack sizes) |
| 111 | + * on smaller AppEngine instance classes like F1 (256MB) or F2 (512MB). |
| 112 | + * If the system experiences a spike, Jetty will safely queue the outgoing RPCs, preventing the |
| 113 | + * JVM and the Appserver from being overwhelmed and eliminating the INTERNAL_ERROR fallback loop. |
| 114 | + */ |
| 115 | + int maxThreads = config.maxConnectionsPerDestination().orElse(100); |
| 116 | + QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, 10, 60000, null, myThreadGroup); |
| 117 | + threadPool.setName("JettyHttpApiHostClient"); |
| 118 | + threadPool.setDaemon(true); |
| 119 | + httpClient.setExecutor(threadPool); |
103 | 120 | httpClient.setScheduler(scheduler); |
104 | 121 | config.maxConnectionsPerDestination().ifPresent(httpClient::setMaxConnectionsPerDestination); |
105 | 122 | try { |
|
0 commit comments