Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ license = "MIT"
default = ["getopts"]

[dependencies]
getopts = { version = "0.2.14", optional = true}
getopts = { version = "0.2.14", optional = true }
bincode = { version = "1.0", optional = true }
serde_derive = "1.0"
serde = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion communication/examples/comm_hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use timely_communication::{Message, Allocate};
fn main() {

// extract the configuration from user-supplied arguments, initialize the computation.
let config = timely_communication::Configuration::from_args(std::env::args()).unwrap();
let config = timely_communication::Config::from_args(std::env::args()).unwrap();
let guards = timely_communication::initialize(config, |mut allocator| {

println!("worker {} of {} started", allocator.index(), allocator.peers());
Expand Down
128 changes: 72 additions & 56 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use logging_core::Logger;


/// Possible configurations for the communication infrastructure.
pub enum Configuration {
pub enum Config {
/// Use one thread.
Thread,
/// Use one process with an indicated number of threads.
Expand All @@ -38,81 +38,97 @@ pub enum Configuration {
}
}

#[cfg(feature = "getopts")]
impl Configuration {

/// Returns a `getopts::Options` struct that can be used to print
/// usage information in higher-level systems.
pub fn options() -> getopts::Options {
let mut opts = getopts::Options::new();
impl Config {
/// Installs options into a [`getopts::Options`] struct that corresponds
/// to the parameters in the configuration.
///
/// It is the caller's responsibility to ensure that the installed options
/// do not conflict with any other options that may exist in `opts`, or
/// that may be installed into `opts` in the future.
///
/// This method is only available if the `getopts` feature is enabled, which
/// it is by default.
#[cfg(feature = "getopts")]
pub fn install_options(opts: &mut getopts::Options) {
opts.optopt("w", "threads", "number of per-process worker threads", "NUM");
opts.optopt("p", "process", "identity of this process", "IDX");
opts.optopt("n", "processes", "number of processes", "NUM");
opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE");
opts.optflag("r", "report", "reports connection progress");

opts
}

/// Constructs a new configuration by parsing supplied text arguments.
/// Instantiates a configuration based upon the parsed options in `matches`.
///
/// Most commonly, this uses `std::env::Args()` as the supplied iterator.
pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Configuration,String> {
let opts = Configuration::options();

opts.parse(args)
.map_err(|e| format!("{:?}", e))
.map(|matches| {

// let mut config = Configuration::new(1, 0, Vec::new());
let threads = matches.opt_str("w").map(|x| x.parse().unwrap_or(1)).unwrap_or(1);
let process = matches.opt_str("p").map(|x| x.parse().unwrap_or(0)).unwrap_or(0);
let processes = matches.opt_str("n").map(|x| x.parse().unwrap_or(1)).unwrap_or(1);
let report = matches.opt_present("report");

assert!(process < processes);
/// The `matches` object must have been constructed from a
/// [`getopts::Options`] which contained at least the options installed by
/// [`Self::install_options`].
///
/// This method is only available if the `getopts` feature is enabled, which
/// it is by default.
#[cfg(feature = "getopts")]
pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
let report = matches.opt_present("report");

if processes > 1 {
let mut addresses = Vec::new();
if let Some(hosts) = matches.opt_str("h") {
let reader = ::std::io::BufReader::new(::std::fs::File::open(hosts.clone()).unwrap());
for x in reader.lines().take(processes) {
addresses.push(x.unwrap());
}
if addresses.len() < processes {
panic!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes);
}
if processes > 1 {
let mut addresses = Vec::new();
if let Some(hosts) = matches.opt_str("h") {
let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
let reader = ::std::io::BufReader::new(file);
for line in reader.lines().take(processes) {
addresses.push(line.map_err(|e| e.to_string())?);
}
else {
for index in 0..processes {
addresses.push(format!("localhost:{}", 2101 + index));
}
if addresses.len() < processes {
return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
}

assert!(processes == addresses.len());
Configuration::Cluster {
threads,
process,
addresses,
report,
log_fn: Box::new( | _ | None),
}
else {
for index in 0..processes {
addresses.push(format!("localhost:{}", 2101 + index));
}
}
else if threads > 1 { Configuration::Process(threads) }
else { Configuration::Thread }
})

assert!(processes == addresses.len());
Ok(Config::Cluster {
threads,
process,
addresses,
report,
log_fn: Box::new( | _ | None),
})
} else if threads > 1 {
Ok(Config::Process(threads))
} else {
Ok(Config::Thread)
}
}

/// Constructs a new configuration by parsing the supplied text arguments.
///
/// Most commonly, callers supply `std::env::args()` as the iterator.
///
/// This method is only available if the `getopts` feature is enabled, which
/// it is by default.
#[cfg(feature = "getopts")]
pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Config, String> {
let mut opts = getopts::Options::new();
Config::install_options(&mut opts);
let matches = opts.parse(args).map_err(|e| e.to_string())?;
Config::from_matches(&matches)
}

/// Attempts to assemble the described communication infrastructure.
pub fn try_build(self) -> Result<(Vec<GenericBuilder>, Box<dyn Any+Send>), String> {
match self {
Configuration::Thread => {
Config::Thread => {
Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
},
Configuration::Process(threads) => {
Config::Process(threads) => {
Ok((Process::new_vector(threads).into_iter().map(|x| GenericBuilder::Process(x)).collect(), Box::new(())))
},
Configuration::Cluster { threads, process, addresses, report, log_fn } => {
Config::Cluster { threads, process, addresses, report, log_fn } => {
match initialize_networking(addresses, process, threads, report, log_fn) {
Ok((stuff, guard)) => {
Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard)))
Expand All @@ -137,7 +153,7 @@ impl Configuration {
/// use timely_communication::Allocate;
///
/// // configure for two threads, just one process.
/// let config = timely_communication::Configuration::Process(2);
/// let config = timely_communication::Config::Process(2);
///
/// // initializes communication, spawns workers
/// let guards = timely_communication::initialize(config, |mut allocator| {
Expand Down Expand Up @@ -190,7 +206,7 @@ impl Configuration {
/// result: Ok(1)
/// ```
pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
config: Configuration,
config: Config,
func: F,
) -> Result<WorkerGuards<T>,String> {
let (allocators, others) = config.try_build()?;
Expand Down
4 changes: 2 additions & 2 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! use timely_communication::Allocate;
//!
//! // configure for two threads, just one process.
//! let config = timely_communication::Configuration::Process(2);
//! let config = timely_communication::Config::Process(2);
//!
//! // initializes communication, spawns workers
//! let guards = timely_communication::initialize(config, |mut allocator| {
Expand Down Expand Up @@ -104,7 +104,7 @@ use abomonation::Abomonation;

pub use allocator::Generic as Allocator;
pub use allocator::Allocate;
pub use initialize::{initialize, initialize_from, Configuration, WorkerGuards};
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
pub use message::Message;

/// A composite trait for types that may be used with channels.
Expand Down
3 changes: 3 additions & 0 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ keywords = ["timely", "dataflow"]
license = "MIT"

[features]
default = ["getopts"]
bincode= ["timely_communication/bincode"]
getopts = ["getopts-dep", "timely_communication/getopts"]

[dependencies]
getopts-dep = { package = "getopts", version = "0.2.14", optional = true }
serde = "1.0"
serde_derive = "1.0"
abomonation = "0.7.3"
Expand Down
3 changes: 1 addition & 2 deletions timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use timely::logging::TimelyEvent;

fn main() {
// initializes and runs a timely dataflow.
let config = timely::Configuration::from_args(::std::env::args()).unwrap();
timely::execute(config, |worker| {
timely::execute_from_args(std::env::args(), |worker| {

let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions timely/examples/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ extern crate timely;

use std::time::{Instant, Duration};

use timely::Configuration;
use timely::Config;
use timely::synchronization::Sequencer;

fn main() {
timely::execute(Configuration::Process(4), |worker| {
timely::execute(Config::process(4), |worker| {

let timer = Instant::now();
let mut sequencer = Sequencer::new(worker, Instant::now());
Expand All @@ -21,6 +21,6 @@ fn main() {
}
worker.step();
}

}).unwrap(); // asserts error-free execution;
}
3 changes: 2 additions & 1 deletion timely/examples/threadless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Inspect, Probe};
use timely::WorkerConfig;

fn main() {

// create a naked single-threaded worker.
let allocator = timely::communication::allocator::Thread::new();
let mut worker = timely::worker::Worker::new(allocator);
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);

// create input and probe handles.
let mut input = InputHandle::new();
Expand Down
4 changes: 2 additions & 2 deletions timely/examples/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ extern crate timely;
extern crate timely_communication;

use timely::dataflow::operators::*;
use timely_communication::Configuration;
use timely::Config;
// use timely::progress::timestamp::RootTimestamp;

fn main() {
timely::execute(Configuration::Thread, |worker| {
timely::execute(Config::thread(), |worker| {
let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
let (input, stream) = scope.new_unordered_input();
stream.inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait Capture<T: Timestamp, D: Data> {
/// let (send, recv) = ::std::sync::mpsc::channel();
/// let send = Arc::new(Mutex::new(send));
///
/// timely::execute(timely::Configuration::Thread, move |worker| {
/// timely::execute(timely::Config::thread(), move |worker| {
///
/// // this is only to validate the output.
/// let send = send.lock().unwrap().clone();
Expand Down Expand Up @@ -76,7 +76,7 @@ pub trait Capture<T: Timestamp, D: Data> {
/// let (send0, recv0) = ::std::sync::mpsc::channel();
/// let send0 = Arc::new(Mutex::new(send0));
///
/// timely::execute(timely::Configuration::Thread, move |worker| {
/// timely::execute(timely::Config::thread(), move |worker| {
///
/// // this is only to validate the output.
/// let send0 = send0.lock().unwrap().clone();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/capture/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub trait Extract<T: Ord, D: Ord> {
/// let (send, recv) = ::std::sync::mpsc::channel();
/// let send = Arc::new(Mutex::new(send));
///
/// timely::execute(timely::Configuration::Thread, move |worker| {
/// timely::execute(timely::Config::thread(), move |worker| {
///
/// // this is only to validate the output.
/// let send = send.lock().unwrap().clone();
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/capture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
//! use timely::dataflow::operators::capture::{EventLink, Replay};
//!
//! timely::execute(timely::Configuration::Thread, |worker| {
//! timely::execute(timely::Config::thread(), |worker| {
//! let handle1 = Rc::new(EventLink::new());
//! let handle2 = Some(handle1.clone());
//!
Expand Down Expand Up @@ -52,7 +52,7 @@
//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
//! use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay};
//!
//! timely::execute(timely::Configuration::Thread, |worker| {
//! timely::execute(timely::Config::thread(), |worker| {
//! let list = TcpListener::bind("127.0.0.1:8000").unwrap();
//! let send = TcpStream::connect("127.0.0.1:8000").unwrap();
//! let recv = list.incoming().next().unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn notificator_delivers_notifications_in_topo_order() {
/// use timely::dataflow::operators::generic::operator::Operator;
/// use timely::dataflow::channels::pact::Pipeline;
///
/// timely::execute(timely::Configuration::Thread, |worker| {
/// timely::execute(timely::Config::thread(), |worker| {
/// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
/// let (in1_handle, in1) = scope.new_input();
/// let (in2_handle, in2) = scope.new_input();
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub trait Operator<G: Scope, D1: Data> {
/// use timely::dataflow::operators::generic::operator::Operator;
/// use timely::dataflow::channels::pact::Pipeline;
///
/// timely::execute(timely::Configuration::Thread, |worker| {
/// timely::execute(timely::Config::thread(), |worker| {
/// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
/// let (in1_handle, in1) = scope.new_input();
/// let (in2_handle, in2) = scope.new_input();
Expand Down Expand Up @@ -208,7 +208,7 @@ pub trait Operator<G: Scope, D1: Data> {
/// use timely::dataflow::operators::generic::operator::Operator;
/// use timely::dataflow::channels::pact::Pipeline;
///
/// timely::execute(timely::Configuration::Thread, |worker| {
/// timely::execute(timely::Config::thread(), |worker| {
/// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
/// let (in1_handle, in1) = scope.new_input();
/// let (in2_handle, in2) = scope.new_input();
Expand Down
8 changes: 4 additions & 4 deletions timely/src/dataflow/operators/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub trait Input : Scope {
/// use timely::dataflow::operators::{Input, Inspect};
///
/// // construct and execute a timely dataflow
/// timely::execute(Configuration::Thread, |worker| {
/// timely::execute(Config::thread(), |worker| {
///
/// // add an input and base computation off of it
/// let mut input = worker.dataflow(|scope| {
Expand Down Expand Up @@ -71,7 +71,7 @@ pub trait Input : Scope {
/// use timely::dataflow::operators::input::Handle;
///
/// // construct and execute a timely dataflow
/// timely::execute(Configuration::Thread, |worker| {
/// timely::execute(Config::thread(), |worker| {
///
/// // add an input and base computation off of it
/// let mut input = Handle::new();
Expand Down Expand Up @@ -189,7 +189,7 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
/// use timely::dataflow::operators::input::Handle;
///
/// // construct and execute a timely dataflow
/// timely::execute(Configuration::Thread, |worker| {
/// timely::execute(Config::thread(), |worker| {
///
/// // add an input and base computation off of it
/// let mut input = Handle::new();
Expand Down Expand Up @@ -226,7 +226,7 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
/// use timely::dataflow::operators::input::Handle;
///
/// // construct and execute a timely dataflow
/// timely::execute(Configuration::Thread, |worker| {
/// timely::execute(Config::thread(), |worker| {
///
/// // add an input and base computation off of it
/// let mut input = Handle::new();
Expand Down
Loading