Skip to content

Commit 7dd6a65

Browse files
committed
InternalMerger adopts tail chunks rather than copy them
1 parent c7b474b commit 7dd6a65

1 file changed

Lines changed: 18 additions & 26 deletions

File tree

differential-dataflow/src/trace/implementations/merge_batcher.rs

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -432,12 +432,6 @@ pub mod container {
432432
}
433433
}
434434

435-
/// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
436-
///
437-
/// Note: The `VecMerger` type implements `Merger` directly and avoids
438-
/// cloning by draining inputs. This `InternalMerge` impl is retained
439-
/// because `reduce` requires `Builder::Input: InternalMerge`.
440-
441435
/// A merger that uses internal iteration via [`InternalMerge`].
442436
pub struct InternalMerger<MC> {
443437
_marker: PhantomData<MC>,
@@ -462,6 +456,9 @@ pub mod container {
462456
stash.push(chunk);
463457
}
464458
/// Drain remaining items from one side into `result`/`output`.
459+
///
460+
/// Copies the partially-consumed head into `result`, then appends
461+
/// remaining full chunks directly to `output` without copying.
465462
fn drain_side(
466463
&self,
467464
head: &mut MC,
@@ -471,21 +468,20 @@ pub mod container {
471468
output: &mut Vec<MC>,
472469
stash: &mut Vec<MC>,
473470
) {
474-
while *pos < head.len() {
471+
// Copy the partially-consumed head into result.
472+
if *pos < head.len() {
475473
result.merge_from(
476474
std::slice::from_mut(head),
477475
std::slice::from_mut(pos),
478476
);
479-
if *pos >= head.len() {
480-
let old = std::mem::replace(head, list.next().unwrap_or_default());
481-
self.recycle(old, stash);
482-
*pos = 0;
483-
}
484-
if result.at_capacity() {
485-
output.push(std::mem::take(result));
486-
*result = self.empty(stash);
487-
}
488477
}
478+
// Flush result before appending full chunks.
479+
if !result.is_empty() {
480+
output.push(std::mem::take(result));
481+
*result = self.empty(stash);
482+
}
483+
// Remaining full chunks go directly to output.
484+
output.extend(list);
489485
}
490486
}
491487

@@ -525,20 +521,12 @@ pub mod container {
525521
}
526522
}
527523

528-
// Drain remaining from side 0.
524+
// Drain remaining from each side: copy partial head, then append full chunks.
529525
self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
530-
if !result.is_empty() {
531-
output.push(std::mem::take(&mut result));
532-
result = self.empty(stash);
533-
}
534-
output.extend(list1);
535-
536-
// Drain remaining from side 1.
537526
self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
538527
if !result.is_empty() {
539-
output.push(std::mem::take(&mut result));
528+
output.push(result);
540529
}
541-
output.extend(list2);
542530
}
543531

544532
fn extract(
@@ -579,6 +567,10 @@ pub mod container {
579567
}
580568

581569
/// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
570+
///
571+
/// Note: The `VecMerger` type implements `Merger` directly and avoids
572+
/// cloning by draining inputs. This `InternalMerge` impl is retained
573+
/// because `reduce` requires `Builder::Input: InternalMerge`.
582574
pub mod vec_internal {
583575
use std::cmp::Ordering;
584576
use timely::PartialOrder;

0 commit comments

Comments
 (0)