Skip to content

Commit ea18efe

Browse files
Mohammad LinjawiMohammad Linjawi
authored andcommitted
[VL][Delta] Fix Delta scan CI regressions
1 parent d2d279e commit ea18efe

6 files changed

Lines changed: 23 additions & 31 deletions

File tree

cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ roaring::Roaring64Map deserializeDeltaBitmapArray(std::string_view serializedPay
184184
return result;
185185
}
186186

187-
VELOX_USER_FAIL("Unexpected Delta bitmap array magic number {} for {}", magic, dvPath);
187+
VELOX_USER_FAIL("Unexpected magic number {} for {}", magic, dvPath);
188188
}
189189

190190
} // namespace

cpp/velox/compute/delta/DeltaSplit.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ struct DeltaDeletionVectorDescriptor {
9797
/// Format: If offset is None then <storageType><pathOrInlineDv>
9898
/// Otherwise <storageType><pathOrInlineDv>@<offset>
9999
std::string uniqueId() const {
100-
char storageTypeChar;
100+
char storageTypeChar = 'u';
101101
switch (storageType) {
102102
case DeltaDeletionVectorStorageType::kUuidPath:
103103
storageTypeChar = 'u';

cpp/velox/compute/delta/DeltaUuidUtils.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ std::string DeltaUuidUtils::encodeBytesToBase85(std::string_view data) {
122122
}
123123

124124
DeltaUuidUtils::Uuid DeltaUuidUtils::decodeZ85ToUuid(const std::string& z85) {
125-
VELOX_CHECK_EQ(z85.length(), 20, "Z85 encoded UUID must be exactly 20 characters");
125+
VELOX_USER_CHECK_EQ(z85.length(), 20, "Z85 encoded UUID must be exactly 20 characters");
126126

127127
Uuid uuid;
128128

@@ -136,11 +136,11 @@ DeltaUuidUtils::Uuid DeltaUuidUtils::decodeZ85ToUuid(const std::string& z85) {
136136

137137
std::string DeltaUuidUtils::decodeBase85ToBytes(std::string_view encoded, size_t decodedSize) {
138138
if (encoded.empty()) {
139-
VELOX_CHECK_EQ(decodedSize, 0, "Expected decoded Base85 size to be 0 for empty input");
139+
VELOX_USER_CHECK_EQ(decodedSize, 0, "Expected decoded Base85 size to be 0 for empty input");
140140
return "";
141141
}
142142

143-
VELOX_CHECK_EQ(encoded.size() % 5, 0, "Base85 encoded payload must be aligned to 5-character blocks");
143+
VELOX_USER_CHECK_EQ(encoded.size() % 5, 0, "Base85 encoded payload must be aligned to 5-character blocks");
144144

145145
const size_t maxDecodedSize = (encoded.size() / 5) * 4;
146146
VELOX_CHECK_LE(
@@ -201,7 +201,7 @@ std::pair<std::string, DeltaUuidUtils::Uuid> DeltaUuidUtils::extractUuidFromZ85(
201201
// Z85-encoded UUID is always 20 characters
202202
// Any characters before that are the random prefix
203203
if (z85.length() < 20) {
204-
VELOX_FAIL("Z85 string too short to contain UUID: {}", z85);
204+
VELOX_USER_FAIL("Z85 string too short to contain UUID: {}", z85);
205205
}
206206

207207
std::string randomPrefix;
@@ -223,13 +223,22 @@ std::pair<std::string, DeltaUuidUtils::Uuid> DeltaUuidUtils::extractUuidFromZ85(
223223
std::string
224224
DeltaUuidUtils::reconstructUuidPath(const std::string& tableDir, const std::string& randomPrefix, const Uuid& uuid) {
225225
std::ostringstream oss;
226-
oss << tableDir;
226+
const auto normalizedTableDir =
227+
!tableDir.empty() && tableDir.back() == '/' ? tableDir.substr(0, tableDir.size() - 1) : tableDir;
228+
oss << normalizedTableDir;
227229

228230
if (!randomPrefix.empty()) {
229-
oss << "/" << randomPrefix;
231+
if (!normalizedTableDir.empty()) {
232+
oss << "/";
233+
}
234+
oss << randomPrefix;
235+
}
236+
237+
if (!normalizedTableDir.empty() || !randomPrefix.empty()) {
238+
oss << "/";
230239
}
231240

232-
oss << "/deletion_vector_" << uuidToString(uuid) << ".bin";
241+
oss << "deletion_vector_" << uuidToString(uuid) << ".bin";
233242
return oss.str();
234243
}
235244

cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationSuccess) {
573573
auto reader = std::make_unique<DeltaDeletionVectorReader>(nullptr, pool_.get(), ioStats_);
574574

575575
// Should succeed with correct cardinality
576-
EXPECT_NO_THROW(reader->loadInlineDeletionVector(inlineData, 5));
576+
EXPECT_NO_THROW(reader->loadInlineDeletionVector(inlineData, std::nullopt, 5));
577577

578578
// Verify cardinality
579579
EXPECT_EQ(reader->estimatedDeletedRowCount(), 5);
@@ -586,7 +586,7 @@ TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationMismatchThrows) {
586586
auto reader = std::make_unique<DeltaDeletionVectorReader>(nullptr, pool_.get(), ioStats_);
587587

588588
// Should throw with incorrect cardinality
589-
EXPECT_THROW(reader->loadInlineDeletionVector(inlineData, 3), VeloxUserError);
589+
EXPECT_THROW(reader->loadInlineDeletionVector(inlineData, std::nullopt, 3), VeloxUserError);
590590
}
591591

592592
TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationFileSuccess) {
@@ -635,7 +635,7 @@ TEST_F(DeltaDeletionVectorReaderTest, LargeCardinalityValidation) {
635635
auto reader = std::make_unique<DeltaDeletionVectorReader>(nullptr, pool_.get(), ioStats_);
636636

637637
// Should succeed with correct cardinality
638-
EXPECT_NO_THROW(reader->loadInlineDeletionVector(inlineData, 1000));
638+
EXPECT_NO_THROW(reader->loadInlineDeletionVector(inlineData, std::nullopt, 1000));
639639
EXPECT_EQ(reader->estimatedDeletedRowCount(), 1000);
640640
}
641641

cpp/velox/compute/delta/tests/DeltaUuidUtilsTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ TEST_F(DeltaUuidUtilsTest, DecodeZ85InvalidLength) {
6161

6262
TEST_F(DeltaUuidUtilsTest, DecodeZ85InvalidCharacter) {
6363
// Test with invalid character (not in Z85 alphabet)
64-
std::string encoded = "~~~~~"; // '~' is not in Z85 alphabet
64+
std::string encoded(20, '~'); // '~' is not in Z85 alphabet
6565

6666
EXPECT_THROW(DeltaUuidUtils::decodeZ85ToUuid(encoded), facebook::velox::VeloxUserError);
6767
}

gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ package org.apache.gluten.extension
1919
import org.apache.gluten.execution.DeltaScanTransformer
2020
import org.apache.gluten.extension.columnar.FallbackTags
2121
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
22-
import org.apache.gluten.sql.shims.SparkShimLoader
2322

2423
import org.apache.spark.sql.delta.DeltaParquetFileFormat
25-
import org.apache.spark.sql.delta.DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME
2624
import org.apache.spark.sql.delta.files.TahoeFileIndex
2725
import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
2826
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -32,11 +30,8 @@ case class OffloadDeltaScan() extends OffloadSingleNode {
3230
case scan: FileSourceScanExec if isDeltaScan(scan) && isDeltaLogScan(scan) =>
3331
FallbackTags.add(scan, "fallback Delta _delta_log scan")
3432
scan
35-
case scan: FileSourceScanExec if isDvPreparedDeltaScan(scan) =>
36-
DeltaScanTransformer(scan)
3733
case scan: FileSourceScanExec if isDeltaScan(scan) =>
38-
FallbackTags.add(scan, "fallback plain Delta scan without DV preprocessing")
39-
scan
34+
DeltaScanTransformer(scan)
4035
case other => other
4136
}
4237

@@ -55,18 +50,6 @@ case class OffloadDeltaScan() extends OffloadSingleNode {
5550
scan.relation.location.isInstanceOf[PreparedDeltaFileIndex]
5651
}
5752

58-
private def isDvPreparedDeltaScan(scan: FileSourceScanExec): Boolean = {
59-
isDeltaScan(scan) && hasDeletionVectorMarkers(scan)
60-
}
61-
62-
private def hasDeletionVectorMarkers(scan: FileSourceScanExec): Boolean = {
63-
val sparkShims = SparkShimLoader.getSparkShims
64-
scan.output.exists(_.name == IS_ROW_DELETED_COLUMN_NAME) ||
65-
scan.requiredSchema.fieldNames.contains(IS_ROW_DELETED_COLUMN_NAME) ||
66-
scan.output.exists(attr => sparkShims.isRowIndexMetadataColumn(attr.name)) ||
67-
scan.requiredSchema.fieldNames.exists(sparkShims.isRowIndexMetadataColumn)
68-
}
69-
7053
private def isDeltaLogScan(scan: FileSourceScanExec): Boolean = {
7154
scan.relation.location.rootPaths.exists {
7255
path =>

0 commit comments

Comments
 (0)