Skip to content

Commit 87db01b

Browse files
Merge pull request #699 from TimelyDataflow/master-next
Prepare for 0.21
2 parents f397826 + 56568ff commit 87db01b

22 files changed

Lines changed: 4435 additions & 1852 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ members = [
1313
#"tpchlike",
1414
#"doop",
1515
"mdbook",
16+
"diagnostics",
1617
]
1718
resolver = "2"
1819

@@ -22,8 +23,8 @@ rust-version = "1.86"
2223

2324
[workspace.dependencies]
2425
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.20.0" }
25-
timely = { version = "0.27", default-features = false }
26-
columnar = { version = "0.11", default-features = false }
26+
timely = { version = "0.28", default-features = false }
27+
columnar = { version = "0.12", default-features = false }
2728
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
2829
#timely = { path = "../timely-dataflow/timely/", default-features = false }
2930

diagnostics/Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "diagnostics"
3+
version = "0.1.0"
4+
edition.workspace = true
5+
rust-version.workspace = true
6+
7+
[dependencies]
8+
differential-dataflow.workspace = true
9+
timely = { workspace = true, features = ["getopts"] }
10+
serde = { version = "1", features = ["derive"] }
11+
serde_json = "1"
12+
tungstenite = "0.26"

diagnostics/examples/scc-bench.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
//! SCC benchmark with diagnostics.
2+
//!
3+
//! Usage: scc-bench [timely args] [--log-logging] [nodes [edges [batch [rounds]]]]
4+
//!
5+
//! Supports standard timely arguments like `-w4` for 4 workers. With multiple
6+
//! workers, all workers' diagnostics are exchanged to worker 0 for serving.
7+
//!
8+
//! Start this, then serve the diagnostics UI and open it in a browser:
9+
//!
10+
//! cd diagnostics && python3 -m http.server 8000
11+
//! open http://localhost:8000/index.html
12+
//!
13+
//! Click Connect (defaults to ws://localhost:51371).
14+
//!
15+
//! Pass --log-logging to also see the diagnostics dataflow itself.
16+
17+
use std::hash::{Hash, Hasher};
18+
use std::collections::hash_map::DefaultHasher;
19+
20+
use timely::dataflow::operators::probe::Handle;
21+
22+
use differential_dataflow::input::Input;
23+
use differential_dataflow::algorithms::graphs::scc::strongly_connected;
24+
25+
use diagnostics::logging;
26+
use diagnostics::server::Server;
27+
28+
fn hash_to_u64<T: Hash>(value: &T) -> u64 {
29+
let mut hasher = DefaultHasher::new();
30+
value.hash(&mut hasher);
31+
hasher.finish()
32+
}
33+
34+
fn edge_for(index: usize, nodes: usize) -> (usize, usize) {
35+
let h1 = hash_to_u64(&index);
36+
let h2 = hash_to_u64(&h1);
37+
((h1 as usize) % nodes, (h2 as usize) % nodes)
38+
}
39+
40+
fn main() {
41+
let timer = std::time::Instant::now();
42+
43+
// Extract our flags before timely consumes its args.
44+
let log_logging = std::env::args().any(|a| a == "--log-logging");
45+
46+
// timely::execute_from_args handles -w, -n, -p, -h and passes the rest through.
47+
timely::execute_from_args(std::env::args(), move |worker| {
48+
// Register diagnostics on each worker.
49+
let state = logging::register(worker, log_logging);
50+
51+
// Start the WebSocket server on worker 0 only. With multiple
52+
// workers (-w N), all workers' data is exchanged to worker 0
53+
// via the DD arrangements, so the browser sees everything.
54+
//
55+
// Non-server workers must drop the SinkHandle so its BatchLogger
56+
// sends a capability retraction and the client input Replay can
57+
// advance. Without this, the Replay holds its frontier at time 0
58+
// forever, blocking the cross-join from producing output.
59+
let _server = if worker.index() == 0 {
60+
Some(Server::start(51371, state.sink))
61+
} else {
62+
drop(state.sink);
63+
None
64+
};
65+
66+
// Parse positional args (skip flags consumed by timely and ourselves).
67+
let positional: Vec<String> = std::env::args()
68+
.skip(1)
69+
.filter(|a| !a.starts_with('-'))
70+
.collect();
71+
let nodes: usize = positional.get(0).and_then(|s| s.parse().ok()).unwrap_or(100_000);
72+
let edges: usize = positional.get(1).and_then(|s| s.parse().ok()).unwrap_or(200_000);
73+
let batch: usize = positional.get(2).and_then(|s| s.parse().ok()).unwrap_or(1_000);
74+
let rounds: usize = positional.get(3).and_then(|s| s.parse().ok()).unwrap_or(usize::MAX);
75+
76+
if worker.index() == 0 {
77+
println!("nodes: {nodes}, edges: {edges}, batch: {batch}, rounds: {}, workers: {}",
78+
if rounds == usize::MAX { "∞".to_string() } else { rounds.to_string() },
79+
worker.peers());
80+
}
81+
82+
let mut probe = Handle::new();
83+
let mut input = worker.dataflow(|scope| {
84+
let (input, graph) = scope.new_collection::<(usize, usize), isize>();
85+
let _scc = strongly_connected(graph).probe_with(&mut probe);
86+
input
87+
});
88+
89+
let index = worker.index();
90+
let peers = worker.peers();
91+
92+
// Load initial edges (partitioned across workers).
93+
let timer_load = std::time::Instant::now();
94+
for i in (0..edges).filter(|i| i % peers == index) {
95+
input.insert(edge_for(i, nodes));
96+
}
97+
input.advance_to(1);
98+
input.flush();
99+
while probe.less_than(input.time()) {
100+
worker.step();
101+
}
102+
if index == 0 {
103+
println!("{:?}\t{:?}\tloaded {edges} edges", timer.elapsed(), timer_load.elapsed());
104+
}
105+
106+
// Apply changes in rounds.
107+
for round in 0..rounds {
108+
let timer_round = std::time::Instant::now();
109+
for i in (0..batch).filter(|i| i % peers == index) {
110+
input.remove(edge_for(round * batch + i, nodes));
111+
input.insert(edge_for(edges + round * batch + i, nodes));
112+
}
113+
input.advance_to(round + 2);
114+
input.flush();
115+
while probe.less_than(input.time()) {
116+
worker.step();
117+
}
118+
if index == 0 {
119+
println!("{:?}\t{:?}\tround {round} ({} changes)",
120+
timer.elapsed(), timer_round.elapsed(), batch * 2);
121+
}
122+
}
123+
}).unwrap();
124+
125+
println!("{:?}\tshut down", timer.elapsed());
126+
}

diagnostics/examples/smoke.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//! Smoke test: run a small DD computation with diagnostics and a WS server.
2+
//!
3+
//! Start this, then open a browser console and connect:
4+
//! let ws = new WebSocket("ws://localhost:51371");
5+
//! ws.onmessage = e => console.log(JSON.parse(e.data));
6+
//!
7+
//! You should see operator, channel, and stat updates flowing.
8+
9+
use std::hash::{Hash, Hasher};
10+
use std::collections::hash_map::DefaultHasher;
11+
use std::time::Duration;
12+
13+
use timely::dataflow::operators::probe::Handle;
14+
use differential_dataflow::input::Input;
15+
16+
use diagnostics::logging;
17+
use diagnostics::server::Server;
18+
19+
fn hash_to_u64<T: Hash>(value: &T) -> u64 {
20+
let mut hasher = DefaultHasher::new();
21+
value.hash(&mut hasher);
22+
hasher.finish()
23+
}
24+
25+
fn edge_for(index: usize, nodes: usize) -> (usize, usize) {
26+
let h1 = hash_to_u64(&index);
27+
let h2 = hash_to_u64(&h1);
28+
((h1 as usize) % nodes, (h2 as usize) % nodes)
29+
}
30+
31+
fn main() {
32+
let timer = std::time::Instant::now();
33+
34+
timely::execute(timely::Config::thread(), move |worker| {
35+
// Register diagnostics (log_logging = true to see the diagnostics dataflow itself).
36+
let state = logging::register(worker, true);
37+
38+
// Start the WebSocket server on worker 0 only.
39+
// Non-server workers drop the SinkHandle so the client input
40+
// Replay can advance its frontier.
41+
let _server = if worker.index() == 0 {
42+
Some(Server::start(51371, state.sink))
43+
} else {
44+
drop(state.sink);
45+
None
46+
};
47+
48+
// Build a user dataflow.
49+
let nodes = 1000;
50+
let edges = 2000;
51+
let batch = 100;
52+
53+
let mut probe = Handle::new();
54+
let mut input = worker.dataflow(|scope| {
55+
let (input, graph) = scope.new_collection::<(usize, usize), isize>();
56+
graph
57+
.map(|(src, _dst)| src)
58+
.probe_with(&mut probe);
59+
input
60+
});
61+
62+
// Load initial edges.
63+
for i in 0..edges {
64+
input.insert(edge_for(i, nodes));
65+
}
66+
input.advance_to(1);
67+
input.flush();
68+
while probe.less_than(input.time()) {
69+
worker.step();
70+
}
71+
if worker.index() == 0 {
72+
eprintln!("{:?}\tloaded {edges} edges", timer.elapsed());
73+
}
74+
75+
// Run rounds of changes, keeping the server alive for browsers to connect.
76+
for round in 0..usize::MAX {
77+
let round_timer = std::time::Instant::now();
78+
for i in 0..batch {
79+
input.remove(edge_for(round * batch + i, nodes));
80+
input.insert(edge_for(edges + round * batch + i, nodes));
81+
}
82+
input.advance_to(round + 2);
83+
input.flush();
84+
while probe.less_than(input.time()) {
85+
worker.step();
86+
}
87+
if worker.index() == 0 && round % 100 == 0 {
88+
eprintln!("{:?}\t{:?}\tround {round}", timer.elapsed(), round_timer.elapsed());
89+
}
90+
91+
// Slow down so there's time to connect a browser.
92+
std::thread::sleep(Duration::from_millis(10));
93+
}
94+
})
95+
.unwrap();
96+
}

0 commit comments

Comments
 (0)