Skip to content

Commit 570ce63

Browse files
authored
DRILL-8242: Fix output for HttpHelperFunctions (#2568)
* DRILL-8242: Fix output for HttpHelperFunctions - add test to check the http_get function output * DRILL-8242: Fix output for HttpHelperFunctions * DRILL-8242: Fix old approach for other functions * DRILL-8242: replace String with InputStream for SimpleHTTP requests * Fix TestHttpPlugin#testSlowResponse. Sometimes DATA_READ ERROR: Read timed out can be returned
1 parent b2e2653 commit 570ce63

20 files changed

Lines changed: 165 additions & 116 deletions

File tree

contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public static class HttpGetFunction implements DrillSimpleFunc {
4545
@Param
4646
NullableVarCharHolder[] inputReaders;
4747

48-
@Output
48+
@Output // todo: remove. Not used in this UDF
4949
ComplexWriter writer;
5050

5151
@Inject
@@ -75,32 +75,25 @@ public void eval() {
7575
// as an approximation of null-if-null handling.
7676
if (args == null) {
7777
// Return empty map
78-
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
79-
mapWriter.start();
80-
mapWriter.end();
8178
return;
8279
}
8380

8481
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
8582

8683
// Make the API call
87-
String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeSimpleGetRequest(finalUrl);
84+
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
8885

8986
// If the result string is null or empty, return an empty map
90-
if (results == null || results.length() == 0) {
87+
if (results == null) {
9188
// Return empty map
92-
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
93-
mapWriter.start();
94-
mapWriter.end();
9589
return;
9690
}
9791

9892
try {
99-
jsonLoaderBuilder.fromString(results);
93+
jsonLoaderBuilder.fromStream(results);
10094
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
10195
loader.startBatch();
10296
jsonLoader.readBatch();
103-
loader.close();
10497
} catch (Exception e) {
10598
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
10699
}
@@ -119,7 +112,7 @@ public static class HttpGetFromStoragePluginFunction implements DrillSimpleFunc
119112
@Param
120113
NullableVarCharHolder[] inputReaders;
121114

122-
@Output
115+
@Output // todo: remove. Not used in this UDF
123116
ComplexWriter writer;
124117

125118
@Inject
@@ -174,34 +167,27 @@ public void eval() {
174167
// as an approximation of null-if-null handling.
175168
if (args == null) {
176169
// Return empty map
177-
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
178-
mapWriter.start();
179-
mapWriter.end();
180170
return;
181171
}
182172

183-
String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(
173+
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(
184174
plugin,
185175
endpointConfig,
186176
drillbitContext,
187177
args
188-
);
178+
).getInputStream();
189179

190180
// If the result string is null or empty, return an empty map
191-
if (results == null || results.length() == 0) {
181+
if (results == null) {
192182
// Return empty map
193-
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
194-
mapWriter.start();
195-
mapWriter.end();
196183
return;
197184
}
198185

199186
try {
200-
jsonLoaderBuilder.fromString(results);
187+
jsonLoaderBuilder.fromStream(results);
201188
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
202189
loader.startBatch();
203190
jsonLoader.readBatch();
204-
loader.close();
205191
} catch (Exception e) {
206192
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
207193
}

contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import okhttp3.RequestBody;
3131
import okhttp3.Response;
3232

33+
import okhttp3.ResponseBody;
3334
import org.apache.commons.lang3.StringUtils;
3435
import org.apache.drill.common.exceptions.EmptyErrorContext;
3536
import org.apache.drill.common.logical.OAuthConfig;
@@ -873,7 +874,7 @@ public static HttpStoragePlugin getStoragePlugin(DrillbitContext context, String
873874
* @param args An optional list of parameter arguments which will be included in the URL
874875
* @return A String of the results.
875876
*/
876-
public static String makeAPICall(
877+
public static SimpleHttp apiCall(
877878
HttpStoragePlugin plugin,
878879
HttpApiConfig endpointConfig,
879880
DrillbitContext context,
@@ -895,16 +896,14 @@ public static String makeAPICall(
895896
}
896897

897898
// Now get the client
898-
SimpleHttp client = new SimpleHttpBuilder()
899+
return new SimpleHttpBuilder()
899900
.pluginConfig(pluginConfig)
900901
.endpointConfig(endpointConfig)
901902
.tempDir(new File(context.getConfig().getString(ExecConstants.DRILL_TMP_DIR)))
902903
.url(HttpUrl.parse(finalUrl))
903904
.proxyConfig(proxyConfig)
904905
.tokenTable(plugin.getTokenTable())
905906
.build();
906-
907-
return client.getResultsFromApiCall();
908907
}
909908

910909
public static OkHttpClient getSimpleHttpClient() {
@@ -915,7 +914,29 @@ public static OkHttpClient getSimpleHttpClient() {
915914
.build();
916915
}
917916

918-
public static String makeSimpleGetRequest(String url) {
917+
public static String getRequestAndStringResponse(String url) {
918+
try {
919+
return makeSimpleGetRequest(url).string();
920+
} catch (IOException e) {
921+
throw UserException
922+
.dataReadError(e)
923+
.message("HTTP request failed")
924+
.build(logger);
925+
}
926+
}
927+
928+
public static InputStream getRequestAndStreamResponse(String url) {
929+
try {
930+
return makeSimpleGetRequest(url).byteStream();
931+
} catch (IOException e) {
932+
throw UserException
933+
.dataReadError(e)
934+
.message("HTTP request failed")
935+
.build(logger);
936+
}
937+
}
938+
939+
public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
919940
OkHttpClient client = getSimpleHttpClient();
920941
Request.Builder requestBuilder = new Request.Builder()
921942
.url(url);
@@ -924,15 +945,8 @@ public static String makeSimpleGetRequest(String url) {
924945
Request request = requestBuilder.build();
925946

926947
// Execute the request
927-
try {
928948
Response response = client.newCall(request).execute();
929-
return response.body().string();
930-
} catch (IOException e) {
931-
throw UserException
932-
.dataReadError(e)
933-
.message("HTTP request failed")
934-
.build(logger);
935-
}
949+
return response.body();
936950
}
937951

938952
/**

contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1127,7 +1127,8 @@ public void testSlowResponse() throws Exception {
11271127
client.queryBuilder().sql(sql).rowSet();
11281128
fail();
11291129
} catch (Exception e) {
1130-
assertTrue("Not timeout exception, " + e, e.getMessage().contains("DATA_READ ERROR: timeout"));
1130+
assertTrue("Not timeout exception, " + e,
1131+
e.getMessage().contains("DATA_READ ERROR: timeout") || e.getMessage().contains("DATA_READ ERROR: Read timed out"));
11311132
}
11321133
}
11331134
}

contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,16 @@
2424
import okhttp3.mockwebserver.RecordedRequest;
2525
import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
2626
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
27+
import org.apache.drill.common.types.TypeProtos;
2728
import org.apache.drill.common.util.DrillFileUtils;
29+
import org.apache.drill.exec.physical.impl.project.ProjectMemoryManager;
2830
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
2931
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
32+
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
3033
import org.apache.drill.exec.physical.rowSet.RowSet;
34+
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
35+
import org.apache.drill.exec.record.metadata.SchemaBuilder;
36+
import org.apache.drill.exec.record.metadata.TupleMetadata;
3137
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
3238
import org.apache.drill.exec.store.http.util.SimpleHttp;
3339
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -37,6 +43,7 @@
3743
import org.apache.drill.test.ClusterFixture;
3844
import org.apache.drill.test.ClusterTest;
3945
import org.apache.drill.test.LogFixture;
46+
import org.apache.drill.test.rowSet.RowSetUtilities;
4047
import org.junit.BeforeClass;
4148
import org.junit.Test;
4249

@@ -47,6 +54,8 @@
4754
import java.util.List;
4855
import java.util.Map;
4956

57+
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
58+
import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
5059
import static org.junit.Assert.assertEquals;
5160
import static org.junit.Assert.assertTrue;
5261
import static org.junit.Assert.fail;
@@ -63,9 +72,11 @@ public class TestHttpUDFFunctions extends ClusterTest {
6372
public static void setup() throws Exception {
6473
logFixture = LogFixture.builder()
6574
.toConsole()
75+
.logger(ProjectMemoryManager.class, CURRENT_LOG_LEVEL)
6676
.logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL)
6777
.logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
6878
.logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
79+
.logger(ResultSetLoaderImpl.class, CURRENT_LOG_LEVEL)
6980
.build();
7081
startCluster(ClusterFixture.builder(dirTestWatcher));
7182
TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
@@ -98,6 +109,33 @@ public void testHttpGetWithNoParams() throws Exception {
98109

99110
RowSet results = client.queryBuilder().sql(sql).rowSet();
100111
assertEquals(1, results.rowCount());
112+
TupleMetadata expectedSchema = new SchemaBuilder()
113+
.addMap("result")
114+
.addMap("results")
115+
.addNullable("sunrise", TypeProtos.MinorType.VARCHAR)
116+
.addNullable("sunset", TypeProtos.MinorType.VARCHAR)
117+
.addNullable("solar_noon", TypeProtos.MinorType.VARCHAR)
118+
.addNullable("day_length", TypeProtos.MinorType.VARCHAR)
119+
.addNullable("civil_twilight_begin", TypeProtos.MinorType.VARCHAR)
120+
.addNullable("civil_twilight_end", TypeProtos.MinorType.VARCHAR)
121+
.addNullable("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR)
122+
.addNullable("nautical_twilight_end", TypeProtos.MinorType.VARCHAR)
123+
.addNullable("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR)
124+
.addNullable("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR)
125+
.resumeMap()
126+
.addNullable("status", TypeProtos.MinorType.VARCHAR)
127+
.resumeSchema()
128+
.build();
129+
130+
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
131+
.addRow(singleMap(
132+
mapValue(
133+
mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM",
134+
"6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"),
135+
"OK")))
136+
.build();
137+
138+
RowSetUtilities.verify(expected, results);
101139
results.clear();
102140

103141
RecordedRequest recordedRequest = server.takeRequest();

exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
2424
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
2525
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
26+
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
2627
import org.apache.drill.exec.record.VectorAccessibleComplexWriter;
2728
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
2829

@@ -56,26 +57,39 @@ protected HoldingContainer generateEvalBody(ClassGenerator<?> classGenerator, Ho
5657
JBlock sub = new JBlock(true, true);
5758
JBlock topSub = sub;
5859

59-
JVar complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
60+
JVar rsLoader = null;
61+
JVar complexWriter = null;
62+
JInvocation container = null;
6063

64+
for (JVar workspaceJVar : workspaceJVars) {
65+
if ("ResultSetLoader".equals(workspaceJVar.type().name())) {
66+
rsLoader = workspaceJVar;
67+
}
68+
}
69+
if (rsLoader == null) {
70+
complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
6171

62-
JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
6372

73+
container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
74+
}
6475
//Default name is "col", if not passed in a reference name for the output vector.
6576
String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath();
6677

6778
JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class);
68-
classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
79+
if (rsLoader == null) {
80+
classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
81+
}
6982

7083
JClass projBatchClass = classGenerator.getModel().ref(ProjectRecordBatch.class);
7184
JExpression projBatch = JExpr.cast(projBatchClass, classGenerator.getMappingSet().getOutgoing());
72-
73-
classGenerator.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter));
74-
75-
76-
classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex()));
77-
78-
sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
85+
if (rsLoader == null) {
86+
classGenerator.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter));
87+
classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex()));
88+
sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
89+
} else {
90+
classGenerator.getSetupBlock().add(projBatch.invoke("addLoader").arg(rsLoader));
91+
sub.decl(classGenerator.getModel()._ref(ResultSetLoader.class), getReturnValue().getName(), rsLoader);
92+
}
7993

8094
// add the subblock after the out declaration.
8195
classGenerator.getEvalBlock().add(topSub);

exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,9 @@ public BufferManagerImpl(BufferAllocator allocator) {
3434

3535
@Override
3636
public void close() {
37-
managedBuffers.forEach(new LongObjectPredicate<DrillBuf>() {
38-
@Override
39-
public boolean apply(long key, DrillBuf value) {
40-
value.release();
41-
return true;
42-
}
37+
managedBuffers.forEach((LongObjectPredicate<DrillBuf>) (key, value) -> {
38+
value.release();
39+
return true;
4340
});
4441
managedBuffers.clear();
4542
}

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ public ValueVectorWriteExpression addOutputVector(String name, LogicalExpression
9595

9696
@Override
9797
public void addComplexField(FieldReference ref) {
98-
initComplexWriters();
98+
if (projectBatch.rsLoader == null) {
99+
initComplexWriters();
100+
}
99101
if (projectBatch.complexFieldReferencesList == null) {
100102
projectBatch.complexFieldReferencesList = Lists.newArrayList();
101103
} else {

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public void update() {
278278
+ ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {} ms"
279279
+ ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(),
280280
rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
281-
(batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
281+
(batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
282282

283283
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
284284
updateIncomingStats();

0 commit comments

Comments
 (0)