77import com .altinity .ice .internal .jvm .Stats ;
88import com .altinity .ice .internal .parquet .Metadata ;
99import java .io .IOException ;
10+ import java .util .ArrayList ;
1011import java .util .Arrays ;
1112import java .util .HashSet ;
1213import java .util .Map ;
1314import java .util .Set ;
15+ import java .util .concurrent .*;
1416import java .util .function .Function ;
1517import java .util .function .Supplier ;
1618import java .util .stream .Collectors ;
@@ -66,26 +68,38 @@ public static void run(
6668 boolean forceTableAuth ,
6769 boolean s3NoSignRequest ,
6870 boolean s3CopyObject ,
69- String retryListFile )
70- throws IOException {
71+ String retryListFile ,
72+ int threadCount )
73+ throws IOException , InterruptedException {
7174 if (files .length == 0 ) {
7275 // no work to be done
7376 return ;
7477 }
75- if (forceNoCopy ) {
76- noCopy = true ;
77- }
78+ InsertOptions options =
79+ InsertOptions .builder ()
80+ .skipDuplicates (skipDuplicates )
81+ .noCommit (noCommit )
82+ .noCopy (noCopy )
83+ .forceNoCopy (forceNoCopy )
84+ .forceTableAuth (forceTableAuth )
85+ .s3NoSignRequest (s3NoSignRequest )
86+ .s3CopyObject (s3CopyObject )
87+ .threadCount (threadCount )
88+ .build ();
89+
90+ final InsertOptions finalOptions =
91+ options .forceNoCopy () ? options .toBuilder ().noCopy (true ).build () : options ;
7892 Table table = catalog .loadTable (nsTable );
7993 try (FileIO tableIO = table .io ()) {
8094 final Supplier <S3Client > s3ClientSupplier ;
81- if (forceTableAuth ) {
95+ if (finalOptions . forceTableAuth () ) {
8296 if (!(tableIO instanceof S3FileIO )) {
8397 throw new UnsupportedOperationException (
8498 "--force-table-auth is currently only supported for s3:// tables" );
8599 }
86100 s3ClientSupplier = ((S3FileIO ) tableIO )::client ;
87101 } else {
88- s3ClientSupplier = () -> S3 .newClient (s3NoSignRequest );
102+ s3ClientSupplier = () -> S3 .newClient (finalOptions . s3NoSignRequest () );
89103 }
90104 Lazy <S3Client > s3ClientLazy = new Lazy <>(s3ClientSupplier );
91105 try {
@@ -136,125 +150,62 @@ public static void run(
136150 : null ) {
137151 boolean atLeastOneFileAppended = false ;
138152
139- // TODO: parallel
140- for (final String file : filesExpanded ) {
141- DataFile df ;
142- try {
143- logger .info ("{}: processing" , file );
144- logger .info ("{}: jvm: {}" , file , Stats .gather ());
145-
146- Function <String , Boolean > checkNotExists =
147- dataFile -> {
148- if (tableDataFiles .contains (dataFile )) {
149- if (skipDuplicates ) {
150- logger .info ("{}: duplicate (skipping)" , file );
151- return true ;
152- }
153- throw new AlreadyExistsException (
154- String .format ("%s is already referenced by the table" , dataFile ));
155- }
156- return false ;
157- };
153+ int numThreads = Math .min (finalOptions .threadCount (), filesExpanded .size ());
154+ ExecutorService executor = Executors .newFixedThreadPool (numThreads );
155+ try {
156+ var futures = new ArrayList <Future <DataFile >>();
157+ for (final String file : filesExpanded ) {
158+ futures .add (
159+ executor .submit (
160+ () -> {
161+ try {
162+ return processFile (
163+ table ,
164+ catalog ,
165+ tableIO ,
166+ inputIO ,
167+ tableDataFiles ,
168+ finalOptions ,
169+ s3ClientLazy ,
170+ dstDataFileSource ,
171+ tableSchema ,
172+ dataFileNamingStrategy ,
173+ file );
174+ } catch (Exception e ) {
175+ if (retryLog != null ) {
176+ logger .error (
177+ "{}: error (adding to retry list and continuing)" , file , e );
178+ retryLog .add (file );
179+ return null ;
180+ } else {
181+ throw e ;
182+ }
183+ }
184+ }));
185+ }
158186
159- InputFile inputFile =
160- Input .newFile (file , catalog , inputIO == null ? tableIO : inputIO );
161- ParquetMetadata metadata = Metadata .read (inputFile );
162- MessageType type = metadata .getFileMetaData ().getSchema ();
163- Schema fileSchema =
164- ParquetSchemaUtil .convert (type ); // nameMapping applied (when present)
165- if (!sameSchema (table , fileSchema )) {
166- throw new BadRequestException (
167- String .format ("%s's schema doesn't match table's schema" , file ));
168- }
169- // assuming datafiles can be anywhere when table.location() is empty
170- var noCopyPossible = file .startsWith (table .location ()) || forceNoCopy ;
171- // TODO: check before uploading anything
172- if (noCopy && !noCopyPossible ) {
173- throw new BadRequestException (
174- file + " cannot be added to catalog without copy" ); // TODO: explain
175- }
176- long dataFileSizeInBytes ;
177- var dataFile = replacePrefix (file , "s3a://" , "s3://" );
178- if (noCopy ) {
179- if (checkNotExists .apply (dataFile )) {
180- continue ;
181- }
182- dataFileSizeInBytes = inputFile .getLength ();
183- } else if (s3CopyObject ) {
184- if (!dataFile .startsWith ("s3://" ) || !table .location ().startsWith ("s3://" )) {
185- throw new BadRequestException (
186- "--s3-copy-object is only supported between s3:// buckets" );
187+ for (var future : futures ) {
188+ try {
189+ DataFile df = future .get ();
190+ if (df != null ) {
191+ atLeastOneFileAppended = true ;
192+ appendOp .appendFile (df );
187193 }
188- String dstDataFile = dstDataFileSource .get (file );
189- if (checkNotExists .apply (dstDataFile )) {
190- continue ;
194+ } catch (InterruptedException e ) {
195+ Thread .currentThread ().interrupt ();
196+ throw new IOException ("Interrupted while processing files" , e );
197+ } catch (ExecutionException e ) {
198+ if (retryLog == null ) {
199+ throw new IOException ("Error processing files" , e .getCause ());
191200 }
192- S3 .BucketPath src = S3 .bucketPath (dataFile );
193- S3 .BucketPath dst = S3 .bucketPath (dstDataFile );
194- logger .info ("{}: fast copying to {}" , file , dstDataFile );
195- CopyObjectRequest copyReq =
196- CopyObjectRequest .builder ()
197- .sourceBucket (src .bucket ())
198- .sourceKey (src .path ())
199- .destinationBucket (dst .bucket ())
200- .destinationKey (dst .path ())
201- .build ();
202- s3ClientLazy .getValue ().copyObject (copyReq );
203- dataFileSizeInBytes = inputFile .getLength ();
204- dataFile = dstDataFile ;
205- } else {
206- String dstDataFile = dstDataFileSource .get (file );
207- if (checkNotExists .apply (dstDataFile )) {
208- continue ;
209- }
210- OutputFile outputFile =
211- tableIO .newOutputFile (replacePrefix (dstDataFile , "s3://" , "s3a://" ));
212- // TODO: support transferTo below (note that compression, etc. might be different)
213- // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) {
214- // s.transferTo(d); }}
215- Parquet .ReadBuilder readBuilder =
216- Parquet .read (inputFile )
217- .createReaderFunc (s -> GenericParquetReaders .buildReader (tableSchema , s ))
218- .project (tableSchema ); // TODO: ?
219- // TODO: reuseContainers?
220- Parquet .WriteBuilder writeBuilder =
221- Parquet .write (outputFile )
222- .overwrite (
223- dataFileNamingStrategy == DataFileNamingStrategy .Name .INPUT_FILENAME )
224- .createWriterFunc (GenericParquetWriter ::buildWriter )
225- .schema (tableSchema );
226- logger .info ("{}: copying to {}" , file , dstDataFile );
227- // file size may have changed due to different compression, etc.
228- dataFileSizeInBytes = copy (readBuilder , writeBuilder );
229- dataFile = dstDataFile ;
230- }
231- logger .info ("{}: adding data file" , file );
232- long recordCount =
233- metadata .getBlocks ().stream ().mapToLong (BlockMetaData ::getRowCount ).sum ();
234- MetricsConfig metricsConfig = MetricsConfig .forTable (table );
235- Metrics metrics = ParquetUtil .fileMetrics (inputFile , metricsConfig );
236- df =
237- new DataFiles .Builder (table .spec ())
238- .withPath (dataFile )
239- .withFormat ("PARQUET" )
240- .withRecordCount (recordCount )
241- .withFileSizeInBytes (dataFileSizeInBytes )
242- .withMetrics (metrics )
243- .build ();
244- } catch (Exception e ) { // FIXME
245- if (retryLog != null ) {
246- logger .error ("{}: error (adding to retry list and continuing)" , file , e );
247- retryLog .add (file );
248- continue ;
249- } else {
250- throw e ;
251201 }
252202 }
253- atLeastOneFileAppended = true ;
254- appendOp .appendFile (df );
203+ } finally {
204+ executor .awaitTermination (1 , TimeUnit .MINUTES );
205+ executor .shutdownNow ();
255206 }
256207
257- if (!noCommit ) {
208+ if (!finalOptions . noCommit () ) {
258209 // TODO: log
259210 if (atLeastOneFileAppended ) {
260211 appendOp .commit ();
@@ -276,6 +227,115 @@ public static void run(
276227 }
277228 }
278229
230+ private static DataFile processFile (
231+ Table table ,
232+ RESTCatalog catalog ,
233+ FileIO tableIO ,
234+ FileIO inputIO ,
235+ Set <String > tableDataFiles ,
236+ InsertOptions options ,
237+ Lazy <S3Client > s3ClientLazy ,
238+ DataFileNamingStrategy dstDataFileSource ,
239+ Schema tableSchema ,
240+ DataFileNamingStrategy .Name dataFileNamingStrategy ,
241+ String file )
242+ throws IOException {
243+ logger .info ("{}: processing" , file );
244+ logger .info ("{}: jvm: {}" , file , Stats .gather ());
245+
246+ Function <String , Boolean > checkNotExists =
247+ dataFile -> {
248+ if (tableDataFiles .contains (dataFile )) {
249+ if (options .skipDuplicates ()) {
250+ logger .info ("{}: duplicate (skipping)" , file );
251+ return true ;
252+ }
253+ throw new AlreadyExistsException (
254+ String .format ("%s is already referenced by the table" , dataFile ));
255+ }
256+ return false ;
257+ };
258+
259+ InputFile inputFile = Input .newFile (file , catalog , inputIO == null ? tableIO : inputIO );
260+ ParquetMetadata metadata = Metadata .read (inputFile );
261+ MessageType type = metadata .getFileMetaData ().getSchema ();
262+ Schema fileSchema = ParquetSchemaUtil .convert (type ); // nameMapping applied (when present)
263+ if (!sameSchema (table , fileSchema )) {
264+ throw new BadRequestException (
265+ String .format ("%s's schema doesn't match table's schema" , file ));
266+ }
267+ // assuming datafiles can be anywhere when table.location() is empty
268+ var noCopyPossible = file .startsWith (table .location ()) || options .forceNoCopy ();
269+ // TODO: check before uploading anything
270+ if (options .noCopy () && !noCopyPossible ) {
271+ throw new BadRequestException (
272+ file + " cannot be added to catalog without copy" ); // TODO: explain
273+ }
274+ long dataFileSizeInBytes ;
275+ var dataFile = replacePrefix (file , "s3a://" , "s3://" );
276+ if (options .noCopy ()) {
277+ if (checkNotExists .apply (dataFile )) {
278+ return null ;
279+ }
280+ dataFileSizeInBytes = inputFile .getLength ();
281+ } else if (options .s3CopyObject ()) {
282+ if (!dataFile .startsWith ("s3://" ) || !table .location ().startsWith ("s3://" )) {
283+ throw new BadRequestException ("--s3-copy-object is only supported between s3:// buckets" );
284+ }
285+ String dstDataFile = dstDataFileSource .get (file );
286+ if (checkNotExists .apply (dstDataFile )) {
287+ return null ;
288+ }
289+ S3 .BucketPath src = S3 .bucketPath (dataFile );
290+ S3 .BucketPath dst = S3 .bucketPath (dstDataFile );
291+ logger .info ("{}: fast copying to {}" , file , dstDataFile );
292+ CopyObjectRequest copyReq =
293+ CopyObjectRequest .builder ()
294+ .sourceBucket (src .bucket ())
295+ .sourceKey (src .path ())
296+ .destinationBucket (dst .bucket ())
297+ .destinationKey (dst .path ())
298+ .build ();
299+ s3ClientLazy .getValue ().copyObject (copyReq );
300+ dataFileSizeInBytes = inputFile .getLength ();
301+ dataFile = dstDataFile ;
302+ } else {
303+ String dstDataFile = dstDataFileSource .get (file );
304+ if (checkNotExists .apply (dstDataFile )) {
305+ return null ;
306+ }
307+ OutputFile outputFile = tableIO .newOutputFile (replacePrefix (dstDataFile , "s3://" , "s3a://" ));
308+ // TODO: support transferTo below (note that compression, etc. might be different)
309+ // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) {
310+ // s.transferTo(d); }}
311+ Parquet .ReadBuilder readBuilder =
312+ Parquet .read (inputFile )
313+ .createReaderFunc (s -> GenericParquetReaders .buildReader (tableSchema , s ))
314+ .project (tableSchema ); // TODO: ?
315+ // TODO: reuseContainers?
316+ Parquet .WriteBuilder writeBuilder =
317+ Parquet .write (outputFile )
318+ .overwrite (dataFileNamingStrategy == DataFileNamingStrategy .Name .INPUT_FILENAME )
319+ .createWriterFunc (GenericParquetWriter ::buildWriter )
320+ .schema (tableSchema );
321+ logger .info ("{}: copying to {}" , file , dstDataFile );
322+ // file size may have changed due to different compression, etc.
323+ dataFileSizeInBytes = copy (readBuilder , writeBuilder );
324+ dataFile = dstDataFile ;
325+ }
326+ logger .info ("{}: adding data file" , file );
327+ long recordCount = metadata .getBlocks ().stream ().mapToLong (BlockMetaData ::getRowCount ).sum ();
328+ MetricsConfig metricsConfig = MetricsConfig .forTable (table );
329+ Metrics metrics = ParquetUtil .fileMetrics (inputFile , metricsConfig );
330+ return new DataFiles .Builder (table .spec ())
331+ .withPath (dataFile )
332+ .withFormat ("PARQUET" )
333+ .withRecordCount (recordCount )
334+ .withFileSizeInBytes (dataFileSizeInBytes )
335+ .withMetrics (metrics )
336+ .build ();
337+ }
338+
279339 private static boolean sameSchema (Table table , Schema fileSchema ) {
280340 boolean sameSchema ;
281341 Schema tableSchema = table .schema ();
0 commit comments