Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ resolver = "2"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.17.0" }
timely = { version = "0.24", default-features = false }
columnar = { version = "0.10", default-features = false }
timely = { version = "0.25", default-features = false }
columnar = { version = "0.11", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[profile.release]
Expand Down
16 changes: 8 additions & 8 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ pub mod storage {
pub mod val {

use std::fmt::Debug;
use columnar::{Container, ContainerOf, Index, Len, Push};
use columnar::{Borrow, Container, ContainerOf, Index, Len, Push};
use columnar::Vecs;

use crate::layout::ColumnarUpdate as Update;
Expand Down Expand Up @@ -406,7 +406,7 @@ pub mod storage {

pub mod key {

use columnar::{Container, ContainerOf, Index, Len, Push};
use columnar::{Borrow, Container, ContainerOf, Index, Len, Push};
use columnar::Vecs;

use crate::layout::ColumnarUpdate as Update;
Expand Down Expand Up @@ -532,7 +532,7 @@ mod column_builder {
self.current.push(item);
if self.current.len() > 1024 * 1024 {
// TODO: Consolidate the batch?
use columnar::{Container, Index};
use columnar::{Borrow, Index};
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let storage = ValStorage::form(refs.into_iter());
Expand Down Expand Up @@ -570,7 +570,7 @@ mod column_builder {
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.current.is_empty() {
// TODO: Consolidate the batch?
use columnar::{Container, Index};
use columnar::{Borrow, Index};
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let storage = ValStorage::form(refs.into_iter());
Expand Down Expand Up @@ -612,7 +612,7 @@ mod column_builder {
self.current.push(item);
if self.current.len() > 1024 * 1024 {
// TODO: Consolidate the batch?
use columnar::{Container, Index};
use columnar::{Borrow, Index};
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let storage = KeyStorage::form(refs.into_iter());
Expand Down Expand Up @@ -642,7 +642,7 @@ mod column_builder {
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.current.is_empty() {
// TODO: Consolidate the batch?
use columnar::{Container, Index};
use columnar::{Borrow, Index};
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let storage = KeyStorage::form(refs.into_iter());
Expand Down Expand Up @@ -763,7 +763,7 @@ pub mod arrangement {
pub use batch_container::Coltainer;
pub mod batch_container {

use columnar::{Columnar, Container, Clear, Push, Index, Len};
use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len};
use differential_dataflow::trace::implementations::BatchContainer;

/// Container, anchored by `C` to provide an owned type.
Expand Down Expand Up @@ -815,7 +815,7 @@ pub mod arrangement {
pub mod batcher {

use std::ops::Range;
use columnar::{Columnar, Container, Index, Len, Push};
use columnar::{Borrow, Columnar, Container, Index, Len, Push};
use differential_dataflow::trace::implementations::chainless_batcher as chainless;
use differential_dataflow::difference::{Semigroup, IsZero};
use timely::progress::frontier::{Antichain, AntichainRef};
Expand Down
3 changes: 1 addition & 2 deletions differential-dataflow/examples/iterate_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ fn wrap<G: Scope, C: timely::Container>(stream: &StreamCore<G, C>) -> StreamCore
builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
input.for_each(|time, data| {
let mut session = output.session(&time);
session.give_container(&mut ContainerWrapper(std::mem::take(data)));
output.give(&time, &mut ContainerWrapper(std::mem::take(data)));
});
});
stream_out
Expand Down
3 changes: 2 additions & 1 deletion differential-dataflow/examples/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ fn main() {
let time = Pair::new(arguments[1], arguments[2]);
if capability.time().less_equal(&time) {
input
.session(capability.clone())
.activate()
.session(&capability)
.give((arguments[0], time, arguments[3]));
} else {
println!("Requested time {:?} no longer open (input from {:?})", time, capability.time());
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn frontier<G, T>(
) -> VecCollection<G, (Location, T)>
where
G: Scope<Timestamp: Lattice+Ord>,
T: Timestamp<Summary: differential_dataflow::ExchangeData>,
T: Timestamp<Summary: differential_dataflow::ExchangeData>+std::hash::Hash,
{
// Translate node and edge transitions into a common Location to Location edge with an associated Summary.
let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary)));
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::operators::arrange::arrangement::ArrangeByKey;
/// method to limit the introduction of labels.
pub fn propagate<G, N, L, R>(edges: &VecCollection<G, (N,N), R>, nodes: &VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -33,7 +33,7 @@ where
/// method to limit the introduction of labels.
pub fn propagate_at<G, N, L, F, R>(edges: &VecCollection<G, (N,N), R>, nodes: &VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -60,7 +60,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ where
/// Returns the subset of edges in the same strongly connected component.
pub fn strongly_connected<G, N, R>(graph: &VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice + Ord>,
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -51,7 +51,7 @@ where
fn trim_edges<G, N, R>(cycle: &VecCollection<G, (N,N), R>, edges: &VecCollection<G, (N,N), R>)
-> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::hashable::Hashable;

fn _color<G, N>(edges: &VecCollection<G, (N,N)>) -> VecCollection<G,(N,Option<u32>)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData+Hash,
{
// need some bogus initial values.
Expand Down
30 changes: 19 additions & 11 deletions differential-dataflow/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub mod source {
use std::marker::{Send, Sync};
use std::sync::Arc;
use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
use timely::dataflow::operators::generic::OutputBuilder;
use timely::progress::Timestamp;
use timely::scheduling::SyncActivator;

Expand Down Expand Up @@ -313,8 +314,11 @@ pub mod source {
let activator2 = scope.activator_for(Rc::clone(&address));
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
let mut source = source_builder(activator);
let (mut updates_out, updates) = messages_op.new_output();
let (mut progress_out, progress) = messages_op.new_output();
let (updates_out, updates) = messages_op.new_output();
let mut updates_out = OutputBuilder::from(updates_out);
let (progress_out, progress) = messages_op.new_output();
let mut progress_out = OutputBuilder::from(progress_out);

messages_op.build(|capabilities| {

// A Weak that communicates whether the returned token has been dropped.
Expand Down Expand Up @@ -387,8 +391,11 @@ pub mod source {
// Step 2: The UPDATES operator.
let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
let (mut changes_out, changes) = updates_op.new_output();
let (mut counts_out, counts) = updates_op.new_output();
let (changes_out, changes) = updates_op.new_output();
let mut changes_out = OutputBuilder::from(changes_out);
let (counts_out, counts) = updates_op.new_output();
let mut counts_out = OutputBuilder::from(counts_out);

updates_op.build(move |_capability| {
// Deduplicates updates, and ships novel updates and the counts for each time.
// For simplicity, this operator ships updates as they are discovered to be new.
Expand Down Expand Up @@ -438,7 +445,8 @@ pub mod source {
);
let mut counts =
progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
let (mut frontier_out, frontier) = progress_op.new_output();
let (frontier_out, frontier) = progress_op.new_output();
let mut frontier_out = OutputBuilder::from(frontier_out);
progress_op.build(move |_capability| {
// Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.

Expand Down Expand Up @@ -554,7 +562,7 @@ pub mod sink {
use timely::progress::{Antichain, ChangeBatch, Timestamp};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder};
use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};

use crate::{lattice::Lattice, ExchangeData};
use super::{Writer, Message, Progress};
Expand Down Expand Up @@ -583,7 +591,8 @@ pub mod sink {
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(builder.operator_info().address);
let mut input = builder.new_input(stream, Pipeline);
let (mut updates_out, updates) = builder.new_output();
let (updates_out, updates) = builder.new_output();
let mut updates_out = OutputBuilder::from(updates_out);

builder.build_reschedule(
move |_capability| {
Expand Down Expand Up @@ -650,7 +659,6 @@ pub mod sink {

builder.build_reschedule(|_capabilities| {
move |frontiers| {
let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]);

// We want to drain inputs no matter what.
// We could do this after the next step, as we are certain these timestamps will
Expand All @@ -667,9 +675,9 @@ pub mod sink {
// If our frontier advances strictly, we have the opportunity to issue a progress statement.
if <_ as PartialOrder>::less_than(
&frontier.borrow(),
&input.frontier.frontier(),
&frontiers[0].frontier(),
) {
let new_frontier = input.frontier.frontier();
let new_frontier = frontiers[0].frontier();

// Extract the timestamp counts to announce.
let mut announce = Vec::new();
Expand All @@ -691,7 +699,7 @@ pub mod sink {
send_queue.push_back(Message::Progress(progress));

// Advance our frontier to track our progress utterance.
frontier = input.frontier.frontier().to_owned();
frontier = frontiers[0].frontier().to_owned();

while let Some(message) = send_queue.front() {
if let Some(duration) = sink.poll(message) {
Expand Down
1 change: 1 addition & 0 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ impl<G: Scope, D: Clone+'static, R: Clone+'static> VecCollection<G, D, R> {
/// to all of the data timestamps).
pub fn delay<F>(&self, func: F) -> VecCollection<G, D, R>
where
G::Timestamp: Hash,
F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
{
let mut func1 = func.clone();
Expand Down
7 changes: 0 additions & 7 deletions differential-dataflow/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
}

mod container {
use std::ops::Deref;

use columnation::Columnation;

use crate::containers::TimelyStack;
Expand All @@ -278,11 +276,6 @@ mod container {
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.local.len()).unwrap() }
#[inline] fn is_empty(&self) -> bool { self.local.is_empty() }
}
impl<T: Columnation> timely::container::IterContainer for TimelyStack<T> {
type ItemRef<'a> = &'a T where Self: 'a;
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
}
impl<T: Columnation> timely::container::DrainContainer for TimelyStack<T> {
type Item<'a> = &'a T where Self: 'a;
type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
Expand Down
5 changes: 3 additions & 2 deletions differential-dataflow/src/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod pointstamp;
use timely::dataflow::Scope;
use timely::order::Product;
use timely::progress::Timestamp;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::generic::{OutputBuilder, builder_rc::OperatorBuilder};
use timely::dataflow::channels::pact::Pipeline;
use timely::progress::Antichain;

Expand All @@ -42,7 +42,8 @@ where
pub fn leave_dynamic(&self, level: usize) -> Self {
// Create a unary operator that will strip all but `level-1` timestamp coordinates.
let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
let (mut output, stream) = builder.new_output();
let (output, stream) = builder.new_output();
let mut output = OutputBuilder::from(output);
let mut input = builder.new_input_connection(&self.inner, Pipeline, [(0, Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } }))]);

builder.build(move |_capability| move |_frontier| {
Expand Down
18 changes: 9 additions & 9 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ where
// Initialize to the minimal input frontier.
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

move |input, output| {
move |(input, frontier), output| {

// As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
// We don't have to keep all capabilities, but we need to be able to form output messages
Expand All @@ -422,12 +422,12 @@ where
// and sending smaller bites than we might have otherwise done.

// Assert that the frontier never regresses.
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));

// Test to see if strict progress has occurred, which happens whenever the new
// frontier isn't equal to the previous. It is only in this case that we have any
// data processing to do.
if prev_frontier.borrow() != input.frontier().frontier() {
if prev_frontier.borrow() != frontier.frontier() {
// There are two cases to handle with some care:
//
// 1. If any held capabilities are not in advance of the new input frontier,
Expand All @@ -441,20 +441,20 @@ where
// and feed this to the trace agent (but not along the timely output).

// If there is at least one capability not in advance of the input frontier ...
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {

let mut upper = Antichain::new(); // re-used allocation for sealing batches.

// For each capability not in advance of the input frontier ...
for (index, capability) in capabilities.elements().iter().enumerate() {

if !input.frontier().less_equal(capability.time()) {
if !frontier.less_equal(capability.time()) {

// Assemble the upper bound on times we can commit with this capabilities.
// We must respect the input frontier, and *subsequent* capabilities, as
// we are pretending to retire the capability changes one by one.
upper.clear();
for time in input.frontier().frontier().iter() {
for time in frontier.frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1) .. ] {
Expand Down Expand Up @@ -490,12 +490,12 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
writer.seal(input.frontier().frontier().to_owned());
let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
writer.seal(frontier.frontier().to_owned());
}

prev_frontier.clear();
prev_frontier.extend(input.frontier().frontier().iter().cloned());
prev_frontier.extend(frontier.frontier().iter().cloned());
}

writer.exert();
Expand Down
Loading