Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions src/dataflow/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,45 @@ where
}
}

/// Applies logic to elements of a collection and returns the results.
///
/// This method allows savvy users to avoid the potential allocation
/// required by `self.collection(expr)` when converting data from an
/// arrangement; if less data are required, the `logic` argument is able
/// to use a reference and produce the minimal amount of data instead.
pub fn flat_map_ref<I, L>(
&self,
relation_expr: &P,
mut logic: L,
) -> Option<(
Collection<S, I::Item, Diff>,
Collection<S, DataflowError, Diff>,
)>
where
I: IntoIterator,
I::Item: Data,
L: FnMut(&V) -> I + 'static,
{
if let Some((oks, err)) = self.collections.get(relation_expr) {
Some((oks.flat_map(move |v| logic(&v)), err.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to sort out whether this results in a per-row copy. Am I correct in thinking that it doesn't, because there will only be one element in pushers here? https://github.com/TimelyDataflow/timely-dataflow/blob/master/timely/src/dataflow/channels/pushers/tee.rs#L19

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure! At least, I think we could have clones if someone calls flat_map_ref multiple times and it resolves to the first case. There is a potential improvement to make to timely when you can push a FnMut(&Record)->Result upstream, but this is a trickier discussion. A clone should only occur for the second + beyond cases where a stream actually gets attached to a downstream thing (that is what registers something in the Tee).

Not sure if this helps clarify. Feel free to ask more about it though! Here, or interactively.

} else if let Some(local) = self.local.get(relation_expr) {
let (oks, errs) = local.values().next().expect("Empty arrangement");
Some((
oks.flat_map_ref(move |_k, v| logic(v)),
errs.as_collection(|k, _v| k.clone()),
))
} else if let Some(trace) = self.trace.get(relation_expr) {
let (_id, oks, errs) = trace.values().next().expect("Empty arrangement");
Some((
// oks.as_collection(|_k, v| v.clone()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure is! Thanks. :D

oks.flat_map_ref(move |_k, v| logic(v)),
errs.as_collection(|k, _v| k.clone()),
))
} else {
None
}
}

/// Convenience method for accessing `arrangement` when all keys are plain columns
pub fn arrangement_columns(
&self,
Expand Down
202 changes: 106 additions & 96 deletions src/dataflow/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ use timely::dataflow::Scope;
use timely::progress::{timestamp::Refines, Timestamp};

use dataflow_types::DataflowError;
use expr::{AggregateExpr, AggregateFunc, RelationExpr, ScalarExpr};
use expr::{AggregateExpr, AggregateFunc, RelationExpr};
use repr::{Datum, Row, RowArena, RowPacker};

use super::context::Context;
use crate::operator::{CollectionExt, StreamExt};
use crate::render::context::Arrangement;

impl<G, T> Context<G, RelationExpr, Row, T>
Expand Down Expand Up @@ -61,63 +60,123 @@ where
// applied a single reduction (which should be good for any consumers
// of the operator and its arrangement).

let keys_clone = group_key.clone();
let relation_expr_clone = relation_expr.clone();

let (ok_input, err_input) = self.collection(input).unwrap();
// Our first step is to extract `(key, vals)` from `input`.
// We do this carefully, attempting to avoid unneccesary allocations
// that would result from cloning rows in input arrangements.
let group_key_clone = group_key.clone();
let aggregates_clone = aggregates.clone();

// Tracks the required number of columns to extract.
let mut columns_needed = 0;
for key in group_key.iter() {
for column in key.support() {
columns_needed = std::cmp::max(columns_needed, column + 1);
}
}
for aggr in aggregates.iter() {
for column in aggr.expr.support() {
columns_needed = std::cmp::max(columns_needed, column + 1);
}
}

let mut row_packer = RowPacker::new();
let (key_val_input, mut err_input): (
Collection<_, Result<(Row, Row), DataflowError>, _>,
_,
) = self
.flat_map_ref(input, move |row| {
let temp_storage = RowArena::new();
let mut results = Vec::new();
// First, evaluate the key selector expressions.
// If any error we produce their errors as output and note
// the fact that the key was not correctly produced.
// TODO(frank): this allocation could be re-used if we were
// more clever about where it came from (e.g. allocated with
// lifetime `'storage` in `flat_map_ref`).
let datums = row.iter().take(columns_needed).collect::<Vec<_>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create the vector outside the closure (then clear/fill it from the iter inside) in order to reuse the allocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot! At least not without a more advanced implementation. Roughly: the lifetime in there is that of 'storage, which is the lifetime of the &Row we get to look at and the lifetime of the associated Datum types. That comes in to existence in the flat_map_ref call. With some more work (I tried, and I think I walked away from) you can stash a vector like that, and then offer it for use, but it's horrible in the case of "not an arrangement" because there is no such storage and in that case you need to re-alloc a vector for each Row..

I'll put a note that we could look in to it though!

for expr in group_key_clone.iter() {
match expr.eval(&datums, &temp_storage) {
Ok(val) => row_packer.push(val),
Err(e) => {
results.push(Err(e.into()));
}
}
}

// Second, evaluate the value selector.
// If any error occurs we produce both the error as output,
// but also a `Datum::Null` value to avoid causing the later
// "ReduceCollation" operator to panic due to absent aggregates.
if results.is_empty() {
let key = row_packer.finish_and_reuse();
for aggr in aggregates_clone.iter() {
match aggr.expr.eval(&datums, &temp_storage) {
Ok(val) => {
row_packer.push(val);
}
Err(e) => {
row_packer.push(Datum::Null);
results.push(Err(e.into()));
}
}
}
let row = row_packer.finish_and_reuse();
results.push(Ok((key, row)));
}
// Return accumulated results.
results
})
.unwrap();

// Demux out the potential errors from key and value selector evaluation.
use timely::dataflow::operators::ok_err::OkErr;
let (ok, err) = key_val_input.inner.ok_err(|(x, t, d)| match x {
Ok(x) => Ok((x, t, d)),
Err(x) => Err((x, t, d)),
});

let ok_input = ok.as_collection();
err_input = err.as_collection().concat(&err_input);

// Distinct is a special case, as there are no aggregates to aggregate.
// In this case, we use a special implementation that does not rely on
// collating aggregates.
let (oks, errs) = if aggregates.is_empty() {
let (ok_collection, err_collection) = ok_input.map_fallible({
let group_key = group_key.clone();
move |row| {
let temp_storage = RowArena::new();
let datums = row.unpack();
let key = Row::try_pack(
group_key.iter().map(|i| i.eval(&datums, &temp_storage)),
)?;
Ok::<_, DataflowError>((key, ()))
}
});
(
ok_collection.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("DistinctBy", {
ok_input.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("DistinctBy", {
|key, _input, output| {
output.push((key.clone(), 1));
}
}),
err_input.concat(&err_collection),
err_input,
)
} else if aggregates.len() == 1 {
// If we have a single aggregate, we need not stage aggregations separately.
build_aggregate_stage(ok_input, err_input, group_key, &aggregates[0], true)
(
build_aggregate_stage(ok_input, 0, &aggregates[0], true),
err_input,
)
} else {
// We'll accumulate partial aggregates here, where each contains updates
// of the form `(key, (index, value))`. This is eventually concatenated,
// and fed into a final reduce to put the elements in order.
let mut ok_partials = Vec::with_capacity(aggregates.len());
let mut err_partials = Vec::with_capacity(aggregates.len());
// Bound the complex dataflow in a region, for better interpretability.
scope.region(|region| {
// Create an iterator over collections, where each is the application
// of one aggregation function whose results are annotated with its
// position in the final results. To be followed by a merge reduction.
for (index, aggr) in aggregates.iter().enumerate() {
// Collect the now-aggregated partial result, annotated with its position.
let (ok_partial, err_partial) = build_aggregate_stage(
ok_input.enter(region),
err_input.enter(region),
group_key,
aggr,
false,
);
let ok_partial =
build_aggregate_stage(ok_input.enter(region), index, aggr, false);
ok_partials.push(
ok_partial
.as_collection(move |key, val| (key.clone(), (index, val.clone())))
.leave(),
);
err_partials.push(err_partial.leave());
}
});

Expand Down Expand Up @@ -155,15 +214,14 @@ where
result.extend(key.iter());
for ((_pos, val), cnt) in input.iter() {
assert_eq!(*cnt, 1);
result.push(val.unpack().pop().unwrap());
result.push(val.unpack_first());
}
output.push((result.finish(), 1));
}
});
let errs = differential_dataflow::collection::concatenate(scope, err_partials);
(oks, errs)
(oks, err_input)
};
let index = (0..keys_clone.len()).collect::<Vec<_>>();
let index = (0..group_key.len()).collect::<Vec<_>>();
self.set_local_columns(relation_expr, &index[..], (oks, errs.arrange()));
}
}
Expand All @@ -174,81 +232,31 @@ where
/// This method accommodates in-place aggregations like sums, hierarchical aggregations like min and max,
/// and other aggregations that may be neither of those things. It also applies distinctness if required.
fn build_aggregate_stage<G>(
ok_input: Collection<G, Row>,
err_input: Collection<G, DataflowError>,
group_key: &[ScalarExpr],
ok_input: Collection<G, (Row, Row)>,
index: usize,
aggr: &AggregateExpr,
prepend_key: bool,
) -> (Arrangement<G, Row>, Collection<G, DataflowError>)
) -> Arrangement<G, Row>
where
G: Scope,
G::Timestamp: Lattice,
{
let AggregateExpr {
func,
expr,
expr: _,
distinct,
} = aggr.clone();

// It is important that in the case of an error in the value selector we still provide a
// value, so that the aggregation produces an aggregate with the correct key. If we do not,
// the `ReduceCollation` operator panics.
use timely::dataflow::channels::pact::Pipeline;
let (partial, err_partial) =
let mut partial = if !prepend_key {
let mut packer = RowPacker::new();
ok_input.map(move |(key, row)| {
let value = row.iter().nth(index).unwrap();
packer.push(value);
(key, packer.finish_and_reuse())
})
} else {
ok_input
.inner
.unary_fallible(Pipeline, "ReduceStagePreparation", |_cap, _info| {
let group_key = group_key.to_vec();
let mut storage = Vec::new();
move |input, ok_output, err_output| {
input.for_each(|time, data| {
let temp_storage = RowArena::new();
let mut ok_session = ok_output.session(&time);
let mut err_session = err_output.session(&time);
data.swap(&mut storage);
for (row, t, diff) in storage.drain(..) {
// First, evaluate the key selector expressions.
// If any error we produce their errors as output and note
// the fact that the key was not correctly produced.
let mut key_packer = RowPacker::new();
let mut error_free = true;
let datums = row.unpack();
for expr in group_key.iter() {
match expr.eval(&datums, &temp_storage) {
Ok(val) => key_packer.push(val),
Err(e) => {
err_session.give((e.into(), t.clone(), diff));
error_free = false;
}
}
}
// Second, evaluate the value selector.
// If any error occurs we produce both the error as output,
// but also a `Datum::Null` value to avoid causing the later
// "ReduceCollation" operator to panic due to absent aggregates.
if error_free {
let key = key_packer.finish();
match expr.eval(&datums, &temp_storage) {
Ok(val) => {
ok_session.give(((key, Row::pack(Some(val))), t, diff));
}
Err(e) => {
ok_session.give((
(key, Row::pack(Some(Datum::Null))),
t.clone(),
diff,
));
err_session.give((e.into(), t, diff));
}
}
}
}
})
}
});

let mut partial = partial.as_collection();
let err_partial = err_partial.as_collection();
};

// If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
if distinct {
Expand Down Expand Up @@ -301,7 +309,7 @@ where
})
};

(ok_out, err_input.concat(&err_partial))
ok_out
}

/// Builds the dataflow for a reduction that can be performed in-place.
Expand All @@ -319,6 +327,7 @@ where
G: Scope,
G::Timestamp: Lattice,
{
use differential_dataflow::operators::consolidate::ConsolidateStream;
use timely::dataflow::operators::map::Map;

let float_scale = f64::from(1 << 24);
Expand All @@ -330,7 +339,7 @@ where
.explode({
let aggr = aggr.clone();
move |(key, row)| {
let datum = row.unpack()[0];
let datum = row.unpack_first();
let (aggs, nonnulls) = match aggr {
AggregateFunc::CountAll => {
// Nothing beyond the accumulated count is needed.
Expand Down Expand Up @@ -373,6 +382,7 @@ where
))
}
})
.consolidate_stream()
.reduce_abelian::<_, OrdValSpine<_, _, _, _>>(
"ReduceAccumulable",
move |key, input, output| {
Expand Down