diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java similarity index 53% rename from spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java rename to spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java index 9c31eb970b56..0152c8e0e6f0 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java @@ -16,15 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.spark.extensions; +package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.RESTCatalogProperties; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.sql.TestSelect; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(ParameterizedTestExtension.class) @@ -38,8 +49,6 @@ protected static Object[][] parameters() { ImmutableMap.builder() .putAll(SparkCatalogConfig.REST.properties()) .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) - // this flag is typically only set by the server, but we set it from the client for - // testing .put( RESTCatalogProperties.SCAN_PLANNING_MODE, RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) @@ -48,4 +57,36 @@ protected static Object[][] parameters() { } }; } + + @TestTemplate + public void fileIOIsPropagated() { + RESTCatalog catalog = new RESTCatalog(); + catalog.setConf(new Configuration()); + catalog.initialize( + "test", + ImmutableMap.builder() + .putAll(restCatalog.properties()) + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) + .build()); + Table table = catalog.loadTable(tableIdent); + + SparkScanBuilder builder = new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty()); + verifyFileIOHasPlanId(builder.build().toBatch(), table); + verifyFileIOHasPlanId(builder.buildCopyOnWriteScan().toBatch(), table); + } + + private void verifyFileIOHasPlanId(Batch batch, Table table) { + FileIO fileIOForScan = + (FileIO) + assertThat(batch) + .extracting("fileIO") + .isInstanceOf(Supplier.class) + .asInstanceOf(InstanceOfAssertFactories.type(Supplier.class)) + .actual() + .get(); + assertThat(fileIOForScan.properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID); + assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index fe062f9d7357..c1b2a5873730 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -31,6 +31,7 @@ import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.spark.OrcBatchReadConf; @@ -49,6 +50,7 @@ abstract class BaseBatchReader extends BaseReader taskGroup, Schema tableSchema, Schema expectedSchema, @@ -57,7 +59,13 @@ abstract class BaseBatchReader extends BaseReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class); private final Table table; + private final EncryptingFileIO fileIO; private final Schema tableSchema; private final Schema expectedSchema; private final boolean caseSensitive; @@ -85,12 +84,14 @@ abstract class BaseReader implements Closeable { BaseReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { this.table = table; + this.fileIO = EncryptingFileIO.combine(fileIO, table().encryption()); this.taskGroup = taskGroup; this.tasks = taskGroup.tasks().iterator(); this.currentIterator = CloseableIterator.empty(); @@ -182,25 +183,14 @@ protected InputFile getInputFile(String location) { private Map inputFiles() { if (lazyInputFiles == null) { - Stream encryptedFiles = - taskGroup.tasks().stream().flatMap(this::referencedFiles).map(this::toEncryptedInputFile); - - // decrypt with the batch call to avoid multiple RPCs to a key server, if possible - Iterable decryptedFiles = table.encryption().decrypt(encryptedFiles::iterator); - - Map files = Maps.newHashMapWithExpectedSize(taskGroup.tasks().size()); - decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted)); - this.lazyInputFiles = ImmutableMap.copyOf(files); + this.lazyInputFiles = + fileIO.bulkDecrypt( + () -> taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator()); } return lazyInputFiles; } - private EncryptedInputFile toEncryptedInputFile(ContentFile file) { - InputFile inputFile = table.io().newInputFile(file.location()); - return EncryptedFiles.encryptedInput(inputFile, file.keyMetadata()); - } - protected Map constantsMap(ContentScanTask task, Schema readSchema) { if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { StructType partitionType = Partitioning.partitionType(table); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 53d44e760afe..a7016e3b09b7 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -28,19 +28,27 @@ import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { BaseRowReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { super( - table, taskGroup, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + table, + fileIO, + taskGroup, + tableSchema, + expectedSchema, + caseSensitive, + cacheDeleteFilesOnExecutors); } protected CloseableIterable newIterable( diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 9ec0f885775f..3dcfb604ea46 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.OrcBatchReadConf; @@ -53,6 +54,7 @@ class BatchDataReader extends BaseBatchReader OrcBatchReadConf orcBatchReadConf) { this( partition.table(), + partition.io(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), @@ -64,6 +66,7 @@ class BatchDataReader extends BaseBatchReader BatchDataReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, @@ -73,6 +76,7 @@ class BatchDataReader extends BaseBatchReader boolean cacheDeleteFilesOnExecutors) { super( table, + fileIO, taskGroup, tableSchema, expectedSchema, diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index b8fa129f6a44..365747565165 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -35,6 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.SnapshotUtil; @@ -51,6 +52,7 @@ class ChangelogRowReader extends BaseRowReader ChangelogRowReader(SparkInputPartition partition) { this( partition.table(), + partition.io(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), @@ -60,6 +62,7 @@ class ChangelogRowReader extends BaseRowReader ChangelogRowReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, @@ -67,6 +70,7 @@ class ChangelogRowReader extends BaseRowReader boolean cacheDeleteFilesOnExecutors) { super( table, + fileIO, taskGroup, tableSchema, ChangelogUtil.dropChangelogMetadata(expectedSchema), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index e1292647b797..96dd99ea64e3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -25,6 +25,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; @@ -32,11 +33,19 @@ public class EqualityDeleteRowReader extends RowDataReader { public EqualityDeleteRowReader( CombinedScanTask task, Table table, + FileIO fileIO, Schema tableSchema, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { - super(table, task, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + super( + table, + fileIO, + task, + tableSchema, + expectedSchema, + caseSensitive, + cacheDeleteFilesOnExecutors); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 9454808f008b..394b3d47b6e4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -28,6 +28,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.ContentFileUtil; @@ -46,6 +47,7 @@ class PositionDeletesRowReader extends BaseRowReader PositionDeletesRowReader(SparkInputPartition partition) { this( partition.table(), + partition.io(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), @@ -55,14 +57,20 @@ class PositionDeletesRowReader extends BaseRowReader PositionDeletesRowReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, boolean caseSensitive, boolean cacheDeleteFilesOnExecutors) { - super( - table, taskGroup, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + table, + fileIO, + taskGroup, + tableSchema, + expectedSchema, + caseSensitive, + cacheDeleteFilesOnExecutors); int numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index b2b3c7856389..08aa44f71041 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -28,6 +28,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; @@ -48,6 +49,7 @@ class RowDataReader extends BaseRowReader implements PartitionRead RowDataReader(SparkInputPartition partition) { this( partition.table(), + partition.io(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), @@ -57,6 +59,7 @@ class RowDataReader extends BaseRowReader implements PartitionRead RowDataReader( Table table, + FileIO fileIO, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, @@ -64,7 +67,13 @@ class RowDataReader extends BaseRowReader implements PartitionRead boolean cacheDeleteFilesOnExecutors) { super( - table, taskGroup, tableSchema, expectedSchema, caseSensitive, cacheDeleteFilesOnExecutors); + table, + fileIO, + taskGroup, + tableSchema, + expectedSchema, + caseSensitive, + cacheDeleteFilesOnExecutors); numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java new file mode 100644 index 000000000000..2a359110c810 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.hadoop.HadoopConfigurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableSupplier; +import org.apache.spark.util.KnownSizeEstimation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides a serializable {@link FileIO} with a known size estimate. Spark calls its + * {@link org.apache.spark.util.SizeEstimator} class when broadcasting variables and this can be an + * expensive operation, so providing a known size estimate allows that operation to be skipped. + * + *

This class also implements {@link AutoCloseable} to avoid leaking resources upon broadcasting. + * Broadcast variables are destroyed and cleaned up on the driver and executors once they are + * garbage collected on the driver. The implementation ensures only resources used by copies of the + * main {@link FileIO} are released. + */ +class SerializableFileIOWithSize + implements FileIO, HadoopConfigurable, KnownSizeEstimation, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SerializableFileIOWithSize.class); + private static final long SIZE_ESTIMATE = 32_768L; + private final transient Object serializationMarker; + private final FileIO fileIO; + + private SerializableFileIOWithSize(FileIO fileIO) { + this.fileIO = fileIO; + this.serializationMarker = new Object(); + } + + @Override + public long estimatedSize() { + return SIZE_ESTIMATE; + } + + public static FileIO wrap(FileIO fileIO) { + return new SerializableFileIOWithSize(fileIO); + } + + @Override + public void close() { + if (null == serializationMarker) { + LOG.debug("Closing FileIO"); + fileIO.close(); + } + } + + @Override + public InputFile newInputFile(String path) { + return fileIO.newInputFile(path); + } + + @Override + public InputFile newInputFile(String path, long length) { + return fileIO.newInputFile(path, length); + } + + @Override + public OutputFile newOutputFile(String path) { + return fileIO.newOutputFile(path); + } + + @Override + public void deleteFile(String path) { + fileIO.deleteFile(path); + } + + @Override + public void initialize(Map properties) { + fileIO.initialize(properties); + } + + @Override + public Map properties() { + return fileIO.properties(); + } + + @Override + public void serializeConfWith( + Function> confSerializer) { + if (fileIO instanceof HadoopConfigurable configurable) { + configurable.serializeConfWith(confSerializer); + } + } + + @Override + public void setConf(Configuration conf) { + if (fileIO instanceof HadoopConfigurable configurable) { + configurable.setConf(conf); + } + } + + @Override + public Configuration getConf() { + if (fileIO instanceof HadoopConfigurable hadoopConfigurable) { + return hadoopConfigurable.getConf(); + } + + return null; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 261d5fa227a5..2109936c96b9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Objects; +import java.util.function.Supplier; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; @@ -28,6 +29,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.spark.ImmutableOrcBatchReadConf; import org.apache.iceberg.spark.ImmutableParquetBatchReadConf; import org.apache.iceberg.spark.OrcBatchReadConf; @@ -45,6 +47,7 @@ class SparkBatch implements Batch { private final JavaSparkContext sparkContext; private final Table table; + private final Supplier fileIO; private final String branch; private final SparkReadConf readConf; private final Types.StructType groupingKeyType; @@ -59,6 +62,7 @@ class SparkBatch implements Batch { SparkBatch( JavaSparkContext sparkContext, Table table, + Supplier fileIO, SparkReadConf readConf, Types.StructType groupingKeyType, List> taskGroups, @@ -66,6 +70,7 @@ class SparkBatch implements Batch { int scanHashCode) { this.sparkContext = sparkContext; this.table = table; + this.fileIO = fileIO; this.branch = readConf.branch(); this.readConf = readConf; this.groupingKeyType = groupingKeyType; @@ -83,6 +88,8 @@ public InputPartition[] planInputPartitions() { // broadcast the table metadata as input partitions will be sent to executors Broadcast tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + Broadcast fileIOBroadcast = + sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get())); String expectedSchemaString = SchemaParser.toJson(expectedSchema); String[][] locations = computePreferredLocations(); @@ -94,6 +101,7 @@ public InputPartition[] planInputPartitions() { groupingKeyType, taskGroups.get(index), tableBroadcast, + fileIOBroadcast, branch, expectedSchemaString, caseSensitive, @@ -106,7 +114,7 @@ public InputPartition[] planInputPartitions() { private String[][] computePreferredLocations() { if (localityEnabled) { - return SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups); + return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups); } else if (executorCacheLocalityEnabled) { List executorLocations = SparkUtil.executorLocations(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 55ea137ca1b0..eba0431e3adf 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -106,6 +106,7 @@ public Batch toBatch() { return new SparkBatch( sparkContext, table, + null != scan ? scan.fileIO() : table::io, readConf, EMPTY_GROUPING_KEY_TYPE, taskGroups(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java index 99b1d78a86b0..a93031780255 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java @@ -24,6 +24,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.types.Types; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,6 +35,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab private final Types.StructType groupingKeyType; private final ScanTaskGroup taskGroup; private final Broadcast
tableBroadcast; + private final Broadcast fileIOBroadcast; private final String branch; private final String expectedSchemaString; private final boolean caseSensitive; @@ -46,6 +48,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab Types.StructType groupingKeyType, ScanTaskGroup taskGroup, Broadcast
tableBroadcast, + Broadcast fileIOBroadcast, String branch, String expectedSchemaString, boolean caseSensitive, @@ -54,6 +57,7 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab this.groupingKeyType = groupingKeyType; this.taskGroup = taskGroup; this.tableBroadcast = tableBroadcast; + this.fileIOBroadcast = fileIOBroadcast; this.branch = branch; this.expectedSchemaString = expectedSchemaString; this.caseSensitive = caseSensitive; @@ -84,6 +88,10 @@ public Table table() { return tableBroadcast.value(); } + public FileIO io() { + return fileIOBroadcast.value(); + } + public String branch() { return branch; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 018b96c7a1d8..06bc16bba847 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Locale; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataOperations; @@ -77,10 +78,12 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(); private final Table table; + private final Supplier fileIO; private final String branch; private final boolean caseSensitive; private final String expectedSchema; private final Broadcast
tableBroadcast; + private final Broadcast fileIOBroadcast; private final long splitSize; private final int splitLookback; private final long splitOpenFileCost; @@ -97,15 +100,18 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA SparkMicroBatchStream( JavaSparkContext sparkContext, Table table, + Supplier fileIO, SparkReadConf readConf, Schema expectedSchema, String checkpointLocation) { this.table = table; + this.fileIO = fileIO; this.branch = readConf.branch(); this.caseSensitive = readConf.caseSensitive(); this.expectedSchema = SchemaParser.toJson(expectedSchema); this.localityPreferred = readConf.localityEnabled(); this.tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + this.fileIOBroadcast = sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get())); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); @@ -172,6 +178,7 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { EMPTY_GROUPING_KEY_TYPE, combinedScanTasks.get(index), tableBroadcast, + fileIOBroadcast, branch, expectedSchema, caseSensitive, @@ -183,7 +190,9 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { } private String[][] computePreferredLocations(List taskGroups) { - return localityPreferred ? SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups) : null; + return localityPreferred + ? SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups) + : null; } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index c9726518ee4e..4d9fb7556b4e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -78,7 +78,14 @@ abstract class SparkPartitioningAwareScan extends S Schema expectedSchema, List filters, Supplier scanReportSupplier) { - super(spark, table, readConf, expectedSchema, filters, scanReportSupplier); + super( + spark, + table, + null != scan ? scan.fileIO() : table::io, + readConf, + expectedSchema, + filters, + scanReportSupplier); this.scan = scan; this.preserveDataGrouping = readConf.preserveDataGrouping(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..a921f1446a85 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -33,6 +33,7 @@ import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.metrics.ScanReport; import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -101,6 +102,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { private final JavaSparkContext sparkContext; private final Table table; + private final Supplier fileIO; private final SparkSession spark; private final SparkReadConf readConf; private final boolean caseSensitive; @@ -115,6 +117,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { SparkScan( SparkSession spark, Table table, + Supplier fileIO, SparkReadConf readConf, Schema expectedSchema, List filters, @@ -125,6 +128,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { this.spark = spark; this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; + this.fileIO = fileIO; this.readConf = readConf; this.caseSensitive = readConf.caseSensitive(); this.expectedSchema = expectedSchema; @@ -162,13 +166,20 @@ protected Types.StructType groupingKeyType() { @Override public Batch toBatch() { return new SparkBatch( - sparkContext, table, readConf, groupingKeyType(), taskGroups(), expectedSchema, hashCode()); + sparkContext, + table, + fileIO, + readConf, + groupingKeyType(), + taskGroups(), + expectedSchema, + hashCode()); } @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { return new SparkMicroBatchStream( - sparkContext, table, readConf, expectedSchema, checkpointLocation); + sparkContext, table, fileIO, readConf, expectedSchema, checkpointLocation); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index d2eb4e5a56e9..99e0deabb0fb 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -41,7 +41,7 @@ class SparkStagedScan extends SparkScan { private List> taskGroups = null; // lazy cache of tasks SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { - super(spark, table, readConf, expectedSchema, ImmutableList.of(), null); + super(spark, table, table::io, readConf, expectedSchema, ImmutableList.of(), null); this.taskSetId = readConf.scanTaskSetId(); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java index 3fcd7fa24a3e..38dcea259c99 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -69,7 +69,11 @@ public abstract class TestBaseWithCatalog extends TestBase { // status even belonging to the same catalog. Reference: // https://www.sqlite.org/inmemorydb.html CatalogProperties.CLIENT_POOL_SIZE, - "1")); + "1", + "include-credentials", + "true", + "gcs.oauth2.token", + "dummyToken")); protected static RESTCatalog restCatalog; diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java index 1e53710a0f7f..8e26a2f42646 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java @@ -90,7 +90,7 @@ private static class ClosureTrackingReader extends BaseReader tracker = Maps.newHashMap(); ClosureTrackingReader(Table table, List tasks) { - super(table, new BaseCombinedScanTask(tasks), null, null, false, true); + super(table, table.io(), new BaseCombinedScanTask(tasks), null, null, false, true); } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java index b88f0233e203..be4391aab668 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java @@ -105,7 +105,8 @@ public void testInsert() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), table.schema(), false, true); + new ChangelogRowReader( + table, table.io(), taskGroup, table.schema(), table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } @@ -136,7 +137,8 @@ public void testDelete() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), table.schema(), false, true); + new ChangelogRowReader( + table, table.io(), taskGroup, table.schema(), table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } @@ -170,7 +172,8 @@ public void testDataFileRewrite() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), table.schema(), false, true); + new ChangelogRowReader( + table, table.io(), taskGroup, table.schema(), table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } @@ -197,7 +200,8 @@ public void testMixDeleteAndInsert() throws IOException { for (ScanTaskGroup taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), table.schema(), false, true); + new ChangelogRowReader( + table, table.io(), taskGroup, table.schema(), table.schema(), false, true); while (reader.next()) { rows.add(reader.get().copy()); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java index 764e1c6c9370..92aace3dfd5c 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java @@ -180,6 +180,7 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException try (PositionDeletesRowReader reader = new PositionDeletesRowReader( table, + table.io(), new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)), positionDeletesTable.schema(), projectedSchema, @@ -220,6 +221,7 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException try (PositionDeletesRowReader reader = new PositionDeletesRowReader( table, + table.io(), new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)), positionDeletesTable.schema(), projectedSchema, @@ -292,6 +294,7 @@ public void readPositionDeletesTableWithDifferentColumnOrdering() throws IOExcep try (PositionDeletesRowReader reader = new PositionDeletesRowReader( table, + table.io(), new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)), positionDeletesTable.schema(), projectedSchema, diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSerializableFileIOWithSize.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSerializableFileIOWithSize.java new file mode 100644 index 000000000000..e025114f4fd9 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSerializableFileIOWithSize.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import org.apache.iceberg.io.FileIO; +import org.junit.jupiter.api.Test; + +class TestSerializableFileIOWithSize { + + @Test + void newInputFileWithLength() { + FileIO mockFileIO = mock(FileIO.class); + FileIO serializableFileIO = SerializableFileIOWithSize.wrap(mockFileIO); + String path = "gs://bucket/path/to/file.parquet"; + long length = 1024L; + + serializableFileIO.newInputFile(path, length); + + verify(mockFileIO).newInputFile(path, length); + } + + @Test + void newInputFileWithoutLength() { + FileIO mockFileIO = mock(FileIO.class); + FileIO serializableFileIO = SerializableFileIOWithSize.wrap(mockFileIO); + String path = "gs://bucket/path/to/file.parquet"; + + serializableFileIO.newInputFile(path); + + verify(mockFileIO).newInputFile(path); + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 33b5a1d6e600..42ddefe47f8b 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -21,11 +21,13 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; +import java.time.LocalDate; import java.util.List; import java.util.Set; import javax.annotation.Nonnull; @@ -69,6 +71,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.ImmutableParquetBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.spark.TestBase; @@ -88,6 +92,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -326,7 +331,8 @@ public void testReadEqualityDeleteRows() throws IOException { for (CombinedScanTask task : tasks) { try (EqualityDeleteRowReader reader = - new EqualityDeleteRowReader(task, table, table.schema(), table.schema(), false, true)) { + new EqualityDeleteRowReader( + task, table, table.io(), table.schema(), table.schema(), false, true)) { while (reader.next()) { actualRowSet.add( new InternalRowWrapper( @@ -635,6 +641,61 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio assertThat(rowSet(tblName, tbl, "*")).hasSize(193); } + @TestTemplate + public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException { + assumeThat(format).isEqualTo(FileFormat.PARQUET); + initDateTable(); + + Schema deleteRowSchema = dateTable.schema().select("dt"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("dt", LocalDate.parse("2021-09-01")), + dataDelete.copy("dt", LocalDate.parse("2021-09-02")), + dataDelete.copy("dt", LocalDate.parse("2021-09-03"))); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + dateTable, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes.subList(0, 3), + deleteRowSchema); + + dateTable.newRowDelta().addDeletes(eqDeletes).commit(); + + CloseableIterable tasks = + TableScanUtil.planTasks( + dateTable.newScan().planFiles(), + TableProperties.METADATA_SPLIT_SIZE_DEFAULT, + TableProperties.SPLIT_LOOKBACK_DEFAULT, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + + ParquetBatchReadConf conf = ImmutableParquetBatchReadConf.builder().batchSize(7).build(); + + for (CombinedScanTask task : tasks) { + try (BatchDataReader reader = + new BatchDataReader( + // expected column is id, while the equality filter column is dt + dateTable, + dateTable.io(), + task, + dateTable.schema(), + dateTable.schema().select("id"), + false, + conf, + null, + true)) { + while (reader.next()) { + ColumnarBatch columnarBatch = reader.get(); + int numOfCols = columnarBatch.numCols(); + assertThat(numOfCols).as("Number of columns").isEqualTo(1); + assertThat(columnarBatch.column(0).dataType()).as("Column type").isEqualTo(IntegerType); + } + } + } + } + private static final Schema PROJECTION_SCHEMA = new Schema( required(1, "id", Types.IntegerType.get()),