Skip to content

Commit a91276c

Browse files
author
Adam Soos
committed
RCB-597: create request class for multithreading
1 parent c39682d commit a91276c

2 files changed

Lines changed: 124 additions & 2 deletions

File tree

api/src/main/java/com/basistech/rosette/api/HttpRosetteAPI.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,18 @@
6767
import java.net.URISyntaxException;
6868
import java.nio.charset.StandardCharsets;
6969
import java.util.ArrayList;
70+
import java.util.Collection;
7071
import java.util.Collections;
7172
import java.util.HashSet;
7273
import java.util.List;
7374
import java.util.Properties;
7475
import java.util.Set;
76+
import java.util.concurrent.ExecutorService;
77+
import java.util.concurrent.Executors;
7578
import java.util.concurrent.Future;
79+
import java.util.concurrent.TimeUnit;
7680
import java.util.regex.Pattern;
81+
import java.util.stream.Collectors;
7782
import java.util.zip.GZIPInputStream;
7883

7984
import static java.net.HttpURLConnection.HTTP_OK;
@@ -98,6 +103,8 @@ public class HttpRosetteAPI extends AbstractRosetteAPI {
98103
private List<Header> additionalHeaders;
99104
private int connectionConcurrency = 2;
100105
private boolean closeClientOnClose = true;
106+
private ExecutorService threadPool;
107+
private int shutdownWait;
101108

102109
private HttpRosetteAPI() {
103110
// use builder
@@ -119,7 +126,8 @@ private HttpRosetteAPI() {
119126
*/
120127
HttpRosetteAPI(String key, String urlToCall, Integer failureRetries,
121128
CloseableHttpClient httpClient, List<Header> additionalHeaders,
122-
Integer connectionConcurrency, boolean onlyAcceptKnownFields) throws HttpRosetteAPIException {
129+
Integer connectionConcurrency, boolean onlyAcceptKnownFields,
130+
Integer shutdownWait) throws HttpRosetteAPIException {
123131
urlBase = urlToCall == null ? urlBase : TRAILING_SLASHES.matcher(urlToCall.trim()).replaceAll("");
124132
if (failureRetries != null && failureRetries >= 1) {
125133
this.failureRetries = failureRetries;
@@ -128,6 +136,10 @@ private HttpRosetteAPI() {
128136
if (connectionConcurrency != null) {
129137
this.connectionConcurrency = connectionConcurrency;
130138
}
139+
this.threadPool = Executors.newFixedThreadPool(this.connectionConcurrency * 2);
140+
if (shutdownWait != null) {
141+
this.shutdownWait = shutdownWait;
142+
}
131143

132144
mapper = ApiModelMixinModule.setupObjectMapper(new ObjectMapper());
133145
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, onlyAcceptKnownFields);
@@ -601,11 +613,57 @@ private <T extends Object> T getResponse(HttpResponse httpResponse, Class<T> cla
601613
}
602614
}
603615

616+
/**
617+
* Creates a RosetteRequest which can be sent concurrently through this HttpRosetteAPI
618+
*
619+
* @param endpoint the endpoint to which the request is sent to
620+
* @param request the request object
621+
* @param responseClass Response's class
622+
* @return RequestThread which when started sends the predefined request through this http client
623+
* @param <R> Type of the response
624+
*/
625+
public <R extends Response> RosetteRequest<R> createRosetteRequest(String endpoint,
626+
Request request,
627+
Class<R> responseClass) {
628+
return new RosetteRequest<>(this, request, endpoint, responseClass);
629+
}
630+
631+
/**
632+
* Sends the request concurrently
633+
* @param request the request to be sent
634+
* @return A Future with the response of the request
635+
* @param <R> type of the response object
636+
*/
637+
public <R extends Response> Future<R> submitRequest(RosetteRequest<R> request) {
638+
return this.threadPool.submit(request);
639+
}
640+
641+
/**
642+
* Sends the requests concurrently
643+
* @param requests list of the requests to be sent
644+
* @return A list of Futures with the responses to the requests
645+
*/
646+
public List<Future<? extends Response>> submitRequests(Collection<RosetteRequest<? extends Response>> requests) {
647+
return requests.stream()
648+
.map(this::submitRequest)
649+
.collect(Collectors.toList());
650+
}
651+
652+
604653
@Override
605654
public void close() throws IOException {
606655
if (closeClientOnClose) {
607656
httpClient.close();
608657
}
658+
try {
659+
this.threadPool.shutdown();
660+
boolean terminated = this.threadPool.awaitTermination(this.shutdownWait, TimeUnit.SECONDS);
661+
if (!terminated) {
662+
this.threadPool.shutdownNow();
663+
}
664+
} catch (InterruptedException e) {
665+
this.threadPool.shutdownNow();
666+
}
609667
}
610668

611669
/**
@@ -619,6 +677,7 @@ public static class Builder {
619677
private CloseableHttpClient httpClient;
620678
private List<Header> additionalHeaders = new ArrayList<>();
621679
private boolean onlyAcceptKnownFields;
680+
private Integer shutdownWait;
622681

623682
/**
624683
* Specify the API key. This is required for use with the public API, and
@@ -701,6 +760,16 @@ public Builder onlyAcceptKnownFields(boolean onlyAcceptKnownFields) {
701760
return this;
702761
}
703762

763+
/**
764+
* Specify how long in seconds the api should wait for pending requests before forcefully terminating
765+
* @param shutdownWaitSeconds the length of the wait in seconds. The default is 0.
766+
* @return this.
767+
*/
768+
public Builder shutdownWait(int shutdownWaitSeconds) {
769+
this.shutdownWait = shutdownWaitSeconds;
770+
return this;
771+
}
772+
704773
/**
705774
* Build the API object.
706775
* @return the new API object.
@@ -713,7 +782,8 @@ public HttpRosetteAPI build() throws HttpRosetteAPIException {
713782
httpClient,
714783
additionalHeaders,
715784
concurrency,
716-
onlyAcceptKnownFields);
785+
onlyAcceptKnownFields,
786+
shutdownWait);
717787
}
718788
}
719789
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2023 Basis Technology Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.basistech.rosette.api;
17+
18+
import com.basistech.rosette.apimodel.Request;
19+
import com.basistech.rosette.apimodel.Response;
20+
21+
import java.util.concurrent.Callable;
22+
23+
/**
24+
* This class encompasses a future request that can be sent concurrently
25+
* @param <R> type of the response object
26+
*/
27+
public class RosetteRequest<R extends Response> implements Callable<R> {
28+
private final HttpRosetteAPI api;
29+
private final Request request;
30+
private final String servicePath;
31+
private final Class<R> responseClass;
32+
private R response;
33+
34+
RosetteRequest(HttpRosetteAPI api,
35+
Request request,
36+
String servicePath, Class<R> responseClass) {
37+
this.api = api;
38+
this.request = request;
39+
this.servicePath = servicePath;
40+
this.responseClass = responseClass;
41+
}
42+
43+
@Override
44+
public R call() {
45+
this.response = api.perform(this.servicePath, this.request, this.responseClass);
46+
return this.response;
47+
}
48+
49+
public R getResponse() {
50+
return this.response;
51+
}
52+
}

0 commit comments

Comments
 (0)