Skip to content

Commit 447ec30

Browse files
committed
Dial in buffer sizing
1 parent bfdc9ca commit 447ec30

1 file changed

Lines changed: 18 additions & 2 deletions

File tree

differential-dataflow/src/trace/implementations/merge_batcher.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,20 +293,36 @@ pub mod container {
293293
}
294294

295295
impl<D, T, R> VecMerger<D, T, R> {
296+
/// The target capacity for output buffers, as a power of two.
297+
///
298+
/// This amount is used to size vectors, where vectors not exactly this capacity are dropped.
299+
/// If this is mis-set, there is the potential for more memory churn than anticipated.
300+
fn target_capacity() -> usize {
301+
timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
302+
}
303+
/// Acquire a buffer with the target capacity.
296304
fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
305+
let target = Self::target_capacity();
297306
let mut container = stash.pop().unwrap_or_default();
298-
container.ensure_capacity(&mut None);
307+
container.clear();
308+
// Reuse if at target; otherwise allocate fresh.
309+
if container.capacity() != target {
310+
container = Vec::with_capacity(target);
311+
}
299312
container
300313
}
301314
/// Ensure `queue` is non-empty by pulling from `iter` if needed.
302315
/// Returns `true` if `queue` has data, `false` if both are exhausted.
303316
/// Recycles drained queues into `stash` for allocation reuse.
304317
fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) -> bool {
318+
let target = Self::target_capacity();
305319
while queue.is_empty() {
306320
if stash.len() < 2 {
307321
let mut recycled = Vec::from(std::mem::take(queue));
308322
recycled.clear();
309-
stash.push(recycled);
323+
if recycled.capacity() == target {
324+
stash.push(recycled);
325+
}
310326
}
311327
match iter.next() {
312328
Some(chunk) => *queue = std::collections::VecDeque::from(chunk),

0 commit comments

Comments
 (0)