From 6d92ed2e1ce7b0116b197849b565b27e76a60df9 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 13 May 2026 11:57:21 +0900 Subject: [PATCH 1/5] feat(client): resumable single-node (non-merkle) uploads (auto-resume from cached per-wave proofs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors PR #84's merkle-resume design for the regular payment path: persist each wave's per-chunk PaymentProof bytes to disk after batch_pay confirms, before the wave's PUT phase. If the upload dies mid-file (network flake, slow close-K, client crash, Ctrl-C), the next attempt loads the cached proofs and skips quote+pay for any chunk whose address matches the current encryption. Storage layout: /payments/single/_ via the new ant-core::data::client::cached_single module. Subdirectory keeps single-node and merkle caches from colliding on filename. Receipts expire after 24h to match QUOTE_MAX_AGE_SECS in ant-node. Threaded a resume_key: Option<&str> through batch_upload_chunks_with_events and upload_waves_single so callers that don't have a stable file identity (the direct batch API at the public surface) opt out by passing None. Wave-level append rather than full rewrite per chunk so the cost of caching scales with chunk count, not chunk count squared. Failure-mode tolerance: every save/load/delete is wrapped in try_* that logs but never bubbles up — a busted cache never blocks a real upload. Asked by Nic on PR #84 follow-up. --- ant-core/src/data/client/batch.rs | 82 +++- ant-core/src/data/client/cached_single.rs | 434 ++++++++++++++++++++++ ant-core/src/data/client/file.rs | 25 +- ant-core/src/data/client/mod.rs | 1 + 4 files changed, 531 insertions(+), 11 deletions(-) create mode 100644 ant-core/src/data/client/cached_single.rs diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 7f1fc89..1636d2c 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -352,7 +352,7 @@ impl Client { chunks: Vec, ) -> Result<(Vec, String, u128)> { let (addresses, storage, gas, _stats) = self - .batch_upload_chunks_with_events(chunks, None, 0, 0) + .batch_upload_chunks_with_events(chunks, None, 0, 0, None) .await?; Ok((addresses, storage, gas)) } @@ -363,12 +363,20 @@ impl Client { /// `stored_offset` is the number of chunks already stored in previous waves /// (so events report cumulative progress). `file_total` is the total chunk /// count across ALL waves (for the `total` field in events). + /// + /// When `resume_key` is `Some`, per-wave payment proofs are persisted + /// to `/payments/single/_` via + /// `crate::data::client::cached_single` so that a partial-upload + /// failure can be resumed on the next attempt without paying twice. + /// The caller is responsible for deleting the cache entry on full + /// success (typically `upload_with_options` in `file.rs`). pub async fn batch_upload_chunks_with_events( &self, chunks: Vec, progress: Option<&mpsc::Sender>, stored_offset: usize, file_total: usize, + resume_key: Option<&str>, ) -> Result<(Vec, String, u128, WaveAggregateStats)> { if chunks.is_empty() { return Ok(( @@ -387,6 +395,20 @@ impl Client { (current adaptive caps — quote: {quote_cap}, store: {store_cap})" ); + // Load any previously-cached single-node receipt for this + // upload. Each chunk whose address is in the cache will skip + // the quote + pay phases and have its `PaidChunk` constructed + // directly from the cached proof + fresh quoted peers. The + // caller is responsible for deleting the cache on full + // success; we only read here, never write the load result back. + let cached_proofs: HashMap> = match resume_key { + Some(key) => match crate::data::client::cached_single::try_load_for_file(key) { + Some((_, receipt)) => receipt.proofs, + None => HashMap::new(), + }, + None => HashMap::new(), + }; + let mut all_addresses = Vec::with_capacity(total_chunks); let mut seen_addresses: HashSet = HashSet::new(); @@ -492,15 +514,63 @@ impl Client { continue; } - info!( - "Wave {wave_num}/{wave_count}: paying for {} chunks", - prepared_chunks.len() - ); - let (paid_chunks, wave_storage, wave_gas) = self.batch_pay(prepared_chunks).await?; + // Split prepared chunks into "already paid in a previous + // attempt" (cached) and "needs payment" (fresh). Cached + // chunks build a `PaidChunk` from the cached proof + the + // freshly-quoted peers, bypassing the EVM transaction. + let mut needs_pay: Vec = Vec::with_capacity(prepared_chunks.len()); + let mut cached_paid: Vec = Vec::new(); + for prep in prepared_chunks { + if let Some(proof_bytes) = cached_proofs.get(&prep.address).cloned() { + cached_paid.push(PaidChunk { + content: prep.content, + address: prep.address, + quoted_peers: prep.quoted_peers, + proof_bytes, + }); + } else { + needs_pay.push(prep); + } + } + if !cached_paid.is_empty() { + info!( + "Wave {wave_num}/{wave_count}: reusing {} cached payment proofs", + cached_paid.len() + ); + } + + let (mut paid_chunks, wave_storage, wave_gas) = if needs_pay.is_empty() { + (Vec::new(), "0".to_string(), 0u128) + } else { + info!( + "Wave {wave_num}/{wave_count}: paying for {} chunks", + needs_pay.len() + ); + self.batch_pay(needs_pay).await? + }; if let Ok(cost) = wave_storage.parse::() { total_storage += cost; } total_gas = total_gas.saturating_add(wave_gas); + + // Persist the freshly-paid wave's proofs so a later + // failure can resume without re-paying. + if let Some(key) = resume_key { + if !paid_chunks.is_empty() { + let new_proofs: HashMap<[u8; 32], Vec> = paid_chunks + .iter() + .map(|pc| (pc.address, pc.proof_bytes.clone())) + .collect(); + crate::data::client::cached_single::try_append_wave( + key, + new_proofs, + &wave_storage, + wave_gas, + ); + } + } + + paid_chunks.extend(cached_paid); pending_store = Some(paid_chunks); } diff --git a/ant-core/src/data/client/cached_single.rs b/ant-core/src/data/client/cached_single.rs new file mode 100644 index 0000000..931ca60 --- /dev/null +++ b/ant-core/src/data/client/cached_single.rs @@ -0,0 +1,434 @@ +//! On-disk cache for single-node (non-merkle) chunk payment proofs. +//! +//! Why this exists +//! --------------- +//! Single-node uploads break the file into payment waves. Each wave is +//! one EVM transaction that produces N per-chunk payment proofs (one +//! per chunk in the wave). The proof bytes are what the storer needs +//! to accept a PUT — without them, the on-chain payment is "stranded": +//! the chain saw the tokens move but the client can no longer prove to +//! a storer that any specific chunk was paid for. +//! +//! Before this module, those proofs lived only in process memory. If +//! the upload died mid-file (network flake, residual close-K stress, +//! a Ctrl-C), every wave already paid for was unrecoverable and the +//! user had to re-quote and re-pay on the next attempt. +//! +//! This module persists the `(chunk_address, proof_bytes)` pair to +//! disk **immediately after each wave's `batch_pay` confirms**, before +//! the wave's PUT phase begins. On the next upload attempt for the same +//! source file, the cache is loaded and any chunk whose address matches +//! the current encryption skips quote+pay and goes straight to PUT. +//! +//! Lifecycle +//! --------- +//! * **append_wave** — called once per successfully paid wave, before +//! the PUT phase. Adds the wave's `(addr, proof_bytes)` entries to +//! the on-disk receipt and updates the cumulative cost figures. +//! * **load_for_file** — called once at the top of the upload. If a +//! non-expired cached receipt exists for the file, the proofs are +//! merged into the upload plan and the matching chunks skip quoting +//! and payment. +//! * **delete_for_file** — called after a fully successful upload to +//! remove the receipt so a future re-upload of the same path pays +//! anew. +//! * **cleanup_outdated** — called opportunistically inside +//! `load_for_file` to garbage-collect receipts past the expiry +//! window. +//! +//! Filename format +//! --------------- +//! Same as `cached_merkle`: `_` under +//! `/payments/single/`. The subdirectory keeps single-node +//! and merkle caches from colliding (they have different on-disk +//! schemas) and makes it easy for a user to wipe one without touching +//! the other. +//! +//! Expiry +//! ------ +//! On-chain quote receipts have a finite validity window +//! (`QUOTE_MAX_AGE_SECS` in `ant-node`, currently 24 h). After that, +//! storers reject the proof even if the file is otherwise resumable. +//! The cache uses a conservative 24 h expiry to match. +//! +//! Failure-mode tolerance +//! ---------------------- +//! All public-facing API (`try_*` variants) swallows IO and +//! serialization errors with a `warn!` log. A busted cache never +//! prevents a real upload — at worst the user re-pays. + +use crate::config; +use crate::error::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs::{self, DirEntry, File}; +use std::hash::{Hash, Hasher}; +use std::io::{BufReader, BufWriter}; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, info, warn}; + +/// Cached single-node receipts older than this are removed from disk. +/// +/// Conservative match for `QUOTE_MAX_AGE_SECS` in `ant-node` (24 h). +/// After that window, storers will reject the cached proof even if +/// the file is otherwise resumable, so keeping the cache wouldn't help. +const PAYMENT_EXPIRATION_SECS: u64 = 24 * 60 * 60; + +/// Subdirectory under the platform-appropriate data dir. +/// +/// `payments/single` rather than `payments/` directly so the merkle +/// cache (in `payments/`) and this cache cannot collide on filename. +const PAYMENTS_SUBDIR: &str = "payments/single"; + +/// On-disk schema for a single-node (non-merkle) upload receipt. +/// +/// Designed to be appended to: each successful wave adds its chunk +/// proofs to `proofs` and bumps the cumulative cost fields. The whole +/// file is rewritten on each append (the size is bounded by the chunk +/// count, so this is fine in practice — a 1 GB upload at 1 MB/chunk +/// gives ~1000 entries × ~1 KB proof ≈ 1 MB receipt file). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SingleNodeReceipt { + /// Per-chunk serialized `PaymentProof` bytes, keyed by content address. + pub proofs: HashMap<[u8; 32], Vec>, + /// Unix timestamp (seconds) the first wave was paid. Used for the + /// 24 h expiry check. + pub first_pay_timestamp: u64, + /// Cumulative storage cost in atto, summed across all paid waves. + pub storage_cost_atto: String, + /// Cumulative gas cost in wei, summed across all paid waves. + pub gas_cost_wei: u128, +} + +impl SingleNodeReceipt { + fn new(now_secs: u64) -> Self { + Self { + proofs: HashMap::new(), + first_pay_timestamp: now_secs, + storage_cost_atto: "0".to_string(), + gas_cost_wei: 0, + } + } +} + +fn payments_dir() -> Result { + let dir = config::data_dir()?.join(PAYMENTS_SUBDIR); + fs::create_dir_all(&dir)?; + Ok(dir) +} + +/// Short non-cryptographic hash of the source file path string, used +/// as the on-disk cache key. +/// +/// Same scheme as `cached_merkle::file_hash_key`. Collisions are +/// content-validated against the current encrypted chunk addresses +/// before being trusted. +fn file_hash_key(file_path: &str) -> String { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + file_path.hash(&mut hasher); + format!("{:016x}", hasher.finish()) +} + +fn receipt_path(dir: &Path, ts: u64, key: &str) -> PathBuf { + dir.join(format!("{ts}_{key}")) +} + +/// Append a wave's worth of paid-chunk proofs to the on-disk receipt. +/// +/// If no receipt exists yet for this file, one is created with the +/// current time as `first_pay_timestamp`. Otherwise the existing +/// file is loaded, extended with the new proofs, and rewritten. +pub fn append_wave( + file_path: &str, + new_proofs: HashMap<[u8; 32], Vec>, + wave_storage_cost_atto: &str, + wave_gas_cost_wei: u128, +) -> Result { + let dir = payments_dir()?; + let key = file_hash_key(file_path); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + // Find an existing receipt for this file (non-expired) and load + // it, or create a fresh one stamped with now(). + let (path, mut receipt) = match find_existing(&dir, &key)? { + Some((p, r)) => (p, r), + None => (receipt_path(&dir, now, &key), SingleNodeReceipt::new(now)), + }; + + receipt.proofs.extend(new_proofs); + if let (Ok(prev), Ok(add)) = ( + receipt.storage_cost_atto.parse::(), + wave_storage_cost_atto.parse::(), + ) { + receipt.storage_cost_atto = prev.saturating_add(add).to_string(); + } + receipt.gas_cost_wei = receipt.gas_cost_wei.saturating_add(wave_gas_cost_wei); + + write_receipt(&path, &receipt)?; + debug!( + "Appended {} proofs to single-node receipt for {file_path:?} ({})", + receipt.proofs.len(), + path.display() + ); + Ok(path) +} + +/// Best-effort `append_wave`. Logs on failure, returns nothing. +/// +/// Intended for the hot path: if we can't persist the receipt the +/// upload still proceeds, the user just loses resume capability for +/// that wave. +pub fn try_append_wave( + file_path: &str, + new_proofs: HashMap<[u8; 32], Vec>, + wave_storage_cost_atto: &str, + wave_gas_cost_wei: u128, +) { + if let Err(e) = append_wave( + file_path, + new_proofs, + wave_storage_cost_atto, + wave_gas_cost_wei, + ) { + warn!( + "Failed to cache single-node payment receipt for {file_path:?}: {e}. \ + Upload will proceed without resume support for this wave." + ); + } +} + +/// Load the cached single-node receipt for a source file path, if any. +/// +/// Side-effect: opportunistically removes expired receipts. +pub fn load_for_file(file_path: &str) -> Result> { + cleanup_outdated(); + let dir = payments_dir()?; + let key = file_hash_key(file_path); + find_existing(&dir, &key) +} + +/// Best-effort load. Logs and returns `None` on error. +pub fn try_load_for_file(file_path: &str) -> Option<(PathBuf, SingleNodeReceipt)> { + match load_for_file(file_path) { + Ok(opt) => opt, + Err(e) => { + warn!( + "Failed to look up cached single-node receipt for {file_path:?}: {e}. \ + Starting a fresh upload." + ); + None + } + } +} + +pub fn delete_for_file(file_path: &str) -> Result<()> { + let dir = payments_dir()?; + let key = file_hash_key(file_path); + if let Ok(read_dir) = fs::read_dir(&dir) { + for entry in read_dir.flatten() { + let path = entry.path(); + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name.contains(&key) { + let _ = fs::remove_file(&path); + debug!("Deleted cached single-node receipt {}", path.display()); + } + } + } + } + Ok(()) +} + +pub fn try_delete_for_file(file_path: &str) { + if let Err(e) = delete_for_file(file_path) { + warn!( + "Failed to delete cached single-node receipt for {file_path:?}: {e}. \ + Will be cleaned up after expiry." + ); + } +} + +/// Garbage-collect cached receipts past the expiry window. +pub fn cleanup_outdated() { + let Ok(dir) = payments_dir() else { + return; + }; + let Ok(read_dir) = fs::read_dir(&dir) else { + return; + }; + for entry in read_dir.flatten() { + if is_expired_entry(&entry) { + let path = entry.path(); + info!( + "Removing expired cached single-node payment file: {}", + path.display() + ); + let _ = fs::remove_file(path); + } + } +} + +fn find_existing(dir: &Path, key: &str) -> Result> { + let read_dir = match fs::read_dir(dir) { + Ok(rd) => rd, + Err(e) => { + debug!("Could not read payments dir {}: {e}", dir.display()); + return Ok(None); + } + }; + for entry in read_dir.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if !name.contains(key) { + continue; + } + if is_expired_filename(name) { + continue; + } + match read_receipt(&path) { + Ok(receipt) => { + info!( + "Found previous single-node upload attempt, resuming with \ + {} cached proofs from {}", + receipt.proofs.len(), + path.display() + ); + return Ok(Some((path, receipt))); + } + Err(e) => { + warn!( + "Cached single-node receipt at {} is unreadable ({e}). \ + Ignoring and starting a fresh upload.", + path.display() + ); + } + } + } + Ok(None) +} + +fn is_expired_entry(entry: &DirEntry) -> bool { + let path = entry.path(); + if !path.is_file() { + return false; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + return false; + }; + is_expired_filename(name) +} + +fn is_expired_filename(name: &str) -> bool { + let ts_str = match name.split_once('_') { + Some((ts, _)) => ts, + None => return false, + }; + let Ok(ts) = ts_str.parse::() else { + return false; + }; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + now > ts.saturating_add(PAYMENT_EXPIRATION_SECS) +} + +fn read_receipt(path: &Path) -> Result { + let handle = File::open(path)?; + let receipt: SingleNodeReceipt = rmp_serde::decode::from_read(BufReader::new(handle)) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + Ok(receipt) +} + +fn write_receipt(path: &Path, receipt: &SingleNodeReceipt) -> Result<()> { + let handle = File::create(path)?; + rmp_serde::encode::write(&mut BufWriter::new(handle), receipt) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn dummy_receipt(ts: u64) -> SingleNodeReceipt { + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([1u8; 32], vec![1, 2, 3]); + SingleNodeReceipt { + proofs, + first_pay_timestamp: ts, + storage_cost_atto: "100".to_string(), + gas_cost_wei: 200, + } + } + + #[test] + fn file_hash_key_is_stable() { + assert_eq!(file_hash_key("/tmp/a"), file_hash_key("/tmp/a")); + assert_ne!(file_hash_key("/tmp/a"), file_hash_key("/tmp/b")); + } + + #[test] + fn expired_filename_detected() { + let stale = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(PAYMENT_EXPIRATION_SECS + 60); + assert!(is_expired_filename(&format!("{stale}_abc"))); + + let fresh = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(60); + assert!(!is_expired_filename(&format!("{fresh}_abc"))); + } + + #[test] + fn malformed_filename_is_not_expired() { + assert!(!is_expired_filename("nonsense")); + assert!(!is_expired_filename("not_a_number_abc")); + } + + #[test] + fn roundtrip_save_load_delete() -> Result<()> { + let file_path = format!( + "/tmp/anselme-resumable-single-test-{}", + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() + ); + let mut wave1: HashMap<[u8; 32], Vec> = HashMap::new(); + wave1.insert([2u8; 32], vec![10, 20]); + let path1 = append_wave(&file_path, wave1, "50", 100)?; + assert!(path1.exists()); + + let mut wave2: HashMap<[u8; 32], Vec> = HashMap::new(); + wave2.insert([3u8; 32], vec![30, 40]); + let path2 = append_wave(&file_path, wave2, "70", 50)?; + // Same file path: should be the same on-disk path (we don't + // create a new timestamped file per wave). + assert_eq!(path1, path2); + + let (loaded_path, loaded) = load_for_file(&file_path)?.expect("receipt should load"); + assert_eq!(loaded_path, path1); + assert_eq!(loaded.proofs.len(), 2); + assert!(loaded.proofs.contains_key(&[2u8; 32])); + assert!(loaded.proofs.contains_key(&[3u8; 32])); + // Cumulative cost summed across waves. + assert_eq!(loaded.storage_cost_atto, "120"); + assert_eq!(loaded.gas_cost_wei, 150); + + delete_for_file(&file_path)?; + assert!(load_for_file(&file_path)?.is_none()); + Ok(()) + } + +} diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 70f0a88..5439742 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -1344,8 +1344,13 @@ impl Client { Ok(result) => result, Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { info!("Merkle needs more peers ({msg}), falling back to wave-batch"); - let (stored, sc, gc, fb_stats) = - self.upload_waves_single(&spill, progress.as_ref()).await?; + let (stored, sc, gc, fb_stats) = self + .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key)) + .await?; + // Full file success on the single-node fallback path: + // the cached single-node receipt (if any) is no longer + // needed. + crate::data::client::cached_single::try_delete_for_file(&file_path_key); return Ok(FileUploadResult { data_map, chunks_stored: stored, @@ -1371,8 +1376,11 @@ impl Client { crate::data::client::cached_merkle::try_delete_for_file(&file_path_key); (stored, PaymentMode::Merkle, sc, gc, stats) } else { - let (stored, sc, gc, stats) = - self.upload_waves_single(&spill, progress.as_ref()).await?; + let (stored, sc, gc, stats) = self + .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key)) + .await?; + // Full file success: drop any cached single-node receipt. + crate::data::client::cached_single::try_delete_for_file(&file_path_key); (stored, PaymentMode::Single, sc, gc, stats) }; @@ -1450,6 +1458,7 @@ impl Client { &self, spill: &ChunkSpill, progress: Option<&mpsc::Sender>, + resume_key: Option<&str>, ) -> Result<(usize, String, u128, WaveAggregateStats)> { let mut total_stored = 0usize; let mut total_storage = Amount::ZERO; @@ -1480,7 +1489,13 @@ impl Client { .await; } let (addresses, wave_storage, wave_gas, wave_stats) = self - .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks) + .batch_upload_chunks_with_events( + wave_data, + progress, + total_stored, + total_chunks, + resume_key, + ) .await?; total_stored += addresses.len(); if let Ok(cost) = wave_storage.parse::() { diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index 5e23ccb..55dd476 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -7,6 +7,7 @@ pub mod adaptive; pub mod batch; pub mod cache; pub(crate) mod cached_merkle; +pub(crate) mod cached_single; pub mod chunk; pub mod data; pub mod file; From 412c8f023066fc5c9f6a115021bcf7bceee2a778 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 13 May 2026 14:47:00 +0900 Subject: [PATCH 2/5] chore(cached-single): fmt + drop unused dummy_receipt test helper --- ant-core/src/data/client/cached_single.rs | 24 ++++++----------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/ant-core/src/data/client/cached_single.rs b/ant-core/src/data/client/cached_single.rs index 931ca60..04a541a 100644 --- a/ant-core/src/data/client/cached_single.rs +++ b/ant-core/src/data/client/cached_single.rs @@ -359,17 +359,6 @@ fn write_receipt(path: &Path, receipt: &SingleNodeReceipt) -> Result<()> { mod tests { use super::*; - fn dummy_receipt(ts: u64) -> SingleNodeReceipt { - let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); - proofs.insert([1u8; 32], vec![1, 2, 3]); - SingleNodeReceipt { - proofs, - first_pay_timestamp: ts, - storage_cost_atto: "100".to_string(), - gas_cost_wei: 200, - } - } - #[test] fn file_hash_key_is_stable() { assert_eq!(file_hash_key("/tmp/a"), file_hash_key("/tmp/a")); @@ -401,10 +390,11 @@ mod tests { #[test] fn roundtrip_save_load_delete() -> Result<()> { - let file_path = format!( - "/tmp/anselme-resumable-single-test-{}", - SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() - ); + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-resumable-single-test-{nanos}"); let mut wave1: HashMap<[u8; 32], Vec> = HashMap::new(); wave1.insert([2u8; 32], vec![10, 20]); let path1 = append_wave(&file_path, wave1, "50", 100)?; @@ -413,8 +403,7 @@ mod tests { let mut wave2: HashMap<[u8; 32], Vec> = HashMap::new(); wave2.insert([3u8; 32], vec![30, 40]); let path2 = append_wave(&file_path, wave2, "70", 50)?; - // Same file path: should be the same on-disk path (we don't - // create a new timestamped file per wave). + // Same file path: one on-disk receipt per upload, appended across waves. assert_eq!(path1, path2); let (loaded_path, loaded) = load_for_file(&file_path)?.expect("receipt should load"); @@ -430,5 +419,4 @@ mod tests { assert!(load_for_file(&file_path)?.is_none()); Ok(()) } - } From 75a01ecb8b87221812163b47b71c6787319c2b55 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 13 May 2026 17:32:51 +0900 Subject: [PATCH 3/5] fix(cached-single): payment-loss hardening of the on-disk receipt cache Adversarial review of #88 found a dozen money-loss paths in the single-node resume cache. This is the consolidated fix. Atomicity & concurrency - write_receipt_atomic: .tmp + BufWriter::into_inner check + flush + sync_all + rename + parent-dir fsync. Replaces the prior truncate-then-write that lost paid waves on crash or concurrent CLI. - ReceiptLock: per-key fs2 exclusive lock on a .lock sidecar guards append/drop/delete. Two concurrent ant-file-upload invocations on the same path now serialize at the receipt boundary instead of last-writer-wins on the proof set. - recover_orphaned_tmps: under the lock, recover the newest readable _.tmp left by a crash between sync_all and rename. Unlink older .tmp siblings (their content is a subset by the load-extend- write invariant) and any corrupt .tmp. - dedupe_canonical_receipts: pick newest canonical by ts-prefix and unlink older siblings. Prevents the non-deterministic resolution that first-match iteration would yield. Stale-proof handling (no remote text trust) - prune_locally_expired_proofs in batch.rs decodes each cached PaymentProof and drops entries whose quote.timestamp is past the storer-side QUOTE_MAX_AGE_SECS budget minus a 5-minute safety margin. Replaces the prior substring match on storer error text, which a Byzantine peer could spoof to force double-payment. - drop_proofs_for_file now takes &[(addr, expected_bytes)] and does compare-and-swap on the on-disk bytes. A concurrent re-pay's fresh proof is never clobbered by a stale prune list computed earlier. Schema, cost, and key stability - SingleNodeReceipt gains a version: u8 field. read_receipt rejects versions above SCHEMA_VERSION = 1 as unreadable. - storage_cost_atto now sums as ant_protocol::evm::Amount (U256) instead of u128, so very large uploads don't silently saturate. - find_existing's unreadable arm unlinks the corrupt file instead of letting it occupy the directory for 24 h. - file.rs canonicalizes the cache key (with display-string fallback) so ./foo and /abs/foo hit the same receipt. - batch.rs seeds total_storage/total_gas from the loaded receipt so the returned tally reflects this-file-total, not just freshly-paid. - delete_for_file also unlinks matching .tmp residue so a future upload of the same path can't resurrect a deleted receipt. Tests - 24 new tests in cached_single + 5 in batch covering atomicity, lock exclusion (2/8 reproducible regression without the lock), tmp recovery + dedupe, CAS-on-bytes drop, schema-version rejection, cost-overflow safety, canonical dedupe, unreadable auto-unlink, and concurrent drop+append consistency. - proof_is_safely_fresh tests for the local pre-flight expiry check. Verification: 280 lib tests pass, clippy -D warnings clean, fmt clean, release build clean. --- ant-core/src/data/client/batch.rs | 259 ++++- ant-core/src/data/client/cached_single.rs | 1173 ++++++++++++++++++++- ant-core/src/data/client/file.rs | 14 +- 3 files changed, 1416 insertions(+), 30 deletions(-) diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 1636d2c..7a072d5 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -14,7 +14,9 @@ use ant_protocol::evm::{ Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash, RewardsAddress, TxHash, }; -use ant_protocol::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment}; +use ant_protocol::payment::{ + deserialize_proof, serialize_single_node_proof, PaymentProof, SingleNodePayment, +}; use ant_protocol::transport::{MultiAddr, PeerId}; use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK}; use bytes::Bytes; @@ -401,20 +403,45 @@ impl Client { // directly from the cached proof + fresh quoted peers. The // caller is responsible for deleting the cache on full // success; we only read here, never write the load result back. - let cached_proofs: HashMap> = match resume_key { - Some(key) => match crate::data::client::cached_single::try_load_for_file(key) { - Some((_, receipt)) => receipt.proofs, - None => HashMap::new(), - }, - None => HashMap::new(), - }; + // + // Before trusting any cached proof, decode it locally and drop + // any whose quote.timestamp is past the storer's per-quote age + // budget (`QUOTE_MAX_AGE_SECS`, mirrored here as + // `CACHED_PROOF_EXPIRY_SECS`). The previous design trusted a + // substring match on remote error text, which a Byzantine + // storer could spoof to force double-payment. Local pre-flight + // is decision-pure: we never hand a doomed proof to a storer, + // and the cache is updated under our own lock with no remote + // text involved. + // `cached_cost` carries the cumulative cost from waves paid in + // a previous run so the returned tally reflects total spend on + // this file, not just freshly-paid chunks. Without this the + // user's "this upload cost X" message under-reports by the + // resumed waves' cost. + let (cached_proofs, cached_storage, cached_gas): (HashMap>, Amount, u128) = + match resume_key { + Some(key) => match crate::data::client::cached_single::try_load_for_file(key) { + Some((_, receipt)) => { + let prior_storage = receipt + .storage_cost_atto + .parse::() + .unwrap_or(Amount::ZERO); + let prior_gas = receipt.gas_cost_wei; + let kept = prune_locally_expired_proofs(key, receipt.proofs); + (kept, prior_storage, prior_gas) + } + None => (HashMap::new(), Amount::ZERO, 0u128), + }, + None => (HashMap::new(), Amount::ZERO, 0u128), + }; let mut all_addresses = Vec::with_capacity(total_chunks); let mut seen_addresses: HashSet = HashSet::new(); - // Accumulate costs across waves. - let mut total_storage = Amount::ZERO; - let mut total_gas: u128 = 0; + // Accumulate costs across waves, seeded with cumulative from + // any cached receipt loaded above. + let mut total_storage = cached_storage; + let mut total_gas: u128 = cached_gas; let mut agg_stats = WaveAggregateStats::default(); // Deduplicate chunks by content address. @@ -849,6 +876,121 @@ fn log_wave_summary(result: &WaveResult) { ); } +/// Safety margin subtracted from the storer's `QUOTE_MAX_AGE_SECS` (24 h) +/// when deciding to trust a cached proof. +/// +/// A proof whose oldest `quote.timestamp` is closer than this to the +/// storer's hard limit is treated as already-expired locally. The +/// margin covers (a) clock skew between client and storer, (b) the +/// in-flight time between the local check and the storer's +/// `validate_quote_timestamps` call, and (c) the time spent uploading +/// the chunk body. 5 minutes is generous for all three combined and +/// cheap: a wrongly-kept proof costs an extra retry round trip, a +/// wrongly-dropped proof costs one re-pay (cheap chunk). +const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300; + +/// Storer-side budget for a quote's age. Mirrors `QUOTE_MAX_AGE_SECS` +/// in `ant-node/src/payment/verifier.rs`. If this value drifts on the +/// node side, the worst case is the client either keeps proofs slightly +/// past the storer limit (forced re-pay on next retry, no money lost) +/// or drops them slightly early (one extra re-pay, no money lost). +/// Either way, no payment is double-spent or stranded. +const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60; + +/// Drop cached `proof_bytes` whose quote timestamps are too close to +/// the storer's expiry window to safely reuse. +/// +/// Why this exists +/// --------------- +/// The cache stores `(chunk_address, proof_bytes)` so a retried upload +/// can skip re-paying. The proof bytes embed `quote.timestamp`s. Each +/// storer evaluates each `quote.timestamp` independently against its +/// 24 h `QUOTE_MAX_AGE_SECS` budget, so close to the 24 h boundary +/// (or on a multi-day-old cache that survived past the receipt's outer +/// expiry for some reason) the storer rejects what the client still +/// believes is fresh. +/// +/// The previous design trusted a substring match on the storer's +/// returned error text to detect this and invalidate the cache after +/// the fact. That allowed a Byzantine storer to spoof the marker and +/// force the client to re-pay fresh proofs (double-payment). This +/// implementation is decision-pure: we decode the proof locally and +/// only re-use it if every embedded quote is comfortably within the +/// budget. No remote text involved. +/// +/// Side-effect: dropped entries are removed from the on-disk cache so +/// they don't reappear on the next load. +fn prune_locally_expired_proofs( + resume_key: &str, + proofs: HashMap<[u8; 32], Vec>, +) -> HashMap> { + let now = std::time::SystemTime::now(); + let max_safe_age = Duration::from_secs( + CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS), + ); + let mut kept: HashMap> = HashMap::with_capacity(proofs.len()); + // Pair each expired address with the EXACT bytes we observed at + // load time. The cache-side drop only removes the entry if those + // bytes still match, so a concurrent re-pay that refreshed the + // proof under its own lock is not clobbered (CAS semantics, fixes + // the TOCTOU between unlocked-load and locked-drop). + let mut expired: Vec<([u8; 32], Vec)> = Vec::new(); + for (addr, bytes) in proofs { + match deserialize_proof(&bytes) { + Ok((proof, _tx_hashes)) => { + if proof_is_safely_fresh(&proof, now, max_safe_age) { + kept.insert(addr, bytes); + } else { + expired.push((addr, bytes)); + } + } + Err(_) => { + // Unreadable cached entry: drop it so it doesn't sit + // here forever. The chunk will re-quote+re-pay. + expired.push((addr, bytes)); + } + } + } + if !expired.is_empty() { + info!( + "Pruning {} stale cached proofs (quote.timestamp past safe-reuse window) \ + before resume", + expired.len() + ); + crate::data::client::cached_single::try_drop_proofs_for_file(resume_key, &expired); + } + kept +} + +/// True iff every quote in the proof has a timestamp not older than +/// `now - max_safe_age` AND not implausibly in the future. A future +/// timestamp is a clock-skew signal: if the storer's clock matches +/// `now` and the quote claims a future time, the storer will reject +/// it with the future-skew branch of `validate_quote_timestamps`. We +/// drop those too rather than send them and lose the round trip. +fn proof_is_safely_fresh( + proof: &ProofOfPayment, + now: std::time::SystemTime, + max_safe_age: Duration, +) -> bool { + for (_peer, quote) in &proof.peer_quotes { + match now.duration_since(quote.timestamp) { + Ok(age) => { + if age > max_safe_age { + return false; + } + } + Err(_) => { + // Future-dated quote relative to local clock. Treat as + // unsafe to reuse — the storer's clock-skew tolerance + // is unknown to us here, so any forward drift is risky. + return false; + } + } + } + true +} + /// Compile-time assertions that batch method futures are Send. #[cfg(test)] mod send_assertions { @@ -982,4 +1124,99 @@ mod tests { let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap(); assert_eq!(paid.len(), 2); } + + // ---- prune_locally_expired_proofs ---- + // + // Build synthetic ProofOfPayment instances with controlled + // timestamps to verify the local pre-flight stale-proof check. + // This is the "no remote text trust" replacement for the prior + // substring-matching invalidation path. A bug here is a direct + // wallet leak (drop-too-eager = re-pay; keep-too-long = doomed + // PUT round trip but no payment loss). + + fn make_proof_with_timestamps(timestamps: &[std::time::SystemTime]) -> ProofOfPayment { + let peer_quotes = timestamps + .iter() + .enumerate() + .map(|(i, ts)| { + let quote = PaymentQuote { + content: xor_name::XorName([0u8; 32]), + timestamp: *ts, + price: Amount::from(1u64), + rewards_address: RewardsAddress::new([1u8; 20]), + pub_key: vec![], + signature: vec![], + }; + (EncodedPeerId::from([i as u8; 32]), quote) + }) + .collect(); + ProofOfPayment { peer_quotes } + } + + #[test] + fn proof_is_safely_fresh_accepts_recent_quote() { + let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]); + assert!(proof_is_safely_fresh( + &proof, + std::time::SystemTime::now(), + Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS), + )); + } + + #[test] + fn proof_is_safely_fresh_rejects_quote_past_safe_window() { + // 23h57m old: past the 24h - 5min safe-reuse threshold but + // still within the storer's hard 24h limit. The whole point + // of the safety margin is to drop these locally before + // burning a doomed PUT round trip. + let too_old = std::time::SystemTime::now() - Duration::from_secs(23 * 60 * 60 + 57 * 60); + let proof = make_proof_with_timestamps(&[too_old]); + let max_safe = Duration::from_secs( + CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS), + ); + assert!( + !proof_is_safely_fresh(&proof, std::time::SystemTime::now(), max_safe), + "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)" + ); + } + + #[test] + fn proof_is_safely_fresh_rejects_if_any_quote_is_stale() { + // The storer rejects on a per-quote basis: a proof with even + // one stale quote will fail on every retry. We must drop it. + let now = std::time::SystemTime::now(); + let fresh = now; + let stale = now - Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS); + let proof = make_proof_with_timestamps(&[fresh, fresh, stale, fresh]); + let max_safe = Duration::from_secs( + CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS), + ); + assert!(!proof_is_safely_fresh(&proof, now, max_safe)); + } + + #[test] + fn proof_is_safely_fresh_rejects_future_dated_quote() { + // Forward clock skew: the storer's future-skew tolerance is + // unknown here, so any forward-drifted quote is unsafe to + // re-use. We bail rather than burn a round trip. + let now = std::time::SystemTime::now(); + let future = now + Duration::from_secs(3600); + let proof = make_proof_with_timestamps(&[future]); + let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS); + assert!(!proof_is_safely_fresh(&proof, now, max_safe)); + } + + #[test] + fn proof_is_safely_fresh_empty_quotes_is_vacuously_safe() { + // No quotes = no storer-side timestamp check to fail. The + // proof is structurally invalid for other reasons, but + // this function's contract is "no stale timestamp present", + // which is trivially true for an empty list. + let proof = make_proof_with_timestamps(&[]); + assert!(proof_is_safely_fresh( + &proof, + std::time::SystemTime::now(), + Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS), + )); + } } diff --git a/ant-core/src/data/client/cached_single.rs b/ant-core/src/data/client/cached_single.rs index 04a541a..c433110 100644 --- a/ant-core/src/data/client/cached_single.rs +++ b/ant-core/src/data/client/cached_single.rs @@ -56,14 +56,33 @@ //! All public-facing API (`try_*` variants) swallows IO and //! serialization errors with a `warn!` log. A busted cache never //! prevents a real upload — at worst the user re-pays. +//! +//! Filesystem requirements +//! ----------------------- +//! The atomic-write and exclusive-lock guarantees assume the data +//! directory lives on a local filesystem with working `flock(2)` (or +//! `LockFileEx` on Windows). On Linux NFS, `flock` is emulated via +//! `fcntl` POSIX locks and may degrade to per-host advisory-only; +//! SMB shares mounted on Linux are similarly fragile. Two +//! concurrent CLI processes on different hosts both pointing at the +//! same shared `payments/single/` directory could therefore lose a +//! wave's proofs to a last-writer-wins race. The platform-default +//! data dir (`~/.local/share/autonomi`, `~/Library/Application +//! Support`, `%LOCALAPPDATA%`) is local, so this is a concern only +//! for users who explicitly redirect the data dir to network +//! storage. No code-level mitigation is planned; if this becomes a +//! reported problem the right fix is a per-host instance lock on +//! `payments/single/.exclusive` at the daemon level. use crate::config; use crate::error::Result; +use ant_protocol::evm::Amount; +use fs2::FileExt; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::fs::{self, DirEntry, File}; +use std::fs::{self, DirEntry, File, OpenOptions}; use std::hash::{Hash, Hasher}; -use std::io::{BufReader, BufWriter}; +use std::io::{BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{debug, info, warn}; @@ -90,6 +109,18 @@ const PAYMENTS_SUBDIR: &str = "payments/single"; /// gives ~1000 entries × ~1 KB proof ≈ 1 MB receipt file). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SingleNodeReceipt { + /// On-disk schema version. + /// + /// Bumped when fields change incompatibly. A version this client + /// doesn't recognize is treated as unreadable in `read_receipt` + /// (returns an error → `find_existing` logs + unlinks → next + /// attempt pays anew). `#[serde(default)]` so receipts written + /// before this field existed deserialize as `version: 0`, which + /// is still treated as known-current (the field's only purpose + /// is rejecting *future* schemas the running binary doesn't + /// understand, not migrating in-flight v0 receipts). + #[serde(default)] + pub version: u8, /// Per-chunk serialized `PaymentProof` bytes, keyed by content address. pub proofs: HashMap<[u8; 32], Vec>, /// Unix timestamp (seconds) the first wave was paid. Used for the @@ -101,9 +132,15 @@ pub struct SingleNodeReceipt { pub gas_cost_wei: u128, } +/// Highest schema version this binary knows how to read. Receipts +/// with a higher version are rejected (the user must have upgraded +/// and downgraded between attempts). +const SCHEMA_VERSION: u8 = 1; + impl SingleNodeReceipt { fn new(now_secs: u64) -> Self { Self { + version: SCHEMA_VERSION, proofs: HashMap::new(), first_pay_timestamp: now_secs, storage_cost_atto: "0".to_string(), @@ -139,6 +176,15 @@ fn receipt_path(dir: &Path, ts: u64, key: &str) -> PathBuf { /// If no receipt exists yet for this file, one is created with the /// current time as `first_pay_timestamp`. Otherwise the existing /// file is loaded, extended with the new proofs, and rewritten. +/// +/// Atomicity & concurrency +/// ----------------------- +/// The whole read-modify-write is guarded by an exclusive advisory +/// lock on a `.lock` sidecar so two concurrent invocations of the +/// CLI on the same file path serialize at this boundary rather than +/// last-writer-wins on the receipt content. The write itself is +/// `tmp + fsync + rename` so an interrupted write never leaves a +/// truncated or partial receipt on disk. pub fn append_wave( file_path: &str, new_proofs: HashMap<[u8; 32], Vec>, @@ -152,6 +198,16 @@ pub fn append_wave( .map(|d| d.as_secs()) .unwrap_or(0); + let _guard = ReceiptLock::acquire(&dir, &key)?; + + // Crash-recovery: if a previous `write_receipt_atomic` was killed + // between `sync_all(tmp)` and `rename(tmp -> canonical)`, the + // fully-flushed `.tmp` sibling holds the only copy of the newest + // wave's proofs. Rename it into place (or unlink it if it's + // corrupt) under our exclusive lock so the upcoming + // `find_existing` sees the recovered receipt. + recover_orphaned_tmps(&dir, &key); + // Find an existing receipt for this file (non-expired) and load // it, or create a fresh one stamped with now(). let (path, mut receipt) = match find_existing(&dir, &key)? { @@ -160,15 +216,21 @@ pub fn append_wave( }; receipt.proofs.extend(new_proofs); + // Sum costs as U256 (Amount). A wave's storage cost is wei-scale + // atto-token and a multi-TB upload's cumulative can plausibly + // overflow u128 (2^128 ≈ 3.4e38; a few thousand chunks at high + // gas pricing already reach 1e36 atto). Parsing failure on + // either side drops that wave's contribution rather than + // silently zeroing the running total. if let (Ok(prev), Ok(add)) = ( - receipt.storage_cost_atto.parse::(), - wave_storage_cost_atto.parse::(), + receipt.storage_cost_atto.parse::(), + wave_storage_cost_atto.parse::(), ) { receipt.storage_cost_atto = prev.saturating_add(add).to_string(); } receipt.gas_cost_wei = receipt.gas_cost_wei.saturating_add(wave_gas_cost_wei); - write_receipt(&path, &receipt)?; + write_receipt_atomic(&path, &receipt)?; debug!( "Appended {} proofs to single-node receipt for {file_path:?} ({})", receipt.proofs.len(), @@ -177,6 +239,100 @@ pub fn append_wave( Ok(path) } +/// Remove specific chunk proofs from the cached receipt for a file, +/// but only if the on-disk proof bytes still match the bytes the +/// caller observed at load time (compare-and-swap semantics). +/// +/// Why the "if unchanged" check matters +/// ------------------------------------ +/// `load_for_file` releases its exclusive lock before returning the +/// snapshot. Between the load and a subsequent drop, another process +/// (or the same process from a different code path) can lock, observe +/// that a chunk needs re-payment, pay it, and append a FRESH proof +/// for the same address. Without a content check, this drop would +/// clobber the fresh proof and strand the just-completed on-chain +/// payment — see test `toctou_load_then_drop_evicts_concurrently_refreshed_proof`. +/// +/// Caller passes `(address, expected_bytes)` pairs. Under the lock, +/// we drop the address only if its current on-disk bytes still equal +/// `expected_bytes`. If they differ, a concurrent re-pay won the race +/// and we leave the new entry intact. +/// +/// If the receipt becomes empty after the drop, the file is removed +/// from disk so a fresh upload starts cleanly. +pub fn drop_proofs_for_file(file_path: &str, expected: &[([u8; 32], Vec)]) -> Result<()> { + if expected.is_empty() { + return Ok(()); + } + let dir = payments_dir()?; + let key = file_hash_key(file_path); + let _guard = ReceiptLock::acquire(&dir, &key)?; + recover_orphaned_tmps(&dir, &key); + let Some((path, mut receipt)) = find_existing(&dir, &key)? else { + return Ok(()); + }; + let before = receipt.proofs.len(); + let mut refreshed = 0usize; + for (addr, expected_bytes) in expected { + match receipt.proofs.get(addr) { + Some(current) if current == expected_bytes => { + receipt.proofs.remove(addr); + } + Some(_) => { + refreshed += 1; + } + None => {} + } + } + if refreshed > 0 { + info!( + "Skipped dropping {refreshed} stale proofs whose bytes changed since load \ + (concurrent re-pay refreshed them — keeping the fresh proof)" + ); + } + let dropped = before.saturating_sub(receipt.proofs.len()); + if dropped == 0 { + return Ok(()); + } + if receipt.proofs.is_empty() { + if let Err(e) = fs::remove_file(&path) { + // remove_file failed (eg. EACCES). Fall back to writing + // the empty receipt atomically so the on-disk content is + // not stale — an empty proofs map still forces the next + // attempt to re-quote+re-pay every chunk, which is the + // intended outcome of "every cached proof is stale". + warn!( + "Could not remove emptied single-node receipt {} ({e}); \ + writing empty receipt instead", + path.display() + ); + write_receipt_atomic(&path, &receipt)?; + } else { + debug!( + "Dropped final {dropped} proofs from single-node receipt for {file_path:?}; \ + receipt removed" + ); + } + return Ok(()); + } + write_receipt_atomic(&path, &receipt)?; + debug!( + "Dropped {dropped} stale proofs from single-node receipt for {file_path:?} ({})", + path.display() + ); + Ok(()) +} + +/// Best-effort `drop_proofs_for_file`. Logs on failure. +pub fn try_drop_proofs_for_file(file_path: &str, expected: &[([u8; 32], Vec)]) { + if let Err(e) = drop_proofs_for_file(file_path, expected) { + warn!( + "Failed to drop stale proofs from cached single-node receipt for \ + {file_path:?}: {e}. Stale entries may be retried next attempt." + ); + } +} + /// Best-effort `append_wave`. Logs on failure, returns nothing. /// /// Intended for the hot path: if we can't persist the receipt the @@ -203,11 +359,16 @@ pub fn try_append_wave( /// Load the cached single-node receipt for a source file path, if any. /// -/// Side-effect: opportunistically removes expired receipts. +/// Side-effect: opportunistically removes expired receipts and recovers +/// orphaned `.tmp` files from a crashed previous write. pub fn load_for_file(file_path: &str) -> Result> { cleanup_outdated(); let dir = payments_dir()?; let key = file_hash_key(file_path); + // Recover under the same lock that append_wave/drop hold so we + // can't race them mid-rename. + let _guard = ReceiptLock::acquire(&dir, &key)?; + recover_orphaned_tmps(&dir, &key); find_existing(&dir, &key) } @@ -228,15 +389,29 @@ pub fn try_load_for_file(file_path: &str) -> Option<(PathBuf, SingleNodeReceipt) pub fn delete_for_file(file_path: &str) -> Result<()> { let dir = payments_dir()?; let key = file_hash_key(file_path); + // Lock so we don't race with an in-flight append_wave from another + // process. The lock sidecar itself is excluded from removal. + let _guard = ReceiptLock::acquire(&dir, &key)?; if let Ok(read_dir) = fs::read_dir(&dir) { for entry in read_dir.flatten() { let path = entry.path(); - if let Some(name) = path.file_name().and_then(|n| n.to_str()) { - if name.contains(&key) { - let _ = fs::remove_file(&path); - debug!("Deleted cached single-node receipt {}", path.display()); - } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + // Skip the lock sidecar (still held by `_guard`). + if name.ends_with(".lock") { + continue; + } + if !name.contains(&key) { + continue; } + // Also unlink matching `.tmp` siblings — otherwise an + // interrupted write left behind from this key's last + // crash would be promoted to canonical by + // `recover_orphaned_tmps` on the next upload of the same + // path, resurrecting a receipt the user explicitly deleted. + let _ = fs::remove_file(&path); + debug!("Deleted cached single-node receipt {}", path.display()); } } Ok(()) @@ -271,6 +446,138 @@ pub fn cleanup_outdated() { } } +/// Recover or unlink any `.tmp` sidecar for this key. +/// +/// A crash between `sync_all(tmp)` and `rename(tmp -> canonical)` in +/// `write_receipt_atomic` leaves a fully-flushed `.tmp` on disk. It's +/// the ONLY copy of the newest wave's proofs (the canonical file still +/// holds the pre-append state, or doesn't exist for a fresh upload). +/// Without recovery, `find_existing` skips it via the `.tmp` filter and +/// the wave's payment is silently lost on the next attempt. +/// +/// Recovery is safe to run only while the receipt lock is held — +/// otherwise we could race an in-flight `write_receipt_atomic` that +/// has just opened its own `.tmp`. +/// +/// For each `<...>_.tmp` we find: deserialize it. If valid, +/// rename to its canonical name (strip the `.tmp` suffix). If invalid, +/// unlink — it's a torn or zero-byte file from a kill mid-write. +fn recover_orphaned_tmps(dir: &Path, key: &str) { + let Ok(read_dir) = fs::read_dir(dir) else { + return; + }; + + // Collect candidates first so we can pick the newest one + // deterministically. Two-pass design covers the case where two + // separate crashes left two distinct `_.tmp` and + // `_.tmp` siblings: a naïve loop would rename BOTH + // into their own canonical names (different filenames, same + // key), and `find_existing` would non-deterministically pick + // one to load. That's a bounded but real payment-loss bug — + // proofs in the unloaded receipt are silently discarded. + let mut candidates: Vec<(u64, PathBuf, bool)> = Vec::new(); + for entry in read_dir.flatten() { + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if !name.ends_with(".tmp") || !name.contains(key) { + continue; + } + let ts = name + .split_once('_') + .and_then(|(ts, _)| ts.parse::().ok()) + .unwrap_or(0); + let readable = read_receipt(&path).is_ok(); + candidates.push((ts, path, readable)); + } + + // Sort descending by timestamp so the newest readable .tmp wins. + candidates.sort_by(|a, b| b.0.cmp(&a.0)); + + let mut recovered = false; + for (_, path, readable) in candidates { + if recovered || !readable { + // Either we already promoted a newer .tmp to canonical + // (this older one is superseded and would clobber it on + // rename), OR this .tmp is corrupt — either way, unlink. + let _ = fs::remove_file(&path); + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + let canonical_name = &name[..name.len() - ".tmp".len()]; + let canonical = path.with_file_name(canonical_name); + match fs::rename(&path, &canonical) { + Ok(()) => { + info!( + "Recovered orphaned receipt {} -> {}", + path.display(), + canonical.display() + ); + recovered = true; + } + Err(e) => warn!( + "Could not recover orphaned receipt {} ({e})", + path.display() + ), + } + } + + dedupe_canonical_receipts(dir, key); +} + +/// Keep at most one canonical receipt per key — the one with the +/// largest timestamp prefix. +/// +/// Multiple canonical receipts for the same key can arise if a +/// previous `append_wave` raced an aborted recovery (eg. before the +/// `.lock` was added). They can also arise transiently when +/// `recover_orphaned_tmps` promotes a `_.tmp` to +/// canonical while a `_` already exists from an earlier +/// successful write. Without deduping, `find_existing` returns the +/// first match from directory iteration — order is filesystem- +/// dependent, so half the proofs end up silently unreachable. +fn dedupe_canonical_receipts(dir: &Path, key: &str) { + let Ok(read_dir) = fs::read_dir(dir) else { + return; + }; + let mut canonicals: Vec<(u64, PathBuf)> = Vec::new(); + for entry in read_dir.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if name.ends_with(".tmp") || name.ends_with(".lock") { + continue; + } + if !name.contains(key) { + continue; + } + let ts = name + .split_once('_') + .and_then(|(ts, _)| ts.parse::().ok()) + .unwrap_or(0); + canonicals.push((ts, path)); + } + if canonicals.len() <= 1 { + return; + } + canonicals.sort_by(|a, b| b.0.cmp(&a.0)); + for (_, stale) in canonicals.iter().skip(1) { + warn!( + "Removing duplicate canonical receipt {} (older sibling of \ + a newer recovered receipt)", + stale.display() + ); + let _ = fs::remove_file(stale); + } +} + fn find_existing(dir: &Path, key: &str) -> Result> { let read_dir = match fs::read_dir(dir) { Ok(rd) => rd, @@ -287,6 +594,10 @@ fn find_existing(dir: &Path, key: &str) -> Result Result { + // Unlink so corrupt receipts can't accumulate on + // disk for up to 24 h (the filename-timestamp + // expiry doesn't reap them — only the canonical + // timestamp is checked, and a corrupt-but-recent + // receipt would be silently kept). Callers always + // hold the receipt lock when this runs, so unlinking + // here cannot race a concurrent rename. warn!( "Cached single-node receipt at {} is unreadable ({e}). \ - Ignoring and starting a fresh upload.", + Unlinking and starting a fresh upload.", path.display() ); + let _ = fs::remove_file(&path); } } } @@ -323,6 +642,11 @@ fn is_expired_entry(entry: &DirEntry) -> bool { let Some(name) = path.file_name().and_then(|n| n.to_str()) else { return false; }; + // Don't reap lock sidecars or in-flight tmp files via filename + // timestamp parsing — they aren't receipts. + if name.ends_with(".lock") || name.ends_with(".tmp") { + return false; + } is_expired_filename(name) } @@ -345,16 +669,134 @@ fn read_receipt(path: &Path) -> Result { let handle = File::open(path)?; let receipt: SingleNodeReceipt = rmp_serde::decode::from_read(BufReader::new(handle)) .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + if receipt.version > SCHEMA_VERSION { + // Future schema written by a newer binary the user downgraded + // from. Treat as unreadable so the caller unlinks it; the + // alternative (silently re-paying) is no worse, and unlink + // keeps the cache directory from accumulating poison. + return Err(crate::error::Error::Io(std::io::Error::other(format!( + "cached receipt has unknown schema version {} (this binary supports up to {SCHEMA_VERSION})", + receipt.version + )))); + } Ok(receipt) } -fn write_receipt(path: &Path, receipt: &SingleNodeReceipt) -> Result<()> { - let handle = File::create(path)?; - rmp_serde::encode::write(&mut BufWriter::new(handle), receipt) - .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; +/// Atomic write via `.tmp` + `fsync(tmp)` + `rename` + `fsync(dir)`. +/// +/// `File::create` (the prior implementation) truncated the destination +/// before writing, so a crash or concurrent reader mid-write saw a +/// zero-byte or partial receipt — payment proofs gone, on-chain payment +/// stranded. `rename(2)` is atomic on POSIX: either the new contents +/// replace the old or nothing changes. We then fsync the parent +/// directory so the rename itself is durable: without that, a power +/// cut after rename could leave the directory entry unflushed and the +/// next boot would see the old (now-stale) name. +/// +/// The BufWriter is held in a named local and explicitly +/// `into_inner()`-checked. The prior version constructed it inline as +/// the argument to `rmp_serde::encode::write`, which meant any flush +/// error during BufWriter drop was silently swallowed and a truncated +/// msgpack file could be renamed into place. +fn write_receipt_atomic(path: &Path, receipt: &SingleNodeReceipt) -> Result<()> { + let tmp_path = tmp_path_for(path); + { + let handle = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&tmp_path)?; + let mut writer = BufWriter::new(handle); + if let Err(e) = rmp_serde::encode::write(&mut writer, receipt) { + let _ = fs::remove_file(&tmp_path); + return Err(crate::error::Error::Io(std::io::Error::other( + e.to_string(), + ))); + } + let mut handle = writer.into_inner().map_err(|e| { + let _ = fs::remove_file(&tmp_path); + crate::error::Error::Io(std::io::Error::other(format!( + "BufWriter flush failed: {e}" + ))) + })?; + if let Err(e) = handle.flush() { + let _ = fs::remove_file(&tmp_path); + return Err(e.into()); + } + if let Err(e) = handle.sync_all() { + let _ = fs::remove_file(&tmp_path); + return Err(e.into()); + } + } + if let Err(e) = fs::rename(&tmp_path, path) { + let _ = fs::remove_file(&tmp_path); + return Err(e.into()); + } + // fsync the parent dir so the rename itself is durable on power + // loss. On macOS this requires opening the dir read-only; on Linux + // O_RDONLY is the only option that works for directories anyway. + // Best-effort: if the parent can't be fsync'd we still consider + // the rename committed, since most modern filesystems (ext4, + // APFS) journal directory metadata. + if let Some(parent) = path.parent() { + if let Ok(dir) = File::open(parent) { + let _ = dir.sync_all(); + } + } Ok(()) } +fn tmp_path_for(path: &Path) -> PathBuf { + let mut tmp = path.to_path_buf(); + let name = path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("receipt"); + tmp.set_file_name(format!("{name}.tmp")); + tmp +} + +/// Advisory exclusive file lock on a per-file sidecar. +/// +/// Two concurrent `ant file upload` invocations on the same source path +/// would otherwise race: both read the existing receipt, both extend +/// it with their own wave's proofs, both write — and the later write +/// silently loses the earlier wave's proofs. That stranded the on-chain +/// payment for the first wave. The lock makes `append_wave` and +/// `drop_proofs_for_file` mutually exclusive across processes. +/// +/// `fs2::FileExt::lock_exclusive` translates to `flock(2)` on Unix and +/// `LockFileEx` on Windows. The lock releases when the underlying +/// `File` is dropped. +struct ReceiptLock { + file: File, +} + +impl ReceiptLock { + fn acquire(dir: &Path, key: &str) -> Result { + let path = dir.join(format!("{key}.lock")); + let file = OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(false) + .open(&path)?; + file.lock_exclusive()?; + Ok(Self { file }) + } +} + +impl Drop for ReceiptLock { + fn drop(&mut self) { + // The sidecar file is left on disk by design: deleting it + // would race with another waiter that has already `open`-ed + // it but not yet `lock_exclusive`-ed it — they'd silently + // hold a lock on an unlinked inode and not actually exclude + // us. A stale empty `.lock` file is harmless. + let _ = FileExt::unlock(&self.file); + } +} + #[cfg(test)] mod tests { use super::*; @@ -388,6 +830,224 @@ mod tests { assert!(!is_expired_filename("not_a_number_abc")); } + #[test] + fn drop_proofs_removes_only_specified_addresses() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-drop-proofs-test-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([1u8; 32], vec![1]); + proofs.insert([2u8; 32], vec![2]); + proofs.insert([3u8; 32], vec![3]); + append_wave(&file_path, proofs, "30", 60)?; + + drop_proofs_for_file(&file_path, &[([2u8; 32], vec![2])])?; + + let (_, loaded) = load_for_file(&file_path)?.expect("receipt still present"); + assert_eq!(loaded.proofs.len(), 2); + assert!(loaded.proofs.contains_key(&[1u8; 32])); + assert!(!loaded.proofs.contains_key(&[2u8; 32])); + assert!(loaded.proofs.contains_key(&[3u8; 32])); + + delete_for_file(&file_path)?; + Ok(()) + } + + #[test] + fn drop_proofs_skips_drop_if_bytes_have_changed() -> Result<()> { + // CAS semantics: caller passes the bytes they observed; the + // drop is a no-op if a concurrent writer refreshed those + // bytes. This is the TOCTOU fix — without it, a stale-list + // computed at load time can clobber a fresh proof appended + // mid-prune. + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-drop-cas-test-{nanos}"); + let mut old: HashMap<[u8; 32], Vec> = HashMap::new(); + old.insert([5u8; 32], vec![0xAA]); + append_wave(&file_path, old, "10", 20)?; + + // Simulate a concurrent re-pay that refreshed the proof + // bytes for [5; 32] between load and drop. + let mut fresh: HashMap<[u8; 32], Vec> = HashMap::new(); + fresh.insert([5u8; 32], vec![0xBB]); + append_wave(&file_path, fresh, "0", 0)?; + + // Caller's stale view was vec![0xAA]; CAS must reject the drop. + drop_proofs_for_file(&file_path, &[([5u8; 32], vec![0xAA])])?; + + let (_, loaded) = load_for_file(&file_path)?.expect("receipt still present"); + assert_eq!( + loaded.proofs.get(&[5u8; 32]), + Some(&vec![0xBB]), + "fresh proof must NOT be clobbered by a CAS drop with stale bytes" + ); + + delete_for_file(&file_path)?; + Ok(()) + } + + #[test] + fn drop_proofs_removes_receipt_file_when_emptied() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-drop-empty-test-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([7u8; 32], vec![7]); + append_wave(&file_path, proofs, "10", 20)?; + + drop_proofs_for_file(&file_path, &[([7u8; 32], vec![7])])?; + + assert!( + load_for_file(&file_path)?.is_none(), + "empty receipt should be removed" + ); + Ok(()) + } + + #[test] + fn drop_proofs_unknown_address_is_noop() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-drop-noop-test-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([9u8; 32], vec![9]); + append_wave(&file_path, proofs, "10", 20)?; + + drop_proofs_for_file(&file_path, &[([42u8; 32], vec![42])])?; + + let (_, loaded) = load_for_file(&file_path)?.expect("receipt still present"); + assert_eq!(loaded.proofs.len(), 1); + assert!(loaded.proofs.contains_key(&[9u8; 32])); + + delete_for_file(&file_path)?; + Ok(()) + } + + #[test] + fn drop_proofs_on_missing_receipt_is_noop() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-drop-missing-test-{nanos}"); + drop_proofs_for_file(&file_path, &[([0u8; 32], vec![0])])?; + assert!(load_for_file(&file_path)?.is_none()); + Ok(()) + } + + #[test] + fn write_receipt_atomic_leaves_no_tmp_file() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-atomic-tmp-test-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([5u8; 32], vec![5]); + let receipt_path = append_wave(&file_path, proofs, "1", 2)?; + let tmp = tmp_path_for(&receipt_path); + assert!(!tmp.exists(), "tmp sibling must be cleaned up after rename"); + assert!(receipt_path.exists()); + delete_for_file(&file_path)?; + Ok(()) + } + + #[test] + fn find_existing_ignores_lock_and_tmp_sidecars() -> Result<()> { + // Two real receipts plus stray .lock and .tmp files in the + // same directory should not confuse find_existing or get + // GC'd by cleanup_outdated. Crash-during-write leaves .tmp + // siblings behind; concurrent locks leave .lock siblings. + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-sidecar-test-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([6u8; 32], vec![6]); + let receipt_path = append_wave(&file_path, proofs, "1", 2)?; + + // Drop a stray .tmp file alongside (simulates crash during + // a previous atomic write). + let dir = receipt_path.parent().expect("receipt has parent dir"); + let stray_tmp = dir.join("123456_deadbeef.tmp"); + fs::write(&stray_tmp, b"garbage")?; + + let (loaded_path, loaded) = load_for_file(&file_path)?.expect("receipt still loaded"); + assert_eq!(loaded_path, receipt_path); + assert_eq!(loaded.proofs.len(), 1); + assert!(stray_tmp.exists(), "stray tmp not auto-deleted by load"); + + // cleanup_outdated must not touch .tmp / .lock by mistake + // even if their filename prefix would parse as a long-ago + // timestamp. + cleanup_outdated(); + assert!(stray_tmp.exists()); + + let _ = fs::remove_file(&stray_tmp); + delete_for_file(&file_path)?; + Ok(()) + } + + #[test] + fn concurrent_append_waves_do_not_lose_proofs() -> Result<()> { + // Two threads appending to the SAME file_path. Without the + // exclusive lock + atomic rename, both threads read the old + // receipt, both extend with their own proofs, both write — + // last writer wins and the other thread's proofs are lost + // (== on-chain payment for those chunks is stranded). With + // the lock, both waves' proofs must survive. + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-concurrent-test-{nanos}"); + let fp1 = file_path.clone(); + let fp2 = file_path.clone(); + + let t1 = std::thread::spawn(move || { + let mut wave: HashMap<[u8; 32], Vec> = HashMap::new(); + for i in 0u8..32 { + wave.insert([i; 32], vec![i]); + } + append_wave(&fp1, wave, "10", 20) + }); + let t2 = std::thread::spawn(move || { + let mut wave: HashMap<[u8; 32], Vec> = HashMap::new(); + for i in 32u8..64 { + wave.insert([i; 32], vec![i]); + } + append_wave(&fp2, wave, "10", 20) + }); + t1.join().expect("thread1 panicked")?; + t2.join().expect("thread2 panicked")?; + + let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load"); + assert_eq!( + loaded.proofs.len(), + 64, + "all 64 proofs must survive concurrent appends" + ); + for i in 0u8..64 { + assert!( + loaded.proofs.contains_key(&[i; 32]), + "proof {i} lost in concurrent append" + ); + } + + delete_for_file(&file_path)?; + Ok(()) + } + #[test] fn roundtrip_save_load_delete() -> Result<()> { let nanos = SystemTime::now() @@ -419,4 +1079,485 @@ mod tests { assert!(load_for_file(&file_path)?.is_none()); Ok(()) } + + /// Stronger version of `concurrent_append_waves_do_not_lose_proofs`. + /// + /// The 2-thread test fails when the lock is removed but the failure + /// mode is an `Os { code: 2, NotFound }` from `rename(2)` colliding + /// on a fresh canonical path — not the silent proof loss the lock + /// is supposed to prevent. This pre-seeds a receipt so both + /// concurrent appenders run the read-modify-write path against the + /// same existing canonical file. Without the lock the last writer + /// overwrites the others and proofs are silently dropped while + /// every `append_wave` call returns `Ok`. + #[test] + fn concurrent_append_after_existing_receipt_keeps_all_proofs() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-concurrent-silent-test-{nanos}"); + + let mut seed: HashMap<[u8; 32], Vec> = HashMap::new(); + seed.insert([200u8; 32], vec![200]); + seed.insert([201u8; 32], vec![201]); + seed.insert([202u8; 32], vec![202]); + seed.insert([203u8; 32], vec![203]); + append_wave(&file_path, seed, "1", 1)?; + + const THREADS: u8 = 16; + const PER_THREAD: u8 = 8; + let handles: Vec<_> = (0..THREADS) + .map(|t| { + let fp = file_path.clone(); + std::thread::spawn(move || { + let mut wave: HashMap<[u8; 32], Vec> = HashMap::new(); + let base = t.wrapping_mul(PER_THREAD); + for i in 0..PER_THREAD { + let addr = base.wrapping_add(i); + wave.insert([addr; 32], vec![addr]); + } + append_wave(&fp, wave, "1", 1) + }) + }) + .collect(); + for h in handles { + h.join().expect("appender thread panicked")?; + } + + let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load"); + for k in [200u8, 201, 202, 203] { + assert!( + loaded.proofs.contains_key(&[k; 32]), + "seed proof {k} disappeared (silent loss)" + ); + } + for t in 0..THREADS { + for i in 0..PER_THREAD { + let addr = t.wrapping_mul(PER_THREAD).wrapping_add(i); + assert!( + loaded.proofs.contains_key(&[addr; 32]), + "appended proof {addr} disappeared (silent loss)" + ); + } + } + + delete_for_file(&file_path)?; + Ok(()) + } + + /// `delete_for_file` must also unlink matching `.tmp` siblings + /// for the deleted key — otherwise a crashed-write residue from + /// this same key would be promoted back to canonical by + /// `recover_orphaned_tmps` on the next upload of the same path, + /// resurrecting a receipt the user explicitly deleted. The + /// `.lock` sidecar must be preserved (we hold it). + #[test] + fn delete_for_file_unlinks_tmp_residue_and_keeps_lock() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-delete-skip-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([0x42; 32], vec![0x42]); + let receipt_path = append_wave(&file_path, proofs, "1", 1)?; + let dir = receipt_path.parent().expect("receipt has parent dir"); + let key = file_hash_key(&file_path); + + let stray_tmp = dir.join(format!("9999_{key}.tmp")); + fs::write(&stray_tmp, b"in-flight")?; + let lock_sidecar = dir.join(format!("{key}.lock")); + assert!(lock_sidecar.exists(), "append_wave should leave a .lock"); + + delete_for_file(&file_path)?; + + assert!(!receipt_path.exists(), "canonical receipt deleted"); + assert!( + !stray_tmp.exists(), + "delete_for_file must unlink .tmp residue (prevents zombie resurrection)" + ); + assert!( + lock_sidecar.exists(), + "delete_for_file must not delete the .lock sidecar" + ); + + let _ = fs::remove_file(&lock_sidecar); + Ok(()) + } + + /// A `.tmp` sibling holding a fully-valid serialized receipt is the + /// crash-mid-rename case: payment proofs are in the .tmp file but + /// the canonical name does not yet exist. `recover_orphaned_tmps` + /// (called from `load_for_file` and `append_wave`) must rename it + /// into place. Without recovery the wave's payment is silently lost. + #[test] + fn orphaned_tmp_with_valid_receipt_is_recovered() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-orphan-recover-{nanos}"); + let key = file_hash_key(&file_path); + let dir = payments_dir()?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let canonical = receipt_path(&dir, now, &key); + let tmp = tmp_path_for(&canonical); + + let mut r = SingleNodeReceipt::new(now); + r.proofs.insert([0xEE; 32], vec![0xEE, 0xEF]); + r.storage_cost_atto = "13".into(); + r.gas_cost_wei = 7; + let handle = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&tmp)?; + rmp_serde::encode::write(&mut BufWriter::new(handle), &r) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + assert!(tmp.exists()); + assert!(!canonical.exists()); + + let (loaded_path, loaded) = load_for_file(&file_path)?.expect("orphan recovered"); + assert!( + loaded.proofs.contains_key(&[0xEE; 32]), + "recovered proof bytes lost" + ); + assert!( + !loaded_path.to_string_lossy().ends_with(".tmp"), + "loaded path should be canonical, not .tmp" + ); + assert!(loaded_path.exists()); + assert!(!tmp.exists(), "orphan .tmp should have been renamed away"); + + delete_for_file(&file_path)?; + let _ = fs::remove_file(dir.join(format!("{key}.lock"))); + Ok(()) + } + + /// A `.tmp` sibling holding garbage is the crash-mid-write case: + /// the write was interrupted before `sync_all`. `recover_orphaned_tmps` + /// must unlink it rather than rename the corrupt bytes onto the + /// canonical path. + #[test] + fn orphaned_tmp_with_garbage_is_unlinked() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-orphan-unlink-{nanos}"); + let key = file_hash_key(&file_path); + let dir = payments_dir()?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let canonical = receipt_path(&dir, now, &key); + let tmp = tmp_path_for(&canonical); + fs::write(&tmp, b"not valid msgpack")?; + assert!(tmp.exists()); + + let result = load_for_file(&file_path)?; + assert!(result.is_none(), "no usable receipt should be present"); + assert!( + !tmp.exists(), + "garbage orphan .tmp should have been unlinked" + ); + assert!( + !canonical.exists(), + "garbage must not be renamed to canonical" + ); + + let _ = fs::remove_file(dir.join(format!("{key}.lock"))); + Ok(()) + } + + /// Atomic-write proof: a torn `.tmp` lying around must + /// never replace the live canonical receipt. The original + /// `write_receipt_atomic_leaves_no_tmp_file` test is theatre + /// (asserts cleanup not atomicity); this one fails if the write + /// path ever regresses to truncate-in-place. + #[test] + fn write_receipt_atomic_preserves_existing_on_torn_tmp() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-atomic-preserve-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([0xAA; 32], vec![0xAA]); + let canonical = append_wave(&file_path, proofs, "10", 20)?; + let canonical_bytes_before = fs::read(&canonical)?; + + // Plant a torn .tmp (zero-byte): simulates kill between open + // and write. recover_orphaned_tmps must unlink it and NOT + // rename it over canonical. + let tmp = tmp_path_for(&canonical); + fs::write(&tmp, b"")?; + + // Force recovery by reloading. + let (_, loaded) = load_for_file(&file_path)?.expect("canonical preserved"); + assert_eq!(loaded.proofs.len(), 1); + assert!(loaded.proofs.contains_key(&[0xAA; 32])); + assert!(!tmp.exists(), "torn .tmp unlinked"); + assert_eq!( + fs::read(&canonical)?, + canonical_bytes_before, + "canonical bytes unchanged by torn .tmp recovery" + ); + + delete_for_file(&file_path)?; + Ok(()) + } + + /// Mixed drop + append concurrency: 8 threads alternating drops + /// of one address and appends of another. The CAS-on-bytes drop + /// + exclusive lock must keep every appended proof reachable. + #[test] + fn concurrent_drop_and_append_keep_consistent_state() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-drop-append-concurrent-{nanos}"); + + // Seed with one proof for address [99; 32] so the dropper + // has something to remove. + let mut seed: HashMap<[u8; 32], Vec> = HashMap::new(); + seed.insert([99u8; 32], vec![99]); + append_wave(&file_path, seed, "1", 1)?; + + let mut handles = Vec::new(); + for i in 0u8..8 { + let fp = file_path.clone(); + handles.push(std::thread::spawn(move || -> Result<()> { + if i % 2 == 0 { + // Even thread: append a fresh proof at address [i; 32]. + let mut wave: HashMap<[u8; 32], Vec> = HashMap::new(); + wave.insert([i; 32], vec![i, i, i]); + append_wave(&fp, wave, "1", 1)?; + } else { + // Odd thread: try to drop [99; 32]. CAS expected + // bytes match seed, so the first to win removes + // it; later attempts no-op. + drop_proofs_for_file(&fp, &[([99u8; 32], vec![99])])?; + } + Ok(()) + })); + } + for h in handles { + h.join().expect("thread panicked")?; + } + + // Every appended even-index proof must be present. + if let Some((_, loaded)) = load_for_file(&file_path)? { + for i in (0u8..8).step_by(2) { + assert!( + loaded.proofs.contains_key(&[i; 32]), + "appended proof {i} must survive concurrent drop+append" + ); + assert_eq!(loaded.proofs.get(&[i; 32]), Some(&vec![i, i, i])); + } + } else { + // Edge case: all drops ran before any append AND the seed + // [99; 32] dropper emptied the receipt before the + // appenders re-created it. With our atomic ordering this + // shouldn't happen — assert it doesn't. + panic!("receipt should still exist with all appended proofs"); + } + + delete_for_file(&file_path)?; + Ok(()) + } + + /// Cost overflow safety: two waves each contributing nearly + /// u128::MAX/1 atto must sum without silently dropping the + /// overflow contribution. Pre-fix this saturated; with U256 sums + /// the result is exact. + #[test] + fn wave_cost_above_u128_max_does_not_silently_drop_cumulative() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-cost-overflow-{nanos}"); + + // 2^127 — fits in u128. Sum of two = 2^128, which overflows + // u128 but is exact in U256. + let near_half_max = "170141183460469231731687303715884105728"; // 2^127 + let mut w1: HashMap<[u8; 32], Vec> = HashMap::new(); + w1.insert([1u8; 32], vec![1]); + append_wave(&file_path, w1, near_half_max, 0)?; + + let mut w2: HashMap<[u8; 32], Vec> = HashMap::new(); + w2.insert([2u8; 32], vec![2]); + append_wave(&file_path, w2, near_half_max, 0)?; + + let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load"); + // Expected: 2 * 2^127 = 2^128. + let expected = "340282366920938463463374607431768211456"; + assert_eq!( + loaded.storage_cost_atto, expected, + "cumulative cost must NOT silently saturate at u128::MAX" + ); + + delete_for_file(&file_path)?; + Ok(()) + } + + /// cleanup_outdated must skip .tmp siblings even when their + /// filename timestamp prefix would parse as ancient. Otherwise + /// an in-flight write's .tmp would get reaped mid-flight. + #[test] + fn cleanup_outdated_skips_tmp_even_with_ancient_prefix() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-cleanup-tmp-skip-{nanos}"); + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([0xAA; 32], vec![0xAA]); + append_wave(&file_path, proofs, "1", 1)?; + + let dir = payments_dir()?; + let key = file_hash_key(&file_path); + // Year 1970 + 1 second. + let ancient_tmp = dir.join(format!("1_{key}.tmp")); + fs::write(&ancient_tmp, b"in-flight")?; + + cleanup_outdated(); + + assert!( + ancient_tmp.exists(), + "cleanup_outdated must not reap .tmp by ancient timestamp prefix" + ); + + let _ = fs::remove_file(&ancient_tmp); + delete_for_file(&file_path)?; + Ok(()) + } + + /// Duplicate canonical receipts (eg. residue from a buggier + /// earlier version, or a `_.tmp` recovered over a + /// pre-existing `_` canonical) must be deduped: + /// newest timestamp wins, older are unlinked. + #[test] + fn duplicate_canonical_receipts_are_deduplicated_keeping_newest() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-dedupe-canonical-{nanos}"); + let dir = payments_dir()?; + let key = file_hash_key(&file_path); + + // Use recent timestamps so a concurrent test's + // `cleanup_outdated` (which walks the shared payments dir + // unfiltered by key) doesn't reap our hand-written receipts + // before the dedup runs. + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let old_ts = now.saturating_sub(120); + let new_ts = now.saturating_sub(60); + let old_path = dir.join(format!("{old_ts}_{key}")); + let new_path = dir.join(format!("{new_ts}_{key}")); + let mut old = SingleNodeReceipt::new(old_ts); + old.proofs.insert([1u8; 32], vec![1]); + let mut new = SingleNodeReceipt::new(new_ts); + new.proofs.insert([2u8; 32], vec![2]); + write_receipt_atomic(&old_path, &old)?; + write_receipt_atomic(&new_path, &new)?; + assert!(old_path.exists() && new_path.exists()); + + // Trigger dedupe via a recovery-pass entry point. + let _guard = ReceiptLock::acquire(&dir, &key)?; + dedupe_canonical_receipts(&dir, &key); + drop(_guard); + + assert!( + !old_path.exists(), + "older canonical receipt must be unlinked" + ); + assert!(new_path.exists(), "newer canonical receipt must survive"); + + delete_for_file(&file_path)?; + Ok(()) + } + + /// An unreadable canonical receipt (corrupt msgpack) must be + /// unlinked, not left to occupy the directory for up to 24 h. + /// Pre-fix the file just got logged as "unreadable" and skipped. + #[test] + fn unreadable_canonical_receipt_is_unlinked_by_find_existing() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-unreadable-canonical-{nanos}"); + let dir = payments_dir()?; + let key = file_hash_key(&file_path); + + // Recent timestamp so is_expired_filename returns false. + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let canonical = dir.join(format!("{now}_{key}")); + fs::write(&canonical, b"this is not msgpack")?; + assert!(canonical.exists()); + + // load_for_file -> find_existing must unlink the corrupt file. + let result = load_for_file(&file_path)?; + assert!(result.is_none(), "no usable receipt"); + assert!( + !canonical.exists(), + "corrupt canonical receipt should be unlinked, not left for 24 h" + ); + + Ok(()) + } + + /// A receipt written by a future schema version (eg. user + /// downgraded the binary between attempts) must be treated as + /// unreadable so the corruption-unlink path kicks in. + #[test] + fn future_schema_version_is_treated_as_unreadable() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-future-schema-{nanos}"); + let dir = payments_dir()?; + let key = file_hash_key(&file_path); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let canonical = dir.join(format!("{now}_{key}")); + let receipt = SingleNodeReceipt { + version: SCHEMA_VERSION.saturating_add(1), + proofs: { + let mut m: HashMap<[u8; 32], Vec> = HashMap::new(); + m.insert([1u8; 32], vec![1]); + m + }, + first_pay_timestamp: now, + storage_cost_atto: "10".to_string(), + gas_cost_wei: 20, + }; + write_receipt_atomic(&canonical, &receipt)?; + assert!(canonical.exists()); + + let result = load_for_file(&file_path)?; + assert!(result.is_none(), "future schema must be rejected"); + assert!(!canonical.exists(), "rejected receipt must be unlinked"); + + Ok(()) + } } diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 5439742..df7b11e 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -1285,9 +1285,17 @@ impl Client { // // For the merkle path, attempt to resume from a cached // receipt before paying again. The cache is keyed by the - // source file path; a successful upload deletes the cache so - // a subsequent re-upload of the same path will pay anew. - let file_path_key = path.display().to_string(); + // CANONICAL source path so `./foo`, `/abs/foo`, and any + // symlink alias all resolve to the same cache entry — a + // crash-and-retry from a different cwd or via a different + // alias still hits the receipt. Canonicalize may fail (the + // file could have been moved between phase 1 and here); we + // fall back to the display string in that case, which + // preserves pre-fix behaviour rather than dropping cache + // resume entirely. + let file_path_key = std::fs::canonicalize(path) + .map(|p| p.display().to_string()) + .unwrap_or_else(|_| path.display().to_string()); let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self .should_use_merkle(chunk_count, mode) { From 45d287b5e4a52e0a35e5077d7761125a3a48e893 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 13 May 2026 17:35:51 +0900 Subject: [PATCH 4/5] fix(cached-single): satisfy clippy::unnecessary_sort_by on rust 1.95 CI uses Rust 1.95 which promotes clippy::unnecessary_sort_by to deny; local was 1.94 and didn't fire. Rewrite the two descending-sort calls in recover_orphaned_tmps and dedupe_canonical_receipts to use sort_by_key + std::cmp::Reverse (the lint's suggested form. EOF ) --- ant-core/src/data/client/cached_single.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ant-core/src/data/client/cached_single.rs b/ant-core/src/data/client/cached_single.rs index c433110..e576917 100644 --- a/ant-core/src/data/client/cached_single.rs +++ b/ant-core/src/data/client/cached_single.rs @@ -493,7 +493,7 @@ fn recover_orphaned_tmps(dir: &Path, key: &str) { } // Sort descending by timestamp so the newest readable .tmp wins. - candidates.sort_by(|a, b| b.0.cmp(&a.0)); + candidates.sort_by_key(|c| std::cmp::Reverse(c.0)); let mut recovered = false; for (_, path, readable) in candidates { @@ -567,7 +567,7 @@ fn dedupe_canonical_receipts(dir: &Path, key: &str) { if canonicals.len() <= 1 { return; } - canonicals.sort_by(|a, b| b.0.cmp(&a.0)); + canonicals.sort_by_key(|c| std::cmp::Reverse(c.0)); for (_, stale) in canonicals.iter().skip(1) { warn!( "Removing duplicate canonical receipt {} (older sibling of \ From e9a93f6eaf27a59cb16857185ab4da57facb3690 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 13 May 2026 18:02:39 +0900 Subject: [PATCH 5/5] fix(cached-single): 4 payment-loss bugs found by codex high in final review 1. Receipt filename TTL was keyed on the FIRST wave's timestamp. A wave paid at T0+23h50m got dropped wholesale at T0+24h via cleanup_outdated even though the late wave's proof was only 10 minutes old. Fix: rotate the canonical filename to _ on every successful append_wave, so the on-disk TTL tracks "time since most recent paid wave" instead of "time since first wave". The receipt survives as long as it keeps being used; stale individual proofs are still pruned by the per-quote.timestamp check in batch.rs. 2. proof_is_safely_fresh rejected ANY future-dated quote, but the ant-node verifier tolerates up to QUOTE_FUTURE_SKEW_TOLERANCE_SECS = 300s of forward skew. A client clock 60s slow would prune perfectly fresh proofs and force re-payment. Fix: mirror the 300s tolerance in CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS and thread max_future_skew through proof_is_safely_fresh. 3. file_hash_key used std::collections::hash_map::DefaultHasher whose output is explicitly NOT stable across rustc releases. User pays on binary A, upgrades within 24h, retries on binary B, cache miss, re-pay. Fix: BLAKE3 of the canonical path string, truncated to 128 bits. Adds blake3 = "1" to ant-core. 4. dedupe_canonical_receipts was content-blind: it unlinked older siblings purely by timestamp without merging their proofs. Pre-existing duplicates from buggier earlier binaries or manual file recovery could hold proofs only in the older sibling. Blind unlink stranded those payments. Fix: union all readable siblings' proofs into the newest, sum costs, take min(first_pay_timestamp), atomically rewrite the winner, then unlink the rest. 3 new tests: - file_hash_key_uses_stable_digest_across_invocations pins the expected BLAKE3 digest so a future regression to a non-stable hash fails loudly. - append_wave_rotates_filename_so_late_waves_dont_age_out verifies the canonical filename's timestamp tracks the latest wave (not the first) and that proofs survive the rotation. - duplicate_canonical_receipts_are_merged_then_older_unlinked replaces the prior lossy test, asserts older sibling's proof is merged into the winner and costs are summed correctly. Verification: 283 lib tests pass (was 280; +3), clippy -D warnings clean, fmt clean, release build clean. --- Cargo.lock | 1 + ant-core/Cargo.toml | 1 + ant-core/src/data/client/batch.rs | 90 +++++-- ant-core/src/data/client/cached_single.rs | 296 +++++++++++++++++++--- 4 files changed, 327 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5b5246..506454e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -857,6 +857,7 @@ dependencies = [ "anyhow", "async-stream", "axum", + "blake3", "bytes", "flate2", "fs2", diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index 073b354..4016c48 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -12,6 +12,7 @@ readme = "../README.md" async-stream = "0.3" axum = "0.8" flate2 = "1" +blake3 = "1" fs2 = "0.4" futures-core = "0.3" futures-util = "0.3" diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 7a072d5..6221f25 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -897,6 +897,18 @@ const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300; /// Either way, no payment is double-spent or stranded. const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60; +/// How far a cached quote's `timestamp` may be in the future before we +/// classify it as too-skewed-to-trust and prune. +/// +/// Mirrors `QUOTE_FUTURE_SKEW_TOLERANCE_SECS = 300` in +/// `ant-node/src/payment/verifier.rs`. If the client's clock runs +/// slow relative to the storer that issued the quote, a perfectly +/// valid proof can appear future-dated to the client — rejecting any +/// forward drift would re-pay those chunks on every retry. Allow the +/// same 5-minute window the storer does so the client and node agree +/// on which proofs are fresh. +const CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300; + /// Drop cached `proof_bytes` whose quote timestamps are too close to /// the storer's expiry window to safely reuse. /// @@ -928,6 +940,7 @@ fn prune_locally_expired_proofs( let max_safe_age = Duration::from_secs( CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS), ); + let max_future_skew = Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS); let mut kept: HashMap> = HashMap::with_capacity(proofs.len()); // Pair each expired address with the EXACT bytes we observed at // load time. The cache-side drop only removes the entry if those @@ -938,7 +951,7 @@ fn prune_locally_expired_proofs( for (addr, bytes) in proofs { match deserialize_proof(&bytes) { Ok((proof, _tx_hashes)) => { - if proof_is_safely_fresh(&proof, now, max_safe_age) { + if proof_is_safely_fresh(&proof, now, max_safe_age, max_future_skew) { kept.insert(addr, bytes); } else { expired.push((addr, bytes)); @@ -963,15 +976,16 @@ fn prune_locally_expired_proofs( } /// True iff every quote in the proof has a timestamp not older than -/// `now - max_safe_age` AND not implausibly in the future. A future -/// timestamp is a clock-skew signal: if the storer's clock matches -/// `now` and the quote claims a future time, the storer will reject -/// it with the future-skew branch of `validate_quote_timestamps`. We -/// drop those too rather than send them and lose the round trip. +/// `now - max_safe_age` AND not further in the future than +/// `max_future_skew`. The forward-skew check mirrors the storer's +/// `QUOTE_FUTURE_SKEW_TOLERANCE_SECS` (300s) so a slow-running client +/// clock doesn't cause us to wrongly prune perfectly fresh proofs +/// that the storer would still accept. fn proof_is_safely_fresh( proof: &ProofOfPayment, now: std::time::SystemTime, max_safe_age: Duration, + max_future_skew: Duration, ) -> bool { for (_peer, quote) in &proof.peer_quotes { match now.duration_since(quote.timestamp) { @@ -980,11 +994,10 @@ fn proof_is_safely_fresh( return false; } } - Err(_) => { - // Future-dated quote relative to local clock. Treat as - // unsafe to reuse — the storer's clock-skew tolerance - // is unknown to us here, so any forward drift is risky. - return false; + Err(future) => { + if future.duration() > max_future_skew { + return false; + } } } } @@ -1153,6 +1166,10 @@ mod tests { ProofOfPayment { peer_quotes } } + fn default_max_future_skew() -> Duration { + Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS) + } + #[test] fn proof_is_safely_fresh_accepts_recent_quote() { let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]); @@ -1160,6 +1177,7 @@ mod tests { &proof, std::time::SystemTime::now(), Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS), + default_max_future_skew(), )); } @@ -1175,7 +1193,12 @@ mod tests { CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS), ); assert!( - !proof_is_safely_fresh(&proof, std::time::SystemTime::now(), max_safe), + !proof_is_safely_fresh( + &proof, + std::time::SystemTime::now(), + max_safe, + default_max_future_skew(), + ), "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)" ); } @@ -1191,19 +1214,45 @@ mod tests { let max_safe = Duration::from_secs( CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS), ); - assert!(!proof_is_safely_fresh(&proof, now, max_safe)); + assert!(!proof_is_safely_fresh( + &proof, + now, + max_safe, + default_max_future_skew(), + )); + } + + #[test] + fn proof_is_safely_fresh_accepts_slight_future_skew_within_node_tolerance() { + // Client clock 60s slow. Quote claims 60s in the future of + // our local view. Node tolerates 300s forward skew, so the + // storer would accept this quote — we must too, or we'd + // wrongly prune fresh proofs and force re-payment. + let now = std::time::SystemTime::now(); + let slight_future = now + Duration::from_secs(60); + let proof = make_proof_with_timestamps(&[slight_future]); + let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS); + assert!( + proof_is_safely_fresh(&proof, now, max_safe, default_max_future_skew()), + "60s-future quote must be accepted (within node's 300s skew tolerance)" + ); } #[test] - fn proof_is_safely_fresh_rejects_future_dated_quote() { - // Forward clock skew: the storer's future-skew tolerance is - // unknown here, so any forward-drifted quote is unsafe to - // re-use. We bail rather than burn a round trip. + fn proof_is_safely_fresh_rejects_far_future_dated_quote() { + // 1 hour in the future of our local clock. Exceeds the + // node's 300s forward-skew tolerance and the storer would + // reject it — we drop it locally to avoid a round trip. let now = std::time::SystemTime::now(); - let future = now + Duration::from_secs(3600); - let proof = make_proof_with_timestamps(&[future]); + let far_future = now + Duration::from_secs(3600); + let proof = make_proof_with_timestamps(&[far_future]); let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS); - assert!(!proof_is_safely_fresh(&proof, now, max_safe)); + assert!(!proof_is_safely_fresh( + &proof, + now, + max_safe, + default_max_future_skew(), + )); } #[test] @@ -1217,6 +1266,7 @@ mod tests { &proof, std::time::SystemTime::now(), Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS), + default_max_future_skew(), )); } } diff --git a/ant-core/src/data/client/cached_single.rs b/ant-core/src/data/client/cached_single.rs index e576917..d65c8c8 100644 --- a/ant-core/src/data/client/cached_single.rs +++ b/ant-core/src/data/client/cached_single.rs @@ -81,7 +81,6 @@ use fs2::FileExt; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::{self, DirEntry, File, OpenOptions}; -use std::hash::{Hash, Hasher}; use std::io::{BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; @@ -155,16 +154,28 @@ fn payments_dir() -> Result { Ok(dir) } -/// Short non-cryptographic hash of the source file path string, used -/// as the on-disk cache key. +/// Stable digest of the canonical path string, used as the on-disk +/// cache key. /// -/// Same scheme as `cached_merkle::file_hash_key`. Collisions are -/// content-validated against the current encrypted chunk addresses -/// before being trusted. +/// **Must be stable across binary versions** — the user can pay a +/// wave on binary A, upgrade or downgrade between attempts, and +/// expect resume to find the receipt on binary B. The standard- +/// library `DefaultHasher` (`std::collections::hash_map::DefaultHasher`) +/// is explicitly documented as NOT stable across rustc releases, so +/// using it here would silently lose resumability on any toolchain +/// upgrade. BLAKE3 gives a permanent, fixed-output digest. The first +/// 16 bytes are plenty: with the lock-protected `find_existing` we +/// content-validate cache hits against the current encrypted chunk +/// addresses, so a 128-bit collision space is well beyond practical +/// concern. fn file_hash_key(file_path: &str) -> String { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - file_path.hash(&mut hasher); - format!("{:016x}", hasher.finish()) + let digest = blake3::hash(file_path.as_bytes()); + let bytes = digest.as_bytes(); + let mut out = String::with_capacity(32); + for byte in &bytes[..16] { + out.push_str(&format!("{byte:02x}")); + } + out } fn receipt_path(dir: &Path, ts: u64, key: &str) -> PathBuf { @@ -173,9 +184,25 @@ fn receipt_path(dir: &Path, ts: u64, key: &str) -> PathBuf { /// Append a wave's worth of paid-chunk proofs to the on-disk receipt. /// -/// If no receipt exists yet for this file, one is created with the -/// current time as `first_pay_timestamp`. Otherwise the existing -/// file is loaded, extended with the new proofs, and rewritten. +/// If no receipt exists yet for this file, one is created. Otherwise +/// the existing file is loaded, extended with the new proofs, and +/// rewritten under a fresh `_` filename (with the old +/// canonical unlinked atomically). +/// +/// Why the filename rotates on every append +/// ---------------------------------------- +/// The 24-hour TTL is enforced by parsing the timestamp prefix from +/// the canonical filename (`cleanup_outdated` + `is_expired_filename`). +/// If we kept the original filename across waves, a receipt holding a +/// wave paid 23 h ago plus a wave paid 1 minute ago would be deleted +/// wholesale at the 24-hour mark — stranding the fresh wave's +/// payment. Rotating the filename to `_` on every successful +/// append makes the on-disk TTL track "time since most recent paid +/// wave" instead of "time since first wave", matching the semantic +/// users expect: the cache survives as long as it keeps being used. +/// Individual stale proofs inside the receipt are pruned by +/// `prune_locally_expired_proofs` in `batch.rs`, which checks each +/// `quote.timestamp` against the storer's per-quote budget. /// /// Atomicity & concurrency /// ----------------------- @@ -210,10 +237,11 @@ pub fn append_wave( // Find an existing receipt for this file (non-expired) and load // it, or create a fresh one stamped with now(). - let (path, mut receipt) = match find_existing(&dir, &key)? { - Some((p, r)) => (p, r), - None => (receipt_path(&dir, now, &key), SingleNodeReceipt::new(now)), + let (old_path, mut receipt) = match find_existing(&dir, &key)? { + Some((p, r)) => (Some(p), r), + None => (None, SingleNodeReceipt::new(now)), }; + let new_path = receipt_path(&dir, now, &key); receipt.proofs.extend(new_proofs); // Sum costs as U256 (Amount). A wave's storage cost is wei-scale @@ -230,13 +258,25 @@ pub fn append_wave( } receipt.gas_cost_wei = receipt.gas_cost_wei.saturating_add(wave_gas_cost_wei); - write_receipt_atomic(&path, &receipt)?; + write_receipt_atomic(&new_path, &receipt)?; + // Unlink the old canonical (different filename), if any. Order: + // write-new then unlink-old means a crash between them leaves + // both files on disk briefly; `find_existing` returns the newer + // by directory iteration order and a subsequent + // `dedupe_canonical_receipts` cleans the older up. No proofs + // are ever lost in the gap because both files hold the same + // load-extend-write content; new_path is a strict superset. + if let Some(old) = old_path { + if old != new_path { + let _ = fs::remove_file(&old); + } + } debug!( "Appended {} proofs to single-node receipt for {file_path:?} ({})", receipt.proofs.len(), - path.display() + new_path.display() ); - Ok(path) + Ok(new_path) } /// Remove specific chunk proofs from the cached receipt for a file, @@ -528,17 +568,24 @@ fn recover_orphaned_tmps(dir: &Path, key: &str) { dedupe_canonical_receipts(dir, key); } -/// Keep at most one canonical receipt per key — the one with the -/// largest timestamp prefix. +/// Keep at most one canonical receipt per key, **merging** the proof +/// content of every readable sibling into the winning one before +/// unlinking the rest. /// /// Multiple canonical receipts for the same key can arise if a -/// previous `append_wave` raced an aborted recovery (eg. before the -/// `.lock` was added). They can also arise transiently when -/// `recover_orphaned_tmps` promotes a `_.tmp` to -/// canonical while a `_` already exists from an earlier -/// successful write. Without deduping, `find_existing` returns the -/// first match from directory iteration — order is filesystem- -/// dependent, so half the proofs end up silently unreachable. +/// previous `append_wave` raced an aborted recovery, if a buggier +/// older binary wrote without rotating, or if manual file recovery +/// dropped a backup alongside the live file. Without merging, an +/// older sibling can hold proofs the newer one never saw — eg. the +/// older was written before a partial `delete_for_file` was +/// interrupted, leaving the older as the only carrier of some +/// waves' on-chain payments. Blind newest-wins would strand those. +/// +/// Strategy: read every readable canonical sibling for the key, union +/// their `proofs` maps and sum costs into the newest-timestamp +/// canonical (overwriting it atomically), then unlink the rest. +/// Unreadable siblings are unlinked without contributing — they +/// can't strand a payment that's already corrupt-on-disk. fn dedupe_canonical_receipts(dir: &Path, key: &str) { let Ok(read_dir) = fs::read_dir(dir) else { return; @@ -568,14 +615,81 @@ fn dedupe_canonical_receipts(dir: &Path, key: &str) { return; } canonicals.sort_by_key(|c| std::cmp::Reverse(c.0)); + + // Identify the winner (newest), then fold every other readable + // sibling into it. + let (winner_ts, winner_path) = canonicals[0].clone(); + let mut winner = match read_receipt(&winner_path) { + Ok(r) => r, + Err(_) => { + // Newest is corrupt: unlink it and let the next-newest + // become the winner on the recursive retry. + warn!( + "Newest canonical {} unreadable; unlinking and retrying dedupe", + winner_path.display() + ); + let _ = fs::remove_file(&winner_path); + return dedupe_canonical_receipts(dir, key); + } + }; + + let mut merged_from = 0usize; for (_, stale) in canonicals.iter().skip(1) { - warn!( - "Removing duplicate canonical receipt {} (older sibling of \ - a newer recovered receipt)", - stale.display() - ); + match read_receipt(stale) { + Ok(other) => { + // Union proofs: an entry only present in the older + // sibling represents a paid wave the newer never saw. + // Keep the WINNER's bytes when both have the same + // address (newer paid wave's proof — by load-extend- + // write semantics newer should hold the same proof + // unless a buggier binary wrote independently). + let mut added = 0usize; + for (addr, bytes) in other.proofs { + winner.proofs.entry(addr).or_insert_with(|| { + added += 1; + bytes + }); + } + if let (Ok(w), Ok(o)) = ( + winner.storage_cost_atto.parse::(), + other.storage_cost_atto.parse::(), + ) { + winner.storage_cost_atto = w.saturating_add(o).to_string(); + } + winner.gas_cost_wei = winner.gas_cost_wei.saturating_add(other.gas_cost_wei); + winner.first_pay_timestamp = + winner.first_pay_timestamp.min(other.first_pay_timestamp); + merged_from += 1; + info!( + "Merged {added} proofs from older canonical {} into winner {}", + stale.display(), + winner_path.display() + ); + } + Err(_) => { + warn!( + "Dropping unreadable duplicate canonical {} (no recoverable proofs)", + stale.display() + ); + } + } let _ = fs::remove_file(stale); } + + if merged_from > 0 { + // Rewrite the winner under its own filename with the merged + // content. Same path, write-tmp-and-rename keeps the on-disk + // state coherent across the rewrite. + if let Err(e) = write_receipt_atomic(&winner_path, &winner) { + warn!( + "Could not rewrite merged canonical receipt {} ({e}); \ + winner retains pre-merge content and the older proofs \ + are lost. Best-effort: leaving on-disk state as-is.", + winner_path.display() + ); + } + } + let _ = winner_ts; } fn find_existing(dir: &Path, key: &str) -> Result> { @@ -807,6 +921,85 @@ mod tests { assert_ne!(file_hash_key("/tmp/a"), file_hash_key("/tmp/b")); } + #[test] + fn file_hash_key_uses_stable_digest_across_invocations() { + // BLAKE3 is a fixed-output cryptographic hash, so the key for + // a given path string must be identical not just within a + // process run but across binary versions / rustc upgrades. + // Pin the expected hex digest so a future change away from + // BLAKE3 (or back to DefaultHasher) trips this test loudly. + // First 16 bytes of BLAKE3("/tmp/anselme-cache-stable-test"): + let expected = "491a1a569cd6c544074a70504b2b5183"; + assert_eq!(file_hash_key("/tmp/anselme-cache-stable-test"), expected); + } + + /// Reproduces codex finding #1: receipt filename used to embed + /// the FIRST wave's timestamp. A wave paid 23h after that first + /// wave would get dropped by filename-TTL at the 24h mark even + /// though it's only an hour old. + /// + /// Post-fix: `append_wave` rotates the canonical filename to + /// `_` on every successful append, so the filename + /// timestamp tracks the LAST paid wave. The receipt survives as + /// long as it keeps being used. + #[test] + fn append_wave_rotates_filename_so_late_waves_dont_age_out() -> Result<()> { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let file_path = format!("/tmp/anselme-ttl-rotation-test-{nanos}"); + + let mut wave1: HashMap<[u8; 32], Vec> = HashMap::new(); + wave1.insert([1u8; 32], vec![1]); + let path_after_wave1 = append_wave(&file_path, wave1, "10", 20)?; + let ts_after_wave1 = path_after_wave1 + .file_name() + .and_then(|n| n.to_str()) + .and_then(|n| n.split_once('_')) + .and_then(|(ts, _)| ts.parse::().ok()) + .expect("wave1 receipt name parses"); + + // Sleep just long enough that `now` advances by at least 1 + // second. Without this, both waves can land on the same + // timestamp and the rotation is a no-op for this test + // (still correct semantics, just not observable here). + std::thread::sleep(std::time::Duration::from_millis(1100)); + + let mut wave2: HashMap<[u8; 32], Vec> = HashMap::new(); + wave2.insert([2u8; 32], vec![2]); + let path_after_wave2 = append_wave(&file_path, wave2, "5", 10)?; + let ts_after_wave2 = path_after_wave2 + .file_name() + .and_then(|n| n.to_str()) + .and_then(|n| n.split_once('_')) + .and_then(|(ts, _)| ts.parse::().ok()) + .expect("wave2 receipt name parses"); + + assert_ne!( + path_after_wave1, path_after_wave2, + "filename must rotate so TTL tracks LAST wave, not first" + ); + assert!( + ts_after_wave2 > ts_after_wave1, + "rotated filename's timestamp must be strictly newer" + ); + assert!( + !path_after_wave1.exists(), + "old canonical must be unlinked after the rewrite" + ); + assert!(path_after_wave2.exists()); + + // The merged receipt contains BOTH waves' proofs at the new + // path — the older entries are NOT lost in the rotation. + let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load"); + assert!(loaded.proofs.contains_key(&[1u8; 32])); + assert!(loaded.proofs.contains_key(&[2u8; 32])); + + delete_for_file(&file_path)?; + Ok(()) + } + #[test] fn expired_filename_detected() { let stale = SystemTime::now() @@ -1440,12 +1633,14 @@ mod tests { Ok(()) } - /// Duplicate canonical receipts (eg. residue from a buggier - /// earlier version, or a `_.tmp` recovered over a - /// pre-existing `_` canonical) must be deduped: - /// newest timestamp wins, older are unlinked. + /// Duplicate canonical receipts must be merged before the older + /// is unlinked — never blindly newest-wins. The older may hold + /// proofs the newer never saw (residue from a buggier binary, + /// manual file recovery, interrupted operation). Blind unlink + /// would strand any on-chain payment whose proof lives only in + /// the older sibling. #[test] - fn duplicate_canonical_receipts_are_deduplicated_keeping_newest() -> Result<()> { + fn duplicate_canonical_receipts_are_merged_then_older_unlinked() -> Result<()> { let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() @@ -1467,24 +1662,43 @@ mod tests { let old_path = dir.join(format!("{old_ts}_{key}")); let new_path = dir.join(format!("{new_ts}_{key}")); let mut old = SingleNodeReceipt::new(old_ts); - old.proofs.insert([1u8; 32], vec![1]); + old.proofs.insert([1u8; 32], vec![0xAA]); + old.storage_cost_atto = "10".to_string(); + old.gas_cost_wei = 20; let mut new = SingleNodeReceipt::new(new_ts); - new.proofs.insert([2u8; 32], vec![2]); + new.proofs.insert([2u8; 32], vec![0xBB]); + new.storage_cost_atto = "30".to_string(); + new.gas_cost_wei = 40; write_receipt_atomic(&old_path, &old)?; write_receipt_atomic(&new_path, &new)?; assert!(old_path.exists() && new_path.exists()); - // Trigger dedupe via a recovery-pass entry point. let _guard = ReceiptLock::acquire(&dir, &key)?; dedupe_canonical_receipts(&dir, &key); drop(_guard); assert!( !old_path.exists(), - "older canonical receipt must be unlinked" + "older canonical receipt must be unlinked after merge" ); assert!(new_path.exists(), "newer canonical receipt must survive"); + // The winner now holds BOTH proofs and the SUMMED costs — + // the older's proof was NOT stranded. + let merged = read_receipt(&new_path)?; + assert!( + merged.proofs.contains_key(&[1u8; 32]), + "older sibling's proof must be merged into the winner" + ); + assert!(merged.proofs.contains_key(&[2u8; 32])); + assert_eq!(merged.proofs.len(), 2); + assert_eq!(merged.storage_cost_atto, "40", "costs must be summed"); + assert_eq!(merged.gas_cost_wei, 60); + assert_eq!( + merged.first_pay_timestamp, old_ts, + "first_pay_timestamp must be the MIN across merged siblings" + ); + delete_for_file(&file_path)?; Ok(()) }