2424import org .apache .drill .common .exceptions .CustomErrorContext ;
2525import org .apache .drill .common .exceptions .UserException ;
2626import org .apache .drill .common .expression .SchemaPath ;
27- import org .apache .drill .exec .ops .FragmentContext ;
2827import org .apache .drill .exec .physical .impl .scan .framework .ManagedReader ;
2928import org .apache .drill .exec .physical .impl .scan .framework .SchemaNegotiator ;
3029import org .apache .drill .exec .physical .resultSet .ResultSetLoader ;
4746
4847public class DruidBatchRecordReader implements ManagedReader <SchemaNegotiator > {
4948 private static final Logger logger = LoggerFactory .getLogger (DruidBatchRecordReader .class );
50-
5149 private static final ObjectMapper objectMapper = new ObjectMapper ();
52-
5350 private final DruidStoragePlugin plugin ;
5451 private final DruidSubScan .DruidSubScanSpec scanSpec ;
5552 private final List <String > columns ;
5653 private final DruidFilter filter ;
5754 private final DruidQueryClient druidQueryClient ;
58- private final FragmentContext fragmentContext ;
59-
60- private final DruidSubScan subScan ;
6155
62- private final TupleMetadata schema ;
6356 private BigInteger nextOffset = BigInteger .ZERO ;
6457 private int maxRecordsToRead = -1 ;
65-
6658 private JsonLoaderBuilder jsonBuilder ;
67-
6859 private JsonLoader jsonLoader ;
6960 private ResultSetLoader resultSetLoader ;
70-
7161 private CustomErrorContext errorContext ;
7262
7363
7464 public DruidBatchRecordReader (DruidSubScan subScan ,
7565 DruidSubScanSpec subScanSpec ,
7666 List <SchemaPath > projectedColumns ,
7767 int maxRecordsToRead ,
78- FragmentContext context ,
7968 DruidStoragePlugin plugin ) {
80- this .subScan = subScan ;
81- columns = new ArrayList <>();
69+ this .columns = new ArrayList <>();
8270 this .maxRecordsToRead = maxRecordsToRead ;
8371 this .plugin = plugin ;
84- scanSpec = subScanSpec ;
85- this .schema = subScan .getSchema ();
86- fragmentContext = context ;
72+ this .scanSpec = subScanSpec ;
8773 this .filter = subScanSpec .getFilter ();
8874 this .druidQueryClient = plugin .getDruidQueryClient ();
8975 }
@@ -96,10 +82,9 @@ public boolean open(SchemaNegotiator negotiator) {
9682
9783 jsonBuilder = new JsonLoaderBuilder ()
9884 .resultSetLoader (resultSetLoader )
85+ .standardOptions (negotiator .queryOptions ())
9986 .errorContext (errorContext );
10087
101-
102-
10388 return true ;
10489 }
10590
@@ -112,9 +97,10 @@ public boolean next() {
11297 setNextOffset (druidScanResponse );
11398
11499 for (ObjectNode eventNode : druidScanResponse .getEvents ()) {
115- jsonLoader = jsonBuilder
116- .fromString (eventNode .asText ())
100+ JsonLoader jsonLoader = jsonBuilder
101+ .fromString (eventNode .toString ())
117102 .build ();
103+
118104 result = jsonLoader .readBatch ();
119105 }
120106 return result ;
0 commit comments