6565
6666from __future__ import annotations
6767
68+ import hashlib
6869import logging
6970import time
7071from collections .abc import Callable
8182from vgi_rpc .metadata import (
8283 LOCATION_FETCH_MS_KEY ,
8384 LOCATION_KEY ,
85+ LOCATION_SHA256_KEY ,
8486 LOCATION_SOURCE_KEY ,
8587 LOG_LEVEL_KEY ,
8688 merge_metadata ,
@@ -378,23 +380,31 @@ def is_external_location_batch(batch: pa.RecordBatch, custom_metadata: pa.KeyVal
378380def make_external_location_batch (
379381 schema : pa .Schema ,
380382 url : str ,
383+ sha256 : str | None = None ,
381384) -> tuple [pa .RecordBatch , pa .KeyValueMetadata ]:
382385 """Create a zero-row pointer batch for an externalized location.
383386
384387 Args:
385388 schema: The schema the pointer batch should conform to.
386389 url: The URL where the actual data resides.
390+ sha256: Optional hex-encoded SHA-256 of the stored bytes
391+ (post-compression). Included as ``vgi_rpc.location.sha256``
392+ metadata so consumers can verify data integrity on fetch.
387393
388394 Returns:
389395 A ``(batch, custom_metadata)`` tuple where *batch* is zero-row
390- and *custom_metadata* contains ``vgi_rpc.location``.
396+ and *custom_metadata* contains ``vgi_rpc.location`` (and
397+ optionally ``vgi_rpc.location.sha256``).
391398
392399 """
393400 batch = pa .RecordBatch .from_arrays (
394401 [pa .array ([], type = f .type ) for f in schema ],
395402 schema = schema ,
396403 )
397- custom_metadata = pa .KeyValueMetadata ({LOCATION_KEY : url .encode ()})
404+ meta : dict [bytes , bytes ] = {LOCATION_KEY : url .encode ()}
405+ if sha256 is not None :
406+ meta [LOCATION_SHA256_KEY ] = sha256 .encode ()
407+ custom_metadata = pa .KeyValueMetadata (meta )
398408 return batch , custom_metadata
399409
400410
@@ -478,13 +488,19 @@ def resolve_external_location(
478488
479489 t0 = time .monotonic ()
480490 try :
491+ # Extract expected SHA-256 from pointer metadata (optional — backward compatible)
492+ expected_sha256 : str | None = None
493+ sha256_bytes = custom_metadata .get (LOCATION_SHA256_KEY )
494+ if sha256_bytes is not None :
495+ expected_sha256 = sha256_bytes .decode () if isinstance (sha256_bytes , bytes ) else str (sha256_bytes )
496+
481497 retryer = Retrying (
482498 stop = stop_after_attempt (max_retries + 1 ),
483499 wait = wait_fixed (config .retry_delay_seconds ),
484500 retry = retry_if_exception_type (retry_types ),
485501 reraise = True ,
486502 )
487- result = retryer (_fetch_and_resolve , batch .schema , url , config , on_log , ipc_validation )
503+ result = retryer (_fetch_and_resolve , batch .schema , url , config , on_log , ipc_validation , expected_sha256 )
488504 elapsed_ms = (time .monotonic () - t0 ) * 1000
489505 if span is not None :
490506 span .set_attribute ("fetch_duration_ms" , elapsed_ms )
@@ -513,6 +529,7 @@ def _fetch_and_resolve(
513529 config : ExternalLocationConfig ,
514530 on_log : _OnLog ,
515531 ipc_validation : IpcValidation = IpcValidation .FULL ,
532+ expected_sha256 : str | None = None ,
516533) -> tuple [pa .RecordBatch , pa .KeyValueMetadata | None ]:
517534 """Fetch URL and extract the data batch from the IPC stream."""
518535 from vgi_rpc .rpc import _dispatch_log_or_error
@@ -521,6 +538,12 @@ def _fetch_and_resolve(
521538 data = fetch_url (url , config .fetch_config )
522539 elapsed_ms = (time .monotonic () - t0 ) * 1000
523540
541+ # Verify SHA-256 if present (data is already decompressed = raw IPC bytes)
542+ if expected_sha256 is not None :
543+ actual_sha256 = hashlib .sha256 (data ).hexdigest ()
544+ if actual_sha256 != expected_sha256 :
545+ raise RuntimeError (f"SHA-256 checksum mismatch for { url } : expected { expected_sha256 } , got { actual_sha256 } " )
546+
524547 reader = ValidatedReader (ipc .open_stream (BytesIO (data )), ipc_validation )
525548 data_batches : list [tuple [pa .RecordBatch , pa .KeyValueMetadata | None ]] = []
526549
@@ -617,6 +640,9 @@ def maybe_externalize_collector(
617640 ipc_bytes = buf .getvalue ()
618641 original_bytes : int | None = None
619642
643+ # Compute SHA-256 of the raw IPC bytes (pre-compression) for end-to-end verification
644+ data_sha256 = hashlib .sha256 (ipc_bytes ).hexdigest ()
645+
620646 content_encoding : str | None = None
621647 if config .compression is not None :
622648 import zstandard
@@ -629,14 +655,15 @@ def maybe_externalize_collector(
629655 ipc_bytes , out .output_schema , config .storage , content_encoding = content_encoding , original_bytes = original_bytes
630656 )
631657 _logger .debug (
632- "Batch externalized: %s (%d bytes, compressed=%s)" ,
658+ "Batch externalized: %s (%d bytes, compressed=%s, sha256=%s )" ,
633659 url ,
634660 len (ipc_bytes ),
635661 content_encoding is not None ,
662+ data_sha256 ,
636663 extra = {"url" : url , "size_bytes" : len (ipc_bytes ), "compressed" : content_encoding is not None },
637664 )
638665
639- pointer_batch , pointer_cm = make_external_location_batch (out .output_schema , url )
666+ pointer_batch , pointer_cm = make_external_location_batch (out .output_schema , url , sha256 = data_sha256 )
640667 return [(pointer_batch , pointer_cm )]
641668
642669
@@ -686,6 +713,9 @@ def maybe_externalize_batch(
686713 ipc_bytes = buf .getvalue ()
687714 original_bytes : int | None = None
688715
716+ # Compute SHA-256 of the raw IPC bytes (pre-compression) for end-to-end verification
717+ data_sha256 = hashlib .sha256 (ipc_bytes ).hexdigest ()
718+
689719 content_encoding : str | None = None
690720 if config .compression is not None :
691721 import zstandard
@@ -698,11 +728,12 @@ def maybe_externalize_batch(
698728 ipc_bytes , batch .schema , config .storage , content_encoding = content_encoding , original_bytes = original_bytes
699729 )
700730 _logger .debug (
701- "Batch externalized: %s (%d bytes, compressed=%s)" ,
731+ "Batch externalized: %s (%d bytes, compressed=%s, sha256=%s )" ,
702732 url ,
703733 len (ipc_bytes ),
704734 content_encoding is not None ,
735+ data_sha256 ,
705736 extra = {"url" : url , "size_bytes" : len (ipc_bytes ), "compressed" : content_encoding is not None },
706737 )
707738
708- return make_external_location_batch (batch .schema , url )
739+ return make_external_location_batch (batch .schema , url , sha256 = data_sha256 )
0 commit comments