From ed4f40241f1080b1c4e25f1ef336f714a15b3edc Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sat, 26 Apr 2025 15:41:52 -0400 Subject: [PATCH 1/5] ice: Refactored multiple files processing to a separate function. --- .../com/altinity/ice/internal/cmd/Insert.java | 246 ++++++++++-------- 1 file changed, 140 insertions(+), 106 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 2a47fc28..55341349 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -72,20 +72,30 @@ public static void run( // no work to be done return; } - if (forceNoCopy) { - noCopy = true; - } + InsertOptions options = + InsertOptions.builder() + .skipDuplicates(skipDuplicates) + .noCommit(noCommit) + .noCopy(noCopy) + .forceNoCopy(forceNoCopy) + .forceTableAuth(forceTableAuth) + .s3NoSignRequest(s3NoSignRequest) + .s3CopyObject(s3CopyObject) + .build(); + + final InsertOptions finalOptions = + options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options; Table table = catalog.loadTable(nsTable); try (FileIO tableIO = table.io()) { final Supplier s3ClientSupplier; - if (forceTableAuth) { + if (finalOptions.forceTableAuth()) { if (!(tableIO instanceof S3FileIO)) { throw new UnsupportedOperationException( "--force-table-auth is currently only supported for s3:// tables"); } s3ClientSupplier = ((S3FileIO) tableIO)::client; } else { - s3ClientSupplier = () -> S3.newClient(s3NoSignRequest); + s3ClientSupplier = () -> S3.newClient(finalOptions.s3NoSignRequest()); } Lazy s3ClientLazy = new Lazy<>(s3ClientSupplier); try { @@ -140,107 +150,22 @@ public static void run( for (final String file : filesExpanded) { DataFile df; try { - logger.info("{}: processing", file); - logger.info("{}: jvm: {}", file, Stats.gather()); - - Function checkNotExists = - dataFile -> { - if (tableDataFiles.contains(dataFile)) { - if (skipDuplicates) { - logger.info("{}: duplicate (skipping)", file); - return true; - } - throw new AlreadyExistsException( - String.format("%s is already referenced by the table", dataFile)); - } - return false; - }; - - InputFile inputFile = - Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO); - ParquetMetadata metadata = Metadata.read(inputFile); - MessageType type = metadata.getFileMetaData().getSchema(); - Schema fileSchema = - ParquetSchemaUtil.convert(type); // nameMapping applied (when present) - if (!sameSchema(table, fileSchema)) { - throw new BadRequestException( - String.format("%s's schema doesn't match table's schema", file)); - } - // assuming datafiles can be anywhere when table.location() is empty - var noCopyPossible = file.startsWith(table.location()) || forceNoCopy; - // TODO: check before uploading anything - if (noCopy && !noCopyPossible) { - throw new BadRequestException( - file + " cannot be added to catalog without copy"); // TODO: explain - } - long dataFileSizeInBytes; - var dataFile = replacePrefix(file, "s3a://", "s3://"); - if (noCopy) { - if (checkNotExists.apply(dataFile)) { - continue; - } - dataFileSizeInBytes = inputFile.getLength(); - } else if (s3CopyObject) { - if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { - throw new BadRequestException( - "--s3-copy-object is only supported between s3:// buckets"); - } - String dstDataFile = dstDataFileSource.get(file); - if (checkNotExists.apply(dstDataFile)) { - continue; - } - S3.BucketPath src = S3.bucketPath(dataFile); - S3.BucketPath dst = S3.bucketPath(dstDataFile); - logger.info("{}: fast copying to {}", file, dstDataFile); - CopyObjectRequest copyReq = - CopyObjectRequest.builder() - .sourceBucket(src.bucket()) - .sourceKey(src.path()) - .destinationBucket(dst.bucket()) - .destinationKey(dst.path()) - .build(); - s3ClientLazy.getValue().copyObject(copyReq); - dataFileSizeInBytes = inputFile.getLength(); - dataFile = dstDataFile; - } else { - String dstDataFile = dstDataFileSource.get(file); - if (checkNotExists.apply(dstDataFile)) { - continue; - } - OutputFile outputFile = - tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://")); - // TODO: support transferTo below (note that compression, etc. might be different) - // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { - // s.transferTo(d); }} - Parquet.ReadBuilder readBuilder = - Parquet.read(inputFile) - .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) - .project(tableSchema); // TODO: ? - // TODO: reuseContainers? - Parquet.WriteBuilder writeBuilder = - Parquet.write(outputFile) - .overwrite( - dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema); - logger.info("{}: copying to {}", file, dstDataFile); - // file size may have changed due to different compression, etc. - dataFileSizeInBytes = copy(readBuilder, writeBuilder); - dataFile = dstDataFile; - } - logger.info("{}: adding data file", file); - long recordCount = - metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); - MetricsConfig metricsConfig = MetricsConfig.forTable(table); - Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); df = - new DataFiles.Builder(table.spec()) - .withPath(dataFile) - .withFormat("PARQUET") - .withRecordCount(recordCount) - .withFileSizeInBytes(dataFileSizeInBytes) - .withMetrics(metrics) - .build(); + processFile( + table, + catalog, + tableIO, + inputIO, + tableDataFiles, + finalOptions, + s3ClientLazy, + dstDataFileSource, + tableSchema, + dataFileNamingStrategy, + file); + if (df == null) { + continue; + } } catch (Exception e) { // FIXME if (retryLog != null) { logger.error("{}: error (adding to retry list and continuing)", file, e); @@ -254,7 +179,7 @@ public static void run( appendOp.appendFile(df); } - if (!noCommit) { + if (!finalOptions.noCommit()) { // TODO: log if (atLeastOneFileAppended) { appendOp.commit(); @@ -276,6 +201,115 @@ public static void run( } } + private static DataFile processFile( + Table table, + RESTCatalog catalog, + FileIO tableIO, + FileIO inputIO, + Set tableDataFiles, + InsertOptions options, + Lazy s3ClientLazy, + DataFileNamingStrategy dstDataFileSource, + Schema tableSchema, + DataFileNamingStrategy.Name dataFileNamingStrategy, + String file) + throws IOException { + logger.info("{}: processing", file); + logger.info("{}: jvm: {}", file, Stats.gather()); + + Function checkNotExists = + dataFile -> { + if (tableDataFiles.contains(dataFile)) { + if (options.skipDuplicates()) { + logger.info("{}: duplicate (skipping)", file); + return true; + } + throw new AlreadyExistsException( + String.format("%s is already referenced by the table", dataFile)); + } + return false; + }; + + InputFile inputFile = Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO); + ParquetMetadata metadata = Metadata.read(inputFile); + MessageType type = metadata.getFileMetaData().getSchema(); + Schema fileSchema = ParquetSchemaUtil.convert(type); // nameMapping applied (when present) + if (!sameSchema(table, fileSchema)) { + throw new BadRequestException( + String.format("%s's schema doesn't match table's schema", file)); + } + // assuming datafiles can be anywhere when table.location() is empty + var noCopyPossible = file.startsWith(table.location()) || options.forceNoCopy(); + // TODO: check before uploading anything + if (options.noCopy() && !noCopyPossible) { + throw new BadRequestException( + file + " cannot be added to catalog without copy"); // TODO: explain + } + long dataFileSizeInBytes; + var dataFile = replacePrefix(file, "s3a://", "s3://"); + if (options.noCopy()) { + if (checkNotExists.apply(dataFile)) { + return null; + } + dataFileSizeInBytes = inputFile.getLength(); + } else if (options.s3CopyObject()) { + if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { + throw new BadRequestException("--s3-copy-object is only supported between s3:// buckets"); + } + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return null; + } + S3.BucketPath src = S3.bucketPath(dataFile); + S3.BucketPath dst = S3.bucketPath(dstDataFile); + logger.info("{}: fast copying to {}", file, dstDataFile); + CopyObjectRequest copyReq = + CopyObjectRequest.builder() + .sourceBucket(src.bucket()) + .sourceKey(src.path()) + .destinationBucket(dst.bucket()) + .destinationKey(dst.path()) + .build(); + s3ClientLazy.getValue().copyObject(copyReq); + dataFileSizeInBytes = inputFile.getLength(); + dataFile = dstDataFile; + } else { + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return null; + } + OutputFile outputFile = tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://")); + // TODO: support transferTo below (note that compression, etc. might be different) + // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { + // s.transferTo(d); }} + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema); // TODO: ? + // TODO: reuseContainers? + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(tableSchema); + logger.info("{}: copying to {}", file, dstDataFile); + // file size may have changed due to different compression, etc. + dataFileSizeInBytes = copy(readBuilder, writeBuilder); + dataFile = dstDataFile; + } + logger.info("{}: adding data file", file); + long recordCount = metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); + return new DataFiles.Builder(table.spec()) + .withPath(dataFile) + .withFormat("PARQUET") + .withRecordCount(recordCount) + .withFileSizeInBytes(dataFileSizeInBytes) + .withMetrics(metrics) + .build(); + } + private static boolean sameSchema(Table table, Schema fileSchema) { boolean sameSchema; Schema tableSchema = table.schema(); From 2dd678defe7e35bc50c156d1beb9e8f680028069 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 27 Apr 2025 15:04:35 -0400 Subject: [PATCH 2/5] ice: Added thread count as a command line argument, default to number of processors. --- ice/src/main/java/com/altinity/ice/Main.java | 10 ++- .../com/altinity/ice/internal/cmd/Insert.java | 85 ++++++++++++------- 2 files changed, 63 insertions(+), 32 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index a26ed39f..4787f3e9 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -171,7 +171,12 @@ void insert( description = "/path/to/file where to save list of files to retry" + " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)") - String retryList) + String retryList, + @CommandLine.Option( + names = {"--thread-count"}, + description = "Number of threads to use for inserting data", + defaultValue = "-1") + int threadCount) throws IOException { if (s3NoSignRequest && s3CopyObject) { throw new UnsupportedOperationException( @@ -203,7 +208,8 @@ void insert( forceTableAuth, s3NoSignRequest, s3CopyObject, - retryList); + retryList, + threadCount == -1 ? Runtime.getRuntime().availableProcessors() : threadCount); } } diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 55341349..3f9b81e1 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -7,10 +7,15 @@ import com.altinity.ice.internal.jvm.Stats; import com.altinity.ice.internal.parquet.Metadata; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -66,7 +71,8 @@ public static void run( boolean forceTableAuth, boolean s3NoSignRequest, boolean s3CopyObject, - String retryListFile) + String retryListFile, + int threadCount) throws IOException { if (files.length == 0) { // no work to be done @@ -81,6 +87,7 @@ public static void run( .forceTableAuth(forceTableAuth) .s3NoSignRequest(s3NoSignRequest) .s3CopyObject(s3CopyObject) + .threadCount(threadCount) .build(); final InsertOptions finalOptions = @@ -146,37 +153,55 @@ public static void run( : null) { boolean atLeastOneFileAppended = false; - // TODO: parallel - for (final String file : filesExpanded) { - DataFile df; - try { - df = - processFile( - table, - catalog, - tableIO, - inputIO, - tableDataFiles, - finalOptions, - s3ClientLazy, - dstDataFileSource, - tableSchema, - dataFileNamingStrategy, - file); - if (df == null) { - continue; - } - } catch (Exception e) { // FIXME - if (retryLog != null) { - logger.error("{}: error (adding to retry list and continuing)", file, e); - retryLog.add(file); - continue; - } else { - throw e; + int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size()); + try (ExecutorService executor = Executors.newFixedThreadPool(numThreads)) { + var futures = new ArrayList>(); + for (final String file : filesExpanded) { + futures.add( + executor.submit( + () -> { + try { + return processFile( + table, + catalog, + tableIO, + inputIO, + tableDataFiles, + finalOptions, + s3ClientLazy, + dstDataFileSource, + tableSchema, + dataFileNamingStrategy, + file); + } catch (Exception e) { + if (retryLog != null) { + logger.error( + "{}: error (adding to retry list and continuing)", file, e); + retryLog.add(file); + return null; + } else { + throw e; + } + } + })); + } + + for (var future : futures) { + try { + DataFile df = future.get(); + if (df != null) { + atLeastOneFileAppended = true; + appendOp.appendFile(df); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while processing files", e); + } catch (ExecutionException e) { + if (retryLog == null) { + throw new IOException("Error processing files", e.getCause()); + } } } - atLeastOneFileAppended = true; - appendOp.appendFile(df); } if (!finalOptions.noCommit()) { From ea830de2a90cb4e911d1d8ef02b0903f65fe695f Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 28 Apr 2025 11:14:58 -0400 Subject: [PATCH 3/5] ice: Added InsertOptions. --- .../ice/internal/cmd/InsertOptions.java | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java new file mode 100644 index 00000000..a99db2cd --- /dev/null +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java @@ -0,0 +1,144 @@ +package com.altinity.ice.internal.cmd; + +public final class InsertOptions { + private final boolean skipDuplicates; + private final boolean noCommit; + private final boolean noCopy; + private final boolean forceNoCopy; + private final boolean forceTableAuth; + private final boolean s3NoSignRequest; + private final boolean s3CopyObject; + private final int threadCount; + + private InsertOptions( + boolean skipDuplicates, + boolean noCommit, + boolean noCopy, + boolean forceNoCopy, + boolean forceTableAuth, + boolean s3NoSignRequest, + boolean s3CopyObject, + int threadCount) { + this.skipDuplicates = skipDuplicates; + this.noCommit = noCommit; + this.noCopy = noCopy; + this.forceNoCopy = forceNoCopy; + this.forceTableAuth = forceTableAuth; + this.s3NoSignRequest = s3NoSignRequest; + this.s3CopyObject = s3CopyObject; + this.threadCount = threadCount; + } + + public static Builder builder() { + return new Builder(); + } + + public boolean skipDuplicates() { + return skipDuplicates; + } + + public boolean noCommit() { + return noCommit; + } + + public boolean noCopy() { + return noCopy; + } + + public boolean forceNoCopy() { + return forceNoCopy; + } + + public boolean forceTableAuth() { + return forceTableAuth; + } + + public boolean s3NoSignRequest() { + return s3NoSignRequest; + } + + public boolean s3CopyObject() { + return s3CopyObject; + } + + public int threadCount() { + return threadCount; + } + + public Builder toBuilder() { + return builder() + .skipDuplicates(skipDuplicates) + .noCommit(noCommit) + .noCopy(noCopy) + .forceNoCopy(forceNoCopy) + .forceTableAuth(forceTableAuth) + .s3NoSignRequest(s3NoSignRequest) + .s3CopyObject(s3CopyObject) + .threadCount(threadCount); + } + + public static final class Builder { + private boolean skipDuplicates; + private boolean noCommit; + private boolean noCopy; + private boolean forceNoCopy; + private boolean forceTableAuth; + private boolean s3NoSignRequest; + private boolean s3CopyObject; + private int threadCount = Runtime.getRuntime().availableProcessors(); + + private Builder() {} + + public Builder skipDuplicates(boolean skipDuplicates) { + this.skipDuplicates = skipDuplicates; + return this; + } + + public Builder noCommit(boolean noCommit) { + this.noCommit = noCommit; + return this; + } + + public Builder noCopy(boolean noCopy) { + this.noCopy = noCopy; + return this; + } + + public Builder forceNoCopy(boolean forceNoCopy) { + this.forceNoCopy = forceNoCopy; + return this; + } + + public Builder forceTableAuth(boolean forceTableAuth) { + this.forceTableAuth = forceTableAuth; + return this; + } + + public Builder s3NoSignRequest(boolean s3NoSignRequest) { + this.s3NoSignRequest = s3NoSignRequest; + return this; + } + + public Builder s3CopyObject(boolean s3CopyObject) { + this.s3CopyObject = s3CopyObject; + return this; + } + + public Builder threadCount(int threadCount) { + this.threadCount = threadCount; + return this; + } + + public InsertOptions build() { + return new InsertOptions( + skipDuplicates, + noCommit, + noCopy, + forceNoCopy, + forceTableAuth, + s3NoSignRequest, + s3CopyObject, + threadCount); + } + } +} From c500040179dbb6a12aa1e1e7a545cfb5f248dd98 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 28 Apr 2025 16:37:08 -0400 Subject: [PATCH 4/5] ice: Changed InsertOptions to record, Added explicit shutdownNow when there is no need to commit transaction. --- ice/src/main/java/com/altinity/ice/Main.java | 2 +- .../com/altinity/ice/internal/cmd/Insert.java | 10 ++++- .../ice/internal/cmd/InsertOptions.java | 37 +++++-------------- 3 files changed, 19 insertions(+), 30 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index 4787f3e9..31ccb131 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -209,7 +209,7 @@ void insert( s3NoSignRequest, s3CopyObject, retryList, - threadCount == -1 ? Runtime.getRuntime().availableProcessors() : threadCount); + threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount); } } diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 3f9b81e1..6d54c5b3 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -154,7 +154,8 @@ public static void run( boolean atLeastOneFileAppended = false; int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size()); - try (ExecutorService executor = Executors.newFixedThreadPool(numThreads)) { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + try { var futures = new ArrayList>(); for (final String file : filesExpanded) { futures.add( @@ -202,6 +203,13 @@ public static void run( } } } + } finally { + // Cancel any remaining tasks since we won't commit the transaction + if (finalOptions.noCommit() || !atLeastOneFileAppended) { + executor.shutdownNow(); + } else { + executor.shutdown(); + } } if (!finalOptions.noCommit()) { diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java index a99db2cd..09c7038c 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java @@ -1,33 +1,14 @@ package com.altinity.ice.internal.cmd; -public final class InsertOptions { - private final boolean skipDuplicates; - private final boolean noCommit; - private final boolean noCopy; - private final boolean forceNoCopy; - private final boolean forceTableAuth; - private final boolean s3NoSignRequest; - private final boolean s3CopyObject; - private final int threadCount; - - private InsertOptions( - boolean skipDuplicates, - boolean noCommit, - boolean noCopy, - boolean forceNoCopy, - boolean forceTableAuth, - boolean s3NoSignRequest, - boolean s3CopyObject, - int threadCount) { - this.skipDuplicates = skipDuplicates; - this.noCommit = noCommit; - this.noCopy = noCopy; - this.forceNoCopy = forceNoCopy; - this.forceTableAuth = forceTableAuth; - this.s3NoSignRequest = s3NoSignRequest; - this.s3CopyObject = s3CopyObject; - this.threadCount = threadCount; - } +public record InsertOptions( + boolean skipDuplicates, + boolean noCommit, + boolean noCopy, + boolean forceNoCopy, + boolean forceTableAuth, + boolean s3NoSignRequest, + boolean s3CopyObject, + int threadCount) { public static Builder builder() { return new Builder(); From 3078e59ca36351442558362539748991c18166d1 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 29 Apr 2025 15:40:50 -0400 Subject: [PATCH 5/5] ice-rest-catalog: Added awaitTermination before calling shutdownNow(). --- ice/src/main/java/com/altinity/ice/Main.java | 2 +- .../com/altinity/ice/internal/cmd/Insert.java | 15 +++------ .../ice/internal/cmd/InsertOptions.java | 32 ------------------- 3 files changed, 5 insertions(+), 44 deletions(-) diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index 31ccb131..38ffbd4e 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -177,7 +177,7 @@ void insert( description = "Number of threads to use for inserting data", defaultValue = "-1") int threadCount) - throws IOException { + throws IOException, InterruptedException { if (s3NoSignRequest && s3CopyObject) { throw new UnsupportedOperationException( "--s3-no-sign-request + --s3-copy-object is not supported by AWS (see --help for details)"); diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 6d54c5b3..748c96dd 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -12,10 +12,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -73,7 +70,7 @@ public static void run( boolean s3CopyObject, String retryListFile, int threadCount) - throws IOException { + throws IOException, InterruptedException { if (files.length == 0) { // no work to be done return; @@ -204,12 +201,8 @@ public static void run( } } } finally { - // Cancel any remaining tasks since we won't commit the transaction - if (finalOptions.noCommit() || !atLeastOneFileAppended) { - executor.shutdownNow(); - } else { - executor.shutdown(); - } + executor.awaitTermination(1, TimeUnit.MINUTES); + executor.shutdownNow(); } if (!finalOptions.noCommit()) { diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java index 09c7038c..da85292c 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java @@ -14,38 +14,6 @@ public static Builder builder() { return new Builder(); } - public boolean skipDuplicates() { - return skipDuplicates; - } - - public boolean noCommit() { - return noCommit; - } - - public boolean noCopy() { - return noCopy; - } - - public boolean forceNoCopy() { - return forceNoCopy; - } - - public boolean forceTableAuth() { - return forceTableAuth; - } - - public boolean s3NoSignRequest() { - return s3NoSignRequest; - } - - public boolean s3CopyObject() { - return s3CopyObject; - } - - public int threadCount() { - return threadCount; - } - public Builder toBuilder() { return builder() .skipDuplicates(skipDuplicates)