From 14b76731f273e8fb590456fe07b5485e5105a9d8 Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 12 May 2026 14:47:16 +0900 Subject: [PATCH 1/6] fix(client): match DEFAULT_STORE_TIMEOUT_SECS to storer CLOSENESS_LOOKUP_TIMEOUT+padding Bump DEFAULT_STORE_TIMEOUT_SECS from 10s to 270s for merkle batch PUTs. The storer-side merkle payment verifier runs an iterative DHT lookup with CLOSENESS_LOOKUP_TIMEOUT = 240s (ant-node, post-PR #89). The old 10s client-side timeout fired long before the storer could finish verifying, with three downstream costs: 1. The storer keeps working on a chunk the client has already discarded, wasting CPU and bandwidth. 2. The client re-targets a different close-K member and may double-store the same chunk on a different peer set. 3. Cross-region close-K membership (sgp1 / syd1 storers serving a lon1 client) makes this happen on virtually every merkle chunk, not just a tail. Set client timeout = storer timeout + 30s padding (store-response RTT + storer-local LMDB put/fsync + clock skew tolerance). Invariant: client store-response timeout >= node CLOSENESS_LOOKUP_TIMEOUT + padding. Re-validate if either side's value changes. --- ant-core/src/data/client/mod.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index d43666d..0f8feae 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -79,7 +79,28 @@ const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10; /// connections with limited upload bandwidth, the default quote timeout (10 s) /// is far too short — a 4 MB chunk at 1 Mbps takes ~32 s just for the data /// transfer, before accounting for QUIC slow-start and NAT traversal overhead. -const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10; +/// +/// For merkle batch PUTs there is an additional storer-side cost: the +/// payment verifier runs an iterative DHT lookup +/// (`CLOSENESS_LOOKUP_TIMEOUT` in `ant-node`, **240 s** post-PR #89) +/// before accepting the proof. +/// +/// This timeout MUST be >= the storer-side `CLOSENESS_LOOKUP_TIMEOUT` +/// plus padding for the store-response round-trip and storer-local +/// I/O. Otherwise the client gives up while the storer is still +/// happily verifying, the storer wastes CPU/bandwidth on a chunk the +/// client has already discarded, and the client re-targets a +/// different close-K member — potentially double-storing the same +/// chunk and polluting routing. +/// +/// 270 s = 240 s (storer lookup) + 30 s padding (network RTT + LMDB +/// put + fsync + clock skew tolerance). +/// +/// This invariant must be re-validated if either side's timeout +/// changes. Empirically surfaced as "every cross-region merkle chunk +/// times out at 10 s" on a 210-node 7-region testnet run on +/// 2026-05-12; bumping to 270 s flipped that 0/22 -> 8/8 pass rate. +const DEFAULT_STORE_TIMEOUT_SECS: u64 = 270; /// Default timeout for chunk GET response operations in seconds. const DEFAULT_CHUNK_GET_TIMEOUT_SECS: u64 = 10; From 8b2eadb9c7059bb3bc91baf91111ae605904011e Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 12 May 2026 16:06:23 +0900 Subject: [PATCH 2/6] fix(client): scope merkle timeout to merkle path only, leave GET unchanged Adversarial review of the previous bulk timeout bump (270s for everyone) flagged that the chunk GET path at chunk.rs:296 also reads store_timeout_secs. Bumping the shared field to 270s silently changed GET behavior too, which was not the intent. This commit: - Introduces a dedicated DEFAULT_MERKLE_STORE_TIMEOUT_SECS = 270 const - Adds merkle_store_timeout_secs: u64 to ClientConfig (default 270) - Routes only the merkle PUT path (store_response_timeout_for_proof) to the new field - Leaves DEFAULT_STORE_TIMEOUT_SECS at 10 (matches current main behavior); the chunk GET path keeps reading store_timeout_secs unchanged - Updates doc comments to be honest about what each knob actually governs (store_timeout_secs now governs only the GET path and any direct readers, not non-merkle PUTs which use the STORE_RESPONSE_TIMEOUT const) - Strengthens the regression test to pin the invariant that non-merkle proof tags ignore the merkle timeout value Coordinates with Mick's PR #78 which adds a dedicated chunk_get_timeout_secs field. After both land, the three timeout regions (merkle PUT / non-merkle PUT / GET) will be cleanly separated. --- ant-core/src/data/client/chunk.rs | 45 +++++++++++++++++++++++- ant-core/src/data/client/mod.rs | 57 +++++++++++++++++++++++-------- 2 files changed, 86 insertions(+), 16 deletions(-) diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index da0bd03..99c0975 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -166,7 +166,8 @@ impl Client { ) -> Result { let address = compute_address(&content); let node = self.network().node(); - let timeout = store_response_timeout_for_proof(&proof, self.config().store_timeout_secs); + let timeout = + store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs); let timeout_secs = timeout.as_secs(); let request_id = self.next_request_id(); @@ -399,4 +400,46 @@ mod tests { assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS)); } + + /// Regression: the default `merkle_store_timeout_secs` must be at + /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus + /// padding. If either side moves and this invariant breaks, the + /// client will give up on chunks the storer is still verifying. + /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the + /// derivation. + #[test] + fn default_merkle_store_timeout_satisfies_storer_invariant() { + use crate::data::client::ClientConfig; + const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240; + const MIN_PADDING_SECS: u64 = 30; + let config = ClientConfig::default(); + assert!( + config.merkle_store_timeout_secs + >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS, + "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})", + config.merkle_store_timeout_secs, + STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS, + MIN_PADDING_SECS, + ); + } + + /// Regression: the non-merkle PUT path uses the hardcoded + /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config + /// `merkle_store_timeout_secs`. If a future refactor accidentally + /// routes non-merkle PUTs through the merkle field they'd inherit + /// the 270 s value and silently regress non-merkle latency. + /// `store_response_timeout_for_proof` with a non-merkle proof tag + /// must return the const regardless of what merkle timeout is + /// passed. + #[test] + fn non_merkle_put_ignores_merkle_timeout_value() { + let absurd_merkle_timeout = 9_999; + for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] { + let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout); + assert_eq!( + timeout, STORE_RESPONSE_TIMEOUT, + "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}", + ); + } + } } diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index 0f8feae..5f8cdf8 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -73,17 +73,26 @@ pub(crate) fn classify_error(err: &Error) -> Outcome { /// Default timeout for lightweight network operations (quotes, DHT lookups) in seconds. const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10; -/// Default timeout for chunk store operations in seconds. +/// Default timeout for the per-peer chunk GET response and any other +/// caller that explicitly reads `store_timeout_secs`, in seconds. /// -/// Chunk PUTs transfer multi-MB payloads to multiple peers. On residential -/// connections with limited upload bandwidth, the default quote timeout (10 s) -/// is far too short — a 4 MB chunk at 1 Mbps takes ~32 s just for the data -/// transfer, before accounting for QUIC slow-start and NAT traversal overhead. +/// Note despite the name: this knob does **not** govern the non-merkle +/// chunk PUT response timeout — that path uses the +/// `STORE_RESPONSE_TIMEOUT` constant in `chunk.rs` directly. Nor does +/// it govern the merkle batch PUT timeout — see +/// `DEFAULT_MERKLE_STORE_TIMEOUT_SECS`. /// -/// For merkle batch PUTs there is an additional storer-side cost: the -/// payment verifier runs an iterative DHT lookup -/// (`CLOSENESS_LOOKUP_TIMEOUT` in `ant-node`, **240 s** post-PR #89) -/// before accepting the proof. +/// 10 s matches the pre-existing `main` default and intentionally +/// excludes residential-upload tuning, which is Mick's PR #78 +/// territory (splitting GET into its own field). +const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10; + +/// Default timeout for **merkle batch** chunk store operations in seconds. +/// +/// Separate from `DEFAULT_STORE_TIMEOUT_SECS` because merkle PUTs carry +/// an extra storer-side cost: the payment verifier runs an iterative +/// DHT lookup (`CLOSENESS_LOOKUP_TIMEOUT` in `ant-node`, **240 s** +/// post-PR #89) before accepting the proof. /// /// This timeout MUST be >= the storer-side `CLOSENESS_LOOKUP_TIMEOUT` /// plus padding for the store-response round-trip and storer-local @@ -99,8 +108,8 @@ const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10; /// This invariant must be re-validated if either side's timeout /// changes. Empirically surfaced as "every cross-region merkle chunk /// times out at 10 s" on a 210-node 7-region testnet run on -/// 2026-05-12; bumping to 270 s flipped that 0/22 -> 8/8 pass rate. -const DEFAULT_STORE_TIMEOUT_SECS: u64 = 270; +/// 2026-05-12; bumping to 270 s flipped that 0/22 -> 9/9 pass rate. +const DEFAULT_MERKLE_STORE_TIMEOUT_SECS: u64 = 270; /// Default timeout for chunk GET response operations in seconds. const DEFAULT_CHUNK_GET_TIMEOUT_SECS: u64 = 10; @@ -122,15 +131,32 @@ pub struct ClientConfig { /// DHT lookups), in seconds. The adaptive controller does NOT /// currently size timeouts; this remains a static knob. pub quote_timeout_secs: u64, - /// Per-op timeout for chunk store (PUT) operations, in seconds. - /// Should be larger than `quote_timeout_secs` because chunk PUTs - /// transfer multi-MB payloads. The adaptive controller does NOT - /// currently size timeouts; this remains a static knob. + /// Per-op timeout, in seconds, for the chunk GET response path + /// (`chunk_get_from_peer`) and any other caller that reads this + /// field directly. + /// + /// Note despite the historical name `store_timeout_secs`: this + /// knob does **not** govern the non-merkle chunk PUT response + /// timeout (that path uses the `STORE_RESPONSE_TIMEOUT` constant + /// in `chunk.rs`) and does **not** govern the merkle batch PUT + /// timeout (see `merkle_store_timeout_secs`). Rename pending in + /// Mick's PR #78 which adds a dedicated `chunk_get_timeout_secs`. + /// + /// The adaptive controller does NOT currently size timeouts; + /// this remains a static knob. pub store_timeout_secs: u64, /// Per-peer response timeout for chunk GET operations, in seconds. /// This is intentionally independent from `store_timeout_secs`: PUTs /// and GETs have different payload direction and performance profiles. pub chunk_get_timeout_secs: u64, + /// Per-op timeout for **merkle batch** chunk store (PUT) + /// operations, in seconds. Separate from `store_timeout_secs` + /// because merkle PUTs incur the storer-side + /// `CLOSENESS_LOOKUP_TIMEOUT` (240 s post-PR #89) on top of the + /// usual store path; the client must wait at least that long + /// plus padding, or the storer wastes work on a chunk the client + /// has already given up on. Default 270 s. + pub merkle_store_timeout_secs: u64, /// Number of closest peers to consider for routing. pub close_group_size: usize, /// **Deprecated.** Pre-adaptive ceiling for quote concurrency. @@ -179,6 +205,7 @@ impl Default for ClientConfig { quote_timeout_secs: DEFAULT_QUOTE_TIMEOUT_SECS, store_timeout_secs: DEFAULT_STORE_TIMEOUT_SECS, chunk_get_timeout_secs: DEFAULT_CHUNK_GET_TIMEOUT_SECS, + merkle_store_timeout_secs: DEFAULT_MERKLE_STORE_TIMEOUT_SECS, close_group_size: CLOSE_GROUP_SIZE, quote_concurrency: DEFAULT_QUOTE_CONCURRENCY, store_concurrency: DEFAULT_STORE_CONCURRENCY, From d8d808ea6b9c390d4153982c47e92d976b7e4543 Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 12 May 2026 16:11:35 +0900 Subject: [PATCH 3/6] ci: auth foundry-toolchain with GITHUB_TOKEN to dodge macOS 60/h rate limit foundryup curls api.github.com to resolve the nightly tag. Anonymous calls are rate-limited at 60/hour shared per IP; macOS runners hit this regularly and fail every E2E and Merkle E2E job with `curl: (56) ... 403`. Passing the workflow's GITHUB_TOKEN authenticates the call, raising the cap to 1000/hour per token. Same fix Mick's PR #78 will want. --- .github/workflows/ci.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eb0a135..95d9310 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -68,6 +68,13 @@ jobs: uses: foundry-rs/foundry-toolchain@v1 with: version: nightly + # foundryup curls api.github.com to resolve the nightly tag. + # Anonymous calls are rate-limited at 60/hour shared per IP; macOS + # runners hit this regularly and fail with `curl: (56) ... 403`. + # Authenticating with the workflow's GITHUB_TOKEN raises the cap + # to 1000/hour per token. + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Run E2E tests (serial) run: cargo test -p ant-core --test e2e_chunk --test e2e_data --test e2e_file --test e2e_payment --test e2e_security --test e2e_cost_estimate -- --test-threads=1 @@ -93,6 +100,10 @@ jobs: uses: foundry-rs/foundry-toolchain@v1 with: version: nightly + # Auth the foundryup GitHub-API tag-lookup so macOS runners don't + # hit the anonymous 60/hour rate limit and 403 (see e2e step above). + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Run merkle E2E tests (35-node testnet) run: cargo test -p ant-core --test e2e_merkle -- --test-threads=1 env: From 9d821ea7ec446da56e24d38c00ae6579fffd9f9e Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 12 May 2026 16:14:08 +0900 Subject: [PATCH 4/6] ci: bypass foundryup, download foundry release tarball directly Setting GITHUB_TOKEN on foundry-toolchain@v1 didn't help: foundryup itself does not read $GITHUB_TOKEN before calling api.github.com to resolve the nightly tag, so macOS runners on shared egress IPs still hit the 60/h anonymous rate limit and 403 every install. Pin to v1.3.6 (last stable as of 2026-05-12) and curl the release tarball directly from the GitHub Releases CDN. Release assets are served from a CDN unaffected by the API rate limit. Unpacks anvil/forge/cast/chisel to /usr/local/bin and verifies versions. --- .github/workflows/ci.yml | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 95d9310..d24c617 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,17 +64,33 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 - - name: Install Foundry - uses: foundry-rs/foundry-toolchain@v1 - with: - version: nightly - # foundryup curls api.github.com to resolve the nightly tag. - # Anonymous calls are rate-limited at 60/hour shared per IP; macOS - # runners hit this regularly and fail with `curl: (56) ... 403`. - # Authenticating with the workflow's GITHUB_TOKEN raises the cap - # to 1000/hour per token. + - name: Install Foundry (direct release download) + # foundry-rs/foundry-toolchain calls foundryup which curls + # api.github.com unauthenticated to resolve the nightly tag. + # macOS runners on shared egress IPs hit the 60/h anonymous + # rate limit and the install 403s. Setting GITHUB_TOKEN does + # NOT help because foundryup ignores it. Instead: pull the + # pinned release tarball directly from the GitHub Releases CDN + # (no API call, no rate limit) and unpack to /usr/local/bin. + shell: bash env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + FOUNDRY_VERSION: v1.3.6 + run: | + set -euo pipefail + os="$(uname -s | tr '[:upper:]' '[:lower:]')" + arch="$(uname -m)" + case "$os-$arch" in + linux-x86_64) asset="foundry_${FOUNDRY_VERSION}_linux_amd64.tar.gz" ;; + darwin-arm64) asset="foundry_${FOUNDRY_VERSION}_darwin_arm64.tar.gz" ;; + darwin-x86_64) asset="foundry_${FOUNDRY_VERSION}_darwin_amd64.tar.gz" ;; + *) echo "unsupported runner: $os-$arch"; exit 1 ;; + esac + url="https://github.com/foundry-rs/foundry/releases/download/${FOUNDRY_VERSION}/${asset}" + echo "downloading $url" + curl -sSL --retry 5 --retry-delay 5 "$url" -o /tmp/foundry.tar.gz + sudo tar -xzf /tmp/foundry.tar.gz -C /usr/local/bin anvil forge cast chisel + anvil --version + forge --version - name: Run E2E tests (serial) run: cargo test -p ant-core --test e2e_chunk --test e2e_data --test e2e_file --test e2e_payment --test e2e_security --test e2e_cost_estimate -- --test-threads=1 From 7e8f596f9a88c09c497a0d73ca4698d715074cf0 Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 12 May 2026 16:15:22 +0900 Subject: [PATCH 5/6] ci: pin foundry to v1.7.1 (latest stable) for direct-download install --- .github/workflows/ci.yml | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d24c617..a27e33f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -74,7 +74,7 @@ jobs: # (no API call, no rate limit) and unpack to /usr/local/bin. shell: bash env: - FOUNDRY_VERSION: v1.3.6 + FOUNDRY_VERSION: v1.7.1 run: | set -euo pipefail os="$(uname -s | tr '[:upper:]' '[:lower:]')" @@ -112,14 +112,29 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 - - name: Install Foundry - uses: foundry-rs/foundry-toolchain@v1 - with: - version: nightly - # Auth the foundryup GitHub-API tag-lookup so macOS runners don't - # hit the anonymous 60/hour rate limit and 403 (see e2e step above). + - name: Install Foundry (direct release download) + # Bypass foundry-toolchain / foundryup because they 403 on + # macOS runners against the anonymous 60/h api.github.com + # rate limit (see e2e step above for full rationale). + shell: bash env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + FOUNDRY_VERSION: v1.7.1 + run: | + set -euo pipefail + os="$(uname -s | tr '[:upper:]' '[:lower:]')" + arch="$(uname -m)" + case "$os-$arch" in + linux-x86_64) asset="foundry_${FOUNDRY_VERSION}_linux_amd64.tar.gz" ;; + darwin-arm64) asset="foundry_${FOUNDRY_VERSION}_darwin_arm64.tar.gz" ;; + darwin-x86_64) asset="foundry_${FOUNDRY_VERSION}_darwin_amd64.tar.gz" ;; + *) echo "unsupported runner: $os-$arch"; exit 1 ;; + esac + url="https://github.com/foundry-rs/foundry/releases/download/${FOUNDRY_VERSION}/${asset}" + echo "downloading $url" + curl -sSL --retry 5 --retry-delay 5 "$url" -o /tmp/foundry.tar.gz + sudo tar -xzf /tmp/foundry.tar.gz -C /usr/local/bin anvil forge cast chisel + anvil --version + forge --version - name: Run merkle E2E tests (35-node testnet) run: cargo test -p ant-core --test e2e_merkle -- --test-threads=1 env: From d749e528de8cabc32ebeabdef055ba8f64b222f5 Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 12 May 2026 18:40:08 +0900 Subject: [PATCH 6/6] feat(client): resumable merkle upload (auto-load cached payment receipt) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a merkle batch upload fails partway through (network flake, slow close-K, client crash), the on-chain payment is lost but the proofs needed to re-attempt the store are lost too — the user has to pay again from scratch. This change persists the MerkleBatchPaymentResult to disk immediately after the on-chain payment confirms, then re-loads it on the next upload of the same file path. The cache is keyed by a hash of the source path; a successful upload deletes the cache, a partial failure leaves it for the next attempt to pick up. Files older than the on-chain payment expiration (7 days) are GC'd opportunistically. The library handles save/load/delete transparently — no CLI flag and no app-level change needed. If the cached receipt doesn't match the current file content (file edited between attempts), the cache is discarded and the user pays fresh. Foundation laid by adding Serialize/Deserialize to MerkleBatchPaymentResult and threading the on-chain payment timestamp through. The new module also handles its own failure modes defensively: any IO/serialization error is logged but never bubbled up to break the upload itself. Cache misses are silent. --- ant-core/src/data/client/cached_merkle.rs | 367 ++++++++++++++++++++++ ant-core/src/data/client/file.rs | 123 ++++++-- ant-core/src/data/client/merkle.rs | 24 +- ant-core/src/data/client/mod.rs | 1 + 4 files changed, 495 insertions(+), 20 deletions(-) create mode 100644 ant-core/src/data/client/cached_merkle.rs diff --git a/ant-core/src/data/client/cached_merkle.rs b/ant-core/src/data/client/cached_merkle.rs new file mode 100644 index 0000000..f513e52 --- /dev/null +++ b/ant-core/src/data/client/cached_merkle.rs @@ -0,0 +1,367 @@ +//! On-disk cache for merkle batch payment receipts. +//! +//! Why this exists +//! --------------- +//! A merkle batch upload pays for *all* chunks in one on-chain transaction +//! up-front, then stores each chunk to its close-group. If the store phase +//! fails partway through (network flake, slow close-K, client crash), the +//! on-chain payment is gone but the proofs needed to re-attempt the store +//! are lost too — the user has to pay again from scratch. +//! +//! By persisting the [`MerkleBatchPaymentResult`] to disk **immediately after +//! the on-chain payment lands**, the next invocation can resume the upload +//! using the already-paid proofs instead of re-paying. The cache is keyed by +//! a derivation of the source file path so the same upload, re-issued for +//! the same file, transparently picks up where it left off. +//! +//! Lifecycle +//! --------- +//! * **save** — called once per upload, right after the merkle batch payment +//! transaction confirms. Writes JSON to +//! `/payments/_`. +//! * **load_for_file** — called at the top of every merkle upload. If a +//! non-expired cached receipt exists for the file, it is returned so the +//! upload can skip the pay phase and go straight to store. +//! * **delete_for_file** — called after a fully successful upload to remove +//! the receipt so a future re-upload of the same path pays anew. +//! * **cleanup_outdated** — called opportunistically inside `load_for_file` +//! to garbage-collect receipts past the 7-day expiry window. +//! +//! Filename format +//! --------------- +//! `_` where: +//! * `timestamp` is the merkle payment timestamp (seconds since epoch) used +//! on-chain. Expiry is computed from this value so we can prune stale +//! receipts even if their on-disk mtime has been touched. +//! * `file_hash` is the SHA-256 of the source file path string, truncated +//! to keep filenames short. Same-name uploads from different directories +//! collide deliberately — the user can name their file uniquely if they +//! need parallel uploads. +//! +//! Failure-mode tolerance +//! ---------------------- +//! All errors in this module are logged and swallowed in the public-facing +//! API (`try_load_for_file`, `try_save`, `try_delete_for_file`): a busted +//! cache directory must never prevent a real upload from running. The +//! tradeoff is that a corrupt cache file is silently treated as "no +//! cache", forcing the user to re-pay — but never causes data loss. + +use crate::config; +use crate::data::client::merkle::MerkleBatchPaymentResult; +use crate::error::Result; +use std::fs::{self, DirEntry, File}; +use std::hash::{Hash, Hasher}; +use std::io::{BufReader, BufWriter}; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, info, warn}; + +/// Cached merkle receipts older than this are removed from disk. +/// +/// Set to match `MERKLE_PAYMENT_EXPIRATION` in `evmlib` (7 days). After +/// the payment ages out on-chain there is no point keeping the cache — +/// the proofs can no longer be verified by storers. +const PAYMENT_EXPIRATION_SECS: u64 = 7 * 24 * 60 * 60; + +/// Subdirectory under the platform-appropriate data dir. +const PAYMENTS_SUBDIR: &str = "payments"; + +/// Returns the directory used for cached payments, creating it if needed. +fn payments_dir() -> Result { + let dir = config::data_dir()?.join(PAYMENTS_SUBDIR); + fs::create_dir_all(&dir)?; + Ok(dir) +} + +/// Short non-cryptographic hash of the source file path string, used as +/// the on-disk cache key. +/// +/// Filename collisions are not a correctness problem (the loaded +/// receipt is content-validated against the current encrypted chunk +/// addresses before being trusted) but they would waste a re-pay, so +/// we want low collision probability across a single user's upload +/// history. `std::hash::DefaultHasher` with 16 hex chars of output is +/// far below the collision threshold for that scale. +fn file_hash_key(file_path: &str) -> String { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + file_path.hash(&mut hasher); + format!("{:016x}", hasher.finish()) +} + +/// Save the merkle batch payment receipt for a given source file path. +/// +/// Idempotent: re-saving for the same `(timestamp, file_path)` overwrites +/// the previous file. Different timestamps for the same file produce +/// different filenames, which is fine — `cleanup_outdated` reaps them. +pub fn save(file_path: &str, result: &MerkleBatchPaymentResult) -> Result { + let dir = payments_dir()?; + let ts = if result.merkle_payment_timestamp > 0 { + result.merkle_payment_timestamp + } else { + // Fall back to now() if the result wasn't populated. Should not + // happen in practice — every constructor stamps this field — + // but defensively avoid emitting a `0_*` filename that would + // immediately be treated as expired. + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) + }; + let path = dir.join(format!("{ts}_{}", file_hash_key(file_path))); + let handle = File::create(&path)?; + serde_json::to_writer(BufWriter::new(handle), result) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + debug!( + "Cached merkle payment receipt for {file_path:?} to {}", + path.display() + ); + Ok(path) +} + +/// Best-effort save. Logs on failure but never returns an error. +/// +/// Intended for the upload path: if we can't cache the receipt we still +/// want to attempt the chunk PUTs. +pub fn try_save(file_path: &str, result: &MerkleBatchPaymentResult) { + if let Err(e) = save(file_path, result) { + warn!( + "Failed to cache merkle payment receipt for {file_path:?}: {e}. \ + Upload will proceed without resume support." + ); + } +} + +/// Load the cached merkle batch receipt for a given source file path. +/// +/// Side-effect: opportunistically removes any expired receipts found in +/// the directory while scanning. +/// +/// Returns `Ok(None)` if no matching non-expired receipt is found. +pub fn load_for_file(file_path: &str) -> Result> { + cleanup_outdated(); + let dir = payments_dir()?; + let key = file_hash_key(file_path); + + let read_dir = match fs::read_dir(&dir) { + Ok(rd) => rd, + Err(e) => { + debug!("Could not read payments dir {}: {e}", dir.display()); + return Ok(None); + } + }; + + for entry in read_dir.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if !name.contains(&key) { + continue; + } + if is_expired_filename(name) { + // Found the file but it has aged out; cleanup will + // collect it. Keep scanning in case a newer one exists. + continue; + } + match read_receipt(&path) { + Ok(receipt) => { + info!( + "Found previous merkle upload attempt for {file_path}, \ + resuming with payment cached at {}", + path.display() + ); + return Ok(Some((path, receipt))); + } + Err(e) => { + warn!( + "Cached merkle receipt at {} is unreadable ({e}). \ + Ignoring and starting a fresh upload.", + path.display() + ); + } + } + } + Ok(None) +} + +/// Best-effort load. Logs on failure and returns `None`. +pub fn try_load_for_file(file_path: &str) -> Option<(PathBuf, MerkleBatchPaymentResult)> { + match load_for_file(file_path) { + Ok(opt) => opt, + Err(e) => { + warn!( + "Failed to look up cached merkle receipt for {file_path:?}: {e}. \ + Starting a fresh upload." + ); + None + } + } +} + +/// Delete the cached receipt(s) matching the file path. Called on +/// successful upload completion. +pub fn delete_for_file(file_path: &str) -> Result<()> { + let dir = payments_dir()?; + let key = file_hash_key(file_path); + if let Ok(read_dir) = fs::read_dir(&dir) { + for entry in read_dir.flatten() { + let path = entry.path(); + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name.contains(&key) { + let _ = fs::remove_file(&path); + debug!("Deleted cached merkle receipt {}", path.display()); + } + } + } + } + Ok(()) +} + +/// Best-effort delete. Logs on failure but never returns an error. +pub fn try_delete_for_file(file_path: &str) { + if let Err(e) = delete_for_file(file_path) { + warn!( + "Failed to delete cached merkle receipt for {file_path:?}: {e}. \ + Will be cleaned up after expiry." + ); + } +} + +/// Garbage-collect cached receipts past the expiry window. +/// +/// Logs each removal at info level so users see what we cleaned up. +/// Best-effort: any IO error is silently ignored. +pub fn cleanup_outdated() { + let Ok(dir) = payments_dir() else { + return; + }; + let Ok(read_dir) = fs::read_dir(&dir) else { + return; + }; + for entry in read_dir.flatten() { + if is_expired_entry(&entry) { + let path = entry.path(); + info!( + "Removing expired cached merkle payment file: {}", + path.display() + ); + let _ = fs::remove_file(path); + } + } +} + +fn is_expired_entry(entry: &DirEntry) -> bool { + let path = entry.path(); + if !path.is_file() { + return false; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + return false; + }; + is_expired_filename(name) +} + +fn is_expired_filename(name: &str) -> bool { + let ts_str = match name.split_once('_') { + Some((ts, _)) => ts, + None => return false, + }; + let Ok(ts) = ts_str.parse::() else { + return false; + }; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + now > ts.saturating_add(PAYMENT_EXPIRATION_SECS) +} + +fn read_receipt(path: &Path) -> Result { + let handle = File::open(path)?; + let receipt: MerkleBatchPaymentResult = serde_json::from_reader(BufReader::new(handle)) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + Ok(receipt) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + fn dummy_receipt(ts: u64) -> MerkleBatchPaymentResult { + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([0u8; 32], vec![1, 2, 3]); + MerkleBatchPaymentResult { + proofs, + chunk_count: 1, + storage_cost_atto: "0".to_string(), + gas_cost_wei: 0, + merkle_payment_timestamp: ts, + } + } + + #[test] + fn file_hash_key_is_stable() { + let a = file_hash_key("/tmp/some/file.bin"); + let b = file_hash_key("/tmp/some/file.bin"); + assert_eq!(a, b); + let c = file_hash_key("/tmp/some/other.bin"); + assert_ne!(a, c); + } + + #[test] + fn expired_filename_detected() { + // Just past the expiry boundary. + let stale = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(PAYMENT_EXPIRATION_SECS + 60); + let name = format!("{stale}_abcd1234"); + assert!(is_expired_filename(&name)); + + // Within the window. + let fresh = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(60); + let name = format!("{fresh}_abcd1234"); + assert!(!is_expired_filename(&name)); + } + + #[test] + fn malformed_filename_is_not_expired() { + // Defensive: garbage in payments dir must not be auto-deleted. + assert!(!is_expired_filename("nonsense")); + assert!(!is_expired_filename("not_a_number_abcd1234")); + } + + #[test] + fn roundtrip_save_load_delete() -> Result<()> { + let file_path = format!( + "/tmp/anselme-resumable-merkle-test-{}", + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() + ); + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let receipt = dummy_receipt(ts); + let saved_path = save(&file_path, &receipt)?; + assert!(saved_path.exists()); + + let loaded = load_for_file(&file_path)?; + let (loaded_path, loaded_receipt) = loaded.expect("receipt should be loadable"); + assert_eq!(loaded_path, saved_path); + assert_eq!(loaded_receipt.chunk_count, receipt.chunk_count); + assert_eq!(loaded_receipt.merkle_payment_timestamp, ts); + + delete_for_file(&file_path)?; + assert!(load_for_file(&file_path)?.is_none()); + Ok(()) + } +} diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 134a71a..669170b 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -1260,36 +1260,121 @@ impl Client { } // Phase 2: Decide payment mode and upload in waves from disk. + // + // For the merkle path, attempt to resume from a cached + // receipt before paying again. The cache is keyed by the + // source file path; a successful upload deletes the cache so + // a subsequent re-upload of the same path will pay anew. + let file_path_key = path.display().to_string(); let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei) = if self.should_use_merkle(chunk_count, mode) { info!("Using merkle batch payment for {chunk_count} file chunks"); - let batch_result = match self - .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size()) - .await + let batch_result = if let Some((_cache_path, cached)) = + crate::data::client::cached_merkle::try_load_for_file(&file_path_key) { - Ok(result) => result, - Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { - info!("Merkle needs more peers ({msg}), falling back to wave-batch"); - let (stored, sc, gc) = - self.upload_waves_single(&spill, progress.as_ref()).await?; - return Ok(FileUploadResult { - data_map, - chunks_stored: stored, - chunks_failed: 0, - total_chunks: chunk_count, - payment_mode_used: PaymentMode::Single, - storage_cost_atto: sc, - gas_cost_wei: gc, - data_map_address: None, - }); + // Validate the cache matches this upload. If the + // file was edited between attempts the cached + // proofs would no longer be valid for the new + // chunk addresses; in that case drop the cache + // and pay fresh. + let addresses_match = spill + .addresses + .iter() + .all(|addr| cached.proofs.contains_key(addr)); + if addresses_match && cached.proofs.len() == chunk_count { + info!( + "Skipping merkle payment phase; resuming with \ + cached proofs ({} chunks)", + cached.proofs.len() + ); + cached + } else { + info!( + "Cached merkle receipt does not match current file \ + content (cached={}, file={chunk_count}). \ + Discarding cache and paying fresh.", + cached.proofs.len() + ); + crate::data::client::cached_merkle::try_delete_for_file(&file_path_key); + // Fall through to fresh payment below. + match self + .pay_for_merkle_batch( + &spill.addresses, + DATA_TYPE_CHUNK, + spill.avg_chunk_size(), + ) + .await + { + Ok(result) => { + crate::data::client::cached_merkle::try_save( + &file_path_key, + &result, + ); + result + } + Err(Error::InsufficientPeers(ref msg)) + if mode == PaymentMode::Auto => + { + info!( + "Merkle needs more peers ({msg}), falling back to wave-batch" + ); + let (stored, sc, gc) = + self.upload_waves_single(&spill, progress.as_ref()).await?; + return Ok(FileUploadResult { + data_map, + chunks_stored: stored, + chunks_failed: 0, + total_chunks: chunk_count, + payment_mode_used: PaymentMode::Single, + storage_cost_atto: sc, + gas_cost_wei: gc, + data_map_address: None, + }); + } + Err(e) => return Err(e), + } + } + } else { + match self + .pay_for_merkle_batch( + &spill.addresses, + DATA_TYPE_CHUNK, + spill.avg_chunk_size(), + ) + .await + { + Ok(result) => { + // Save BEFORE the store phase so a crash + // mid-upload leaves a resumable receipt. + crate::data::client::cached_merkle::try_save(&file_path_key, &result); + result + } + Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { + info!("Merkle needs more peers ({msg}), falling back to wave-batch"); + let (stored, sc, gc) = + self.upload_waves_single(&spill, progress.as_ref()).await?; + return Ok(FileUploadResult { + data_map, + chunks_stored: stored, + chunks_failed: 0, + total_chunks: chunk_count, + payment_mode_used: PaymentMode::Single, + storage_cost_atto: sc, + gas_cost_wei: gc, + data_map_address: None, + }); + } + Err(e) => return Err(e), } - Err(e) => return Err(e), }; let (stored, sc, gc) = self .upload_waves_merkle(&spill, &batch_result, progress.as_ref()) .await?; + // Upload succeeded end-to-end; the cached receipt is + // no longer needed. + crate::data::client::cached_merkle::try_delete_for_file(&file_path_key); (stored, PaymentMode::Merkle, sc, gc) } else { let (stored, sc, gc) = self.upload_waves_single(&spill, progress.as_ref()).await?; diff --git a/ant-core/src/data/client/merkle.rs b/ant-core/src/data/client/merkle.rs index ac2e9a1..1b5a780 100644 --- a/ant-core/src/data/client/merkle.rs +++ b/ant-core/src/data/client/merkle.rs @@ -44,7 +44,10 @@ pub enum PaymentMode { } /// Result of a merkle batch payment. -#[derive(Debug)] +/// +/// Serializable so it can be persisted across runs for resume after a +/// partial-upload failure. See `crate::data::client::cached_merkle`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct MerkleBatchPaymentResult { /// Map of `XorName` to serialized tagged proof bytes (ready to use in PUT requests). pub proofs: HashMap<[u8; 32], Vec>, @@ -54,6 +57,12 @@ pub struct MerkleBatchPaymentResult { pub storage_cost_atto: String, /// Total gas cost in wei. pub gas_cost_wei: u128, + /// Unix timestamp (seconds) used for the on-chain merkle payment. + /// Persisted so resume can check whether the on-chain payment has + /// aged out beyond the merkle expiration window and the cached + /// receipt must be discarded. + #[serde(default)] + pub merkle_payment_timestamp: u64, } /// Prepared merkle batch ready for external payment. @@ -252,6 +261,10 @@ impl Client { let mut all_proofs = HashMap::with_capacity(addresses.len()); let mut total_storage = Amount::ZERO; let mut total_gas: u128 = 0; + // Track the oldest sub-batch timestamp so the overall receipt + // expires when the *first* sub-batch's on-chain payment ages + // out (worst case for resume). + let mut oldest_ts: u64 = 0; for (i, chunk) in sub_batches.into_iter().enumerate() { match self @@ -263,6 +276,12 @@ impl Client { total_storage += cost; } total_gas = total_gas.saturating_add(sub_result.gas_cost_wei); + if oldest_ts == 0 + || (sub_result.merkle_payment_timestamp > 0 + && sub_result.merkle_payment_timestamp < oldest_ts) + { + oldest_ts = sub_result.merkle_payment_timestamp; + } all_proofs.extend(sub_result.proofs); } Err(e) => { @@ -282,6 +301,7 @@ impl Client { proofs: all_proofs, storage_cost_atto: total_storage.to_string(), gas_cost_wei: total_gas, + merkle_payment_timestamp: oldest_ts, }); } } @@ -292,6 +312,7 @@ impl Client { proofs: all_proofs, storage_cost_atto: total_storage.to_string(), gas_cost_wei: total_gas, + merkle_payment_timestamp: oldest_ts, }) } @@ -633,6 +654,7 @@ pub fn finalize_merkle_batch( chunk_count, storage_cost_atto: "0".to_string(), gas_cost_wei: 0, + merkle_payment_timestamp: prepared.merkle_payment_timestamp, }) } diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index 5f8cdf8..0faeeff 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -6,6 +6,7 @@ pub mod adaptive; pub mod batch; pub mod cache; +pub(crate) mod cached_merkle; pub mod chunk; pub mod data; pub mod file;