Skip to content

Commit d67c57d

Browse files
committed
WIP
1 parent e46c579 commit d67c57d

2 files changed

Lines changed: 43 additions & 11 deletions

File tree

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
5757
private final DruidQueryClient druidQueryClient;
5858
private final FragmentContext fragmentContext;
5959

60+
private final DruidSubScan subScan;
61+
6062
private final TupleMetadata schema;
6163
private BigInteger nextOffset = BigInteger.ZERO;
6264
private int maxRecordsToRead = -1;
@@ -69,17 +71,18 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
6971
private CustomErrorContext errorContext;
7072

7173

72-
public DruidBatchRecordReader(DruidSubScanSpec subScanSpec,
74+
public DruidBatchRecordReader(DruidSubScan subScan,
75+
DruidSubScanSpec subScanSpec,
7376
List<SchemaPath> projectedColumns,
7477
int maxRecordsToRead,
7578
FragmentContext context,
7679
DruidStoragePlugin plugin) {
80+
this.subScan = subScan;
7781
columns = new ArrayList<>();
78-
setColumns(projectedColumns);
7982
this.maxRecordsToRead = maxRecordsToRead;
8083
this.plugin = plugin;
8184
scanSpec = subScanSpec;
82-
this.schema = scanSpec.getSchema();
85+
this.schema = subScan.getSchema();
8386
fragmentContext = context;
8487
this.filter = subScanSpec.getFilter();
8588
this.druidQueryClient = plugin.getDruidQueryClient();
@@ -89,8 +92,8 @@ public DruidBatchRecordReader(DruidSubScanSpec subScanSpec,
8992
public boolean open(SchemaNegotiator negotiator) {
9093
resultSetLoader = negotiator.build();
9194
errorContext = negotiator.parentErrorContext();
95+
negotiator.setErrorContext(errorContext);
9296

93-
negotiator
9497
jsonBuilder = new JsonLoaderBuilder()
9598
.resultSetLoader(resultSetLoader)
9699
.errorContext(errorContext);

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.drill.common.exceptions.ChildErrorContext;
2121
import org.apache.drill.common.exceptions.ExecutionSetupException;
22-
import org.apache.drill.common.exceptions.UserException;
2322
import org.apache.drill.common.expression.SchemaPath;
2423
import org.apache.drill.common.types.TypeProtos.MinorType;
2524
import org.apache.drill.common.types.Types;
@@ -41,12 +40,33 @@
4140
import org.slf4j.Logger;
4241
import org.slf4j.LoggerFactory;
4342

43+
import java.util.Iterator;
4444
import java.util.List;
4545

4646
public class DruidScanBatchCreator implements BatchCreator<DruidSubScan> {
4747

4848
private static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class);
4949

50+
/*
51+
@Override
52+
public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
53+
SplunkSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
54+
Preconditions.checkArgument(children.isEmpty());
55+
56+
try {
57+
ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
58+
return builder.buildScanOperator(context, subScan);
59+
} catch (UserException e) {
60+
// Rethrow user exceptions directly
61+
throw e;
62+
} catch (Throwable e) {
63+
// Wrap all others
64+
throw new ExecutionSetupException(e);
65+
}
66+
}
67+
*/
68+
69+
5070
@Override
5171
public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
5272
Preconditions.checkArgument(children.isEmpty());
@@ -55,6 +75,7 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc
5575

5676
for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) {
5777
try {
78+
ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan, context);
5879
columns = subScan.getColumns();
5980
readers.add(new DruidRecordReader(scanSpec, columns, subScan.getMaxRecordsToRead(), context, subScan.getStorageEngine()));
6081
} catch (Exception ex) {
@@ -67,7 +88,7 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc
6788

6889
private ScanFrameworkBuilder createBuilder(OptionManager options,
6990
DruidSubScan subScan,
70-
DruidSubScanSpec scanSpec) {
91+
ExecutorFragmentContext context) {
7192
ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
7293
builder.projection(subScan.getColumns());
7394
builder.providedSchema(subScan.getSchema());
@@ -76,26 +97,34 @@ private ScanFrameworkBuilder createBuilder(OptionManager options,
7697
builder.errorContext(new ChildErrorContext(builder.errorContext()) {});
7798

7899
// Reader
79-
ReaderFactory readerFactory = new DruidReaderFactory(subScan);
100+
ReaderFactory readerFactory = new DruidReaderFactory(subScan, context);
80101
builder.setReaderFactory(readerFactory);
81102
builder.nullType(Types.optional(MinorType.VARCHAR));
82103

83104
return builder;
84105
}
85106

86-
private static class DruidReaderFactory() implements ReaderFactory {
87-
107+
private static class DruidReaderFactory implements ReaderFactory {
88108
private final DruidSubScan subScan;
89-
public DruidReaderFactory(DruidSubScan subScan) {
109+
private final ExecutorFragmentContext context;
110+
private final Iterator<DruidSubScanSpec> subScanSpecIterator;
111+
public DruidReaderFactory(DruidSubScan subScan, ExecutorFragmentContext context) {
90112
this.subScan = subScan;
113+
this.context = context;
114+
this.subScanSpecIterator = subScan.getScanSpec().iterator();
91115
}
92116

93117
@Override
94118
public void bind(ManagedScanFramework framework) { }
95119

96120
@Override
97121
public ManagedReader<SchemaNegotiator> next() {
98-
return new DruidBatchRecordReader();
122+
return new DruidBatchRecordReader(subScan,
123+
subScanSpecIterator.next(),
124+
subScan.getColumns(),
125+
subScan.getMaxRecordsToRead(),
126+
context,
127+
subScan.getStorageEngine());
99128
}
100129
}
101130
}

0 commit comments

Comments
 (0)