Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion communication/examples/comm_hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {

println!("worker {} of {} started", allocator.index(), allocator.peers());

// allocates pair of senders list and one receiver.
// allocates a pair of senders list and one receiver.
let (mut senders, mut receiver) = allocator.allocate(0);

// send typed data along each channel
Expand Down
4 changes: 2 additions & 2 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Configuration {
/// let guards = timely_communication::initialize(config, |mut allocator| {
/// println!("worker {} started", allocator.index());
///
/// // allocates pair of senders list and one receiver.
/// // allocates a pair of senders list and one receiver.
/// let (mut senders, mut receiver) = allocator.allocate(0);
///
/// // send typed data along each channel
Expand Down Expand Up @@ -215,7 +215,7 @@ pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
/// let guards = timely_communication::initialize_from(builders, Box::new(()), |mut allocator| {
/// println!("worker {} started", allocator.index());
///
/// // allocates pair of senders list and one receiver.
/// // allocates a pair of senders list and one receiver.
/// let (mut senders, mut receiver) = allocator.allocate(0);
///
/// // send typed data along each channel
Expand Down
12 changes: 6 additions & 6 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
//! This crate is part of the timely dataflow system, used primarily for its inter-worker communication.
//! It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
//!
//! Threads are spawned with an [`allocator::Generic`](./allocator/generic/enum.Generic.html), whose `allocate` method returns a pair of several send endpoints and one
//! Threads are spawned with an [`allocator::Generic`](allocator::generic::Generic), whose
//! [`allocate`](allocator::generic::Generic::allocate) method returns a pair of several send endpoints and one
//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
//!
//! To be communicated, a type must implement the [`Serialize`](./trait.Serialize.html) trait. A default implementation of `Serialize` is
//! provided for any type implementing [`Abomonation`](../abomonation/trait.Abomonation.html). To implement other serialization strategies, wrap your type
//! and implement `Serialize` for your wrapper.
//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait when using the
//! `bincode` feature or the [`Abomonation`](abomonation::Abomonation) trait when not.
//!
//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](./trait.Push.html) and [`Pull`](./trait.Pull.html)
//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](Push) and [`Pull`](Pull)
//! traits), which is used for more precise control of resources.
//!
//! # Examples
Expand All @@ -25,7 +25,7 @@
//! let guards = timely_communication::initialize(config, |mut allocator| {
//! println!("worker {} started", allocator.index());
//!
//! // allocates pair of senders list and one receiver.
//! // allocates a pair of senders list and one receiver.
//! let (mut senders, mut receiver) = allocator.allocate(0);
//!
//! // send typed data along each channel
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_0/chapter_0_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ Is timely dataflow always applicable? The intent of this research project is to

Under the covers, your computer (the one on which you are reading this text) is a dataflow processor. When your computer *reads memory* it doesn't actually wander off to find the memory, it introduces a read request into your memory controller, an independent component that will eventually return with the associated cache line. Your computer then gets back to work on whatever it was doing, hoping the responses from the controller return in a timely fashion.

Academically, I treat "my computer can do this, but timely dataflow cannot" as a bug. There are degrees, of course, and timely datalow isn't on par with the processor's custom hardware designed to handle low level requests efficiently, but *algorithmically*, the goal is that anything you can do efficiently with a computer you should be able to express in timely dataflow.
Academically, I treat "my computer can do this, but timely dataflow cannot" as a bug. There are degrees, of course, and timely dataflow isn't on par with the processor's custom hardware designed to handle low level requests efficiently, but *algorithmically*, the goal is that anything you can do efficiently with a computer you should be able to express in timely dataflow.
2 changes: 1 addition & 1 deletion mdbook/src/chapter_1/chapter_1_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Dataflow programming is fundamentally about describing your program as independe

Let's write an overly simple dataflow program. Remember our `examples/hello.rs` program? We are going to revisit that, but with some **timestamp** aspects removed. The goal is to get a sense for dataflow with all of its warts, and to get you excited for the next section where we bring back the timestamps. :)

Here is a reduced version of `examples/hello.rs` that just feeds data in to our dataflow, without paying any attention to progress made. In particular, we have removed the `probe()` operation, the resulting `probe` variable, and the use of `probe` to determine how long we should step the worker before introducing more data.
Here is a reduced version of `examples/hello.rs` that just feeds data into our dataflow, without paying any attention to progress made. In particular, we have removed the `probe()` operation, the resulting `probe` variable, and the use of `probe` to determine how long we should step the worker before introducing more data.

```rust
#![allow(unused_variables)]
Expand Down
6 changes: 3 additions & 3 deletions mdbook/src/chapter_1/chapter_1_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

When dataflow programs move data around arbitrarily, it becomes hard to correlate the produced outputs with the supplied inputs. If we supply a stream of bank transactions as input, and the output is a stream of bank balances, how can we know which input transactions are reflected in which output balances?

The standard approach to this problem is to install *timestamps* on the data. Each records gets a logical timestamp associated with it that indicates *when* it should be thought to happen. This is not necessarily "when" in terms of the date, time, or specific nanosecond the record was emitted; a timestamp could simply be a sequence number identifying a batch of input records. Or, and we will get in to the terrifying details later, it could be much more complicated than this.
The standard approach to this problem is to install *timestamps* on the data. Each records gets a logical timestamp associated with it that indicates *when* it should be thought to happen. This is not necessarily "when" in terms of the date, time, or specific nanosecond the record was emitted; a timestamp could simply be a sequence number identifying a batch of input records. Or, and we will get into the terrifying details later, it could be much more complicated than this.

Timestamps are what allow us to correlate inputs and outputs. When we introduce records with some logical timestamp, unless our dataflow computation changes the timestamps, we expect to see corresponding outputs with that same timestamp.

## An example

Remember from the dataflow section how when we remove the coordination from our `examples/hello.rs` program, the output was produced in some horrible order? In fact, each of those records had a timestamp associated with it that would reveal the correct order; we just weren't printing the timestamp because `inspect` doesn't have access to it.

Let's change the program to print out the timestamp with each record. This shouldn't be very thrilling output, because the timestamp is exactly the same as the number itself, but that didn't have to be the case. We are just going to replace the line
Let's change the program to print out the timestamp with each record. This shouldn't be a very thrilling output, because the timestamp is exactly the same as the number itself, but that didn't have to be the case. We are just going to replace the line

```rust,ignore
.inspect(move |x| println!("worker {}:\thello {}", index, x))
Expand Down Expand Up @@ -53,6 +53,6 @@ The timestamps are the `(Root, i)` things for various values of `i`. These happe

Timestamps are not only helpful for dataflow users, but for the operators themselves. With time we will start to write more interesting dataflow operators, and it may be important for them to understand which records should be thought to come before others.

Imagine, for example, a dataflow operator whose job is to report the "sum so far", where "so far" should be with respect to the timestamp (as opposed to whatever arbitary order the operator receives the records). Such an operator can't simply take its input records, add them to a total, and produce the result. The input records may no longer be ordered by timestamp, and the produced summations may not reflect any partial sum of the input. Instead, the operator needs to look at the timestamps on the records, and incorporate the numbers in order of their timestamps.
Imagine, for example, a dataflow operator whose job is to report the "sum so far", where "so far" should be with respect to the timestamp (as opposed to whatever arbitrary order the operator receives the records). Such an operator can't simply take its input records, add them to a total, and produce the result. The input records may no longer be ordered by timestamp, and the produced summations may not reflect any partial sum of the input. Instead, the operator needs to look at the timestamps on the records, and incorporate the numbers in order of their timestamps.

Of course, such an operator works great as long as it expects exactly one record for each timestamp. Things get harder for it if it might receive multiple records at each timestamp, or perhaps none. To address this, the underlying system will have to help the operator reason about the progress of its input, up next.
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Let's talk about how to create timely dataflows.

This section will be a bit of a tour through the dataflow construction process, ignoring for the moment details about the interesting ways in which you can get data in to and out of your dataflow; those will show up in the "Running Timely Dataflows" section. For now we are going to work with examples with fixed input data and no interactivity to speak of, focusing on what we can cause to happen to that data.
This section will be a bit of a tour through the dataflow construction process, ignoring for the moment details about the interesting ways in which you can get data into and out of your dataflow; those will show up in the "Running Timely Dataflows" section. For now we are going to work with examples with fixed input data and no interactivity to speak of, focusing on what we can cause to happen to that data.

Here is a relatively simple example, taken from `timely/examples/simple.rs`, that turns the numbers zero through nine into a stream, and then feeds them through an `inspect` operator printing them to the screen.

Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() {
}
```

There will be more to do to get data in to `input`, and we aren't going to worry about that at the moment. But, now you know two of the places you can get data from!
There will be more to do to get data into `input`, and we aren't going to worry about that at the moment. But, now you know two of the places you can get data from!

## Other sources

Expand Down
6 changes: 3 additions & 3 deletions mdbook/src/chapter_2/chapter_2_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ In between introducing streams of data and inspecting or capturing the output, w

## Mapping

One of the simplest thing one can do with a stream of data is to transform each record into a new record. In database terminology this would be called "projection", where you extract some fields from a larger record, but as we are in a more rich programming language we can perform arbitrary transformations.
One of the simplest things one can do with a stream of data is to transform each record into a new record. In database terminology this would be called "projection", where you extract some fields from a larger record, but as we are in a more rich programming language we can perform arbitrary transformations.

The `map` operator takes as argument a closure from the input data type to an output data type that you get to define. The result is the stream of records corresponding to the application of your closure to each record in the input stream.
The `map` operator takes as an argument a closure from the input data type to an output data type that you get to define. The result is the stream of records corresponding to the application of your closure to each record in the input stream.

The following program should print out the numbers one through ten.

Expand Down Expand Up @@ -115,7 +115,7 @@ Unlike `map`, the predicate passed to the `filter` operator does not receive *ow

## Logical Partitioning

There are two operators for spliting and combining streams, `partition` and `concat` respectively.
There are two operators for splitting and combining streams, `partition` and `concat` respectively.

The `partition` operator takes two arguments, a number of resulting streams to produce, and a closure which takes each record to a pair of the target partition identifier and the output record. The output of `partition` is a list of streams, where each stream contains those elements mapped to the stream under the closure.

Expand Down
8 changes: 4 additions & 4 deletions mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Most of what is interesting lies in the closure, so let's first tidy up some loo

The heart of the logic lies in the closure that binds `input` and `output`. These two are handles respectively to the operator's input (from which it can read records) and the operator's output (to which it can send records).

The input handle `input` has one primary method, `next`, which may return a pair of timestamp and batch of data. Rust really likes you to demonstrate a commitment to only looking at valid data, and our `while` loop does what is called deconstruction: we acknowledge the optional structure and only execute in the case the `Option` variant is `Some`, containing data. The `next` method could also return `None`, indicating that there is no more data available at the moment. It is strongly recommended that you take the hint and stop trying to read inputs at that point; timely gives you the courtesy of executing whatever code you want in this closure, but if you never release control back to the system you'll break things (timely employs ["cooperative multitasking"](https://en.wikipedia.org/wiki/Cooperative_multitasking)).
The input handle `input` has one primary method, `next`, which may return a pair consisting of a `CapabilityRef<Timestamp>` and a batch of data. Rust really likes you to demonstrate a commitment to only looking at valid data, and our `while` loop does what is called deconstruction: we acknowledge the optional structure and only execute in the case the `Option` variant is `Some`, containing data. The `next` method could also return `None`, indicating that there is no more data available at the moment. It is strongly recommended that you take the hint and stop trying to read inputs at that point; timely gives you the courtesy of executing whatever code you want in this closure, but if you never release control back to the system you'll break things (timely employs ["cooperative multitasking"](https://en.wikipedia.org/wiki/Cooperative_multitasking)).

The output handle `output` has one primary method, `session`, which starts up an output session at the indicated time. The resulting session can be given data in various ways: (i) element at a time with `give`, (ii) iterator at a time with `give_iterator`, and (iii) vector at a time with `give_content`. Internally it is buffering up the output and flushing automatically when the session goes out of scope, which happens above when we go around the `while` loop.

Expand All @@ -54,7 +54,7 @@ There is also a method `operators::source` which .. has no inputs. You can't cal

### Capabilities

We skipped a discussion of the `capability` argument, and we need to dig in to that now.
We skipped a discussion of the `capability` argument, and we need to dig into that now.

One of timely dataflow's main features is its ability to track whether an operator may or may not in the future receive more records bearing a certain timestamp. The way that timely does this is by requiring that its operators, like the ones we have written, hold *capabilities* for sending data at any timestamp. A capability is an instance of the `Capability<Time>` type, which looks to the outside world like an instance of `Time`, but which `output` will demand to see before it allows you to create a session.

Expand Down Expand Up @@ -112,7 +112,7 @@ Our next step is to define and return a closure that takes `output` as a paramet

The closure does a bit of a dance to capture the current time (not a capability, in this case), create a session with this time and send whatever the time happens to be as data, then downgrade the capability to be one timestep in the future. If it turns out that this is greater than twenty we discard the capability.

The system is smart enough to notice when you downgrade and discard capabilities, and it understand that these actions represent irreversible actions on your part that can now be communicated to others in the dataflow. As this closure is repeatedly executed, the timestamp of the capability will advance and the system will be able to indicate this to downstream operators.
The system is smart enough to notice when you downgrade and discard capabilities, and it understands that these actions represent irreversible actions on your part that can now be communicated to others in the dataflow. As this closure is repeatedly executed, the timestamp of the capability will advance and the system will be able to indicate this to downstream operators.

### Stateful operators

Expand Down Expand Up @@ -218,7 +218,7 @@ fn main() {

As an exercise, this example could be improved in a few ways. How might you change it so that the data are still sent in the order they are received, but messages may be sent as soon as they are received if their time is currently in the frontier? This would avoid buffering messages that are ready to go, and would only buffer messages that are out-of-order, potentially reducing the memory footprint and improving the effective latency.

Before ending the section, let's rewrite this example without the `notificator`, in an attempt to de-mystify how it works. Whether you use a notificator or not is up to you; they are mostly about staying sane in what can be a confusing setting, and you can totally skip them once you have internalized how capabilities and frontiers work.
Before ending the section, let's rewrite this example without the `notificator`, in an attempt to demystify how it works. Whether you use a notificator or not is up to you; they are mostly about staying sane in what can be a confusing setting, and you can totally skip them once you have internalized how capabilities and frontiers work.

```rust
extern crate timely;
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_5.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ You can check out the result in [`examples/wordcount.rs`](https://github.com/Tim
Echidnatron%
```

We kept sending the same word over and over, so its count goes up. Neat. If you'd like to run it with two workers, you just need to put `-- -w2` at the end of the command, like so:
We kept sending the same word over and over, so its count went up. Neat. If you'd like to run it with two workers, you just need to put `-- -w2` at the end of the command, like so:

```ignore
Echidnatron% cargo run --example wordcount -- -w2
Expand Down
Loading