Skip to content

Commit 652c76b

Browse files
authored
Enable all previously enabled crates (#679)
Enable crates previously disabled, and fix their compile errors. Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
1 parent b54f7bf commit 652c76b

20 files changed

Lines changed: 233 additions & 221 deletions

Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ members = [
33
"differential-dataflow",
44
# "advent_of_code_2017",
55
"dogsdogsdogs",
6-
#"experiments",
7-
#"interactive",
8-
#"server",
9-
#"server/dataflows/degr_dist",
10-
#"server/dataflows/neighborhood",
11-
#"server/dataflows/random_graph",
12-
#"server/dataflows/reachability",
6+
"experiments",
7+
"interactive",
8+
"server",
9+
"server/dataflows/degr_dist",
10+
"server/dataflows/neighborhood",
11+
"server/dataflows/random_graph",
12+
"server/dataflows/reachability",
1313
#"tpchlike",
1414
#"doop",
1515
"mdbook",

experiments/src/bin/arrange.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ fn main() {
5151
let (handle, data) = scope.new_collection();
5252

5353
let probe = match comp {
54-
Comp::Nothing => data.probe(),
55-
Comp::Exchange => data.inner.exchange(|&(x,_,_): &((usize,()),_,_)| x.0 as u64).probe(),
56-
Comp::Arrange => data.arrange_by_self().stream.probe(),
57-
Comp::Count => data.arrange_by_self().count_total().probe(),
58-
Comp::Distinct => data.arrange_by_self().distinct_total().probe(),
54+
Comp::Nothing => data.probe().0,
55+
Comp::Exchange => data.inner.exchange(|&(x,_,_): &((usize,()),_,_)| x.0 as u64).probe().0,
56+
Comp::Arrange => data.arrange_by_self().stream.probe().0,
57+
Comp::Count => data.arrange_by_self().count_total().probe().0,
58+
Comp::Distinct => data.arrange_by_self().distinct_total().probe().0,
5959
};
6060

6161
(handle, probe)

experiments/src/bin/attend.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ fn main() {
1717

1818
let (input, graph) = scope.new_collection();
1919

20+
let graph2 = graph.clone();
2021
let organizers = graph.explode(|(x,y)| Some((x, (1,0))).into_iter().chain(Some((y, (0,1))).into_iter()))
2122
.threshold_total(|_,w| if w.1 == 0 { 1 } else { 0 });
2223

2324
organizers
25+
.clone()
2426
.iterate(|scope, attend| {
25-
graph.enter(&scope)
27+
graph2.enter(&scope)
2628
.semijoin(attend)
2729
.map(|(_,y)| y)
2830
.threshold_total(|_,w| if w >= &3 { 1 } else { 0 })

experiments/src/bin/deals-interactive.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ fn main() {
4545
// each edge should exist in both directions.
4646
let graph = graph.arrange_by_key();
4747

48-
let probe =
49-
interactive(&graph, query1, query2, query3)
48+
let (probe, _) =
49+
interactive(graph, query1, query2, query3)
5050
.filter(move |_| inspect)
5151
// .map(|_| ())
5252
.consolidate()
@@ -205,34 +205,38 @@ fn main() {
205205
}
206206

207207
fn interactive<G: Scope>(
208-
edges: &Arrange<G, Node, Node, isize>,
208+
edges: Arrange<G, Node, Node, isize>,
209209
tc_1: VecCollection<G, Node>,
210210
tc_2: VecCollection<G, Node>,
211211
sg_x: VecCollection<G, Node>
212212
) -> VecCollection<G, Node>
213213
where G::Timestamp: Lattice{
214214

215215
// descendants of tc_1:
216+
let tc_1_enter = tc_1.clone();
217+
let edges_q1 = edges.clone();
216218
let query1 =
217219
tc_1.map(|x| (x,x))
218220
.iterate(|scope, inner|
219-
edges
221+
edges_q1
220222
.enter(&scope)
221223
.join_core(inner.arrange_by_key(), |_,&y,&q| [(y,q)])
222-
.concat(tc_1.enter(&scope).map(|x| (x,x)))
224+
.concat(tc_1_enter.enter(&scope).map(|x| (x,x)))
223225
.distinct()
224226
)
225227
.map(|(x,q)| (q,x));
226228

227229
// ancestors of tc_2:
230+
let tc_2_enter = tc_2.clone();
231+
let edges_q2 = edges.clone();
228232
let query2 =
229233
tc_2.map(|x| (x,x))
230234
.iterate(|scope, inner|
231-
edges
235+
edges_q2
232236
.as_collection(|&k,&v| (v,k))
233237
.enter(&scope)
234238
.join_core(inner.arrange_by_key(), |_,&y,&q| [(y,q)])
235-
.concat(tc_2.enter(&scope).map(|x| (x,x)))
239+
.concat(tc_2_enter.enter(&scope).map(|x| (x,x)))
236240
.distinct()
237241
)
238242
.map(|(x,q)| (q,x));
@@ -242,31 +246,35 @@ where G::Timestamp: Lattice{
242246
// sg(X,Y) <- magic(X), par(X,Xp), par(Y,Yp), sg(Xp,Yp).
243247

244248
// ancestors of sg_x:
249+
let sg_x_enter = sg_x.clone();
250+
let sg_x_semijoin = sg_x.clone();
251+
let edges_magic = edges.clone();
245252
let magic =
246253
sg_x.iterate(|scope, inner|
247-
edges
254+
edges_magic
248255
.as_collection(|&k,&v| (v,k))
249256
.enter(&scope)
250257
.semijoin(inner)
251258
.map(|(_x,y)| y)
252-
.concat(sg_x.enter(&scope))
259+
.concat(sg_x_enter.enter(&scope))
253260
.distinct()
254261
);
255262

256263
let magic_edges =
257-
edges
258-
.join_core(magic.arrange_by_self(), |k,v,_| [(k.clone(), v.clone())])
264+
edges.clone()
265+
.join_core(magic.clone().arrange_by_self(), |k,v,_| [(k.clone(), v.clone())])
259266
.map(|(x,y)|(y,x))
260-
.semijoin(magic)
267+
.semijoin(magic.clone())
261268
.map(|(x,y)|(y,x));
262269

270+
let magic_enter = magic.clone();
263271
let query3 =
264272
magic
265273
.map(|x| (x,x)) // for query q, sg(x,x)
266274
.iterate(|scope, inner| {
267275

268276
let edges = edges.enter(&scope);
269-
let magic = magic.enter(&scope);
277+
let magic = magic_enter.enter(&scope);
270278
let magic_edges = magic_edges.enter(&scope);
271279

272280
let result =
@@ -279,7 +287,7 @@ where G::Timestamp: Lattice{
279287
// result.map(|_| ()).consolidate().inspect(|x| println!("\t{:?}", x));
280288
result
281289
})
282-
.semijoin(sg_x);
290+
.semijoin(sg_x_semijoin);
283291

284292
query1.concat(query2).concat(query3).map(|(q,_)| q)
285293
}

experiments/src/bin/deals.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ fn main() {
4444
let graph = graph.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
4545

4646
match program.as_str() {
47-
"tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
48-
"sg" => sg(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("sg count: {:?}", x)).probe(),
47+
"tc" => tc(graph.clone()).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
48+
"sg" => sg(graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("sg count: {:?}", x)).probe(),
4949
_ => panic!("must specify one of 'tc', 'sg'.")
5050
};
5151

@@ -83,54 +83,54 @@ fn main() {
8383
use timely::order::Product;
8484

8585
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
86-
fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> VecCollection<G, Edge, Present> {
86+
fn tc<G: Scope<Timestamp=()>>(edges: EdgeArranged<G, Node, Node, Present>) -> VecCollection<G, Edge, Present> {
8787

8888
// repeatedly update minimal distances each node can be reached from each root
8989
edges.stream.scope().iterative::<Iter,_,_>(|scope| {
9090

91-
let inner = Variable::new(scope, Product::new(Default::default(), 1));
92-
let edges = edges.enter(&inner.scope());
91+
let (inner, inner_collection) = Variable::new(scope, Product::new(Default::default(), 1));
92+
let edges = edges.enter(scope);
9393

9494
let result =
95-
inner
95+
inner_collection
9696
.map(|(x,y)| (y,x))
9797
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
98-
.join_core(edges, |_y,&x,&z| Some((x, z)))
98+
.join_core(edges.clone(), |_y,&x,&z| Some((x, z)))
9999
.concat(edges.as_collection(|&k,&v| (k,v)))
100100
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
101-
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
101+
.threshold_semigroup(|_,_,x: Option<&Present>| if x.is_none() { Some(Present) } else { None })
102102
;
103103

104-
inner.set(result);
104+
inner.set(result.clone());
105105
result.leave()
106106
}
107107
)
108108
}
109109

110110
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
111-
fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> VecCollection<G, Edge, Present> {
111+
fn sg<G: Scope<Timestamp=()>>(edges: EdgeArranged<G, Node, Node, Present>) -> VecCollection<G, Edge, Present> {
112112

113-
let peers = edges.join_core(edges, |_,&x,&y| Some((x,y))).filter(|&(x,y)| x != y);
113+
let peers = edges.clone().join_core(edges.clone(), |_,&x,&y| Some((x,y))).filter(|&(x,y)| x != y);
114114

115115
// repeatedly update minimal distances each node can be reached from each root
116116
peers.scope().iterative::<Iter,_,_>(|scope| {
117117

118-
let inner = Variable::new(scope, Product::new(Default::default(), 1));
119-
let edges = edges.enter(&inner.scope());
120-
let peers = peers.enter(&inner.scope());
118+
let (inner, inner_collection) = Variable::new(scope, Product::new(Default::default(), 1));
119+
let edges = edges.enter(scope);
120+
let peers = peers.enter(scope);
121121

122122
let result =
123-
inner
123+
inner_collection
124124
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
125-
.join_core(edges, |_,&x,&z| Some((x, z)))
125+
.join_core(edges.clone(), |_,&x,&z| Some((x, z)))
126126
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
127127
.join_core(edges, |_,&x,&z| Some((x, z)))
128128
.concat(peers)
129129
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
130-
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
130+
.threshold_semigroup(|_,_,x: Option<&Present>| if x.is_none() { Some(Present) } else { None })
131131
;
132132

133-
inner.set(result);
133+
inner.set(result.clone());
134134
result.leave()
135135
}
136136
)

0 commit comments

Comments
 (0)