Skip to content

Commit 84207e6

Browse files
Updates for timely 0.25 (#647)
* Updates for timely 0.25 * Restore proper Cargo.toml Signed-off-by: Moritz Hoffmann <[email protected]> * Update columnar example to timely 0.25 --------- Signed-off-by: Moritz Hoffmann <[email protected]> Co-authored-by: Moritz Hoffmann <[email protected]>
1 parent 44168f1 commit 84207e6

File tree

28 files changed

+140
-122
lines changed

28 files changed

+140
-122
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ resolver = "2"
1717

1818
[workspace.dependencies]
1919
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.17.0" }
20-
timely = { version = "0.24", default-features = false }
21-
columnar = { version = "0.10", default-features = false }
20+
timely = { version = "0.25", default-features = false }
21+
columnar = { version = "0.11", default-features = false }
2222
#timely = { path = "../timely-dataflow/timely/", default-features = false }
2323

2424
[profile.release]

differential-dataflow/examples/columnar.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ pub mod storage {
298298
pub mod val {
299299

300300
use std::fmt::Debug;
301-
use columnar::{Container, ContainerOf, Index, Len, Push};
301+
use columnar::{Borrow, Container, ContainerOf, Index, Len, Push};
302302
use columnar::Vecs;
303303

304304
use crate::layout::ColumnarUpdate as Update;
@@ -406,7 +406,7 @@ pub mod storage {
406406

407407
pub mod key {
408408

409-
use columnar::{Container, ContainerOf, Index, Len, Push};
409+
use columnar::{Borrow, Container, ContainerOf, Index, Len, Push};
410410
use columnar::Vecs;
411411

412412
use crate::layout::ColumnarUpdate as Update;
@@ -532,7 +532,7 @@ mod column_builder {
532532
self.current.push(item);
533533
if self.current.len() > 1024 * 1024 {
534534
// TODO: Consolidate the batch?
535-
use columnar::{Container, Index};
535+
use columnar::{Borrow, Index};
536536
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
537537
refs.sort();
538538
let storage = ValStorage::form(refs.into_iter());
@@ -570,7 +570,7 @@ mod column_builder {
570570
fn finish(&mut self) -> Option<&mut Self::Container> {
571571
if !self.current.is_empty() {
572572
// TODO: Consolidate the batch?
573-
use columnar::{Container, Index};
573+
use columnar::{Borrow, Index};
574574
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
575575
refs.sort();
576576
let storage = ValStorage::form(refs.into_iter());
@@ -612,7 +612,7 @@ mod column_builder {
612612
self.current.push(item);
613613
if self.current.len() > 1024 * 1024 {
614614
// TODO: Consolidate the batch?
615-
use columnar::{Container, Index};
615+
use columnar::{Borrow, Index};
616616
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
617617
refs.sort();
618618
let storage = KeyStorage::form(refs.into_iter());
@@ -642,7 +642,7 @@ mod column_builder {
642642
fn finish(&mut self) -> Option<&mut Self::Container> {
643643
if !self.current.is_empty() {
644644
// TODO: Consolidate the batch?
645-
use columnar::{Container, Index};
645+
use columnar::{Borrow, Index};
646646
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
647647
refs.sort();
648648
let storage = KeyStorage::form(refs.into_iter());
@@ -763,7 +763,7 @@ pub mod arrangement {
763763
pub use batch_container::Coltainer;
764764
pub mod batch_container {
765765

766-
use columnar::{Columnar, Container, Clear, Push, Index, Len};
766+
use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len};
767767
use differential_dataflow::trace::implementations::BatchContainer;
768768

769769
/// Container, anchored by `C` to provide an owned type.
@@ -815,7 +815,7 @@ pub mod arrangement {
815815
pub mod batcher {
816816

817817
use std::ops::Range;
818-
use columnar::{Columnar, Container, Index, Len, Push};
818+
use columnar::{Borrow, Columnar, Container, Index, Len, Push};
819819
use differential_dataflow::trace::implementations::chainless_batcher as chainless;
820820
use differential_dataflow::difference::{Semigroup, IsZero};
821821
use timely::progress::frontier::{Antichain, AntichainRef};

differential-dataflow/examples/iterate_container.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ fn wrap<G: Scope, C: timely::Container>(stream: &StreamCore<G, C>) -> StreamCore
4040
builder.build(move |_capability| move |_frontier| {
4141
let mut output = output.activate();
4242
input.for_each(|time, data| {
43-
let mut session = output.session(&time);
44-
session.give_container(&mut ContainerWrapper(std::mem::take(data)));
43+
output.give(&time, &mut ContainerWrapper(std::mem::take(data)));
4544
});
4645
});
4746
stream_out

differential-dataflow/examples/multitemporal.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ fn main() {
5959
let time = Pair::new(arguments[1], arguments[2]);
6060
if capability.time().less_equal(&time) {
6161
input
62-
.session(capability.clone())
62+
.activate()
63+
.session(&capability)
6364
.give((arguments[0], time, arguments[3]));
6465
} else {
6566
println!("Requested time {:?} no longer open (input from {:?})", time, capability.time());

differential-dataflow/examples/progress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ fn frontier<G, T>(
121121
) -> VecCollection<G, (Location, T)>
122122
where
123123
G: Scope<Timestamp: Lattice+Ord>,
124-
T: Timestamp<Summary: differential_dataflow::ExchangeData>,
124+
T: Timestamp<Summary: differential_dataflow::ExchangeData>+std::hash::Hash,
125125
{
126126
// Translate node and edge transitions into a common Location to Location edge with an associated Summary.
127127
let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary)));

differential-dataflow/src/algorithms/graphs/propagate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::operators::arrange::arrangement::ArrangeByKey;
1616
/// method to limit the introduction of labels.
1717
pub fn propagate<G, N, L, R>(edges: &VecCollection<G, (N,N), R>, nodes: &VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
1818
where
19-
G: Scope<Timestamp: Lattice+Ord>,
19+
G: Scope<Timestamp: Lattice+Ord+Hash>,
2020
N: ExchangeData+Hash,
2121
R: ExchangeData+Abelian,
2222
R: Multiply<R, Output=R>,
@@ -33,7 +33,7 @@ where
3333
/// method to limit the introduction of labels.
3434
pub fn propagate_at<G, N, L, F, R>(edges: &VecCollection<G, (N,N), R>, nodes: &VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
3535
where
36-
G: Scope<Timestamp: Lattice+Ord>,
36+
G: Scope<Timestamp: Lattice+Ord+Hash>,
3737
N: ExchangeData+Hash,
3838
R: ExchangeData+Abelian,
3939
R: Multiply<R, Output=R>,
@@ -60,7 +60,7 @@ where
6060
R: Multiply<R, Output=R>,
6161
R: From<i8>,
6262
L: ExchangeData,
63-
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
63+
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static,
6464
F: Fn(&L)->u64+Clone+'static,
6565
{
6666
// Morally the code performs the following iterative computation. However, in the interest of a simplified

differential-dataflow/src/algorithms/graphs/scc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ where
3535
/// Returns the subset of edges in the same strongly connected component.
3636
pub fn strongly_connected<G, N, R>(graph: &VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
3737
where
38-
G: Scope<Timestamp: Lattice + Ord>,
38+
G: Scope<Timestamp: Lattice+Ord+Hash>,
3939
N: ExchangeData + Hash,
4040
R: ExchangeData + Abelian,
4141
R: Multiply<R, Output=R>,
@@ -51,7 +51,7 @@ where
5151
fn trim_edges<G, N, R>(cycle: &VecCollection<G, (N,N), R>, edges: &VecCollection<G, (N,N), R>)
5252
-> VecCollection<G, (N,N), R>
5353
where
54-
G: Scope<Timestamp: Lattice+Ord>,
54+
G: Scope<Timestamp: Lattice+Ord+Hash>,
5555
N: ExchangeData + Hash,
5656
R: ExchangeData + Abelian,
5757
R: Multiply<R, Output=R>,

differential-dataflow/src/algorithms/graphs/sequential.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::hashable::Hashable;
1111

1212
fn _color<G, N>(edges: &VecCollection<G, (N,N)>) -> VecCollection<G,(N,Option<u32>)>
1313
where
14-
G: Scope<Timestamp: Lattice+Ord>,
14+
G: Scope<Timestamp: Lattice+Ord+Hash>,
1515
N: ExchangeData+Hash,
1616
{
1717
// need some bogus initial values.

differential-dataflow/src/capture.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ pub mod source {
228228
use std::marker::{Send, Sync};
229229
use std::sync::Arc;
230230
use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
231+
use timely::dataflow::operators::generic::OutputBuilder;
231232
use timely::progress::Timestamp;
232233
use timely::scheduling::SyncActivator;
233234

@@ -313,8 +314,11 @@ pub mod source {
313314
let activator2 = scope.activator_for(Rc::clone(&address));
314315
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
315316
let mut source = source_builder(activator);
316-
let (mut updates_out, updates) = messages_op.new_output();
317-
let (mut progress_out, progress) = messages_op.new_output();
317+
let (updates_out, updates) = messages_op.new_output();
318+
let mut updates_out = OutputBuilder::from(updates_out);
319+
let (progress_out, progress) = messages_op.new_output();
320+
let mut progress_out = OutputBuilder::from(progress_out);
321+
318322
messages_op.build(|capabilities| {
319323

320324
// A Weak that communicates whether the returned token has been dropped.
@@ -387,8 +391,11 @@ pub mod source {
387391
// Step 2: The UPDATES operator.
388392
let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
389393
let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
390-
let (mut changes_out, changes) = updates_op.new_output();
391-
let (mut counts_out, counts) = updates_op.new_output();
394+
let (changes_out, changes) = updates_op.new_output();
395+
let mut changes_out = OutputBuilder::from(changes_out);
396+
let (counts_out, counts) = updates_op.new_output();
397+
let mut counts_out = OutputBuilder::from(counts_out);
398+
392399
updates_op.build(move |_capability| {
393400
// Deduplicates updates, and ships novel updates and the counts for each time.
394401
// For simplicity, this operator ships updates as they are discovered to be new.
@@ -438,7 +445,8 @@ pub mod source {
438445
);
439446
let mut counts =
440447
progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
441-
let (mut frontier_out, frontier) = progress_op.new_output();
448+
let (frontier_out, frontier) = progress_op.new_output();
449+
let mut frontier_out = OutputBuilder::from(frontier_out);
442450
progress_op.build(move |_capability| {
443451
// Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
444452

@@ -554,7 +562,7 @@ pub mod sink {
554562
use timely::progress::{Antichain, ChangeBatch, Timestamp};
555563
use timely::dataflow::{Scope, Stream};
556564
use timely::dataflow::channels::pact::{Exchange, Pipeline};
557-
use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder};
565+
use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
558566

559567
use crate::{lattice::Lattice, ExchangeData};
560568
use super::{Writer, Message, Progress};
@@ -583,7 +591,8 @@ pub mod sink {
583591
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
584592
let reactivator = stream.scope().activator_for(builder.operator_info().address);
585593
let mut input = builder.new_input(stream, Pipeline);
586-
let (mut updates_out, updates) = builder.new_output();
594+
let (updates_out, updates) = builder.new_output();
595+
let mut updates_out = OutputBuilder::from(updates_out);
587596

588597
builder.build_reschedule(
589598
move |_capability| {
@@ -650,7 +659,6 @@ pub mod sink {
650659

651660
builder.build_reschedule(|_capabilities| {
652661
move |frontiers| {
653-
let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]);
654662

655663
// We want to drain inputs no matter what.
656664
// We could do this after the next step, as we are certain these timestamps will
@@ -667,9 +675,9 @@ pub mod sink {
667675
// If our frontier advances strictly, we have the opportunity to issue a progress statement.
668676
if <_ as PartialOrder>::less_than(
669677
&frontier.borrow(),
670-
&input.frontier.frontier(),
678+
&frontiers[0].frontier(),
671679
) {
672-
let new_frontier = input.frontier.frontier();
680+
let new_frontier = frontiers[0].frontier();
673681

674682
// Extract the timestamp counts to announce.
675683
let mut announce = Vec::new();
@@ -691,7 +699,7 @@ pub mod sink {
691699
send_queue.push_back(Message::Progress(progress));
692700

693701
// Advance our frontier to track our progress utterance.
694-
frontier = input.frontier.frontier().to_owned();
702+
frontier = frontiers[0].frontier().to_owned();
695703

696704
while let Some(message) = send_queue.front() {
697705
if let Some(duration) = sink.poll(message) {

differential-dataflow/src/collection.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ impl<G: Scope, D: Clone+'static, R: Clone+'static> VecCollection<G, D, R> {
496496
/// to all of the data timestamps).
497497
pub fn delay<F>(&self, func: F) -> VecCollection<G, D, R>
498498
where
499+
G::Timestamp: Hash,
499500
F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
500501
{
501502
let mut func1 = func.clone();

0 commit comments

Comments
 (0)