Skip to content

Commit e46c579

Browse files
committed
Initial Work
1 parent 08b794b commit e46c579

5 files changed

Lines changed: 295 additions & 15 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.drill.exec.store.druid;
20+
21+
import com.fasterxml.jackson.core.JsonProcessingException;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.node.ObjectNode;
24+
import org.apache.drill.common.exceptions.CustomErrorContext;
25+
import org.apache.drill.common.exceptions.UserException;
26+
import org.apache.drill.common.expression.SchemaPath;
27+
import org.apache.drill.exec.ops.FragmentContext;
28+
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
29+
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
30+
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
31+
import org.apache.drill.exec.record.metadata.TupleMetadata;
32+
import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec;
33+
import org.apache.drill.exec.store.druid.common.DruidFilter;
34+
import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
35+
import org.apache.drill.exec.store.druid.druid.ScanQuery;
36+
import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder;
37+
import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
38+
import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
39+
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
40+
import org.apache.drill.exec.vector.BaseValueVector;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
44+
import java.math.BigInteger;
45+
import java.util.ArrayList;
46+
import java.util.List;
47+
48+
public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
49+
private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class);
50+
51+
private static final ObjectMapper objectMapper = new ObjectMapper();
52+
53+
private final DruidStoragePlugin plugin;
54+
private final DruidSubScan.DruidSubScanSpec scanSpec;
55+
private final List<String> columns;
56+
private final DruidFilter filter;
57+
private final DruidQueryClient druidQueryClient;
58+
private final FragmentContext fragmentContext;
59+
60+
private final TupleMetadata schema;
61+
private BigInteger nextOffset = BigInteger.ZERO;
62+
private int maxRecordsToRead = -1;
63+
64+
private JsonLoaderBuilder jsonBuilder;
65+
66+
private JsonLoader jsonLoader;
67+
private ResultSetLoader resultSetLoader;
68+
69+
private CustomErrorContext errorContext;
70+
71+
72+
public DruidBatchRecordReader(DruidSubScanSpec subScanSpec,
73+
List<SchemaPath> projectedColumns,
74+
int maxRecordsToRead,
75+
FragmentContext context,
76+
DruidStoragePlugin plugin) {
77+
columns = new ArrayList<>();
78+
setColumns(projectedColumns);
79+
this.maxRecordsToRead = maxRecordsToRead;
80+
this.plugin = plugin;
81+
scanSpec = subScanSpec;
82+
this.schema = scanSpec.getSchema();
83+
fragmentContext = context;
84+
this.filter = subScanSpec.getFilter();
85+
this.druidQueryClient = plugin.getDruidQueryClient();
86+
}
87+
88+
@Override
89+
public boolean open(SchemaNegotiator negotiator) {
90+
resultSetLoader = negotiator.build();
91+
errorContext = negotiator.parentErrorContext();
92+
93+
negotiator
94+
jsonBuilder = new JsonLoaderBuilder()
95+
.resultSetLoader(resultSetLoader)
96+
.errorContext(errorContext);
97+
98+
99+
100+
return true;
101+
}
102+
103+
@Override
104+
public boolean next() {
105+
boolean result = false;
106+
try {
107+
String query = getQuery();
108+
DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query);
109+
setNextOffset(druidScanResponse);
110+
111+
for (ObjectNode eventNode : druidScanResponse.getEvents()) {
112+
jsonLoader = jsonBuilder
113+
.fromString(eventNode.asText())
114+
.build();
115+
result = jsonLoader.readBatch();
116+
}
117+
return result;
118+
} catch (Exception e) {
119+
throw UserException
120+
.dataReadError(e)
121+
.message("Failure while executing druid query: " + e.getMessage())
122+
.addContext(errorContext)
123+
.build(logger);
124+
}
125+
}
126+
127+
@Override
128+
public void close() {
129+
if (jsonLoader != null) {
130+
jsonLoader.close();
131+
jsonLoader = null;
132+
}
133+
134+
if (! nextOffset.equals(BigInteger.ZERO)) {
135+
nextOffset = BigInteger.ZERO;
136+
}
137+
}
138+
139+
private String getQuery() throws JsonProcessingException {
140+
int queryThreshold =
141+
maxRecordsToRead >= 0
142+
? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, maxRecordsToRead)
143+
: BaseValueVector.INITIAL_VALUE_ALLOCATION;
144+
ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder();
145+
ScanQuery scanQuery =
146+
scanQueryBuilder.build(
147+
scanSpec.dataSourceName,
148+
columns,
149+
filter,
150+
nextOffset,
151+
queryThreshold,
152+
scanSpec.getMinTime(),
153+
scanSpec.getMaxTime()
154+
);
155+
return objectMapper.writeValueAsString(scanQuery);
156+
}
157+
158+
private void setNextOffset(DruidScanResponse druidScanResponse) {
159+
nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
160+
}
161+
}

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@
2323
import com.fasterxml.jackson.annotation.JsonTypeName;
2424
import com.fasterxml.jackson.annotation.JsonIgnore;
2525

26+
import org.apache.calcite.avatica.Meta;
2627
import org.apache.drill.common.PlanStringBuilder;
2728
import org.apache.drill.common.expression.SchemaPath;
29+
import org.apache.drill.exec.metastore.MetadataProviderManager;
2830
import org.apache.drill.exec.physical.EndpointAffinity;
2931
import org.apache.drill.exec.physical.base.AbstractGroupScan;
3032
import org.apache.drill.exec.physical.base.GroupScan;
3133
import org.apache.drill.exec.physical.base.PhysicalOperator;
3234
import org.apache.drill.exec.physical.base.ScanStats;
3335
import org.apache.drill.exec.proto.CoordinationProtos;
36+
import org.apache.drill.exec.record.metadata.TupleMetadata;
3437
import org.apache.drill.exec.store.StoragePluginRegistry;
3538

3639
import org.apache.drill.exec.store.schedule.AffinityCreator;
@@ -44,6 +47,7 @@
4447
import org.slf4j.Logger;
4548
import org.slf4j.LoggerFactory;
4649

50+
import java.io.IOException;
4751
import java.util.ArrayList;
4852
import java.util.List;
4953

@@ -55,6 +59,8 @@ public class DruidGroupScan extends AbstractGroupScan {
5559
private final DruidScanSpec scanSpec;
5660
private final DruidStoragePlugin storagePlugin;
5761

62+
private MetadataProviderManager metadataProviderManager;
63+
5864
private List<SchemaPath> columns;
5965
private boolean filterPushedDown = false;
6066
private int maxRecordsToRead;
@@ -73,19 +79,20 @@ public DruidGroupScan(@JsonProperty("userName") String userName,
7379
pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
7480
scanSpec,
7581
columns,
76-
maxRecordsToRead);
82+
maxRecordsToRead, null);
7783
}
7884

7985
public DruidGroupScan(String userName,
8086
DruidStoragePlugin storagePlugin,
8187
DruidScanSpec scanSpec,
8288
List<SchemaPath> columns,
83-
int maxRecordsToRead) {
89+
int maxRecordsToRead, MetadataProviderManager metadataProviderManager) {
8490
super(userName);
8591
this.storagePlugin = storagePlugin;
8692
this.scanSpec = scanSpec;
8793
this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
8894
this.maxRecordsToRead = maxRecordsToRead;
95+
this.metadataProviderManager = metadataProviderManager;
8996
init();
9097
}
9198

@@ -102,6 +109,7 @@ private DruidGroupScan(DruidGroupScan that) {
102109
this.filterPushedDown = that.filterPushedDown;
103110
this.druidWorkList = that.druidWorkList;
104111
this.assignments = that.assignments;
112+
this.metadataProviderManager = that.metadataProviderManager;
105113
}
106114

107115
@Override
@@ -163,7 +171,8 @@ private void init() {
163171
getScanSpec().getFilter(),
164172
getDatasourceSize(),
165173
getDataSourceMinTime(),
166-
getDataSourceMaxTime()
174+
getDataSourceMaxTime(),
175+
getSchema()
167176
)
168177
);
169178
druidWorkList.add(druidWork);
@@ -225,12 +234,13 @@ public DruidSubScan getSpecificScan(int minorFragmentId) {
225234
druidWork.getDruidSubScanSpec().getFilter(),
226235
druidWork.getDruidSubScanSpec().getDataSourceSize(),
227236
druidWork.getDruidSubScanSpec().getMinTime(),
228-
druidWork.getDruidSubScanSpec().getMaxTime()
237+
druidWork.getDruidSubScanSpec().getMaxTime(),
238+
druidWork.getDruidSubScanSpec().getSchema()
229239
)
230240
);
231241
}
232242

233-
return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead);
243+
return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead, getSchema());
234244
}
235245

236246
@JsonIgnore
@@ -283,13 +293,25 @@ public int getMaxRecordsToRead() {
283293
return maxRecordsToRead;
284294
}
285295

296+
public TupleMetadata getSchema() {
297+
if (metadataProviderManager == null) {
298+
return null;
299+
}
300+
try {
301+
return metadataProviderManager.getSchemaProvider().read().getSchema();
302+
} catch (IOException | NullPointerException e) {
303+
return null;
304+
}
305+
}
306+
286307
@Override
287308
public String toString() {
288309
return new PlanStringBuilder(this)
289-
.field("druidScanSpec", scanSpec)
290-
.field("columns", columns)
291-
.field("druidStoragePlugin", storagePlugin)
292-
.toString();
310+
.field("druidScanSpec", scanSpec)
311+
.field("columns", columns)
312+
.field("druidStoragePlugin", storagePlugin)
313+
.field("schema", getSchema())
314+
.toString();
293315
}
294316

295317
@Override

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,25 @@
1717
*/
1818
package org.apache.drill.exec.store.druid;
1919

20+
import org.apache.drill.common.exceptions.ChildErrorContext;
2021
import org.apache.drill.common.exceptions.ExecutionSetupException;
22+
import org.apache.drill.common.exceptions.UserException;
2123
import org.apache.drill.common.expression.SchemaPath;
24+
import org.apache.drill.common.types.TypeProtos.MinorType;
25+
import org.apache.drill.common.types.Types;
2226
import org.apache.drill.exec.ops.ExecutorFragmentContext;
2327
import org.apache.drill.exec.physical.impl.BatchCreator;
2428
import org.apache.drill.exec.physical.impl.ScanBatch;
29+
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
30+
import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
31+
import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
32+
import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
33+
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
2534
import org.apache.drill.exec.record.CloseableRecordBatch;
2635
import org.apache.drill.exec.record.RecordBatch;
36+
import org.apache.drill.exec.server.options.OptionManager;
2737
import org.apache.drill.exec.store.RecordReader;
38+
import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec;
2839
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
2940
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
3041
import org.slf4j.Logger;
@@ -53,4 +64,38 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc
5364
logger.debug("Number of record readers initialized - {}", readers.size());
5465
return new ScanBatch(subScan, context, readers);
5566
}
67+
68+
private ScanFrameworkBuilder createBuilder(OptionManager options,
69+
DruidSubScan subScan,
70+
DruidSubScanSpec scanSpec) {
71+
ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
72+
builder.projection(subScan.getColumns());
73+
builder.providedSchema(subScan.getSchema());
74+
builder.setUserName(subScan.getUserName());
75+
// Provide custom error context
76+
builder.errorContext(new ChildErrorContext(builder.errorContext()) {});
77+
78+
// Reader
79+
ReaderFactory readerFactory = new DruidReaderFactory(subScan);
80+
builder.setReaderFactory(readerFactory);
81+
builder.nullType(Types.optional(MinorType.VARCHAR));
82+
83+
return builder;
84+
}
85+
86+
private static class DruidReaderFactory() implements ReaderFactory {
87+
88+
private final DruidSubScan subScan;
89+
public DruidReaderFactory(DruidSubScan subScan) {
90+
this.subScan = subScan;
91+
}
92+
93+
@Override
94+
public void bind(ManagedScanFramework framework) { }
95+
96+
@Override
97+
public ManagedReader<SchemaNegotiator> next() {
98+
return new DruidBatchRecordReader();
99+
}
100+
}
56101
}

0 commit comments

Comments
 (0)