Skip to content

Commit bfdc9ca

Browse files
committed
Introduce VecMerger to efficiently merge owning vectors
1 parent 47c4ec1 commit bfdc9ca

1 file changed

Lines changed: 145 additions & 2 deletions

File tree

differential-dataflow/src/trace/implementations/merge_batcher.rs

Lines changed: 145 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,154 @@ pub mod container {
278278
);
279279
}
280280

281-
/// A `Merger` using internal iteration for `Vec` containers.
282-
pub type VecInternalMerger<D, T, R> = InternalMerger<Vec<(D, T, R)>>;
281+
/// A `Merger` for `Vec` containers, which contain owned data and need special treatment.
282+
pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
283283
/// A `Merger` using internal iteration for `TimelyStack` containers.
284284
pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
285285

286+
/// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
287+
pub struct VecMerger<D, T, R> {
288+
_marker: PhantomData<(D, T, R)>,
289+
}
290+
291+
impl<D, T, R> Default for VecMerger<D, T, R> {
292+
fn default() -> Self { Self { _marker: PhantomData } }
293+
}
294+
295+
impl<D, T, R> VecMerger<D, T, R> {
296+
fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
297+
let mut container = stash.pop().unwrap_or_default();
298+
container.ensure_capacity(&mut None);
299+
container
300+
}
301+
/// Ensure `queue` is non-empty by pulling from `iter` if needed.
302+
/// Returns `true` if `queue` has data, `false` if both are exhausted.
303+
/// Recycles drained queues into `stash` for allocation reuse.
304+
fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) -> bool {
305+
while queue.is_empty() {
306+
if stash.len() < 2 {
307+
let mut recycled = Vec::from(std::mem::take(queue));
308+
recycled.clear();
309+
stash.push(recycled);
310+
}
311+
match iter.next() {
312+
Some(chunk) => *queue = std::collections::VecDeque::from(chunk),
313+
None => return false,
314+
}
315+
}
316+
true
317+
}
318+
}
319+
320+
impl<D, T, R> Merger for VecMerger<D, T, R>
321+
where
322+
D: Ord + Clone + 'static,
323+
T: Ord + Clone + PartialOrder + 'static,
324+
R: crate::difference::Semigroup + Clone + 'static,
325+
{
326+
type Chunk = Vec<(D, T, R)>;
327+
type Time = T;
328+
329+
fn merge(
330+
&mut self,
331+
list1: Vec<Vec<(D, T, R)>>,
332+
list2: Vec<Vec<(D, T, R)>>,
333+
output: &mut Vec<Vec<(D, T, R)>>,
334+
stash: &mut Vec<Vec<(D, T, R)>>,
335+
) {
336+
use std::cmp::Ordering;
337+
use std::collections::VecDeque;
338+
339+
let mut iter1 = list1.into_iter();
340+
let mut iter2 = list2.into_iter();
341+
let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
342+
let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
343+
344+
let mut result = self.empty(stash);
345+
346+
// Merge while both queues can be kept non-empty.
347+
while Self::refill(&mut q1, &mut iter1, stash) && Self::refill(&mut q2, &mut iter2, stash) {
348+
while !q1.is_empty() && !q2.is_empty() {
349+
let (d1, t1, _) = q1.front().unwrap();
350+
let (d2, t2, _) = q2.front().unwrap();
351+
match (d1, t1).cmp(&(d2, t2)) {
352+
Ordering::Less => {
353+
result.push(q1.pop_front().unwrap());
354+
}
355+
Ordering::Greater => {
356+
result.push(q2.pop_front().unwrap());
357+
}
358+
Ordering::Equal => {
359+
let (d, t, mut r1) = q1.pop_front().unwrap();
360+
let (_, _, r2) = q2.pop_front().unwrap();
361+
r1.plus_equals(&r2);
362+
if !r1.is_zero() {
363+
result.push((d, t, r1));
364+
}
365+
}
366+
}
367+
368+
if result.at_capacity() {
369+
output.push(std::mem::take(&mut result));
370+
result = self.empty(stash);
371+
}
372+
}
373+
}
374+
375+
// Push partial result and remaining data from both sides.
376+
if !result.is_empty() { output.push(result); }
377+
for q in [q1, q2] {
378+
if !q.is_empty() { output.push(Vec::from(q)); }
379+
}
380+
output.extend(iter1);
381+
output.extend(iter2);
382+
}
383+
384+
fn extract(
385+
&mut self,
386+
merged: Vec<Vec<(D, T, R)>>,
387+
upper: AntichainRef<T>,
388+
frontier: &mut Antichain<T>,
389+
ship: &mut Vec<Vec<(D, T, R)>>,
390+
kept: &mut Vec<Vec<(D, T, R)>>,
391+
stash: &mut Vec<Vec<(D, T, R)>>,
392+
) {
393+
let mut keep = self.empty(stash);
394+
let mut ready = self.empty(stash);
395+
396+
for chunk in merged {
397+
for (data, time, diff) in chunk {
398+
if upper.less_equal(&time) {
399+
frontier.insert_with(&time, |time| time.clone());
400+
keep.push((data, time, diff));
401+
} else {
402+
ready.push((data, time, diff));
403+
}
404+
}
405+
if keep.at_capacity() {
406+
kept.push(std::mem::take(&mut keep));
407+
keep = self.empty(stash);
408+
}
409+
if ready.at_capacity() {
410+
ship.push(std::mem::take(&mut ready));
411+
ready = self.empty(stash);
412+
}
413+
}
414+
if !keep.is_empty() { kept.push(keep); }
415+
if !ready.is_empty() { ship.push(ready); }
416+
}
417+
418+
fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
419+
(chunk.len(), 0, 0, 0)
420+
}
421+
}
422+
423+
/// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
424+
///
425+
/// Note: The `VecMerger` type implements `Merger` directly and avoids
426+
/// cloning by draining inputs. This `InternalMerge` impl is retained
427+
/// because `reduce` requires `Builder::Input: InternalMerge`.
428+
286429
/// A merger that uses internal iteration via [`InternalMerge`].
287430
pub struct InternalMerger<MC> {
288431
_marker: PhantomData<MC>,

0 commit comments

Comments
 (0)