|
| 1 | +/// Streaming asymmetric join between updates (K,V1) and an arrangement on (K,V2). |
| 2 | +/// |
| 3 | +/// The asymmetry is that the join only responds to streamed updates, not to changes in the arrangement. |
| 4 | +/// Streamed updates join only with matching arranged updates at lesser times *in the total order*, and |
| 5 | +/// subject to a predicate supplied by the user (roughly: strictly less, or not). |
| 6 | +/// |
| 7 | +/// This behavior can ensure that any pair of matching updates interact exactly once. |
| 8 | +/// |
| 9 | +/// There are various forms of this operator with tangled closures about how to emit the outputs and |
| 10 | +/// wrangle the logical compaction frontier in order to preserve the distinctions around times that are |
| 11 | +/// strictly less (conventional compaction logic would collapse unequal times to the frontier, and lose |
| 12 | +/// the distiction). |
| 13 | +/// |
| 14 | +/// The methods also carry an auxiliary time next to the value, which is used to advance the joined times. |
| 15 | +/// This is .. a byproduct of wanting to allow advancing times a la `join_function`, without breaking the |
| 16 | +/// coupling by total order on "initial time". |
| 17 | +/// |
| 18 | +/// The doccomments for individual methods are a bit of a mess. Sorry. |
| 19 | +
|
| 20 | +use std::collections::VecDeque; |
| 21 | +use std::ops::Mul; |
| 22 | + |
| 23 | +use timely::ContainerBuilder; |
| 24 | +use timely::container::CapacityContainerBuilder; |
| 25 | +use timely::dataflow::{Scope, Stream}; |
| 26 | +use timely::dataflow::channels::pact::{Pipeline, Exchange}; |
| 27 | +use timely::dataflow::operators::Operator; |
| 28 | +use timely::PartialOrder; |
| 29 | +use timely::progress::{Antichain, ChangeBatch, Timestamp}; |
| 30 | +use timely::progress::frontier::MutableAntichain; |
| 31 | + |
| 32 | +use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable}; |
| 33 | +use differential_dataflow::difference::{Monoid, Semigroup}; |
| 34 | +use differential_dataflow::lattice::Lattice; |
| 35 | +use differential_dataflow::operators::arrange::Arranged; |
| 36 | +use differential_dataflow::trace::{Cursor, TraceReader}; |
| 37 | +use differential_dataflow::consolidation::{consolidate, consolidate_updates}; |
| 38 | +use differential_dataflow::trace::implementations::BatchContainer; |
| 39 | + |
| 40 | +use timely::dataflow::operators::CapabilitySet; |
| 41 | + |
| 42 | +/// A binary equijoin that responds to updates on only its first input. |
| 43 | +/// |
| 44 | +/// This operator responds to inputs of the form |
| 45 | +/// |
| 46 | +/// ```ignore |
| 47 | +/// ((key, val1, time1), initial_time, diff1) |
| 48 | +/// ``` |
| 49 | +/// |
| 50 | +/// where `initial_time` is less or equal to `time1`, and produces as output |
| 51 | +/// |
| 52 | +/// ```ignore |
| 53 | +/// ((output_func(key, val1, val2), lub(time1, time2)), initial_time, diff1 * diff2) |
| 54 | +/// ``` |
| 55 | +/// |
| 56 | +/// for each `((key, val2), time2, diff2)` present in `arrangement`, where |
| 57 | +/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. |
| 58 | +/// This last constraint is important to ensure that we correctly produce |
| 59 | +/// all pairs of output updates across multiple `half_join` operators. |
| 60 | +/// |
| 61 | +/// Notice that the time is hoisted up into data. The expectation is that |
| 62 | +/// once out of the "delta flow region", the updates will be `delay`d to the |
| 63 | +/// times specified in the payloads. |
| 64 | +pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>( |
| 65 | + stream: VecCollection<G, (K, V, G::Timestamp), R>, |
| 66 | + arrangement: Arranged<G, Tr>, |
| 67 | + frontier_func: FF, |
| 68 | + comparison: CF, |
| 69 | + mut output_func: S, |
| 70 | +) -> VecCollection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output> |
| 71 | +where |
| 72 | + G: Scope<Timestamp = Tr::Time>, |
| 73 | + K: Hashable + ExchangeData, |
| 74 | + V: ExchangeData, |
| 75 | + R: ExchangeData + Monoid, |
| 76 | + Tr: TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static, |
| 77 | + R: Mul<Tr::Diff, Output: Semigroup>, |
| 78 | + FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static, |
| 79 | + CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, |
| 80 | + DOut: Clone+'static, |
| 81 | + S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static, |
| 82 | +{ |
| 83 | + let output_func = move |builder: &mut CapacityContainerBuilder<Vec<_>>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| { |
| 84 | + for (time, diff2) in output.drain(..) { |
| 85 | + let diff = diff1.clone() * diff2.clone(); |
| 86 | + let dout = (output_func(k, v1, v2), time.clone()); |
| 87 | + use timely::container::PushInto; |
| 88 | + builder.push_into((dout, initial.clone(), diff)); |
| 89 | + } |
| 90 | + }; |
| 91 | + half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder<Vec<_>>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func) |
| 92 | + .as_collection() |
| 93 | +} |
| 94 | + |
| 95 | +/// An unsafe variant of `half_join` where the `output_func` closure takes |
| 96 | +/// additional arguments a vector of `time` and `diff` tuples as input and |
| 97 | +/// writes its outputs at a container builder. The container builder |
| 98 | +/// can, but isn't required to, accept `(data, time, diff)` triplets. |
| 99 | +/// This allows for more flexibility, but is more error-prone. |
| 100 | +/// |
| 101 | +/// This operator responds to inputs of the form |
| 102 | +/// |
| 103 | +/// ```ignore |
| 104 | +/// ((key, val1, time1), initial_time, diff1) |
| 105 | +/// ``` |
| 106 | +/// |
| 107 | +/// where `initial_time` is less or equal to `time1`, and produces as output |
| 108 | +/// |
| 109 | +/// ```ignore |
| 110 | +/// output_func(session, key, val1, val2, initial_time, diff1, &[lub(time1, time2), diff2]) |
| 111 | +/// ``` |
| 112 | +/// |
| 113 | +/// for each `((key, val2), time2, diff2)` present in `arrangement`, where |
| 114 | +/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. |
| 115 | +/// |
| 116 | +/// The `yield_function` allows the caller to indicate when the operator should |
| 117 | +/// yield control, as a function of the elapsed time and the number of matched |
| 118 | +/// records. Note this is not the number of *output* records, owing mainly to |
| 119 | +/// the number of matched records being easiest to record with low overhead. |
| 120 | +pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, Y, S, CB>( |
| 121 | + stream: VecCollection<G, (K, V, G::Timestamp), R>, |
| 122 | + mut arrangement: Arranged<G, Tr>, |
| 123 | + frontier_func: FF, |
| 124 | + comparison: CF, |
| 125 | + yield_function: Y, |
| 126 | + mut output_func: S, |
| 127 | +) -> Stream<G, CB::Container> |
| 128 | +where |
| 129 | + G: Scope<Timestamp = Tr::Time>, |
| 130 | + K: Hashable + ExchangeData, |
| 131 | + V: ExchangeData, |
| 132 | + R: ExchangeData + Monoid, |
| 133 | + Tr: for<'a> TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static, |
| 134 | + FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static, |
| 135 | + CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, |
| 136 | + Y: Fn(std::time::Instant, usize) -> bool + 'static, |
| 137 | + S: FnMut(&mut CB, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, |
| 138 | + CB: ContainerBuilder, |
| 139 | +{ |
| 140 | + // No need to block physical merging for this operator. |
| 141 | + arrangement.trace.set_physical_compaction(Antichain::new().borrow()); |
| 142 | + let mut arrangement_trace = Some(arrangement.trace); |
| 143 | + let arrangement_stream = arrangement.stream; |
| 144 | + |
| 145 | + let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); |
| 146 | + |
| 147 | + // Stash for (time, diff) accumulation. |
| 148 | + let mut output_buffer = Vec::new(); |
| 149 | + |
| 150 | + // Unified blobs: each blob holds data in (T, D, R) order, with a stuck_count |
| 151 | + // tracking how many elements at the back are not yet eligible for processing. |
| 152 | + // The ready prefix is sorted by (D, T, R) for cursor traversal. |
| 153 | + let mut blobs: Vec<Blob<(K, V, G::Timestamp), G::Timestamp, R>> = Vec::new(); |
| 154 | + |
| 155 | + let scope = stream.scope(); |
| 156 | + stream.inner.binary_frontier(arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| { |
| 157 | + |
| 158 | + // Acquire an activator to reschedule the operator when it has unfinished work. |
| 159 | + let activator = scope.activator_for(info.address); |
| 160 | + |
| 161 | + move |(input1, frontier1), (input2, frontier2), output| { |
| 162 | + |
| 163 | + // Drain all input into a single buffer. |
| 164 | + let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new(); |
| 165 | + let mut caps = CapabilitySet::new(); |
| 166 | + input1.for_each(|capability, data| { |
| 167 | + caps.insert(capability.retain(0)); |
| 168 | + arriving.extend(data.drain(..).map(|(d, t, r)| (t, d, r))); |
| 169 | + }); |
| 170 | + |
| 171 | + // Drain input batches; although we do not observe them, we want access to the input |
| 172 | + // to observe the frontier and to drive scheduling. |
| 173 | + input2.for_each(|_, _| { }); |
| 174 | + |
| 175 | + // Local variables to track if and when we should exit early. |
| 176 | + let mut yielded = false; |
| 177 | + let timer = std::time::Instant::now(); |
| 178 | + let mut work = 0; |
| 179 | + |
| 180 | + if let Some(ref mut trace) = arrangement_trace { |
| 181 | + |
| 182 | + let frontier = frontier2.frontier(); |
| 183 | + |
| 184 | + // Determine the total-order minimum of the arrangement frontier, |
| 185 | + // used to partition arrivals into immediately-eligible vs stuck. |
| 186 | + let mut time_con = Tr::TimeContainer::with_capacity(1); |
| 187 | + if let Some(min_time) = frontier.iter().min() { |
| 188 | + time_con.push_own(min_time); |
| 189 | + } |
| 190 | + let eligible = |initial: &G::Timestamp| -> bool { |
| 191 | + !(0..time_con.len()).any(|i| comparison(time_con.index(i), initial)) |
| 192 | + }; |
| 193 | + |
| 194 | + // Form a new blob from arrivals. |
| 195 | + // consolidate_updates sorts by (T, D, R) — the stuck order. |
| 196 | + consolidate_updates(&mut arriving); |
| 197 | + |
| 198 | + if !arriving.is_empty() { |
| 199 | + let mut lower = MutableAntichain::new(); |
| 200 | + lower.update_iter(arriving.iter().map(|(t, _, _)| (t.clone(), 1))); |
| 201 | + let mut blob_caps = CapabilitySet::new(); |
| 202 | + for time in lower.frontier().iter() { |
| 203 | + blob_caps.insert(caps.delayed(time)); |
| 204 | + } |
| 205 | + |
| 206 | + // Determine how many records are stuck (ineligible). |
| 207 | + // Data is sorted by (T, D, R) and eligibility is monotone in T, |
| 208 | + // so stuck records form a suffix. |
| 209 | + let stuck_count = arriving.iter().rev() |
| 210 | + .take_while(|(t, _, _)| !eligible(t)) |
| 211 | + .count(); |
| 212 | + |
| 213 | + let mut data: VecDeque<_> = arriving.into(); |
| 214 | + |
| 215 | + // Sort the ready prefix by (D, T, R) for cursor traversal. |
| 216 | + let ready_len = data.len() - stuck_count; |
| 217 | + if ready_len > 0 { |
| 218 | + // VecDeque slices: make_contiguous then sort the prefix. |
| 219 | + let slice = data.make_contiguous(); |
| 220 | + slice[..ready_len].sort_by(|(t1, d1, r1), (t2, d2, r2)| { |
| 221 | + (d1, t1, r1).cmp(&(d2, t2, r2)) |
| 222 | + }); |
| 223 | + } |
| 224 | + |
| 225 | + blobs.push(Blob { |
| 226 | + caps: blob_caps, |
| 227 | + lower, |
| 228 | + data, |
| 229 | + stuck_count, |
| 230 | + }); |
| 231 | + } |
| 232 | + |
| 233 | + // Nibble: only when all ready elements have been drained (stuck_count == len), |
| 234 | + // check if stuck records have become eligible and promote them. |
| 235 | + for blob in blobs.iter_mut().filter(|b| b.stuck_count == b.data.len()) { |
| 236 | + |
| 237 | + // Count how many stuck records (from the front, which has the |
| 238 | + // lowest initial times) are now eligible. |
| 239 | + let newly_ready = blob.data.iter().take_while(|(t, _, _)| eligible(t)).count(); |
| 240 | + |
| 241 | + if newly_ready > 0 { |
| 242 | + blob.stuck_count -= newly_ready; |
| 243 | + |
| 244 | + // Sort the newly-ready prefix by (D, T, R) for cursor traversal. |
| 245 | + let slice = blob.data.make_contiguous(); |
| 246 | + slice[..newly_ready].sort_by(|(t1, d1, r1), (t2, d2, r2)| { |
| 247 | + (d1, t1, r1).cmp(&(d2, t2, r2)) |
| 248 | + }); |
| 249 | + |
| 250 | + // Downgrade capabilities. |
| 251 | + let mut new_lower = MutableAntichain::new(); |
| 252 | + new_lower.update_iter(blob.data.iter().map(|(t, _, _)| (t.clone(), 1))); |
| 253 | + blob.lower = new_lower; |
| 254 | + blob.caps.downgrade(&blob.lower.frontier()); |
| 255 | + } |
| 256 | + } |
| 257 | + |
| 258 | + // Process ready elements from blobs. |
| 259 | + for blob in blobs.iter_mut().filter(|b| b.data.len() > b.stuck_count) { |
| 260 | + if yielded { break; } |
| 261 | + |
| 262 | + let mut builders = (0..blob.caps.len()).map(|_| CB::default()).collect::<Vec<_>>(); |
| 263 | + |
| 264 | + let (mut cursor, storage) = trace.cursor(); |
| 265 | + let mut key_con = Tr::KeyContainer::with_capacity(1); |
| 266 | + let mut removals: ChangeBatch<G::Timestamp> = ChangeBatch::new(); |
| 267 | + |
| 268 | + // Process ready elements from the front. |
| 269 | + while blob.data.len() > blob.stuck_count { |
| 270 | + yielded = yielded || yield_function(timer, work); |
| 271 | + if yielded { break; } |
| 272 | + |
| 273 | + // Peek at the front element. It's in (T, D, R) storage order, |
| 274 | + // but the ready prefix has been sorted by (D, T, R). |
| 275 | + let (ref initial, (ref key, ref val1, ref time), ref diff1) = blob.data[0]; |
| 276 | + |
| 277 | + let builder_idx = blob.caps.iter().position(|c| c.time().less_equal(initial)).unwrap(); |
| 278 | + |
| 279 | + key_con.clear(); key_con.push_own(&key); |
| 280 | + cursor.seek_key(&storage, key_con.index(0)); |
| 281 | + if cursor.get_key(&storage) == key_con.get(0) { |
| 282 | + while let Some(val2) = cursor.get_val(&storage) { |
| 283 | + cursor.map_times(&storage, |t, d| { |
| 284 | + if comparison(t, initial) { |
| 285 | + let mut t = Tr::owned_time(t); |
| 286 | + t.join_assign(time); |
| 287 | + output_buffer.push((t, Tr::owned_diff(d))) |
| 288 | + } |
| 289 | + }); |
| 290 | + consolidate(&mut output_buffer); |
| 291 | + work += output_buffer.len(); |
| 292 | + output_func(&mut builders[builder_idx], key, val1, val2, initial, diff1, &mut output_buffer); |
| 293 | + output_buffer.clear(); |
| 294 | + cursor.step_val(&storage); |
| 295 | + } |
| 296 | + cursor.rewind_vals(&storage); |
| 297 | + } |
| 298 | + |
| 299 | + while let Some(container) = builders[builder_idx].extract() { |
| 300 | + output.session(&blob.caps[builder_idx]).give_container(container); |
| 301 | + } |
| 302 | + |
| 303 | + let (initial, _, _) = blob.data.pop_front().unwrap(); |
| 304 | + removals.update(initial, -1); |
| 305 | + } |
| 306 | + |
| 307 | + for builder_idx in 0 .. blob.caps.len() { |
| 308 | + while let Some(container) = builders[builder_idx].finish() { |
| 309 | + output.session(&blob.caps[builder_idx]).give_container(container); |
| 310 | + } |
| 311 | + } |
| 312 | + |
| 313 | + // Apply all removals in bulk and downgrade once. |
| 314 | + if blob.data.is_empty() { |
| 315 | + // Eagerly release the blob's resources. |
| 316 | + blob.lower = MutableAntichain::new(); |
| 317 | + blob.caps = CapabilitySet::new(); |
| 318 | + blob.data = VecDeque::default(); |
| 319 | + } else { |
| 320 | + blob.lower.update_iter(removals.drain()); |
| 321 | + blob.caps.downgrade(&blob.lower.frontier()); |
| 322 | + } |
| 323 | + } |
| 324 | + |
| 325 | + // Remove fully-consumed blobs. |
| 326 | + blobs.retain(|blob| !blob.data.is_empty()); |
| 327 | + } |
| 328 | + |
| 329 | + // Re-activate if we have blobs with ready elements to process. |
| 330 | + if blobs.iter().any(|b| b.data.len() > b.stuck_count) { |
| 331 | + activator.activate(); |
| 332 | + } |
| 333 | + |
| 334 | + // The logical merging frontier depends on input1 and all blobs. |
| 335 | + let mut frontier = Antichain::new(); |
| 336 | + for time in frontier1.frontier().iter() { |
| 337 | + frontier_func(time, &mut frontier); |
| 338 | + } |
| 339 | + for blob in blobs.iter() { |
| 340 | + for cap in blob.caps.iter() { |
| 341 | + frontier_func(cap.time(), &mut frontier); |
| 342 | + } |
| 343 | + } |
| 344 | + arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); |
| 345 | + |
| 346 | + if frontier1.is_empty() && blobs.is_empty() { |
| 347 | + arrangement_trace = None; |
| 348 | + } |
| 349 | + } |
| 350 | + }) |
| 351 | +} |
| 352 | + |
| 353 | +/// A unified blob of updates. Data is stored as `(T, D, R)` tuples in a VecDeque. |
| 354 | +/// The last `stuck_count` elements are stuck (not yet eligible for processing), |
| 355 | +/// sorted by `(T, D, R)` from consolidation. The ready prefix (everything before |
| 356 | +/// the stuck tail) is sorted by `(D, T, R)` for efficient cursor traversal. |
| 357 | +/// Ready elements are consumed from the front via `pop_front`. |
| 358 | +struct Blob<D, T: Timestamp, R> { |
| 359 | + caps: CapabilitySet<T>, |
| 360 | + lower: MutableAntichain<T>, |
| 361 | + data: VecDeque<(T, D, R)>, |
| 362 | + /// Number of stuck (ineligible) elements at the back of `data`. |
| 363 | + stuck_count: usize, |
| 364 | +} |
0 commit comments