Skip to content

Commit 6af0624

Browse files
Add CompletableFuture-based executeAsync overloads to fluent Async for simpler async composition. (#782)
Provide an optional bounded default executor with configurable thread and queue limits. Keep existing Future-based API and default execution behavior unchanged.
1 parent b78c2ad commit 6af0624

3 files changed

Lines changed: 385 additions & 3 deletions

File tree

httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java

Lines changed: 225 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,124 @@
2626
*/
2727
package org.apache.hc.client5.http.fluent;
2828

29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutorService;
2931
import java.util.concurrent.Future;
32+
import java.util.concurrent.LinkedBlockingQueue;
33+
import java.util.concurrent.RejectedExecutionException;
34+
import java.util.concurrent.ThreadPoolExecutor;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicInteger;
3037

38+
import org.apache.hc.core5.annotation.Contract;
39+
import org.apache.hc.core5.annotation.ThreadingBehavior;
3140
import org.apache.hc.core5.concurrent.BasicFuture;
41+
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
3242
import org.apache.hc.core5.concurrent.FutureCallback;
3343
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
44+
import org.apache.hc.core5.util.Args;
3445

3546
/**
3647
* Asynchronous executor for {@link Request}s.
3748
*
3849
* @since 4.3
3950
*/
51+
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
4052
public class Async {
4153

54+
private static final int DEFAULT_MAX_THREADS =
55+
Math.max(2, Math.min(32, Runtime.getRuntime().availableProcessors() * 2));
56+
57+
private static final int DEFAULT_QUEUE_CAPACITY = 1000;
58+
59+
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);
60+
4261
private Executor executor;
43-
private java.util.concurrent.Executor concurrentExec;
62+
private volatile java.util.concurrent.Executor concurrentExec;
63+
private volatile ExecutorService ownedConcurrentExec;
64+
65+
private int maxThreads = DEFAULT_MAX_THREADS;
66+
private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
4467

4568
public static Async newInstance() {
4669
return new Async();
4770
}
4871

4972
Async() {
5073
super();
74+
// Keep legacy behavior by default.
75+
}
76+
77+
public Async maxThreads(final int maxThreads) {
78+
Args.positive(maxThreads, "maxThreads");
79+
this.maxThreads = maxThreads;
80+
rebuildOwnedExecutorIfActive();
81+
return this;
82+
}
83+
84+
public Async queueCapacity(final int queueCapacity) {
85+
Args.positive(queueCapacity, "queueCapacity");
86+
this.queueCapacity = queueCapacity;
87+
rebuildOwnedExecutorIfActive();
88+
return this;
89+
}
90+
91+
/**
92+
* Enables an owned bounded default executor for asynchronous request execution using the
93+
* current {@code maxThreads} and {@code queueCapacity} settings.
94+
*
95+
* @return this instance.
96+
* @since 5.7
97+
*/
98+
public Async useDefaultExecutor() {
99+
return useDefaultExecutor(this.maxThreads, this.queueCapacity);
100+
}
101+
102+
/**
103+
* Enables an owned bounded default executor for asynchronous request execution.
104+
*
105+
* @param maxThreads maximum number of threads.
106+
* @param queueCapacity maximum number of queued tasks.
107+
* @return this instance.
108+
* @since 5.7
109+
*/
110+
public Async useDefaultExecutor(final int maxThreads, final int queueCapacity) {
111+
Args.positive(maxThreads, "maxThreads");
112+
Args.positive(queueCapacity, "queueCapacity");
113+
this.maxThreads = maxThreads;
114+
this.queueCapacity = queueCapacity;
115+
116+
shutdown();
117+
this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity);
118+
this.concurrentExec = this.ownedConcurrentExec;
119+
return this;
120+
}
121+
122+
private void rebuildOwnedExecutorIfActive() {
123+
if (this.ownedConcurrentExec != null) {
124+
shutdown();
125+
this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity);
126+
this.concurrentExec = this.ownedConcurrentExec;
127+
}
128+
}
129+
130+
private static ExecutorService createDefaultExecutor(final int maxThreads, final int queueCapacity) {
131+
final int instanceId = INSTANCE_COUNT.incrementAndGet();
132+
final DefaultThreadFactory threadFactory = new DefaultThreadFactory(
133+
"httpclient5-fluent-async-" + instanceId + "-",
134+
true);
135+
136+
final ThreadPoolExecutor exec = new ThreadPoolExecutor(
137+
maxThreads,
138+
maxThreads,
139+
60L,
140+
TimeUnit.SECONDS,
141+
new LinkedBlockingQueue<>(queueCapacity),
142+
threadFactory,
143+
new ThreadPoolExecutor.CallerRunsPolicy());
144+
145+
exec.allowCoreThreadTimeOut(true);
146+
return exec;
51147
}
52148

53149
public Async use(final Executor executor) {
@@ -57,9 +153,25 @@ public Async use(final Executor executor) {
57153

58154
public Async use(final java.util.concurrent.Executor concurrentExec) {
59155
this.concurrentExec = concurrentExec;
156+
shutdown();
60157
return this;
61158
}
62159

160+
/**
161+
* Shuts down resources owned by this instance, if any.
162+
* <p>
163+
* This method never attempts to shut down executors supplied via {@link #use(java.util.concurrent.Executor)}.
164+
*
165+
* @since 5.7
166+
*/
167+
public void shutdown() {
168+
final ExecutorService exec = this.ownedConcurrentExec;
169+
if (exec != null) {
170+
this.ownedConcurrentExec = null;
171+
exec.shutdown();
172+
}
173+
}
174+
63175
static class ExecRunnable<T> implements Runnable {
64176

65177
private final BasicFuture<T> future;
@@ -100,8 +212,14 @@ public <T> Future<T> execute(
100212
request,
101213
this.executor != null ? this.executor : Executor.newInstance(),
102214
handler);
103-
if (this.concurrentExec != null) {
104-
this.concurrentExec.execute(runnable);
215+
216+
final java.util.concurrent.Executor exec = this.concurrentExec;
217+
if (exec != null) {
218+
try {
219+
exec.execute(runnable);
220+
} catch (final RejectedExecutionException ex) {
221+
future.failed(ex);
222+
}
105223
} else {
106224
final Thread t = new Thread(runnable);
107225
t.setDaemon(true);
@@ -122,4 +240,108 @@ public Future<Content> execute(final Request request) {
122240
return execute(request, new ContentResponseHandler(), null);
123241
}
124242

243+
/**
244+
* Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
245+
* when the response has been fully received and converted by the given response handler.
246+
*
247+
* @param request the request to execute.
248+
* @param handler the response handler.
249+
* @param <T> the handler result type.
250+
* @return a {@code CompletableFuture} producing the handler result.
251+
* @since 5.7
252+
*/
253+
public <T> CompletableFuture<T> executeAsync(final Request request, final HttpClientResponseHandler<T> handler) {
254+
final CompletableFuture<T> cf = new CompletableFuture<>();
255+
execute(request, handler, new FutureCallback<T>() {
256+
257+
@Override
258+
public void completed(final T result) {
259+
cf.complete(result);
260+
}
261+
262+
@Override
263+
public void failed(final Exception ex) {
264+
cf.completeExceptionally(ex);
265+
}
266+
267+
@Override
268+
public void cancelled() {
269+
cf.cancel(false);
270+
}
271+
272+
});
273+
return cf;
274+
}
275+
276+
/**
277+
* Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
278+
* when the response has been fully received and converted by the given response handler. The given
279+
* callback is invoked on completion, failure, or cancellation.
280+
*
281+
* @param request the request to execute.
282+
* @param handler the response handler.
283+
* @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}.
284+
* @param <T> the handler result type.
285+
* @return a {@code CompletableFuture} producing the handler result.
286+
* @since 5.7
287+
*/
288+
public <T> CompletableFuture<T> executeAsync(
289+
final Request request, final HttpClientResponseHandler<T> handler, final FutureCallback<T> callback) {
290+
final CompletableFuture<T> cf = new CompletableFuture<>();
291+
execute(request, handler, new FutureCallback<T>() {
292+
293+
@Override
294+
public void completed(final T result) {
295+
if (callback != null) {
296+
callback.completed(result);
297+
}
298+
cf.complete(result);
299+
}
300+
301+
@Override
302+
public void failed(final Exception ex) {
303+
if (callback != null) {
304+
callback.failed(ex);
305+
}
306+
cf.completeExceptionally(ex);
307+
}
308+
309+
@Override
310+
public void cancelled() {
311+
if (callback != null) {
312+
callback.cancelled();
313+
}
314+
cf.cancel(false);
315+
}
316+
317+
});
318+
return cf;
319+
}
320+
321+
/**
322+
* Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
323+
* when the response has been fully received and converted to {@link Content}.
324+
*
325+
* @param request the request to execute.
326+
* @return a {@code CompletableFuture} producing the response {@code Content}.
327+
* @since 5.7
328+
*/
329+
public CompletableFuture<Content> executeAsync(final Request request) {
330+
return executeAsync(request, new ContentResponseHandler());
331+
}
332+
333+
/**
334+
* Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
335+
* when the response has been fully received and converted to {@link Content}. The given callback
336+
* is invoked on completion, failure, or cancellation.
337+
*
338+
* @param request the request to execute.
339+
* @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}.
340+
* @return a {@code CompletableFuture} producing the response {@code Content}.
341+
* @since 5.7
342+
*/
343+
public CompletableFuture<Content> executeAsync(final Request request, final FutureCallback<Content> callback) {
344+
return executeAsync(request, new ContentResponseHandler(), callback);
345+
}
346+
125347
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.examples.fluent;
28+
29+
import java.util.Arrays;
30+
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
32+
33+
import org.apache.hc.client5.http.fluent.Async;
34+
import org.apache.hc.client5.http.fluent.Request;
35+
36+
/**
37+
* This example demonstrates how the HttpClient fluent API can be used to execute multiple
38+
* requests asynchronously using CompletableFuture.
39+
*/
40+
public class FluentAsyncCompletableFuture {
41+
42+
public static void main(final String... args) throws Exception {
43+
44+
final List<Request> requests = Arrays.asList(
45+
Request.get("http://www.google.com/"),
46+
Request.get("http://www.yahoo.com/"),
47+
Request.get("http://www.apache.org/"),
48+
Request.get("http://www.apple.com/")
49+
);
50+
51+
final Async async = Async.newInstance().useDefaultExecutor(8, 500);
52+
try {
53+
54+
final CompletableFuture<?>[] futures = requests.stream()
55+
.map(r -> async.executeAsync(r)
56+
.thenAccept(content -> System.out.println("Request completed: " + r))
57+
.exceptionally(ex -> {
58+
System.out.println(ex.getMessage() + ": " + r);
59+
return null;
60+
}))
61+
.toArray(CompletableFuture[]::new);
62+
63+
CompletableFuture.allOf(futures).join();
64+
} finally {
65+
async.shutdown();
66+
}
67+
68+
System.out.println("Done");
69+
}
70+
71+
}

0 commit comments

Comments
 (0)