Skip to content

Commit c67ad11

Browse files
frankmcsherryantiguruclaude
authored
Release preparation (#722)
* Slow but correct impls * WIP * Wildly overcomplicated * Remove commented code * Removed one-off trait * Remove unread counters * Convert silent errors to panics * Simplify logic * More commented code removed * Extract unconditional behavior * Idiomatic Rust * Less time cloning * Idiomatic capability use * Ref keyword to borrow * Further tightening * Tidy explanatory text, and reflect frontier * Tighten comments, remove mutable borrow * Prefer insert_ref to insert * Use containers for interesting (keys, time) (#710) * Use containers for interesting (keys, time) * Remove use of owned keys in reduce.rs * Remove all uses of KeyOwn * Remove KeyOwn * Improve documentation * Walk back overly prescriptive dogs^3 constraints * Remove TimelyStack and all dependent types (#715) Remove TimelyStack, TStack layout, ColumnationChunker, ColInternalMerger, and all Col* type aliases (ColValSpine, ColKeySpine, ColValBuilder, etc.) from the codebase. The columnation crate dependency is retained. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Track timely `master` changes, around `Child` scope changes (#714) * Track timely's Child changes * Tidy bounds * Absorb next wave of changes * Remove unused, and Arranged impls * Remove timestamp generic from Arranged * Standardize T for timestamps, Tr for traces * Correct local timely reference * Convert G generics to T * Remove as Timestamp prompts * Remove T generics bound by Tr * Remove Scope::clone calls * Further tightening of traits * Correct docs * Relifetimed scopes (#718) * Bring core DD into line * Clean up examples and tests * Interactive update (#719) * Example DDIR with SCC * Parse from concrete syntax * Further examples and improvements * Further improvements to track columnar * Move DDIR to its own crate Extract the DD IR interpreter into a `ddir/` crate, separating shared infrastructure (parse, IR, lower) from backend-specific rendering. - Split parse into `parse/applicative.rs` (.ddir) and `parse/pipe.rs` (.ddp) - Extract `ir.rs` (Node, Program, RowLike) from `lower.rs` - Move programs and examples from differential-dataflow/examples/ to ddir/ - Remove DDIR_DEBUG, DDIR_REACHABILITY, DDIR_PRINT env vars (use INSPECT) - Fix off-by-one in columnar harness timestamping - Unify inspect output format between vec and col backends - Update programs to output compact counts via arrange+inspect Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Optimized IR * Relocate to interactive/ * Further clean-up --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * DDIR README * Continue to track timely master (#720) * Target timely 0.29 (#721) --------- Co-authored-by: Moritz Hoffmann <antiguru@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 775967a commit c67ad11

133 files changed

Lines changed: 3824 additions & 4669 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ rust-version = "1.86"
2323

2424
[workspace.dependencies]
2525
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.22.0" }
26-
timely = { version = "0.28", default-features = false }
26+
timely = { version = "0.29", default-features = false }
2727
columnar = { version = "0.12", default-features = false }
28-
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
28+
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
2929
#timely = { path = "../timely-dataflow/timely/", default-features = false }
3030

3131
[workspace.lints.clippy]

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Alternately, here is a fragment that computes the set of nodes reachable from a
2222
```rust
2323
let reachable =
2424
roots.iterate(|scope, reach|
25-
edges.enter(&scope)
25+
edges.enter(scope)
2626
.semijoin(reach)
2727
.map(|(src, dst)| dst)
2828
.concat(reach)
@@ -337,7 +337,7 @@ edges.iterate(|scope, inner| {
337337
.map(|(node,_)| node);
338338

339339
// keep edges between active vertices
340-
edges.enter(&scope)
340+
edges.enter(scope)
341341
.semijoin(active)
342342
.map(|(src,dst)| (dst,src))
343343
.semijoin(active)

advent_of_code_2017/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ workspace = true
1111

1212
[dependencies]
1313
differential-dataflow = { workspace = true }
14-
timely = { git = "https://github.com/frankmcsherry/timely-dataflow" }
14+
timely = { workspace = true }

advent_of_code_2017/src/bin/day_06.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn main() {
2424

2525
let stable = banks.iterate(|scope, iter|
2626
iter.map_in_place(|banks| recycle(banks))
27-
.concat(banks.enter(&scope))
27+
.concat(banks.enter(scope))
2828
.distinct()
2929
);
3030

@@ -43,7 +43,7 @@ fn main() {
4343
loop_point
4444
.iterate(|scope, iter|
4545
iter.map_in_place(|banks| recycle(banks))
46-
.concat(loop_point.enter(&scope))
46+
.concat(loop_point.enter(scope))
4747
.distinct()
4848
)
4949
.map(|_| ((),()))

advent_of_code_2017/src/bin/day_07.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,10 +1104,10 @@ tvhftq (35)";
11041104

11051105
let total_weights: VecCollection<_,String> = weights
11061106
.iterate(|scope, inner| {
1107-
parents.enter(&scope)
1107+
parents.enter(scope)
11081108
.semijoin(inner)
11091109
.map(|(_, parent)| parent)
1110-
.concat(weights.enter(&scope))
1110+
.concat(weights.enter(scope))
11111111
});
11121112

11131113
parents

advent_of_code_2017/src/bin/day_08.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1107,7 +1107,7 @@ wui inc -120 if i > -2038";
11071107
.map(|_| ((0, String::new()), 0))
11081108
.iterate(|scope, valid| {
11091109

1110-
let edits = edits.enter(&scope);
1110+
let edits = edits.enter(scope);
11111111

11121112
valid
11131113
.prefix_sum_at(edits.map(|(key,_)| key), 0, |_k,x,y| *x + *y)

advent_of_code_2017/src/bin/day_09.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ where
120120
if input.len() > 1 { result = combine(result, &(input[1].0).1); }
121121
output.push((result, 1));
122122
})
123-
.concat(unit_ranges.enter(&scope))
123+
.concat(unit_ranges.enter(scope))
124124
)
125125
}
126126

@@ -154,10 +154,10 @@ where
154154
.iterate(|scope, state| {
155155
aggregates
156156
.filter(|&((_, log),_)| log < 64) // the log = 64 interval doesn't help us here (overflows).
157-
.enter(&scope)
157+
.enter(scope)
158158
.map(|((pos, log), data)| (pos, (log, data)))
159159
.join_map(state, move |&pos, &(log, ref data), state| (pos + (1 << log), combine(state, data)))
160-
.concat(init_state.enter(&scope))
160+
.concat(init_state.enter(scope))
161161
.distinct()
162162
})
163163
.consolidate()

advent_of_code_2017/src/bin/day_12.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2035,8 +2035,8 @@ fn main() {
20352035
let labels =
20362036
nodes
20372037
.iterate(|scope, label| {
2038-
let edges = edges.enter(&scope);
2039-
let nodes = nodes.enter(&scope);
2038+
let edges = edges.enter(scope);
2039+
let nodes = nodes.enter(scope);
20402040
label
20412041
.join_map(edges, |_src, &lbl, &tgt| (tgt, lbl))
20422042
.concat(nodes)

diagnostics/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[package]
22
name = "diagnostics"
33
version = "0.1.0"
4+
publish = false
45
edition.workspace = true
56
rust-version.workspace = true
67

diagnostics/src/logging.rs

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use differential_dataflow::operators::arrange::TraceAgent;
3434
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
3535
use differential_dataflow::{AsCollection, VecCollection};
3636

37-
use timely::communication::Allocate;
3837
use timely::container::CapacityContainerBuilder;
3938
use timely::dataflow::channels::pact::Pipeline;
4039
use timely::dataflow::operators::capture::{Event, EventLink, Replay, Capture};
@@ -244,12 +243,11 @@ fn quantize(time: Duration, interval: Duration) -> Duration {
244243
}
245244

246245
/// Quantize timestamps in a collection's inner stream.
247-
fn quantize_collection<S, D>(
248-
collection: VecCollection<S, D, i64>,
246+
fn quantize_collection<'scope, D>(
247+
collection: VecCollection<'scope, Duration, D, i64>,
249248
interval: Duration,
250-
) -> VecCollection<S, D, i64>
249+
) -> VecCollection<'scope, Duration, D, i64>
251250
where
252-
S: Scope<Timestamp = Duration>,
253251
D: differential_dataflow::Data,
254252
{
255253
collection
@@ -276,7 +274,7 @@ where
276274
///
277275
/// Returns a [`LoggingState`] with trace handles and a [`SinkHandle`] for
278276
/// the WebSocket thread.
279-
pub fn register<A: Allocate>(worker: &mut Worker<A>, log_logging: bool) -> LoggingState {
277+
pub fn register(worker: &mut Worker, log_logging: bool) -> LoggingState {
280278
let start = Instant::now();
281279

282280
// Event links for logging capture (worker-internal, Rc-based).
@@ -377,8 +375,8 @@ pub fn register<A: Allocate>(worker: &mut Worker<A>, log_logging: bool) -> Loggi
377375
}
378376
}
379377

380-
fn install_loggers<A: Allocate>(
381-
worker: &mut Worker<A>,
378+
fn install_loggers(
379+
worker: &mut Worker,
382380
t_link: Rc<EventLink<Duration, Vec<(Duration, TimelyEvent)>>>,
383381
d_link: Rc<EventLink<Duration, Vec<(Duration, DifferentialEvent)>>>,
384382
) {
@@ -402,11 +400,11 @@ fn install_loggers<A: Allocate>(
402400
// ============================================================================
403401

404402
/// Internal: collections before arrangement, used for the cross-join.
405-
struct TimelyCollections<S: Scope> {
406-
operators: VecCollection<S, (usize, String, Vec<usize>), i64>,
407-
channels: VecCollection<S, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
408-
elapsed: VecCollection<S, usize, i64>,
409-
messages: VecCollection<S, usize, i64>,
403+
struct TimelyCollections<'scope> {
404+
operators: VecCollection<'scope, Duration, (usize, String, Vec<usize>), i64>,
405+
channels: VecCollection<'scope, Duration, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
406+
elapsed: VecCollection<'scope, Duration, usize, i64>,
407+
messages: VecCollection<'scope, Duration, usize, i64>,
410408
}
411409

412410
#[derive(Default)]
@@ -416,16 +414,16 @@ struct TimelyDemuxState {
416414
}
417415

418416
/// Build timely logging collections and arrangements.
419-
fn construct_timely<S: Scope<Timestamp = Duration>>(
420-
scope: &mut S,
421-
stream: Stream<S, Vec<(Duration, TimelyEvent)>>,
422-
) -> (TimelyTraces, TimelyCollections<S>) {
417+
fn construct_timely<'scope>(
418+
scope: Scope<'scope, Duration>,
419+
stream: Stream<'scope, Duration, Vec<(Duration, TimelyEvent)>>,
420+
) -> (TimelyTraces, TimelyCollections<'scope>) {
423421
type OpUpdate = ((usize, String, Vec<usize>), Duration, i64);
424422
type ChUpdate = ((usize, Vec<usize>, (usize, usize), (usize, usize)), Duration, i64);
425423
type ElUpdate = (usize, Duration, i64);
426424
type MsgUpdate = (usize, Duration, i64);
427425

428-
let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope.clone());
426+
let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope);
429427
let mut input = demux.new_input(stream, Pipeline);
430428

431429
let (op_out, operates) = demux.new_output::<Vec<OpUpdate>>();
@@ -536,24 +534,24 @@ fn construct_timely<S: Scope<Timestamp = Duration>>(
536534
// ============================================================================
537535

538536
/// Internal: collections before arrangement, used for the cross-join.
539-
struct DifferentialCollections<S: Scope> {
540-
arrangement_batches: VecCollection<S, usize, i64>,
541-
arrangement_records: VecCollection<S, usize, i64>,
542-
sharing: VecCollection<S, usize, i64>,
543-
batcher_records: VecCollection<S, usize, i64>,
544-
batcher_size: VecCollection<S, usize, i64>,
545-
batcher_capacity: VecCollection<S, usize, i64>,
546-
batcher_allocations: VecCollection<S, usize, i64>,
537+
struct DifferentialCollections<'scope> {
538+
arrangement_batches: VecCollection<'scope, Duration, usize, i64>,
539+
arrangement_records: VecCollection<'scope, Duration, usize, i64>,
540+
sharing: VecCollection<'scope, Duration, usize, i64>,
541+
batcher_records: VecCollection<'scope, Duration, usize, i64>,
542+
batcher_size: VecCollection<'scope, Duration, usize, i64>,
543+
batcher_capacity: VecCollection<'scope, Duration, usize, i64>,
544+
batcher_allocations: VecCollection<'scope, Duration, usize, i64>,
547545
}
548546

549547
/// Build differential logging collections and arrangements.
550-
fn construct_differential<S: Scope<Timestamp = Duration>>(
551-
scope: &mut S,
552-
stream: Stream<S, Vec<(Duration, DifferentialEvent)>>,
553-
) -> (DifferentialTraces, DifferentialCollections<S>) {
548+
fn construct_differential<'scope>(
549+
scope: Scope<'scope, Duration>,
550+
stream: Stream<'scope, Duration, Vec<(Duration, DifferentialEvent)>>,
551+
) -> (DifferentialTraces, DifferentialCollections<'scope>) {
554552
type Update = (usize, Duration, i64);
555553

556-
let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone());
554+
let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope);
557555
let mut input = demux.new_input(stream, Pipeline);
558556

559557
let (bat_out, batches) = demux.new_output::<Vec<Update>>();

0 commit comments

Comments
 (0)