Skip to content

Commit c7b474b

Browse files
committed
Tidy up structure of logic
1 parent 447ec30 commit c7b474b

1 file changed

Lines changed: 29 additions & 33 deletions

File tree

differential-dataflow/src/trace/implementations/merge_batcher.rs

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -311,25 +311,21 @@ pub mod container {
311311
}
312312
container
313313
}
314-
/// Ensure `queue` is non-empty by pulling from `iter` if needed.
315-
/// Returns `true` if `queue` has data, `false` if both are exhausted.
316-
/// Recycles drained queues into `stash` for allocation reuse.
317-
fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) -> bool {
318-
let target = Self::target_capacity();
319-
while queue.is_empty() {
314+
/// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`.
315+
fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
316+
if queue.is_empty() {
317+
let target = Self::target_capacity();
320318
if stash.len() < 2 {
321319
let mut recycled = Vec::from(std::mem::take(queue));
322320
recycled.clear();
323321
if recycled.capacity() == target {
324322
stash.push(recycled);
325323
}
326324
}
327-
match iter.next() {
328-
Some(chunk) => *queue = std::collections::VecDeque::from(chunk),
329-
None => return false,
325+
if let Some(chunk) = iter.next() {
326+
*queue = std::collections::VecDeque::from(chunk);
330327
}
331328
}
332-
true
333329
}
334330
}
335331

@@ -359,33 +355,33 @@ pub mod container {
359355

360356
let mut result = self.empty(stash);
361357

362-
// Merge while both queues can be kept non-empty.
363-
while Self::refill(&mut q1, &mut iter1, stash) && Self::refill(&mut q2, &mut iter2, stash) {
364-
while !q1.is_empty() && !q2.is_empty() {
365-
let (d1, t1, _) = q1.front().unwrap();
366-
let (d2, t2, _) = q2.front().unwrap();
367-
match (d1, t1).cmp(&(d2, t2)) {
368-
Ordering::Less => {
369-
result.push(q1.pop_front().unwrap());
370-
}
371-
Ordering::Greater => {
372-
result.push(q2.pop_front().unwrap());
373-
}
374-
Ordering::Equal => {
375-
let (d, t, mut r1) = q1.pop_front().unwrap();
376-
let (_, _, r2) = q2.pop_front().unwrap();
377-
r1.plus_equals(&r2);
378-
if !r1.is_zero() {
379-
result.push((d, t, r1));
380-
}
358+
// Merge while both queues are non-empty.
359+
while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
360+
match (d1, t1).cmp(&(d2, t2)) {
361+
Ordering::Less => {
362+
result.push(q1.pop_front().unwrap());
363+
}
364+
Ordering::Greater => {
365+
result.push(q2.pop_front().unwrap());
366+
}
367+
Ordering::Equal => {
368+
let (d, t, mut r1) = q1.pop_front().unwrap();
369+
let (_, _, r2) = q2.pop_front().unwrap();
370+
r1.plus_equals(&r2);
371+
if !r1.is_zero() {
372+
result.push((d, t, r1));
381373
}
382374
}
375+
}
383376

384-
if result.at_capacity() {
385-
output.push(std::mem::take(&mut result));
386-
result = self.empty(stash);
387-
}
377+
if result.at_capacity() {
378+
output.push(std::mem::take(&mut result));
379+
result = self.empty(stash);
388380
}
381+
382+
// Refill emptied queues from their chains.
383+
if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
384+
if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
389385
}
390386

391387
// Push partial result and remaining data from both sides.

0 commit comments

Comments
 (0)