Skip to content

Commit 7944101

Browse files
committed
Consolidate storage types to avoid double allocations
1 parent 2a0fa88 commit 7944101

1 file changed

Lines changed: 97 additions & 154 deletions

File tree

dogsdogsdogs/src/operators/half_join2.rs

Lines changed: 97 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,10 @@ where
147147
// Stash for (time, diff) accumulation.
148148
let mut output_buffer = Vec::new();
149149

150-
// Stage 1: Stuck blobs sorted by (initial, data). Nibbled from the front
151-
// as the arrangement frontier advances to determine eligible records.
152-
let mut stuck: Vec<StuckBlob<(K, V, G::Timestamp), G::Timestamp, R>> = Vec::new();
153-
// Stage 2: Ready blobs sorted by (data, initial). Consumed from the front
154-
// one record at a time, yield-safe.
155-
let mut ready: Vec<ReadyBlob<(K, V, G::Timestamp), G::Timestamp, R>> = Vec::new();
156-
// Buffer for new arrivals, stored as (initial, data, diff) for direct consolidation.
157-
let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new();
150+
// Unified blobs: each blob holds data in (T, D, R) order, with a stuck_count
151+
// tracking how many elements at the back are not yet eligible for processing.
152+
// The ready prefix is sorted by (D, T, R) for cursor traversal.
153+
let mut blobs: Vec<Blob<(K, V, G::Timestamp), G::Timestamp, R>> = Vec::new();
158154

159155
let scope = stream.scope();
160156
stream.inner.binary_frontier(arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {
@@ -164,7 +160,8 @@ where
164160

165161
move |(input1, frontier1), (input2, frontier2), output| {
166162

167-
// Drain all input for this activation into a single buffer.
163+
// Drain all input into a single buffer.
164+
let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new();
168165
let mut caps = CapabilitySet::new();
169166
input1.for_each(|capability, data| {
170167
caps.insert(capability.retain(0));
@@ -194,138 +191,90 @@ where
194191
!(0..time_con.len()).any(|i| comparison(time_con.index(i), initial))
195192
};
196193

197-
// Form new blobs from this activation's arrivals, partitioning
198-
// immediately-eligible records from those that must wait.
194+
// Form a new blob from arrivals.
195+
// consolidate_updates sorts by (T, D, R) — the stuck order.
196+
consolidate_updates(&mut arriving);
197+
199198
if !arriving.is_empty() {
200-
consolidate_updates(&mut arriving);
201-
202-
if !arriving.is_empty() {
203-
// Partition: eligible records go straight to a ready blob,
204-
// the rest go to a stuck blob. `arriving` is sorted by
205-
// (initial, data) after consolidation, and eligibility is
206-
// monotone in total-order initial time, so we can split
207-
// with a linear scan.
208-
let split = arriving.iter().position(|&(ref t, _, _)| !eligible(t)).unwrap_or(arriving.len());
209-
210-
// Stuck portion (tail).
211-
if split < arriving.len() {
212-
let stuck_data: Vec<_> = arriving.drain(split..).collect();
213-
let mut lower = MutableAntichain::new();
214-
lower.update_iter(stuck_data.iter().map(|(t, _, _)| (t.clone(), 1)));
215-
let mut stuck_caps = CapabilitySet::new();
216-
for time in lower.frontier().iter() {
217-
stuck_caps.insert(caps.delayed(time));
218-
}
219-
stuck.push(StuckBlob {
220-
caps: stuck_caps,
221-
lower,
222-
data: stuck_data.into(),
223-
});
224-
}
199+
let mut lower = MutableAntichain::new();
200+
lower.update_iter(arriving.iter().map(|(t, _, _)| (t.clone(), 1)));
201+
let mut blob_caps = CapabilitySet::new();
202+
for time in lower.frontier().iter() {
203+
blob_caps.insert(caps.delayed(time));
204+
}
225205

226-
// Eligible portion (head) — rearrange to (data, initial, diff)
227-
// and consolidate for key-sorted cursor traversal.
228-
if !arriving.is_empty() {
229-
let mut active_data: Vec<_> = arriving.drain(..)
230-
.map(|(t, d, r)| (d, t, r))
231-
.collect();
232-
consolidate_updates(&mut active_data);
233-
234-
if !active_data.is_empty() {
235-
let mut pile_lower = MutableAntichain::new();
236-
pile_lower.update_iter(active_data.iter().map(|(_, t, _)| (t.clone(), 1)));
237-
let mut ready_caps = CapabilitySet::new();
238-
for time in pile_lower.frontier().iter() {
239-
ready_caps.insert(caps.delayed(time));
240-
}
241-
ready.push(ReadyBlob {
242-
caps: ready_caps,
243-
lower: pile_lower,
244-
data: VecDeque::from(active_data),
245-
});
246-
}
247-
}
206+
// Determine how many records are stuck (ineligible).
207+
// Data is sorted by (T, D, R) and eligibility is monotone in T,
208+
// so stuck records form a suffix.
209+
let stuck_count = arriving.iter().rev()
210+
.take_while(|(t, _, _)| !eligible(t))
211+
.count();
212+
213+
let mut data: VecDeque<_> = arriving.into();
214+
215+
// Sort the ready prefix by (D, T, R) for cursor traversal.
216+
let ready_len = data.len() - stuck_count;
217+
if ready_len > 0 {
218+
// VecDeque slices: make_contiguous then sort the prefix.
219+
let slice = data.make_contiguous();
220+
slice[..ready_len].sort_by(|(t1, d1, r1), (t2, d2, r2)| {
221+
(d1, t1, r1).cmp(&(d2, t2, r2))
222+
});
248223
}
224+
225+
blobs.push(Blob {
226+
caps: blob_caps,
227+
lower,
228+
data,
229+
stuck_count,
230+
});
249231
}
250232

251-
// Stage 1: nibble stuck blobs to produce new ready piles.
252-
{
253-
// Collect all eligible records across all blobs into one pile.
254-
let mut eligible_vec = Vec::new();
255-
let mut eligible_caps = CapabilitySet::new();
256-
257-
for blob in stuck.iter_mut() {
258-
// Pop eligible records from the front. The deque is sorted by
259-
// initial time in total order, and `comparison` is monotone,
260-
// so we stop at the first ineligible record.
261-
let before = eligible_vec.len();
262-
while let Some((initial, _, _)) = blob.data.front() {
263-
if !eligible(initial) {
264-
break;
265-
}
266-
eligible_vec.push(blob.data.pop_front().unwrap());
267-
}
233+
// Nibble: only when all ready elements have been drained (stuck_count == len),
234+
// check if stuck records have become eligible and promote them.
235+
for blob in blobs.iter_mut().filter(|b| b.stuck_count == b.data.len()) {
268236

269-
if eligible_vec.len() > before {
270-
// Grab caps for the eligible records before downgrading the blob.
271-
let mut frontier = Antichain::new();
272-
for (initial, _, _) in eligible_vec[before..].iter() {
273-
frontier.insert(initial.clone());
274-
}
275-
for time in frontier.iter() {
276-
eligible_caps.insert(blob.caps.delayed(time));
277-
}
237+
// Count how many stuck records (from the front, which has the
238+
// lowest initial times) are now eligible.
239+
let newly_ready = blob.data.iter().take_while(|(t, _, _)| eligible(t)).count();
278240

279-
// Remove eligible times from the blob's antichain and downgrade.
280-
blob.lower.update_iter(eligible_vec[before..].iter().map(|(t, _, _)| (t.clone(), -1)));
281-
blob.caps.downgrade(&blob.lower.frontier());
282-
}
283-
}
241+
if newly_ready > 0 {
242+
blob.stuck_count -= newly_ready;
284243

285-
stuck.retain(|blob| !blob.data.is_empty());
286-
287-
if !eligible_vec.is_empty() {
288-
// Rearrange to (data, initial, diff) and consolidate.
289-
// consolidate_updates sorts by (data, initial) which is
290-
// the order we want for the active blob.
291-
let mut active_data: Vec<_> = eligible_vec.into_iter()
292-
.map(|(t, d, r)| (d, t, r))
293-
.collect();
294-
consolidate_updates(&mut active_data);
295-
296-
if !active_data.is_empty() {
297-
let mut pile_lower = MutableAntichain::new();
298-
pile_lower.update_iter(active_data.iter().map(|(_, t, _)| (t.clone(), 1)));
299-
eligible_caps.downgrade(&pile_lower.frontier());
300-
301-
ready.push(ReadyBlob {
302-
caps: eligible_caps,
303-
lower: pile_lower,
304-
data: VecDeque::from(active_data),
305-
});
306-
}
244+
// Sort the newly-ready prefix by (D, T, R) for cursor traversal.
245+
let slice = blob.data.make_contiguous();
246+
slice[..newly_ready].sort_by(|(t1, d1, r1), (t2, d2, r2)| {
247+
(d1, t1, r1).cmp(&(d2, t2, r2))
248+
});
249+
250+
// Downgrade capabilities.
251+
let mut new_lower = MutableAntichain::new();
252+
new_lower.update_iter(blob.data.iter().map(|(t, _, _)| (t.clone(), 1)));
253+
blob.lower = new_lower;
254+
blob.caps.downgrade(&blob.lower.frontier());
307255
}
308256
}
309257

310-
// Stage 2: drain ready piles (key-sorted, yield-safe).
311-
for pile in ready.iter_mut() {
258+
// Process ready elements from blobs.
259+
for blob in blobs.iter_mut().filter(|b| b.data.len() > b.stuck_count) {
312260
if yielded { break; }
313261

314-
// For each pile, we'll set up container builders for each distinct capability in the capset.
315-
// The closure gets invoked on a container builder, and if there are containers to extract we
316-
// ship them with the associated capability. We flush at the end.
317-
318-
let mut builders = (0..pile.caps.len()).map(|_| CB::default()).collect::<Vec<_>>();
262+
let mut builders = (0..blob.caps.len()).map(|_| CB::default()).collect::<Vec<_>>();
319263

320264
let (mut cursor, storage) = trace.cursor();
321265
let mut key_con = Tr::KeyContainer::with_capacity(1);
322266
let mut removals: ChangeBatch<G::Timestamp> = ChangeBatch::new();
323267

324-
while let Some(((ref key, ref val1, ref time), ref initial, ref diff1)) = pile.data.front() {
268+
// Process ready elements from the front.
269+
while blob.data.len() > blob.stuck_count {
325270
yielded = yielded || yield_function(timer, work);
326271
if yielded { break; }
327272

328-
let builder_idx = pile.caps.iter().position(|c| c.time().less_equal(initial)).unwrap();
273+
// Peek at the front element. It's in (T, D, R) storage order,
274+
// but the ready prefix has been sorted by (D, T, R).
275+
let (ref initial, (ref key, ref val1, ref time), ref diff1) = blob.data[0];
276+
277+
let builder_idx = blob.caps.iter().position(|c| c.time().less_equal(initial)).unwrap();
329278

330279
key_con.clear(); key_con.push_own(&key);
331280
cursor.seek_key(&storage, key_con.index(0));
@@ -340,7 +289,6 @@ where
340289
});
341290
consolidate(&mut output_buffer);
342291
work += output_buffer.len();
343-
// TODO: Worry about how to avoid reconstructing sessions so often.
344292
output_func(&mut builders[builder_idx], key, val1, val2, initial, diff1, &mut output_buffer);
345293
output_buffer.clear();
346294
cursor.step_val(&storage);
@@ -349,73 +297,68 @@ where
349297
}
350298

351299
while let Some(container) = builders[builder_idx].extract() {
352-
output.session(&pile.caps[builder_idx]).give_container(container);
300+
output.session(&blob.caps[builder_idx]).give_container(container);
353301
}
354302

355-
let (_, initial, _) = pile.data.pop_front().unwrap();
303+
let (initial, _, _) = blob.data.pop_front().unwrap();
356304
removals.update(initial, -1);
357305
}
358306

359-
for builder_idx in 0 .. pile.caps.len() {
307+
for builder_idx in 0 .. blob.caps.len() {
360308
while let Some(container) = builders[builder_idx].finish() {
361-
output.session(&pile.caps[builder_idx]).give_container(container);
309+
output.session(&blob.caps[builder_idx]).give_container(container);
362310
}
363311
}
364312

365313
// Apply all removals in bulk and downgrade once.
366-
if !removals.is_empty() {
367-
pile.lower.update_iter(removals.drain());
368-
pile.caps.downgrade(&pile.lower.frontier());
314+
if blob.data.is_empty() {
315+
// Eagerly release the blob's resources.
316+
blob.lower = MutableAntichain::new();
317+
blob.caps = CapabilitySet::new();
318+
blob.data = VecDeque::default();
319+
} else {
320+
blob.lower.update_iter(removals.drain());
321+
blob.caps.downgrade(&blob.lower.frontier());
369322
}
370323
}
371-
ready.retain(|pile| !pile.data.is_empty());
324+
325+
// Remove fully-consumed blobs.
326+
blobs.retain(|blob| !blob.data.is_empty());
372327
}
373328

374-
// Re-activate if we have ready piles to process.
375-
if !ready.is_empty() {
329+
// Re-activate if we have blobs with ready elements to process.
330+
if blobs.iter().any(|b| b.data.len() > b.stuck_count) {
376331
activator.activate();
377332
}
378333

379-
// The logical merging frontier depends on input1, stuck, and ready blobs.
334+
// The logical merging frontier depends on input1 and all blobs.
380335
let mut frontier = Antichain::new();
381336
for time in frontier1.frontier().iter() {
382337
frontier_func(time, &mut frontier);
383338
}
384-
for blob in stuck.iter() {
385-
for cap in blob.caps.iter() {
386-
frontier_func(cap.time(), &mut frontier);
387-
}
388-
}
389-
for blob in ready.iter() {
339+
for blob in blobs.iter() {
390340
for cap in blob.caps.iter() {
391341
frontier_func(cap.time(), &mut frontier);
392342
}
393343
}
394344
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
395345

396-
if frontier1.is_empty() && stuck.is_empty() && ready.is_empty() {
346+
if frontier1.is_empty() && blobs.is_empty() {
397347
arrangement_trace = None;
398348
}
399349
}
400350
})
401351
}
402352

403-
/// Stuck work sorted by `(initial, data)`. Nibbled from the front as the
404-
/// arrangement frontier advances to determine eligible records.
405-
struct StuckBlob<D, T: Timestamp, R> {
353+
/// A unified blob of updates. Data is stored as `(T, D, R)` tuples in a VecDeque.
354+
/// The last `stuck_count` elements are stuck (not yet eligible for processing),
355+
/// sorted by `(T, D, R)` from consolidation. The ready prefix (everything before
356+
/// the stuck tail) is sorted by `(D, T, R)` for efficient cursor traversal.
357+
/// Ready elements are consumed from the front via `pop_front`.
358+
struct Blob<D, T: Timestamp, R> {
406359
caps: CapabilitySet<T>,
407360
lower: MutableAntichain<T>,
408361
data: VecDeque<(T, D, R)>,
409-
}
410-
411-
/// Ready work sorted by `(data, initial)`. Consumed from the front one record
412-
/// at a time during trace lookups. Yield-safe: can stop and resume at any point,
413-
/// because the work can be resumed without re-sorting. Strictly speaking we will
414-
/// do a MutableAntichain rebuild, which could be linear time, but fixing that is
415-
/// future work.
416-
// TODO: Fix the thing in the comments.
417-
struct ReadyBlob<D, T: Timestamp, R> {
418-
caps: CapabilitySet<T>,
419-
lower: MutableAntichain<T>,
420-
data: VecDeque<(D, T, R)>,
362+
/// Number of stuck (ineligible) elements at the back of `data`.
363+
stuck_count: usize,
421364
}

0 commit comments

Comments
 (0)