Skip to content

Commit c58d8a0

Browse files
authored
DRILL-8248: Fix http_request for several rows (#2573)
1 parent fee1a60 commit c58d8a0

9 files changed

Lines changed: 182 additions & 97 deletions

File tree

contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -52,55 +52,57 @@ public static class HttpGetFunction implements DrillSimpleFunc {
5252
OptionManager options;
5353

5454
@Inject
55-
ResultSetLoader loader;
55+
ResultSetLoader rsLoader;
5656

5757
@Workspace
58-
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
58+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
59+
60+
@Workspace
61+
org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
5962

6063
@Override
6164
public void setup() {
62-
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
63-
.resultSetLoader(loader)
64-
.standardOptions(options);
65+
stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
66+
rsLoader.startBatch();
6567
}
6668

6769
@Override
6870
public void eval() {
6971
// Get the URL
7072
String url = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
71-
7273
// Process Positional Arguments
7374
java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
7475
// If the arg list is null, indicating at least one null arg, return an empty map
7576
// as an approximation of null-if-null handling.
7677
if (args == null) {
77-
// Return empty map
7878
return;
7979
}
80-
8180
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
82-
8381
// Make the API call
8482
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
85-
8683
// If the result string is null or empty, return an empty map
8784
if (results == null) {
88-
// Return empty map
8985
return;
9086
}
91-
9287
try {
93-
jsonLoaderBuilder.fromStream(results);
94-
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
95-
loader.startBatch();
96-
jsonLoader.readBatch();
88+
stream.setValue(results);
89+
if (jsonLoader == null) {
90+
jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(rsLoader, options, stream);
91+
}
92+
org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
93+
rowWriter.start();
94+
if (jsonLoader.parser().next()) {
95+
rowWriter.save();
96+
}
9797
} catch (Exception e) {
98-
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
98+
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
99+
.message("Error while reading JSON. ")
100+
.addContext(e.getMessage())
101+
.build();
99102
}
100103
}
101104
}
102105

103-
104106
@FunctionTemplate(names = {"http_request", "httpRequest"},
105107
scope = FunctionTemplate.FunctionScope.SIMPLE,
106108
isVarArg = true)
@@ -122,17 +124,20 @@ public static class HttpGetFromStoragePluginFunction implements DrillSimpleFunc
122124
DrillbitContext drillbitContext;
123125

124126
@Inject
125-
ResultSetLoader loader;
127+
ResultSetLoader rsLoader;
126128

127129
@Workspace
128-
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
130+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader;
129131

130132
@Workspace
131133
org.apache.drill.exec.store.http.HttpStoragePlugin plugin;
132134

133135
@Workspace
134136
org.apache.drill.exec.store.http.HttpApiConfig endpointConfig;
135137

138+
@Workspace
139+
org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<java.io.InputStream> stream;
140+
136141
@Override
137142
public void setup() {
138143
String schemaPath = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
@@ -154,10 +159,9 @@ public void setup() {
154159
endpointName,
155160
plugin.getConfig()
156161
);
157-
162+
stream = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>();
158163
// Add JSON configuration from Storage plugin, if present.
159-
jsonLoaderBuilder = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.setupJsonBuilder(endpointConfig, loader, options);
160-
164+
rsLoader.startBatch();
161165
}
162166

163167
@Override
@@ -167,30 +171,30 @@ public void eval() {
167171
// If the arg list is null, indicating at least one null arg, return an empty map
168172
// as an approximation of null-if-null handling.
169173
if (args == null) {
170-
// Return empty map
171174
return;
172175
}
173-
174-
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(
175-
plugin,
176-
endpointConfig,
177-
drillbitContext,
178-
args
179-
).getInputStream();
180-
176+
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
177+
.getInputStream();
181178
// If the result string is null or empty, return an empty map
182179
if (results == null) {
183-
// Return empty map
184180
return;
185181
}
186-
187182
try {
188-
jsonLoaderBuilder.fromStream(results);
189-
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
190-
loader.startBatch();
191-
jsonLoader.readBatch();
183+
stream.setValue(results);
184+
if (jsonLoader == null) {
185+
// Add JSON configuration from Storage plugin, if present.
186+
jsonLoader = org.apache.drill.exec.store.http.udfs.HttpUdfUtils.createJsonLoader(endpointConfig, rsLoader, options, stream);
187+
}
188+
org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer();
189+
rowWriter.start();
190+
if (jsonLoader.parser().next()) {
191+
rowWriter.save();
192+
}
192193
} catch (Exception e) {
193-
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
194+
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
195+
.message("Error while reading JSON. ")
196+
.addContext(e.getMessage())
197+
.build();
194198
}
195199
}
196200
}

contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpUdfUtils.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,42 +21,50 @@
2121
import org.apache.commons.lang3.StringUtils;
2222
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
2323
import org.apache.drill.exec.server.options.OptionManager;
24-
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
25-
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
24+
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
25+
import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
2626
import org.apache.drill.exec.store.http.HttpApiConfig;
27-
import org.apache.drill.exec.store.http.HttpJsonOptions;
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

30+
import java.io.InputStream;
31+
3132
public class HttpUdfUtils {
3233

3334
private static final Logger logger = LoggerFactory.getLogger(HttpUdfUtils.class);
3435

35-
public static JsonLoaderBuilder setupJsonBuilder(HttpApiConfig endpointConfig, ResultSetLoader loader, OptionManager options) {
36-
loader.setTargetRowCount(1);
36+
public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader,
37+
OptionManager options,
38+
SingleElementIterator<InputStream> stream) {
39+
return createJsonLoader(null, rsLoader, options, stream);
40+
}
41+
public static JsonLoaderImpl createJsonLoader(HttpApiConfig endpointConfig, ResultSetLoader rsLoader,
42+
OptionManager options, SingleElementIterator<InputStream> stream) {
3743
// Add JSON configuration from Storage plugin, if present.
38-
HttpJsonOptions jsonOptions = endpointConfig.jsonOptions();
39-
JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder()
40-
.resultSetLoader(loader)
41-
.maxRows(1)
42-
.standardOptions(options);
43-
44+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder =
45+
new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
46+
.resultSetLoader(rsLoader)
47+
.standardOptions(options)
48+
.fromStream(() -> stream);
4449
// Add data path if present
45-
if (StringUtils.isNotEmpty(endpointConfig.dataPath())) {
46-
jsonLoaderBuilder.dataPath(endpointConfig.dataPath());
47-
}
48-
49-
if (jsonOptions != null) {
50-
// Add options from endpoint configuration to jsonLoader
51-
JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options);
52-
jsonLoaderBuilder.options(jsonLoaderOptions);
50+
if (endpointConfig != null) {
51+
if (StringUtils.isNotEmpty(endpointConfig.dataPath())) {
52+
jsonLoaderBuilder.dataPath(endpointConfig.dataPath());
53+
}
54+
// Add JSON configuration from Storage plugin, if present.
55+
org.apache.drill.exec.store.http.HttpJsonOptions jsonOptions = endpointConfig.jsonOptions();
56+
if (jsonOptions != null) {
57+
// Add options from endpoint configuration to jsonLoader
58+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions jsonLoaderOptions = jsonOptions.getJsonOptions(options);
59+
jsonLoaderBuilder.options(jsonLoaderOptions);
5360

54-
// Add provided schema if present
55-
if (jsonOptions.schema() != null) {
56-
logger.debug("Found schema: {}", jsonOptions.schema());
57-
jsonLoaderBuilder.providedSchema(jsonOptions.schema());
61+
// Add provided schema if present
62+
if (jsonOptions.schema() != null) {
63+
logger.debug("Found schema: {}", jsonOptions.schema());
64+
jsonLoaderBuilder.providedSchema(jsonOptions.schema());
65+
}
5866
}
5967
}
60-
return jsonLoaderBuilder;
68+
return (org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl) jsonLoaderBuilder.build();
6169
}
6270
}

contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.drill.exec.record.metadata.SchemaBuilder;
3737
import org.apache.drill.exec.record.metadata.TupleMetadata;
3838
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
39+
import org.apache.drill.exec.store.http.udfs.HttpUdfUtils;
3940
import org.apache.drill.exec.store.http.util.SimpleHttp;
4041
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
4142
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
@@ -68,7 +69,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
6869
private static String TEST_JSON_PAGE1;
6970
private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT;
7071
protected static LogFixture logFixture;
71-
private final static Level CURRENT_LOG_LEVEL = Level.INFO;
72+
private final static Level CURRENT_LOG_LEVEL = Level.DEBUG;
7273

7374
@BeforeClass
7475
public static void setup() throws Exception {
@@ -79,6 +80,7 @@ public static void setup() throws Exception {
7980
.logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
8081
.logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
8182
.logger(ResultSetLoaderImpl.class, CURRENT_LOG_LEVEL)
83+
.logger(HttpUdfUtils.class, CURRENT_LOG_LEVEL)
8284
.build();
8385
startCluster(ClusterFixture.builder(dirTestWatcher));
8486
TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
@@ -115,7 +117,7 @@ public static void setup() throws Exception {
115117
configs.put("basicJson", basicJson);
116118

117119
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
118-
new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
120+
new HttpStoragePluginConfig(false, configs, 200, "globaluser", "globalpass", "",
119121
80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
120122
UsernamePasswordCredentials.USERNAME, "globaluser",
121123
UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
@@ -130,6 +132,8 @@ public void testProvidedSchema() throws Exception {
130132
server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
131133
RowSet results = client.queryBuilder().sql(sql).rowSet();
132134

135+
assertEquals(1, results.rowCount());
136+
133137
TupleMetadata expectedSchema = new SchemaBuilder()
134138
.addMap("data")
135139
.addNullable("col_1", MinorType.FLOAT8)
@@ -147,6 +151,33 @@ public void testProvidedSchema() throws Exception {
147151
}
148152
}
149153

154+
@Test
155+
public void testSeveralRowsAndRequests() throws Exception {
156+
String sql = "SELECT http_request('local.basicJson', `col1`) as data FROM cp.`/data/p4.json`";
157+
try (MockWebServer server = startServer()) {
158+
server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
159+
server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
160+
RowSet results = client.queryBuilder().sql(sql).rowSet();
161+
162+
assertEquals(2, results.rowCount());
163+
164+
TupleMetadata expectedSchema = new SchemaBuilder()
165+
.addMap("data")
166+
.addNullable("col_1", MinorType.FLOAT8)
167+
.addNullable("col_2", MinorType.FLOAT8)
168+
.addNullable("col_3", MinorType.FLOAT8)
169+
.resumeSchema()
170+
.build();
171+
172+
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
173+
.addRow(singleMap(mapValue(1.0, 2.0, 3.0)))
174+
.addRow(singleMap(mapValue(4.0, 5.0, 6.0)))
175+
.build();
176+
177+
RowSetUtilities.verify(expected, results);
178+
}
179+
}
180+
150181
@Test
151182
public void testHttpGetWithNoParams() throws Exception {
152183
try (MockWebServer server = startServer()) {
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"col1": "apache"}
2+
{"col1": "ddr"}

contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.drill.exec.record.metadata.ColumnMetadata;
2727
import org.apache.drill.exec.record.metadata.MetadataUtils;
2828
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
29+
import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator;
2930
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
3031
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
3132
import org.apache.drill.exec.store.kafka.MetaDataField;
@@ -38,7 +39,6 @@
3839

3940
import java.io.ByteArrayInputStream;
4041
import java.io.InputStream;
41-
import java.util.Iterator;
4242
import java.util.Properties;
4343
import java.util.StringJoiner;
4444

@@ -156,24 +156,4 @@ public String toString() {
156156
.add("resultSetLoader=" + resultSetLoader)
157157
.toString();
158158
}
159-
160-
public static class SingleElementIterator<T> implements Iterator<T> {
161-
private T value;
162-
163-
@Override
164-
public boolean hasNext() {
165-
return value != null;
166-
}
167-
168-
@Override
169-
public T next() {
170-
T value = this.value;
171-
this.value = null;
172-
return value;
173-
}
174-
175-
public void setValue(T value) {
176-
this.value = value;
177-
}
178-
}
179159
}

0 commit comments

Comments
 (0)