Skip to content

Commit 6bc311d

Browse files
prestwichclaude
andauthored
fix: reduce excess allocations (ENG-2036) (#49)
* fix: reduce excess allocations in storage workspace (ENG-2036) Eliminate unnecessary clones and intermediate collections: - drain_above: zip iterators instead of indexed .get().cloned() - ColdReceipt::new: destructure and into_iter() logs instead of cloning - write_block_to_tx: single-pass loop instead of intermediate Vec - SqlColdBackend::new: match on &str instead of .to_owned() Add ENG-2036 comments for intentionally deferred items (Filter clone, eager into_owned in traversals, format!() SQL queries). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: add debug_assert for tx/receipt length match Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0120567 commit 6bc311d

6 files changed

Lines changed: 57 additions & 46 deletions

File tree

crates/cold-mdbx/src/backend.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,31 +40,25 @@ fn write_block_to_tx(
4040
// Hash-keyed indices use put (keys are not sequential)
4141
tx.queue_put::<ColdBlockHashIndex>(&data.header.hash(), &block)?;
4242

43-
// Store transactions, senders, and build hash index
44-
let tx_meta: Vec<_> = data
45-
.transactions
46-
.iter()
47-
.enumerate()
48-
.map(|(idx, recovered_tx)| {
49-
let tx_idx = idx as u64;
50-
let sender = recovered_tx.signer();
51-
let tx_signed: &TransactionSigned = recovered_tx;
52-
tx.queue_append_dual::<ColdTransactions>(&block, &tx_idx, tx_signed)?;
53-
tx.queue_append_dual::<ColdTxSenders>(&block, &tx_idx, &sender)?;
54-
tx.queue_put::<ColdTxHashIndex>(tx_signed.hash(), &TxLocation::new(block, tx_idx))?;
55-
Ok((*tx_signed.hash(), sender))
56-
})
57-
.collect::<Result<_, MdbxColdError>>()?;
58-
59-
// Compute and store IndexedReceipts with precomputed metadata
43+
// Store transactions, senders, hash indices, and receipts in a single
44+
// pass to avoid an intermediate Vec of (tx_hash, sender) tuples.
6045
let mut first_log_index = 0u64;
6146
let mut prior_cumulative_gas = 0u64;
62-
for (idx, (receipt, (tx_hash, sender))) in data.receipts.into_iter().zip(tx_meta).enumerate() {
47+
debug_assert_eq!(data.transactions.len(), data.receipts.len());
48+
for (idx, (recovered_tx, receipt)) in data.transactions.iter().zip(data.receipts).enumerate() {
49+
let tx_idx = idx as u64;
50+
let sender = recovered_tx.signer();
51+
let tx_signed: &TransactionSigned = recovered_tx;
52+
tx.queue_append_dual::<ColdTransactions>(&block, &tx_idx, tx_signed)?;
53+
tx.queue_append_dual::<ColdTxSenders>(&block, &tx_idx, &sender)?;
54+
tx.queue_put::<ColdTxHashIndex>(tx_signed.hash(), &TxLocation::new(block, tx_idx))?;
55+
56+
let tx_hash = *tx_signed.hash();
6357
let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
6458
prior_cumulative_gas = receipt.inner.cumulative_gas_used;
6559
let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
6660
first_log_index += ir.receipt.inner.logs.len() as u64;
67-
tx.queue_append_dual::<ColdReceipts>(&block, &(idx as u64), &ir)?;
61+
tx.queue_append_dual::<ColdReceipts>(&block, &tx_idx, &ir)?;
6862
}
6963

7064
for (idx, event) in data.signet_events.iter().enumerate() {
@@ -731,6 +725,9 @@ impl ColdStorageRead for MdbxColdBackend {
731725

732726
async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
733727
let env = self.env.clone();
728+
// ENG-2036: clone required to move into spawn_blocking. Eliminating
729+
// this would require changing the ColdStorage trait to pass owned
730+
// Filter, which is a cross-crate API change.
734731
let filter = filter.clone();
735732
let std_deadline = params.deadline.into_std();
736733
let _ = tokio::task::spawn_blocking(move || {

crates/cold-sql/src/backend.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,21 +112,20 @@ impl SqlColdBackend {
112112
pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
113113
// Detect backend from a pooled connection.
114114
let conn = pool.acquire().await?;
115-
let backend = conn.backend_name().to_owned();
116-
drop(conn);
115+
let backend = conn.backend_name();
117116

118-
let migration = match backend.as_str() {
119-
"PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
120-
"SQLite" => include_str!("../migrations/001_initial.sql"),
117+
let (migration, is_postgres) = match backend {
118+
"PostgreSQL" => (include_str!("../migrations/001_initial_pg.sql"), true),
119+
"SQLite" => (include_str!("../migrations/001_initial.sql"), false),
121120
other => {
122121
return Err(SqlColdError::Convert(format!(
123122
"unsupported database backend: {other}"
124123
)));
125124
}
126125
};
126+
drop(conn);
127127
// Execute via pool to ensure the migration uses the same
128128
// connection that subsequent queries will use.
129-
let is_postgres = backend == "PostgreSQL";
130129
sqlx::raw_sql(migration).execute(&pool).await?;
131130
Ok(Self { pool, is_postgres })
132131
}
@@ -247,6 +246,10 @@ impl SqlColdBackend {
247246
// Build the parameterised query once. $1 is the block number
248247
// (bound per iteration); remaining parameters are the address
249248
// and topic filters from the user's request.
249+
//
250+
// ENG-2036: format!() allocates here, but the dynamic filter clause
251+
// makes this unavoidable. sqlx's prepared-statement cache mitigates
252+
// repeated execution cost.
250253
let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
251254
let data_sql = format!(
252255
"SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
@@ -1348,6 +1351,8 @@ impl ColdStorageRead for SqlColdBackend {
13481351
let to = filter.get_to_block().unwrap_or(u64::MAX);
13491352

13501353
// Build WHERE clause: block range ($1, $2) + address/topic filters.
1354+
// ENG-2036: dynamic filter clause requires format!(); mitigated by
1355+
// sqlx prepared-statement cache.
13511356
let (filter_clause, params) = build_log_filter_clause(filter, 3);
13521357
let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
13531358

crates/cold/src/cold_receipt.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,40 +47,42 @@ impl ColdReceipt {
4747
let block_hash = header.hash();
4848
let block_timestamp = header.timestamp;
4949

50-
let rpc_logs: Vec<RpcLog> = ir
51-
.receipt
50+
// Destructure to move fields instead of cloning logs.
51+
let IndexedReceipt { receipt: typed_receipt, tx_hash, first_log_index, gas_used, sender } =
52+
ir;
53+
let tx_type = typed_receipt.tx_type;
54+
let status = typed_receipt.inner.status;
55+
let cumulative_gas_used = typed_receipt.inner.cumulative_gas_used;
56+
57+
let rpc_logs: Vec<RpcLog> = typed_receipt
5258
.inner
5359
.logs
54-
.iter()
60+
.into_iter()
5561
.enumerate()
5662
.map(|(log_idx, log)| RpcLog {
57-
inner: log.clone(),
63+
inner: log,
5864
block_hash: Some(block_hash),
5965
block_number: Some(block_number),
6066
block_timestamp: Some(block_timestamp),
61-
transaction_hash: Some(ir.tx_hash),
67+
transaction_hash: Some(tx_hash),
6268
transaction_index: Some(transaction_index),
63-
log_index: Some(ir.first_log_index + log_idx as u64),
69+
log_index: Some(first_log_index + log_idx as u64),
6470
removed: false,
6571
})
6672
.collect();
6773

68-
let receipt = ConsensusReceipt {
69-
status: ir.receipt.inner.status,
70-
cumulative_gas_used: ir.receipt.inner.cumulative_gas_used,
71-
logs: rpc_logs,
72-
};
74+
let receipt = ConsensusReceipt { status, cumulative_gas_used, logs: rpc_logs };
7375

7476
Self {
7577
receipt,
76-
tx_type: ir.receipt.tx_type,
77-
tx_hash: ir.tx_hash,
78-
gas_used: ir.gas_used,
78+
tx_type,
79+
tx_hash,
80+
gas_used,
7981
block_number,
8082
block_hash,
8183
block_timestamp,
8284
transaction_index,
83-
from: ir.sender,
85+
from: sender,
8486
}
8587
}
8688
}

crates/hot/src/model/traverse/dual_key.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ pub trait DualKeyTraverse<E: HotKvReadError> {
8585
where
8686
Self: Sized,
8787
{
88+
// ENG-2036: into_owned() here eagerly materializes the first entry,
89+
// wasting an allocation if the iterator is never consumed. Deferring
90+
// this requires lifetime changes to the iterator struct.
8891
let first_entry =
8992
self.first()?.map(|(k1, k2, v)| (k1.into_owned(), k2.into_owned(), v.into_owned()));
9093
Ok(RawDualKeyIter {
@@ -107,6 +110,7 @@ pub trait DualKeyTraverse<E: HotKvReadError> {
107110
where
108111
Self: Sized,
109112
{
113+
// ENG-2036: same eager into_owned() as iter() above.
110114
let first_entry = self
111115
.next_dual_above(k1, k2)?
112116
.map(|(k1, k2, v)| (k1.into_owned(), k2.into_owned(), v.into_owned()));

crates/hot/src/model/traverse/kv.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ pub trait KvTraverse<E: HotKvReadError> {
4343
where
4444
Self: Sized,
4545
{
46+
// ENG-2036: into_owned() here eagerly materializes the first entry,
47+
// wasting an allocation if the iterator is never consumed. Deferring
48+
// this requires lifetime changes to the iterator struct.
4649
let first_entry = self.first()?.map(|(k, v)| (k.into_owned(), v.into_owned()));
4750
Ok(RawKvIter {
4851
cursor: self,
@@ -65,6 +68,7 @@ pub trait KvTraverse<E: HotKvReadError> {
6568
where
6669
Self: Sized,
6770
{
71+
// ENG-2036: same eager into_owned() as iter() above.
6872
let first_entry = self.lower_bound(key)?.map(|(k, v)| (k.into_owned(), v.into_owned()));
6973
Ok(RawKvIter {
7074
cursor: self,

crates/storage/src/unified.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -266,14 +266,13 @@ impl<H: HotKv> UnifiedStorage<H> {
266266
// 3. Atomically drain cold (best-effort — failure = normal cold lag)
267267
let cold_receipts = self.cold.drain_above(block).await.unwrap_or_default();
268268

269-
// 4. Assemble drained blocks (zip headers with receipts, default empty)
269+
// 4. Assemble drained blocks (zip headers with receipts, default empty).
270+
// Pad cold_receipts to match headers length so zip consumes both
271+
// without cloning.
270272
let drained = headers
271273
.into_iter()
272-
.enumerate()
273-
.map(|(i, header)| {
274-
let receipts = cold_receipts.get(i).cloned().unwrap_or_default();
275-
DrainedBlock { header, receipts }
276-
})
274+
.zip(cold_receipts.into_iter().chain(std::iter::repeat_with(Vec::new)))
275+
.map(|(header, receipts)| DrainedBlock { header, receipts })
277276
.collect();
278277

279278
Ok(drained)

0 commit comments

Comments
 (0)