Skip to content

Commit 7dbbff0

Browse files
author
Adam Soos
committed
RCB-597: move thread pool to example
1 parent b585780 commit 7dbbff0

3 files changed

Lines changed: 28 additions & 122 deletions

File tree

api/src/main/java/com/basistech/rosette/api/HttpRosetteAPI.java

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,13 @@
6767
import java.net.URISyntaxException;
6868
import java.nio.charset.StandardCharsets;
6969
import java.util.ArrayList;
70-
import java.util.Collection;
7170
import java.util.Collections;
7271
import java.util.HashSet;
7372
import java.util.List;
7473
import java.util.Properties;
7574
import java.util.Set;
76-
import java.util.concurrent.ExecutorService;
77-
import java.util.concurrent.Executors;
7875
import java.util.concurrent.Future;
79-
import java.util.concurrent.TimeUnit;
8076
import java.util.regex.Pattern;
81-
import java.util.stream.Collectors;
8277
import java.util.zip.GZIPInputStream;
8378

8479
import static java.net.HttpURLConnection.HTTP_OK;
@@ -103,8 +98,6 @@ public class HttpRosetteAPI extends AbstractRosetteAPI {
10398
private List<Header> additionalHeaders;
10499
private int connectionConcurrency = 2;
105100
private boolean closeClientOnClose = true;
106-
private ExecutorService threadPool;
107-
private int shutdownWait;
108101

109102
private HttpRosetteAPI() {
110103
// use builder
@@ -126,8 +119,7 @@ private HttpRosetteAPI() {
126119
*/
127120
HttpRosetteAPI(String key, String urlToCall, Integer failureRetries,
128121
CloseableHttpClient httpClient, List<Header> additionalHeaders,
129-
Integer connectionConcurrency, boolean onlyAcceptKnownFields,
130-
Integer shutdownWait) throws HttpRosetteAPIException {
122+
Integer connectionConcurrency, boolean onlyAcceptKnownFields) throws HttpRosetteAPIException {
131123
urlBase = urlToCall == null ? urlBase : TRAILING_SLASHES.matcher(urlToCall.trim()).replaceAll("");
132124
if (failureRetries != null && failureRetries >= 1) {
133125
this.failureRetries = failureRetries;
@@ -136,10 +128,6 @@ private HttpRosetteAPI() {
136128
if (connectionConcurrency != null) {
137129
this.connectionConcurrency = connectionConcurrency;
138130
}
139-
this.threadPool = Executors.newFixedThreadPool(this.connectionConcurrency * 2);
140-
if (shutdownWait != null) {
141-
this.shutdownWait = shutdownWait;
142-
}
143131

144132
mapper = ApiModelMixinModule.setupObjectMapper(new ObjectMapper());
145133
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, onlyAcceptKnownFields);
@@ -627,41 +615,12 @@ public RosetteRequest createRosetteRequest(String endpoint,
627615
return new RosetteRequest(this, request, endpoint, responseClass);
628616
}
629617

630-
/**
631-
* Sends the request concurrently
632-
* @param request the request to be sent
633-
* @return A Future with the response of the request
634-
*/
635-
public Future<Response> submitRequest(RosetteRequest request) {
636-
return this.threadPool.submit(request);
637-
}
638-
639-
/**
640-
* Sends the requests concurrently
641-
* @param requests list of the requests to be sent
642-
* @return A list of Futures with the responses to the requests
643-
*/
644-
public List<Future<Response>> submitRequests(Collection<RosetteRequest> requests) {
645-
return requests.stream()
646-
.map(this::submitRequest)
647-
.collect(Collectors.toList());
648-
}
649-
650618

651619
@Override
652620
public void close() throws IOException {
653621
if (closeClientOnClose) {
654622
httpClient.close();
655623
}
656-
try {
657-
this.threadPool.shutdown();
658-
boolean terminated = this.threadPool.awaitTermination(this.shutdownWait, TimeUnit.SECONDS);
659-
if (!terminated) {
660-
this.threadPool.shutdownNow();
661-
}
662-
} catch (InterruptedException e) {
663-
this.threadPool.shutdownNow();
664-
}
665624
}
666625

667626
/**
@@ -675,7 +634,6 @@ public static class Builder {
675634
private CloseableHttpClient httpClient;
676635
private List<Header> additionalHeaders = new ArrayList<>();
677636
private boolean onlyAcceptKnownFields;
678-
private Integer shutdownWait;
679637

680638
/**
681639
* Specify the API key. This is required for use with the public API, and
@@ -758,16 +716,6 @@ public Builder onlyAcceptKnownFields(boolean onlyAcceptKnownFields) {
758716
return this;
759717
}
760718

761-
/**
762-
* Specify how long in seconds the api should wait for pending requests before forcefully terminating
763-
* @param shutdownWaitSeconds the length of the wait in seconds. The default is 0.
764-
* @return this.
765-
*/
766-
public Builder shutdownWait(int shutdownWaitSeconds) {
767-
this.shutdownWait = shutdownWaitSeconds;
768-
return this;
769-
}
770-
771719
/**
772720
* Build the API object.
773721
* @return the new API object.
@@ -780,8 +728,7 @@ public HttpRosetteAPI build() throws HttpRosetteAPIException {
780728
httpClient,
781729
additionalHeaders,
782730
concurrency,
783-
onlyAcceptKnownFields,
784-
shutdownWait);
731+
onlyAcceptKnownFields);
785732
}
786733
}
787734
}

api/src/test/java/com/basistech/rosette/api/RosetteRequestTest.java

Lines changed: 11 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import com.basistech.rosette.apimodel.LanguageOptions;
2424
import com.basistech.rosette.apimodel.LanguageResponse;
2525
import com.basistech.rosette.apimodel.Response;
26-
import org.apache.http.impl.client.CloseableHttpClient;
27-
import org.apache.http.impl.client.HttpClients;
2826
import org.junit.jupiter.api.AfterEach;
2927
import org.junit.jupiter.api.BeforeEach;
3028
import org.junit.jupiter.api.Test;
@@ -40,12 +38,13 @@
4038
import java.util.Date;
4139
import java.util.List;
4240
import java.util.concurrent.ExecutionException;
41+
import java.util.concurrent.ExecutorService;
42+
import java.util.concurrent.Executors;
4343
import java.util.concurrent.Future;
4444
import java.util.concurrent.TimeUnit;
4545

4646
import static org.junit.jupiter.api.Assertions.assertEquals;
4747
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
48-
import static org.junit.jupiter.api.Assertions.assertNull;
4948
import static org.junit.jupiter.api.Assertions.assertTrue;
5049

5150
@ExtendWith(MockServerExtension.class)
@@ -86,9 +85,11 @@ void successfulRequest() throws ExecutionException, InterruptedException {
8685
RosetteRequest entitiesRequest = this.api.createRosetteRequest(AbstractRosetteAPI.ENTITIES_SERVICE_PATH, entitiesRequestData, EntitiesResponse.class);
8786

8887
//testing the request
89-
Future<Response> response = this.api.submitRequest(entitiesRequest);
88+
ExecutorService threadPool = Executors.newFixedThreadPool(1);
89+
Future<Response> response = threadPool.submit(entitiesRequest);
9090
assertInstanceOf(EntitiesResponse.class, response.get());
9191
assertEquals(response.get(), entitiesRequest.getResponse());
92+
threadPool.shutdownNow();
9293
}
9394

9495

@@ -107,9 +108,11 @@ void errorResponse() throws ExecutionException, InterruptedException {
107108
RosetteRequest entitiesRequest = this.api.createRosetteRequest(AbstractRosetteAPI.ENTITIES_SERVICE_PATH, entitiesRequestData, EntitiesResponse.class);
108109

109110
//testing the request
110-
Future<Response> response = this.api.submitRequest(entitiesRequest);
111+
ExecutorService threadPool = Executors.newFixedThreadPool(1);
112+
Future<Response> response = threadPool.submit(entitiesRequest);
111113
assertInstanceOf(ErrorResponse.class, response.get());
112114
assertEquals(response.get(), entitiesRequest.getResponse());
115+
threadPool.shutdownNow();
113116
}
114117

115118
@Test
@@ -144,8 +147,9 @@ void testTiming() throws ExecutionException, InterruptedException {
144147
}
145148

146149
//run requests
150+
ExecutorService threadPool = Executors.newFixedThreadPool(7);
147151
Date d1 = new Date();
148-
List<Future<Response>> responses = this.api.submitRequests(requests);
152+
List<Future<Response>> responses = threadPool.invokeAll(requests);
149153
for (int i = 0; i < responses.size(); i++) {
150154
responses.get(i).get();
151155
}
@@ -168,7 +172,7 @@ void testTiming() throws ExecutionException, InterruptedException {
168172

169173

170174
d1 = new Date();
171-
responses = this.api.submitRequests(requests);
175+
responses = threadPool.invokeAll(requests);
172176
for (int i = 0; i < responses.size(); i++) {
173177
responses.get(i).get();
174178
}
@@ -178,54 +182,6 @@ void testTiming() throws ExecutionException, InterruptedException {
178182
assertTrue(d2.getTime() - d1.getTime() > requests.size() / concurrency * delay); // running faster than this would suggest it exceeds the maximum concurrency
179183
}
180184

181-
private RosetteRequest setupShutdownTest(int shutdownWaitSeconds, int responseDelayMillis, CloseableHttpClient client) {
182-
this.api = new HttpRosetteAPI.Builder()
183-
.url(String.format("http://localhost:%d/rest/v1", mockServer.getPort()))
184-
.shutdownWait(shutdownWaitSeconds)
185-
.httpClient(client)
186-
.build();
187-
188-
//response setup
189-
String entitiesResponse = "{\"entities\" : [ { \"type\" : \"ORGANIZATION\", \"mention\" : \"Securities and Exchange Commission\", \"normalized\" : \"U.S. Securities and Exchange Commission\", \"count\" : 1, \"mentionOffsets\" : [ { \"startOffset\" : 4, \"endOffset\" : 38 } ], \"entityId\" : \"Q953944\", \"confidence\" : 0.39934742, \"linkingConfidence\" : 0.67404154 } ] }";
190-
setupResponse("/rest/v1/entities", entitiesResponse, 200, responseDelayMillis, 1);
191-
192-
//request setup
193-
String entitiesTextData = "The Securities and Exchange Commission today announced the leadership of the agency’s trial unit.";
194-
DocumentRequest<EntitiesOptions> entitiesRequestData = DocumentRequest.<EntitiesOptions>builder()
195-
.content(entitiesTextData)
196-
.build();
197-
return this.api.createRosetteRequest(AbstractRosetteAPI.ENTITIES_SERVICE_PATH, entitiesRequestData, EntitiesResponse.class);
198-
}
199-
200-
@Test
201-
void successfulShutdown() throws IOException {
202-
/* creating http client to avoid closing the httpClient of HttpRosetteApi during close()
203-
* which automatically closes the pending threads. This way the shutdown behaviour can be tested.*/
204-
CloseableHttpClient httpClient = HttpClients.createDefault();
205-
int shutdownWait = 3;
206-
RosetteRequest request = setupShutdownTest(shutdownWait, 1000, httpClient);
207-
this.api.submitRequest(request);
208-
this.api.close();
209-
httpClient.close();
210-
211-
assertInstanceOf(EntitiesResponse.class, request.getResponse()); //got response successfully
212-
}
213-
214-
@Test
215-
void timeoutShutdown() throws IOException {
216-
/* creating http client to avoid closing the httpClient of HttpRosetteApi during close()
217-
* which automatically closes the pending threads. This way the shutdown behaviour can be tested.*/
218-
CloseableHttpClient httpClient = HttpClients.createDefault();
219-
int shutdownWait = 3;
220-
RosetteRequest request = setupShutdownTest(shutdownWait, 5000, httpClient);
221-
this.api.submitRequest(request);
222-
this.api.close();
223-
224-
httpClient.close();
225-
assertNull(request.getResponse()); //didn't get a response, it was forcefully shutdown
226-
}
227-
228-
229185
@AfterEach
230186
void after() throws IOException {
231187
this.api.close();

examples/src/main/java/com/basistech/rosette/examples/ConcurrencyExample.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.Arrays;
3838
import java.util.List;
3939
import java.util.concurrent.ExecutionException;
40+
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.Executors;
4042
import java.util.concurrent.Future;
4143

4244
import static com.basistech.rosette.api.common.AbstractRosetteAPI.ENTITIES_SERVICE_PATH;
@@ -68,7 +70,7 @@ private void run() throws IOException, ExecutionException, InterruptedException
6870
.connectionConcurrency(maximumConcurrency)
6971
.build();
7072

71-
List<RosetteRequest> threads = new ArrayList<>();
73+
List<RosetteRequest> requests = new ArrayList<>();
7274
// Setting up entities request
7375
String entitiesTextData =
7476
"The Securities and Exchange Commission today announced the leadership of the agency’s trial unit. "
@@ -77,21 +79,21 @@ private void run() throws IOException, ExecutionException, InterruptedException
7779
+ "Since December 2016, Ms. Fitzpatrick and Mr. Gottesman have served as Co-Acting Chief Litigation Counsel. "
7880
+ "In that role, they were jointly responsible for supervising the trial unit at the agency’s Washington D.C. headquarters "
7981
+ "as well as coordinating with litigators in the SEC’s 11 regional offices around the country.";
80-
threads.add(
82+
requests.add(
8183
rosetteApi.createRosetteRequest(ENTITIES_SERVICE_PATH,
8284
DocumentRequest.<EntitiesOptions>builder().content(entitiesTextData).build(),
8385
EntitiesResponse.class)
8486
);
8587
// Setting up language request
8688
String languageData = "Por favor Señorita, says the man.";
87-
threads.add(
89+
requests.add(
8890
rosetteApi.createRosetteRequest(LANGUAGE_SERVICE_PATH,
8991
DocumentRequest.<LanguageOptions>builder().content(languageData).build(),
9092
LanguageResponse.class)
9193
);
9294
// Setting up morphology request
9395
// No content is given to this request and it will return an error response
94-
threads.add(
96+
requests.add(
9597
rosetteApi.createRosetteRequest(MORPHOLOGY_SERVICE_PATH + "/" + MorphologicalFeature.COMPLETE,
9698
DocumentRequest.<MorphologyOptions>builder().build(),
9799
MorphologyResponse.class)
@@ -105,38 +107,39 @@ private void run() throws IOException, ExecutionException, InterruptedException
105107
names.add(Name.builder().text(name).build());
106108
}
107109
double threshold = 0.75;
108-
threads.add(
110+
requests.add(
109111
rosetteApi.createRosetteRequest(NAME_DEDUPLICATION_SERVICE_PATH,
110112
NameDeduplicationRequest.builder().names(names).threshold(threshold).build(),
111113
NameDeduplicationResponse.class)
112114
);
113115
//Setting up the tokens request
114116
String tokensData = "北京大学生物系主任办公室内部会议";
115-
threads.add(
117+
requests.add(
116118
rosetteApi.createRosetteRequest(TOKENS_SERVICE_PATH,
117119
DocumentRequest.builder().content(tokensData).build(),
118120
TokensResponse.class)
119121
);
120122
//Setting up the transliteration request
121123
String transliterationData = "ana r2ye7 el gam3a el sa3a 3 el 3asr";
122-
threads.add(
124+
requests.add(
123125
rosetteApi.createRosetteRequest(TRANSLITERATION_SERVICE_PATH,
124126
DocumentRequest.builder().content(transliterationData).build(),
125127
TransliterationResponse.class)
126128
);
127129

128130
// start the threads
129-
List<Future<Response>> futures = rosetteApi.submitRequests(threads);
131+
ExecutorService threadPool = Executors.newFixedThreadPool(maximumConcurrency);
132+
List<Future<Response>> futures = threadPool.invokeAll(requests);
130133

131134
// wait for the threads to finish
132-
for (int i = 0; i < threads.size(); i++) {
135+
for (int i = 0; i < requests.size(); i++) {
133136
futures.get(i).get();
134137
}
135138

136-
for (int i = 0; i < threads.size(); i++) {
137-
System.out.println(responseToJson(threads.get(i).getResponse()));
139+
for (int i = 0; i < requests.size(); i++) {
140+
System.out.println(responseToJson(requests.get(i).getResponse()));
138141
}
139-
rosetteApi.close();
142+
threadPool.shutdown();
140143
}
141144

142145

0 commit comments

Comments
 (0)