Skip to content

Commit 95cf25a

Browse files
sumedhsakdeoclaude
andcommitted
feat: Add performance observability with tags/metrics separation
Add a pluggable PerfObserver pattern to instrument ArrowScan, manifest reading, delete file resolution, and scan planning with duration tracking and typed attributes split into tags (dimensions) and metrics (values) for compatibility with Prometheus/OTEL/Kafka backends. Includes CompositeObserver with exception isolation, thread-safe global observer, zero-overhead NullPerfObserver fast path, and @timed decorator with _perf_ctx injection. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e1ab55e commit 95cf25a

5 files changed

Lines changed: 693 additions & 145 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 135 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@
128128
DataFileContent,
129129
FileFormat,
130130
)
131+
from pyiceberg.observability import perf_timer
131132
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value
132133
from pyiceberg.schema import (
133134
PartnerAccessor,
@@ -1586,102 +1587,123 @@ def _task_to_record_batches(
15861587
downcast_ns_timestamp_to_us: bool | None = None,
15871588
batch_size: int | None = None,
15881589
) -> Iterator[pa.RecordBatch]:
1589-
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1590-
with io.new_input(task.file.file_path).open() as fin:
1591-
fragment = arrow_format.make_fragment(fin)
1592-
physical_schema = fragment.physical_schema
1593-
1594-
# For V1 and V2, we only support Timestamp 'us' in Iceberg Schema,
1595-
# therefore it is reasonable to always cast 'ns' timestamp to 'us' on read.
1596-
# For V3 this has to set explicitly to avoid nanosecond timestamp to be down-casted by default
1597-
downcast_ns_timestamp_to_us = (
1598-
downcast_ns_timestamp_to_us if downcast_ns_timestamp_to_us is not None else format_version <= 2
1599-
)
1600-
file_schema = pyarrow_to_schema(
1601-
physical_schema, name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
1602-
)
1603-
1604-
# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
1605-
projected_missing_fields = _get_column_projection_values(
1606-
task.file, projected_schema, table_schema, partition_spec, file_schema.field_ids
1607-
)
1590+
with perf_timer(
1591+
"arrow.read_file",
1592+
file_path=task.file.file_path,
1593+
file_format=str(task.file.file_format),
1594+
) as t:
1595+
t.metric("file_size_bytes", task.file.file_size_in_bytes)
1596+
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1597+
batch_count = 0
1598+
row_count = 0
1599+
with io.new_input(task.file.file_path).open() as fin:
1600+
fragment = arrow_format.make_fragment(fin)
1601+
physical_schema = fragment.physical_schema
1602+
1603+
# For V1 and V2, we only support Timestamp 'us' in Iceberg Schema,
1604+
# therefore it is reasonable to always cast 'ns' timestamp to 'us' on read.
1605+
# For V3 this has to set explicitly to avoid nanosecond timestamp to be down-casted by default
1606+
downcast_ns_timestamp_to_us = (
1607+
downcast_ns_timestamp_to_us if downcast_ns_timestamp_to_us is not None else format_version <= 2
1608+
)
1609+
file_schema = pyarrow_to_schema(
1610+
physical_schema,
1611+
name_mapping,
1612+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
1613+
format_version=format_version,
1614+
)
16081615

1609-
pyarrow_filter = None
1610-
if bound_row_filter is not AlwaysTrue():
1611-
translated_row_filter = translate_column_names(
1612-
bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_field_values=projected_missing_fields
1616+
# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
1617+
projected_missing_fields = _get_column_projection_values(
1618+
task.file, projected_schema, table_schema, partition_spec, file_schema.field_ids
16131619
)
1614-
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1615-
pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema)
1616-
1617-
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
1618-
1619-
scanner_kwargs: dict[str, Any] = {
1620-
"fragment": fragment,
1621-
"schema": physical_schema,
1622-
# This will push down the query to Arrow.
1623-
# But in case there are positional deletes, we have to apply them first
1624-
"filter": pyarrow_filter if not positional_deletes else None,
1625-
"columns": [col.name for col in file_project_schema.columns],
1626-
}
1627-
if batch_size is not None:
1628-
scanner_kwargs["batch_size"] = batch_size
16291620

1630-
fragment_scanner = ds.Scanner.from_fragment(**scanner_kwargs)
1621+
pyarrow_filter = None
1622+
if bound_row_filter is not AlwaysTrue():
1623+
translated_row_filter = translate_column_names(
1624+
bound_row_filter,
1625+
file_schema,
1626+
case_sensitive=case_sensitive,
1627+
projected_field_values=projected_missing_fields,
1628+
)
1629+
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1630+
pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema)
1631+
1632+
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
1633+
1634+
scanner_kwargs: dict[str, Any] = {
1635+
"fragment": fragment,
1636+
"schema": physical_schema,
1637+
# This will push down the query to Arrow.
1638+
# But in case there are positional deletes, we have to apply them first
1639+
"filter": pyarrow_filter if not positional_deletes else None,
1640+
"columns": [col.name for col in file_project_schema.columns],
1641+
}
1642+
if batch_size is not None:
1643+
scanner_kwargs["batch_size"] = batch_size
16311644

1632-
next_index = 0
1633-
batches = fragment_scanner.to_batches()
1634-
for batch in batches:
1635-
next_index = next_index + len(batch)
1636-
current_index = next_index - len(batch)
1637-
current_batch = batch
1645+
fragment_scanner = ds.Scanner.from_fragment(**scanner_kwargs)
16381646

1639-
if positional_deletes:
1640-
# Create the mask of indices that we're interested in
1641-
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
1642-
current_batch = current_batch.take(indices)
1647+
next_index = 0
1648+
batches = fragment_scanner.to_batches()
1649+
for batch in batches:
1650+
next_index = next_index + len(batch)
1651+
current_index = next_index - len(batch)
1652+
current_batch = batch
16431653

1644-
# skip empty batches
1645-
if current_batch.num_rows == 0:
1646-
continue
1654+
if positional_deletes:
1655+
# Create the mask of indices that we're interested in
1656+
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
1657+
current_batch = current_batch.take(indices)
16471658

1648-
# Apply the user filter
1649-
if pyarrow_filter is not None:
1650-
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
1651-
table = pa.Table.from_batches([current_batch])
1652-
table = table.filter(pyarrow_filter)
1659+
batch_count += 1
16531660
# skip empty batches
1654-
if table.num_rows == 0:
1661+
if current_batch.num_rows == 0:
16551662
continue
16561663

1657-
current_batch = table.combine_chunks().to_batches()[0]
1664+
# Apply the user filter
1665+
if pyarrow_filter is not None:
1666+
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
1667+
table = pa.Table.from_batches([current_batch])
1668+
table = table.filter(pyarrow_filter)
1669+
# skip empty batches
1670+
if table.num_rows == 0:
1671+
continue
16581672

1659-
yield _to_requested_schema(
1660-
projected_schema,
1661-
file_project_schema,
1662-
current_batch,
1663-
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
1664-
projected_missing_fields=projected_missing_fields,
1665-
allow_timestamp_tz_mismatch=True,
1666-
)
1673+
current_batch = table.combine_chunks().to_batches()[0]
1674+
1675+
row_count += current_batch.num_rows
1676+
yield _to_requested_schema(
1677+
projected_schema,
1678+
file_project_schema,
1679+
current_batch,
1680+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
1681+
projected_missing_fields=projected_missing_fields,
1682+
allow_timestamp_tz_mismatch=True,
1683+
)
1684+
t.metric("batch_count", batch_count)
1685+
t.metric("row_count", row_count)
16671686

16681687

16691688
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]:
1670-
deletes_per_file: dict[str, list[ChunkedArray]] = {}
1671-
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
1672-
if len(unique_deletes) > 0:
1673-
executor = ExecutorFactory.get_or_create()
1674-
deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map(
1675-
lambda args: _read_deletes(*args),
1676-
[(io, delete_file) for delete_file in unique_deletes],
1677-
)
1678-
for delete in deletes_per_files:
1679-
for file, arr in delete.items():
1680-
if file in deletes_per_file:
1681-
deletes_per_file[file].append(arr)
1682-
else:
1683-
deletes_per_file[file] = [arr]
1689+
with perf_timer("arrow.read_delete_files") as t:
1690+
deletes_per_file: dict[str, list[ChunkedArray]] = {}
1691+
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
1692+
t.metric("unique_delete_files", len(unique_deletes))
1693+
if len(unique_deletes) > 0:
1694+
executor = ExecutorFactory.get_or_create()
1695+
deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map(
1696+
lambda args: _read_deletes(*args),
1697+
[(io, delete_file) for delete_file in unique_deletes],
1698+
)
1699+
for delete in deletes_per_files:
1700+
for file, arr in delete.items():
1701+
if file in deletes_per_file:
1702+
deletes_per_file[file].append(arr)
1703+
else:
1704+
deletes_per_file[file] = [arr]
16841705

1706+
t.metric("data_files_with_deletes", len(deletes_per_file))
16851707
return deletes_per_file
16861708

16871709

@@ -1887,12 +1909,41 @@ def to_record_batches(
18871909
if order.concurrent_streams < 1:
18881910
raise ValueError(f"concurrent_streams must be >= 1, got {order.concurrent_streams}")
18891911
return self._apply_limit(
1890-
self._iter_batches_arrival(
1891-
task_list, deletes_per_file, order.batch_size, order.concurrent_streams, order.max_buffered_batches
1912+
self._iter_batches_counted(
1913+
self._iter_batches_arrival(
1914+
task_list,
1915+
deletes_per_file,
1916+
order.batch_size,
1917+
order.concurrent_streams,
1918+
order.max_buffered_batches,
1919+
),
1920+
task_count=len(task_list),
18921921
)
18931922
)
18941923

1895-
return self._apply_limit(self._iter_batches_materialized(task_list, deletes_per_file))
1924+
return self._apply_limit(
1925+
self._iter_batches_counted(
1926+
self._iter_batches_materialized(task_list, deletes_per_file),
1927+
task_count=len(task_list),
1928+
)
1929+
)
1930+
1931+
@staticmethod
1932+
def _iter_batches_counted(
1933+
inner: Iterator[pa.RecordBatch],
1934+
task_count: int,
1935+
) -> Iterator[pa.RecordBatch]:
1936+
"""Wrap an inner batch iterator with aggregate perf_timer tracking."""
1937+
with perf_timer("arrow.to_record_batches") as t:
1938+
t.metric("task_count", task_count)
1939+
batch_count = 0
1940+
row_count = 0
1941+
for batch in inner:
1942+
batch_count += 1
1943+
row_count += batch.num_rows
1944+
yield batch
1945+
t.metric("batch_count", batch_count)
1946+
t.metric("row_count", row_count)
18961947

18971948
def _prepare_tasks_and_deletes(
18981949
self, tasks: Iterable[FileScanTask]

pyiceberg/manifest.py

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from pyiceberg.conversions import to_bytes
3737
from pyiceberg.exceptions import ValidationError
3838
from pyiceberg.io import FileIO, InputFile, OutputFile
39+
from pyiceberg.observability import perf_timer
3940
from pyiceberg.partitioning import PartitionSpec
4041
from pyiceberg.schema import Schema
4142
from pyiceberg.typedef import Record, TableVersion
@@ -869,18 +870,21 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> list
869870
Returns:
870871
An Iterator of manifest entries.
871872
"""
872-
input_file = io.new_input(self.manifest_path)
873-
with AvroFile[ManifestEntry](
874-
input_file,
875-
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
876-
read_types={-1: ManifestEntry, 2: DataFile},
877-
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
878-
) as reader:
879-
return [
880-
_inherit_from_manifest(entry, self)
881-
for entry in reader
882-
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
883-
]
873+
with perf_timer("manifest.fetch_entries", manifest_path=self.manifest_path) as t:
874+
input_file = io.new_input(self.manifest_path)
875+
with AvroFile[ManifestEntry](
876+
input_file,
877+
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
878+
read_types={-1: ManifestEntry, 2: DataFile},
879+
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
880+
) as reader:
881+
result = [
882+
_inherit_from_manifest(entry, self)
883+
for entry in reader
884+
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
885+
]
886+
t.metric("entry_count", len(result))
887+
return result
884888

885889
def __eq__(self, other: Any) -> bool:
886890
"""Return the equality of two instances of the ManifestFile class."""
@@ -924,19 +928,24 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
924928
Returns:
925929
A tuple of ManifestFile objects.
926930
"""
927-
file = io.new_input(manifest_list)
928-
manifest_files = list(read_manifest_list(file))
929-
930-
result = []
931-
with _manifest_cache_lock:
932-
for manifest_file in manifest_files:
933-
manifest_path = manifest_file.manifest_path
934-
if manifest_path in _manifest_cache:
935-
result.append(_manifest_cache[manifest_path])
936-
else:
937-
_manifest_cache[manifest_path] = manifest_file
938-
result.append(manifest_file)
939-
931+
with perf_timer("manifest.read_list") as t:
932+
file = io.new_input(manifest_list)
933+
manifest_files = list(read_manifest_list(file))
934+
935+
result = []
936+
cache_hits = 0
937+
with _manifest_cache_lock:
938+
for manifest_file in manifest_files:
939+
manifest_path = manifest_file.manifest_path
940+
if manifest_path in _manifest_cache:
941+
result.append(_manifest_cache[manifest_path])
942+
cache_hits += 1
943+
else:
944+
_manifest_cache[manifest_path] = manifest_file
945+
result.append(manifest_file)
946+
947+
t.metric("manifest_count", len(result))
948+
t.metric("cache_hits", cache_hits)
940949
return tuple(result)
941950

942951

0 commit comments

Comments
 (0)