diff --git a/Cargo.toml b/Cargo.toml index 660f267c3..f6f5b8734 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,8 @@ resolver = "2" edition = "2021" [workspace.dependencies] -columnar = "0.3" +# columnar = "0.3" +columnar = { path = "../columnar" } [profile.release] opt-level = 3 diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index 22afc44c0..dbb6b0407 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -114,13 +114,13 @@ struct Broadcaster { impl Push for Broadcaster { fn push(&mut self, element: &mut Option) { - // Push defensive copies to pushers after the first. - for pusher in self.pushers.iter_mut().skip(1) { - self.spare.clone_from(element); - pusher.push(&mut self.spare); - } + // // Push defensive copies to pushers after the first. + // for pusher in self.pushers.iter_mut().skip(1) { + // self.spare.clone_from(element); + // pusher.push(&mut self.spare); + // } // Push the element itself at the first pusher. - for pusher in self.pushers.iter_mut().take(1) { + for pusher in self.pushers.iter_mut() {//}.take(1) { pusher.push(element); } } diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index c957755c5..054804562 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -94,12 +94,12 @@ impl Pull for Puller { #[inline] fn pull(&mut self) -> &mut Option { let mut borrow = self.source.borrow_mut(); - // if let Some(element) = self.current.take() { - // // TODO : Arbitrary constant. - // if borrow.1.len() < 16 { - // borrow.1.push_back(element); - // } - // } + if let Some(element) = self.current.take() { + // TODO : Arbitrary constant. + if borrow.1.len() < 16 { + borrow.1.push_back(element); + } + } self.current = borrow.0.pop_front(); &mut self.current } diff --git a/container/src/lib.rs b/container/src/lib.rs index 9394f1ec0..0449dc9b9 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -165,6 +165,10 @@ impl> PushInto for CapacityContainerBuil // Maybe flush if self.current.at_capacity() { self.pending.push_back(std::mem::take(&mut self.current)); + if let Some(spare) = self.empty.take() { + self.current = spare; + self.current.clear(); + } } } } @@ -186,9 +190,12 @@ impl ContainerBuilder for CapacityContainerBuild fn finish(&mut self) -> Option<&mut C> { if !self.current.is_empty() { self.pending.push_back(std::mem::take(&mut self.current)); + if let Some(spare) = &mut self.empty { + std::mem::swap(&mut self.current, spare); + self.current.clear(); + } } - self.empty = self.pending.pop_front(); - self.empty.as_mut() + self.extract() } } diff --git a/timely/Cargo.toml b/timely/Cargo.toml index bc672931c..6e455650f 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -19,6 +19,7 @@ default = ["getopts"] getopts = ["getopts-dep", "timely_communication/getopts"] [dependencies] +bytemuck = "1.18.0" columnar = { workspace = true } getopts-dep = { package = "getopts", version = "0.2.21", optional = true } bincode = { version = "1.0" } @@ -32,5 +33,4 @@ crossbeam-channel = "0.5" smallvec = { version = "1.13.2", features = ["serde", "const_generics"] } [dev-dependencies] -bytemuck = "1.18.0" rand = { version = "0.8", features = ["small_rng"] } diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 12d362f3c..0ae030b51 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -1,7 +1,7 @@ //! Wordcount based on flatcontainer. use { - std::collections::HashMap, + std::collections::BTreeMap, timely::{Container, container::CapacityContainerBuilder}, timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, timely::dataflow::InputHandleCore, @@ -21,8 +21,6 @@ fn main() { type Container = Column; - use columnar::Len; - let config = timely::Config { communication: timely::CommunicationConfig::ProcessBinary(3), worker: timely::WorkerConfig::default(), @@ -37,9 +35,7 @@ fn main() { worker.dataflow::(|scope| { input .to_stream(scope) - .unary( - Pipeline, - "Split", + .unary(Pipeline, "Split", |_cap, _info| { move |input, output| { while let Some((time, data)) = input.next() { @@ -58,38 +54,33 @@ fn main() { ExchangeCore::,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64), "WordCount", |_capability, _info| { - let mut queues = HashMap::new(); - let mut counts = HashMap::new(); + let mut queues = Vec::new(); + let mut counts = BTreeMap::new(); move |input, output| { while let Some((time, data)) = input.next() { - queues - .entry(time.retain()) - .or_insert(Vec::new()) - .push(std::mem::take(data)); + queues.push((time.retain(), std::mem::take(data))); } for (key, val) in queues.iter_mut() { if !input.frontier().less_equal(key.time()) { let mut session = output.session(key); - for batch in val.drain(..) { - for wordcount in batch.iter() { - let total = - if let Some(count) = counts.get_mut(wordcount.text) { - *count += wordcount.diff; - *count - } - else { - counts.insert(wordcount.text.to_string(), *wordcount.diff); - *wordcount.diff - }; - session.give(WordCountReference { text: wordcount.text, diff: total }); + for wordcount in val.iter() { + let total = + if let Some(count) = counts.get_mut(wordcount.text) { + *count += wordcount.diff; + *count } + else { + counts.insert(wordcount.text.to_string(), *wordcount.diff); + *wordcount.diff + }; + session.give(WordCountReference { text: wordcount.text, diff: total }); } } } - queues.retain(|_key, val| !val.is_empty()); + queues.retain(|(key, _val)| input.frontier().less_equal(key.time()) ); } }, ) @@ -99,13 +90,35 @@ fn main() { }); // introduce data and watch! - for round in 0..10 { - input.send(WordCountReference { text: "flat container", diff: 1 }); - input.advance_to(round + 1); - while probe.less_than(input.time()) { - worker.step(); + + if let Some(filename) = std::env::args().nth(1) { + let file = std::fs::File::open(filename).unwrap(); + use std::io::BufRead; + let mut lines = std::io::BufReader::new(file); + let mut text = String::default(); + let mut index = 0; + while lines.read_line(&mut text).unwrap() > 0 { + if index % worker.peers() == worker.index() { + input.send(WordCountReference { text: &text, diff: 1 }); + } + text.clear(); + input.advance_to(index + 1); + while probe.less_than(input.time()) { + worker.step(); + } + index += 1; } } + else { + for round in 0.. { + input.send(WordCountReference { text: "flat container", diff: 1 }); + input.advance_to(round + 1); + while probe.less_than(input.time()) { + worker.step(); + } + } + } + }) .unwrap(); } @@ -177,9 +190,9 @@ mod container { type Iter<'a> = IterOwn<>::Borrowed<'a>>; fn iter<'a>(&'a self) -> Self::Iter<'a> { match self { - Column::Typed(t) => t.borrow().into_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(), + Column::Typed(t) => t.borrow().into_index_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), } } @@ -187,9 +200,9 @@ mod container { type DrainIter<'a> = IterOwn<>::Borrowed<'a>>; fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { match self { - Column::Typed(t) => t.borrow().into_iter(), - Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_iter(), - Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_iter(), + Column::Typed(t) => t.borrow().into_index_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), } } } @@ -327,9 +340,12 @@ mod builder { fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { self.pending.push_back(Column::Typed(std::mem::take(&mut self.current))); + if let Some(Column::Typed(spare)) = self.empty.take() { + self.current = spare; + self.current.clear(); + } } - self.empty = self.pending.pop_front(); - self.empty.as_mut() + self.extract() } } diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 557b7c7e1..ec9ee679b 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -145,6 +145,7 @@ mod encoding { // We will pad out anything we write to make the result `u64` aligned. impl Bytesable for Bincode { fn from_bytes(bytes: Bytes) -> Self { + println!("Bincode::from_bytes: {:?}", std::any::type_name::()); let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed"); let typed_size = ::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize; assert_eq!(bytes.len(), (typed_size + 7) & !7); diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 3c5a7f00f..4b9e2fa00 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -10,7 +10,9 @@ use crate::Bincode; /// A progress update message consisting of source worker id, sequence number and lists of /// message and internal updates -pub type ProgressMsg = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>; +// pub type ProgressMsg = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>; + +pub type ProgressMsg = crate::dataflow::channels::Message<(), Column<((Location, T), i64)>>; /// Manages broadcasting of progress updates to and receiving updates from workers. pub struct Progcaster { @@ -28,11 +30,13 @@ pub struct Progcaster { channel_identifier: usize, /// An optional logger to record progress messages. progress_logging: Option>, + + container: <((Location, T), i64) as columnar::Columnar>::Container, } impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied worker. - pub fn new(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option, progress_logging: Option>) -> Progcaster { + pub fn new(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option, progress_logging: Option>) -> Progcaster where ::Container: Clone + Send { let channel_identifier = worker.new_identifier(); let (pusher, puller) = worker.broadcast(channel_identifier, addr); @@ -49,6 +53,7 @@ impl Progcaster { identifier, channel_identifier, progress_logging, + container: Default::default(), } } @@ -88,14 +93,31 @@ impl Progcaster { }); }); - let payload = (self.source, self.counter, std::mem::take(changes)); - let mut to_push = Some(Bincode { payload }); + use columnar::{Clear, Push}; + self.container.clear(); + for item in changes.drain() { + self.container.push(&item); + } + let message = crate::dataflow::channels::Message { + time: (), + data: Column::Typed(std::mem::take(&mut self.container)), + from: self.source, + seq: self.counter, + }; + // self.container.0.push(self.source); + // self.container.push((self.source, self.counter, &changes.unstable_internal_updates()[..])); + + // let payload = (self.source, self.counter, std::mem::take(changes)); + let mut to_push: Option> = Some(message); self.pusher.push(&mut to_push); self.pusher.done(); if let Some(pushed) = to_push { - *changes = pushed.payload.2; - changes.clear(); + if let Column::Typed(t) = pushed.data { + self.container = t; + } + // *changes = pushed.2; + // changes.clear(); } self.counter += 1; @@ -107,12 +129,15 @@ impl Progcaster { while let Some(message) = self.puller.pull() { - let source = message.0; - let counter = message.1; - let recv_changes = &mut message.2; + let source = message.from; + let counter = message.seq; + let recv_changes = &mut message.data; let channel = self.channel_identifier; + use columnar::Columnar; + use timely_container::Container; + // See comments above about the relatively high cost of this logging, and our // options for improving it if performance limits users who want other logging. self.progress_logging.as_ref().map(|l| { @@ -123,11 +148,11 @@ impl Progcaster { for ((location, time), diff) in recv_changes.iter() { match location.port { - Port::Target(port) => { - messages.push((location.node, port, time.clone(), *diff)) + crate::progress::PortReference::Target(port) => { + messages.push((location.node, port, T::into_owned(time), *diff)) }, - Port::Source(port) => { - internal.push((location.node, port, time.clone(), *diff)) + crate::progress::PortReference::Source(port) => { + internal.push((location.node, port, T::into_owned(time), *diff)) } } } @@ -144,10 +169,161 @@ impl Progcaster { }); // We clone rather than drain to avoid deserialization. - for &(ref update, delta) in recv_changes.iter() { - changes.update(update.clone(), delta); + for (update, delta) in recv_changes.iter() { + changes.update(<(Location, T) as Columnar>::into_owned(update), *delta); + } + } + + } +} + + +pub use container::Column; +mod container { + + use columnar::Columnar; + use columnar::Container as FooBozzle; + + use timely_bytes::arc::Bytes; + + /// A container based on a columnar store, encoded in aligned bytes. + pub enum Column { + /// The typed variant of the container. + Typed(C::Container), + /// The binary variant of the container. + Bytes(Bytes), + /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. + /// + /// Reasons could include misalignment, cloning of data, or wanting + /// to release the `Bytes` as a scarce resource. + Align(Box<[u64]>), + } + + impl Default for Column { + fn default() -> Self { Self::Typed(Default::default()) } + } + + impl Clone for Column where C::Container: Clone { + fn clone(&self) -> Self { + match self { + Column::Typed(t) => Column::Typed(t.clone()), + Column::Bytes(b) => { + assert!(b.len() % 8 == 0); + let mut alloc: Vec = vec![0; b.len() / 8]; + bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]); + Self::Align(alloc.into()) + }, + Column::Align(a) => Column::Align(a.clone()), + } + } + } + + use columnar::{Clear, Len, Index, FromBytes}; + use columnar::bytes::{EncodeDecode, Indexed}; + use columnar::common::IterOwn; + + use crate::Container; + impl Container for Column { + fn len(&self) -> usize { + match self { + Column::Typed(t) => t.len(), + Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(), + Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(), + } + } + // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. + fn clear(&mut self) { + match self { + Column::Typed(t) => t.clear(), + Column::Bytes(_) => *self = Column::Typed(Default::default()), + Column::Align(_) => *self = Column::Typed(Default::default()), + } + } + + type ItemRef<'a> = C::Ref<'a>; + type Iter<'a> = IterOwn<>::Borrowed<'a>>; + fn iter<'a>(&'a self) -> Self::Iter<'a> { + match self { + Column::Typed(t) => t.borrow().into_index_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), + } + } + + type Item<'a> = C::Ref<'a>; + type DrainIter<'a> = IterOwn<>::Borrowed<'a>>; + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + match self { + Column::Typed(t) => t.borrow().into_index_iter(), + Column::Bytes(b) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), + Column::Align(a) => <>::Borrowed<'a> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), + } + } + } + + use crate::container::SizableContainer; + impl SizableContainer for Column { + fn at_capacity(&self) -> bool { + match self { + Self::Typed(t) => { + let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow()); + length_in_bytes >= (1 << 20) + }, + Self::Bytes(_) => true, + Self::Align(_) => true, + } + } + fn ensure_capacity(&mut self, _stash: &mut Option) { } + } + + use crate::container::PushInto; + impl PushInto for Column where C::Container: columnar::Push { + #[inline] + fn push_into(&mut self, item: T) { + use columnar::Push; + match self { + Column::Typed(t) => t.push(item), + Column::Align(_) | Column::Bytes(_) => { + // We really oughtn't be calling this in this case. + // We could convert to owned, but need more constraints on `C`. + unimplemented!("Pushing into Column::Bytes without first clearing"); + } + } + } + } + + impl crate::dataflow::channels::ContainerBytes for Column { + fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self { + // Our expectation / hope is that `bytes` is `u64` aligned and sized. + // If the alignment is borked, we can relocate. IF the size is borked, + // not sure what we do in that case. + assert!(bytes.len() % 8 == 0); + if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { + Self::Bytes(bytes) + } + else { + println!("Re-locating bytes for alignment reasons"); + let mut alloc: Vec = vec![0; bytes.len() / 8]; + bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]); + Self::Align(alloc.into()) } } + fn length_in_bytes(&self) -> usize { + match self { + // We'll need one u64 for the length, then the length rounded up to a multiple of 8. + Column::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()), + Column::Bytes(b) => b.len(), + Column::Align(a) => 8 * a.len(), + } + } + + fn into_bytes(&self, writer: &mut W) { + match self { + Column::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() }, + Column::Bytes(b) => writer.write_all(b).unwrap(), + Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), + } + } } } diff --git a/timely/src/progress/timestamp.rs b/timely/src/progress/timestamp.rs index 8b88fb731..93013a093 100644 --- a/timely/src/progress/timestamp.rs +++ b/timely/src/progress/timestamp.rs @@ -9,7 +9,7 @@ use crate::ExchangeData; use crate::order::PartialOrder; /// A composite trait for types that serve as timestamps in timely dataflow. -pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+ExchangeData+Hash+Ord { +pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+ExchangeData+Hash+Ord + columnar::Columnar { /// A type summarizing action on a timestamp along a dataflow path. type Summary : PathSummary + 'static; /// A minimum value suitable as a default.