Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pyiceberg/table/delete_file_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def filter_by_seq(self, seq: int) -> list[DataFile]:
start_idx = bisect_left(self._seqs, seq)
return [delete_file for delete_file, _ in self._files[start_idx:]]

def referenced_delete_files(self) -> list[DataFile]:
self._ensure_indexed()
return [data_file for data_file, _ in self._files]


def _has_path_bounds(delete_file: DataFile) -> bool:
lower = delete_file.lower_bounds
Expand Down Expand Up @@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
deletes.update(path_deletes.filter_by_seq(seq_num))

return deletes

def referenced_data_files(self) -> list[DataFile]:
Comment thread
gabeiglio marked this conversation as resolved.
Outdated
data_files: list[DataFile] = []

for deletes in self._by_partition.values():
data_files.extend(deletes.referenced_delete_files())

for deletes in self._by_path.values():
data_files.extend(deletes.referenced_delete_files())

return data_files
123 changes: 122 additions & 1 deletion pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
from pyiceberg.exceptions import ValidationException
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.manifest import (
INITIAL_SEQUENCE_NUMBER,
DataFile,
ManifestContent,
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
)
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.delete_file_index import DeleteFileIndex
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, Operation.OVERWRITE}
VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, Operation.OVERWRITE}


def _validation_history(
Expand Down Expand Up @@ -216,6 +225,61 @@ def _added_data_files(
yield entry


def _added_delete_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: BooleanExpression | None,
partition_set: dict[int, set[Record]] | None,
parent_snapshot: Snapshot | None,
) -> DeleteFileIndex:
"""Return matching delete files that have been added to the table since a starting snapshot.

Args:
table: Table to get the history from
starting_snapshot: Starting snapshot to get the history from
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
parent_snapshot: Parent snapshot to get the history from

Returns:
DeleteFileIndex
"""
if parent_snapshot is None or table.format_version < 2:
return DeleteFileIndex()

manifests, snapshot_ids = _validation_history(
table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
)

dfi = DeleteFileIndex()

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
Comment thread
gabeiglio marked this conversation as resolved.
Outdated
if _filter_manifest_entries(
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema()
):
dfi.add_delete_file(entry, entry.data_file.partition)

return dfi


def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) -> int:
Comment thread
gabeiglio marked this conversation as resolved.
Outdated
"""Find the starting sequence number from a snapshot.

Args:
table: Table to find snapshot from
starting_snapshot: Snapshot from where to start looking

Returns
Sequence number as int
"""
if starting_snapshot is not None:
if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id):
Comment thread
gabeiglio marked this conversation as resolved.
Outdated
if seq := snapshot.sequence_number:
return seq
return INITIAL_SEQUENCE_NUMBER


def _validate_added_data_files(
table: Table,
starting_snapshot: Snapshot,
Expand All @@ -235,3 +299,60 @@ def _validate_added_data_files(
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")


def _validate_no_new_delete_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: BooleanExpression | None,
partition_set: dict[int, set[Record]] | None,
parent_snapshot: Snapshot | None,
) -> None:
"""Validate no new delete files matching a filter have been added to the table since starting a snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find added data files
partition_set: Dictionary of partition spec to set of partition records
parent_snapshot: Ending snapshot on the branch being validated
"""
deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot)

if deletes.is_empty():
return

conflicting_delete_files = deletes.referenced_data_files()
raise ValidationException(
f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_files}"
Comment thread
gabeiglio marked this conversation as resolved.
Outdated
)


def _validate_no_new_delete_files_for_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: BooleanExpression | None,
data_files: set[DataFile],
parent_snapshot: Snapshot | None,
) -> None:
"""Validate no new delete files must be applied for data files that have been added to the table since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find added data files
data_files: data files to validate have no new deletes
parent_snapshot: Ending snapshot on the branch being validated
"""
# If there is no current state, or no files has been added
if parent_snapshot is None or table.format_version < 2:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've verified this logic locally by mocking a concurrent conflict. The table.format_version < 2 guard correctly prevents unnecessary overhead for V1 tables, and the use of the DeleteFileIndex ensures we are only blocking commits when there is a real overlap in data files (avoiding 'lazy' global locks).

return

deletes = _added_delete_files(table, starting_snapshot, data_filter, None, parent_snapshot)
seq_num = _starting_sequence_number(table, starting_snapshot)

# Fail to any delete file found that applies to files written in or before the starting snapshot
for data_file in data_files:
delete_files = deletes.for_data_file(seq_num, data_file)
Comment thread
gabeiglio marked this conversation as resolved.
Outdated
if len(delete_files) > 0:
raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}")
161 changes: 160 additions & 1 deletion tests/table/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@

from pyiceberg.exceptions import ValidationException
from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import (
_added_data_files,
_added_delete_files,
_deleted_data_files,
_validate_added_data_files,
_validate_deleted_data_files,
_validate_no_new_delete_files,
_validate_no_new_delete_files_for_data_files,
_validation_history,
)

Expand Down Expand Up @@ -350,3 +353,159 @@ class DummyEntry:
data_filter=None,
parent_snapshot=oldest_snapshot,
)


@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE])
def test_validate_added_delete_files_non_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number
)
]

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
dfi = _added_delete_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)

assert dfi.is_empty()
assert len(dfi.referenced_data_files()) == 0


@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE])
def test_validate_added_delete_files_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

mock_delete_file = DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path="s3://dummy/path",
)

mock_delete_file.spec_id = 0

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
result = [
ManifestEntry.from_args(
Comment thread
gabeiglio marked this conversation as resolved.
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.min_sequence_number
)
]

result[-1] = ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id,
sequence_number=10000,
data_file=mock_delete_file,
)

return result

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
dfi = _added_delete_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)

assert not dfi.is_empty()
assert dfi.referenced_data_files()[0] == mock_delete_file


def test_validate_no_new_delete_files_raises_on_conflict(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, _ = table_v2_with_extensive_snapshots_and_manifests
oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", return_value=False):
with pytest.raises(ValidationException):
_validate_no_new_delete_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
partition_set=None,
parent_snapshot=oldest_snapshot,
)


def test_validate_no_new_delete_files_for_data_files_raises_on_conflict(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, _ = table_v2_with_extensive_snapshots_and_manifests
oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

mocked_data_file = DataFile.from_args()

with patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", return_value=[mocked_data_file]):
with pytest.raises(ValidationException):
_validate_no_new_delete_files_for_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
data_files={mocked_data_file},
parent_snapshot=oldest_snapshot,
)