@@ -22,9 +22,9 @@ use std::ops::Mul;
2222
2323use timely:: ContainerBuilder ;
2424use timely:: container:: CapacityContainerBuilder ;
25- use timely:: dataflow:: { Scope , ScopeParent , Stream } ;
25+ use timely:: dataflow:: { Scope , Stream } ;
2626use timely:: dataflow:: channels:: pact:: { Pipeline , Exchange } ;
27- use timely:: dataflow:: operators:: { Capability , Operator , generic :: Session } ;
27+ use timely:: dataflow:: operators:: Operator ;
2828use timely:: PartialOrder ;
2929use timely:: progress:: { Antichain , ChangeBatch , Timestamp } ;
3030use timely:: progress:: frontier:: MutableAntichain ;
@@ -80,27 +80,18 @@ where
8080 DOut : Clone +' static ,
8181 S : FnMut ( & K , & V , Tr :: Val < ' _ > ) ->DOut +' static ,
8282{
83- let output_func = move |session : & mut SessionFor < G , _ > , k : & K , v1 : & V , v2 : Tr :: Val < ' _ > , initial : & G :: Timestamp , diff1 : & R , output : & mut Vec < ( G :: Timestamp , Tr :: Diff ) > | {
83+ let output_func = move |builder : & mut CapacityContainerBuilder < Vec < _ > > , k : & K , v1 : & V , v2 : Tr :: Val < ' _ > , initial : & G :: Timestamp , diff1 : & R , output : & mut Vec < ( G :: Timestamp , Tr :: Diff ) > | {
8484 for ( time, diff2) in output. drain ( ..) {
8585 let diff = diff1. clone ( ) * diff2. clone ( ) ;
8686 let dout = ( output_func ( k, v1, v2) , time. clone ( ) ) ;
87- session. give ( ( dout, initial. clone ( ) , diff) ) ;
87+ use timely:: container:: PushInto ;
88+ builder. push_into ( ( dout, initial. clone ( ) , diff) ) ;
8889 }
8990 } ;
9091 half_join_internal_unsafe :: < _ , _ , _ , _ , _ , _ , _ , _ , _ , CapacityContainerBuilder < Vec < _ > > > ( stream, arrangement, frontier_func, comparison, |_timer, _count| false , output_func)
9192 . as_collection ( )
9293}
9394
94- /// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
95- ///
96- /// This is a shorthand primarily for the reson of readability.
97- type SessionFor < ' a , ' b , G , CB > =
98- Session < ' a , ' b ,
99- <G as ScopeParent >:: Timestamp ,
100- CB ,
101- Capability < <G as ScopeParent >:: Timestamp > ,
102- > ;
103-
10495/// An unsafe variant of `half_join` where the `output_func` closure takes
10596/// additional arguments a vector of `time` and `diff` tuples as input and
10697/// writes its outputs at a container builder. The container builder
@@ -143,7 +134,7 @@ where
143134 FF : Fn ( & G :: Timestamp , & mut Antichain < G :: Timestamp > ) + ' static ,
144135 CF : Fn ( Tr :: TimeGat < ' _ > , & Tr :: Time ) -> bool + ' static ,
145136 Y : Fn ( std:: time:: Instant , usize ) -> bool + ' static ,
146- S : FnMut ( & mut SessionFor < G , CB > , & K , & V , Tr :: Val < ' _ > , & G :: Timestamp , & R , & mut Vec < ( G :: Timestamp , Tr :: Diff ) > ) + ' static ,
137+ S : FnMut ( & mut CB , & K , & V , Tr :: Val < ' _ > , & G :: Timestamp , & R , & mut Vec < ( G :: Timestamp , Tr :: Diff ) > ) + ' static ,
147138 CB : ContainerBuilder ,
148139{
149140 // No need to block physical merging for this operator.
@@ -180,25 +171,6 @@ where
180171 arriving. extend ( data. drain ( ..) . map ( |( d, t, r) | ( t, d, r) ) ) ;
181172 } ) ;
182173
183- // Form a new blob from this activation's arrivals.
184- if !arriving. is_empty ( ) {
185- consolidate_updates ( & mut arriving) ;
186-
187- if !arriving. is_empty ( ) {
188- let mut lower = MutableAntichain :: new ( ) ;
189- // TODO: consider implementing `update_iter_ref` to avoid clone.
190- lower. update_iter ( arriving. iter ( ) . map ( |( t, _, _) | ( t. clone ( ) , 1 ) ) ) ;
191-
192- caps. downgrade ( & lower. frontier ( ) ) ;
193-
194- stuck. push ( StuckBlob {
195- caps,
196- lower,
197- data : std:: mem:: take ( & mut arriving) . into ( ) ,
198- } ) ;
199- }
200- }
201-
202174 // Drain input batches; although we do not observe them, we want access to the input
203175 // to observe the frontier and to drive scheduling.
204176 input2. for_each ( |_, _| { } ) ;
@@ -212,109 +184,111 @@ where
212184
213185 let frontier = frontier2. frontier ( ) ;
214186
215- // Stage 2 first: drain ready piles (key-sorted, yield-safe).
216- for pile in ready. iter_mut ( ) {
217- if yielded { break ; }
218-
219- let ( mut cursor, storage) = trace. cursor ( ) ;
220- let mut key_con = Tr :: KeyContainer :: with_capacity ( 1 ) ;
221- let mut removals: ChangeBatch < G :: Timestamp > = ChangeBatch :: new ( ) ;
222- // Cache a delayed capability. Reusable for any `initial` that is
223- // greater or equal to the cached time in the partial order.
224- let mut cached_cap: Option < Capability < G :: Timestamp > > = None ;
225-
226- while let Some ( ( ( ref key, ref val1, ref time) , ref initial, ref diff1) ) = pile. data . front ( ) {
227- yielded = yielded || yield_function ( timer, work) ;
228- if yielded { break ; }
187+ // Determine the total-order minimum of the arrangement frontier,
188+ // used to partition arrivals into immediately-eligible vs stuck.
189+ let mut time_con = Tr :: TimeContainer :: with_capacity ( 1 ) ;
190+ if let Some ( min_time) = frontier. iter ( ) . min ( ) {
191+ time_con. push_own ( min_time) ;
192+ }
193+ let eligible = |initial : & G :: Timestamp | -> bool {
194+ !( 0 ..time_con. len ( ) ) . any ( |i| comparison ( time_con. index ( i) , initial) )
195+ } ;
229196
230- // Reuse cached capability if its time is <= initial.
231- if !cached_cap. as_ref ( ) . is_some_and ( |cap| cap. time ( ) . less_equal ( initial) ) {
232- cached_cap = Some ( pile. caps . delayed ( initial) ) ;
197+ // Form new blobs from this activation's arrivals, partitioning
198+ // immediately-eligible records from those that must wait.
199+ if !arriving. is_empty ( ) {
200+ consolidate_updates ( & mut arriving) ;
201+
202+ if !arriving. is_empty ( ) {
203+ // Partition: eligible records go straight to a ready blob,
204+ // the rest go to a stuck blob. `arriving` is sorted by
205+ // (initial, data) after consolidation, and eligibility is
206+ // monotone in total-order initial time, so we can split
207+ // with a linear scan.
208+ let split = arriving. iter ( ) . position ( |& ( ref t, _, _) | !eligible ( t) ) . unwrap_or ( arriving. len ( ) ) ;
209+
210+ // Stuck portion (tail).
211+ if split < arriving. len ( ) {
212+ let stuck_data: Vec < _ > = arriving. drain ( split..) . collect ( ) ;
213+ let mut lower = MutableAntichain :: new ( ) ;
214+ lower. update_iter ( stuck_data. iter ( ) . map ( |( t, _, _) | ( t. clone ( ) , 1 ) ) ) ;
215+ let mut stuck_caps = CapabilitySet :: new ( ) ;
216+ for time in lower. frontier ( ) . iter ( ) {
217+ stuck_caps. insert ( caps. delayed ( time) ) ;
218+ }
219+ stuck. push ( StuckBlob {
220+ caps : stuck_caps,
221+ lower,
222+ data : stuck_data. into ( ) ,
223+ } ) ;
233224 }
234- let cap = cached_cap. as_ref ( ) . unwrap ( ) ;
235225
236- key_con. clear ( ) ; key_con. push_own ( & key) ;
237- cursor. seek_key ( & storage, key_con. index ( 0 ) ) ;
238- if cursor. get_key ( & storage) == key_con. get ( 0 ) {
239- while let Some ( val2) = cursor. get_val ( & storage) {
240- cursor. map_times ( & storage, |t, d| {
241- if comparison ( t, initial) {
242- let mut t = Tr :: owned_time ( t) ;
243- t. join_assign ( time) ;
244- output_buffer. push ( ( t, Tr :: owned_diff ( d) ) )
245- }
226+ // Eligible portion (head) — rearrange to (data, initial, diff)
227+ // and consolidate for key-sorted cursor traversal.
228+ if !arriving. is_empty ( ) {
229+ let mut active_data: Vec < _ > = arriving. drain ( ..)
230+ . map ( |( t, d, r) | ( d, t, r) )
231+ . collect ( ) ;
232+ consolidate_updates ( & mut active_data) ;
233+
234+ if !active_data. is_empty ( ) {
235+ let mut pile_lower = MutableAntichain :: new ( ) ;
236+ pile_lower. update_iter ( active_data. iter ( ) . map ( |( _, t, _) | ( t. clone ( ) , 1 ) ) ) ;
237+ let mut ready_caps = CapabilitySet :: new ( ) ;
238+ for time in pile_lower. frontier ( ) . iter ( ) {
239+ ready_caps. insert ( caps. delayed ( time) ) ;
240+ }
241+ ready. push ( ReadyBlob {
242+ caps : ready_caps,
243+ lower : pile_lower,
244+ data : VecDeque :: from ( active_data) ,
246245 } ) ;
247- consolidate ( & mut output_buffer) ;
248- work += output_buffer. len ( ) ;
249- // TODO: Worry about how to avoid reconstructing sessions so often.
250- output_func ( & mut output. session_with_builder ( & cap) , key, val1, val2, initial, diff1, & mut output_buffer) ;
251- output_buffer. clear ( ) ;
252- cursor. step_val ( & storage) ;
253246 }
254- cursor. rewind_vals ( & storage) ;
255247 }
256-
257- let ( _, initial, _) = pile. data . pop_front ( ) . unwrap ( ) ;
258- removals. update ( initial, -1 ) ;
259- }
260-
261- // Apply all removals in bulk and downgrade once.
262- if !removals. is_empty ( ) {
263- pile. lower . update_iter ( removals. drain ( ) ) ;
264- pile. caps . downgrade ( & pile. lower . frontier ( ) ) ;
265248 }
266249 }
267- ready. retain ( |pile| !pile. data . is_empty ( ) ) ;
268-
269- // Stage 1: nibble blobs to produce new ready piles.
270- if !yielded {
271- // Put the total-order minimum of the frontier into a TimeContainer
272- // so we can call `comparison`. Since `comparison` is monotone with
273- // the total order, only the minimum matters.
274- let mut time_con = Tr :: TimeContainer :: with_capacity ( 1 ) ;
275- if let Some ( min_time) = frontier. iter ( ) . min ( ) {
276- time_con. push_own ( min_time) ;
277- }
278250
251+ // Stage 1: nibble stuck blobs to produce new ready piles.
252+ {
279253 // Collect all eligible records across all blobs into one pile.
280- let mut eligible = Vec :: new ( ) ;
254+ let mut eligible_vec = Vec :: new ( ) ;
281255 let mut eligible_caps = CapabilitySet :: new ( ) ;
282256
283257 for blob in stuck. iter_mut ( ) {
284258 // Pop eligible records from the front. The deque is sorted by
285259 // initial time in total order, and `comparison` is monotone,
286260 // so we stop at the first ineligible record.
287- let before = eligible . len ( ) ;
261+ let before = eligible_vec . len ( ) ;
288262 while let Some ( ( initial, _, _) ) = blob. data . front ( ) {
289- if ( 0 ..time_con . len ( ) ) . any ( |i| comparison ( time_con . index ( i ) , initial) ) {
263+ if ! eligible ( initial) {
290264 break ;
291265 }
292- eligible . push ( blob. data . pop_front ( ) . unwrap ( ) ) ;
266+ eligible_vec . push ( blob. data . pop_front ( ) . unwrap ( ) ) ;
293267 }
294268
295- if eligible . len ( ) > before {
269+ if eligible_vec . len ( ) > before {
296270 // Grab caps for the eligible records before downgrading the blob.
297271 let mut frontier = Antichain :: new ( ) ;
298- for ( initial, _, _) in eligible [ before..] . iter ( ) {
272+ for ( initial, _, _) in eligible_vec [ before..] . iter ( ) {
299273 frontier. insert ( initial. clone ( ) ) ;
300274 }
301275 for time in frontier. iter ( ) {
302276 eligible_caps. insert ( blob. caps . delayed ( time) ) ;
303277 }
304278
305279 // Remove eligible times from the blob's antichain and downgrade.
306- blob. lower . update_iter ( eligible [ before..] . iter ( ) . map ( |( t, _, _) | ( t. clone ( ) , -1 ) ) ) ;
280+ blob. lower . update_iter ( eligible_vec [ before..] . iter ( ) . map ( |( t, _, _) | ( t. clone ( ) , -1 ) ) ) ;
307281 blob. caps . downgrade ( & blob. lower . frontier ( ) ) ;
308282 }
309283 }
310284
311285 stuck. retain ( |blob| !blob. data . is_empty ( ) ) ;
312286
313- if !eligible . is_empty ( ) {
287+ if !eligible_vec . is_empty ( ) {
314288 // Rearrange to (data, initial, diff) and consolidate.
315289 // consolidate_updates sorts by (data, initial) which is
316290 // the order we want for the active blob.
317- let mut active_data: Vec < _ > = eligible . into_iter ( )
291+ let mut active_data: Vec < _ > = eligible_vec . into_iter ( )
318292 . map ( |( t, d, r) | ( d, t, r) )
319293 . collect ( ) ;
320294 consolidate_updates ( & mut active_data) ;
@@ -332,6 +306,69 @@ where
332306 }
333307 }
334308 }
309+
310+ // Stage 2: drain ready piles (key-sorted, yield-safe).
311+ for pile in ready. iter_mut ( ) {
312+ if yielded { break ; }
313+
314+ // For each pile, we'll set up container builders for each distinct capability in the capset.
315+ // The closure gets invoked on a container builder, and if there are containers to extract we
316+ // ship them with the associated capability. We flush at the end.
317+
318+ let mut builders = ( 0 ..pile. caps . len ( ) ) . map ( |_| CB :: default ( ) ) . collect :: < Vec < _ > > ( ) ;
319+
320+ let ( mut cursor, storage) = trace. cursor ( ) ;
321+ let mut key_con = Tr :: KeyContainer :: with_capacity ( 1 ) ;
322+ let mut removals: ChangeBatch < G :: Timestamp > = ChangeBatch :: new ( ) ;
323+
324+ while let Some ( ( ( ref key, ref val1, ref time) , ref initial, ref diff1) ) = pile. data . front ( ) {
325+ yielded = yielded || yield_function ( timer, work) ;
326+ if yielded { break ; }
327+
328+ let builder_idx = pile. caps . iter ( ) . position ( |c| c. time ( ) . less_equal ( initial) ) . unwrap ( ) ;
329+
330+ key_con. clear ( ) ; key_con. push_own ( & key) ;
331+ cursor. seek_key ( & storage, key_con. index ( 0 ) ) ;
332+ if cursor. get_key ( & storage) == key_con. get ( 0 ) {
333+ while let Some ( val2) = cursor. get_val ( & storage) {
334+ cursor. map_times ( & storage, |t, d| {
335+ if comparison ( t, initial) {
336+ let mut t = Tr :: owned_time ( t) ;
337+ t. join_assign ( time) ;
338+ output_buffer. push ( ( t, Tr :: owned_diff ( d) ) )
339+ }
340+ } ) ;
341+ consolidate ( & mut output_buffer) ;
342+ work += output_buffer. len ( ) ;
343+ // TODO: Worry about how to avoid reconstructing sessions so often.
344+ output_func ( & mut builders[ builder_idx] , key, val1, val2, initial, diff1, & mut output_buffer) ;
345+ output_buffer. clear ( ) ;
346+ cursor. step_val ( & storage) ;
347+ }
348+ cursor. rewind_vals ( & storage) ;
349+ }
350+
351+ while let Some ( container) = builders[ builder_idx] . extract ( ) {
352+ output. session ( & pile. caps [ builder_idx] ) . give_container ( container) ;
353+ }
354+
355+ let ( _, initial, _) = pile. data . pop_front ( ) . unwrap ( ) ;
356+ removals. update ( initial, -1 ) ;
357+ }
358+
359+ for builder_idx in 0 .. pile. caps . len ( ) {
360+ while let Some ( container) = builders[ builder_idx] . finish ( ) {
361+ output. session ( & pile. caps [ builder_idx] ) . give_container ( container) ;
362+ }
363+ }
364+
365+ // Apply all removals in bulk and downgrade once.
366+ if !removals. is_empty ( ) {
367+ pile. lower . update_iter ( removals. drain ( ) ) ;
368+ pile. caps . downgrade ( & pile. lower . frontier ( ) ) ;
369+ }
370+ }
371+ ready. retain ( |pile| !pile. data . is_empty ( ) ) ;
335372 }
336373
337374 // Re-activate if we have ready piles to process.
0 commit comments