Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ timely_logging = { path = "../logging", version = "0.12" }
timely_communication = { path = "../communication", version = "0.12", default-features = false }
timely_container = { path = "../container", version = "0.12" }
crossbeam-channel = "0.5.0"
smallvec = { version = "1.13.2", features = ["serde", "const_generics"] }

[dev-dependencies]
# timely_sort="0.1.6"
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
if !progress.frontiers[0].is_empty() {
// transmit any frontier progress.
let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());
event_pusher.push(Event::Progress(to_send.into_inner()));
event_pusher.push(Event::Progress(to_send.into_inner().to_vec()));
}

use crate::communication::message::RefOrMut;
Expand Down
2 changes: 1 addition & 1 deletion timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
self.to_push = Some(Message::from_typed((
self.source,
self.counter,
changes.clone().into_inner(),
changes.clone().into_inner().to_vec(),
)));
}

Expand Down
32 changes: 17 additions & 15 deletions timely/src/progress/change_batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! A collection of updates of the form `(T, i64)`.

use smallvec::SmallVec;

/// A collection of updates of the form `(T, i64)`.
///
/// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating
Expand All @@ -10,14 +12,14 @@
/// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly
/// expensive operations; all operations should take an amortized constant or logarithmic time.
#[derive(Clone, Debug, Eq, PartialEq, Abomonation, Serialize, Deserialize)]
pub struct ChangeBatch<T> {
pub struct ChangeBatch<T, const X: usize = 2> {
// A list of updates to which we append.
updates: Vec<(T, i64)>,
updates: SmallVec<[(T, i64); X]>,
// The length of the prefix of `self.updates` known to be compact.
clean: usize,
}

impl<T> ChangeBatch<T> {
impl<T, const X: usize> ChangeBatch<T, X> {

/// Allocates a new empty `ChangeBatch`.
///
Expand All @@ -29,9 +31,9 @@ impl<T> ChangeBatch<T> {
/// let mut batch = ChangeBatch::<usize>::new();
/// assert!(batch.is_empty());
///```
pub fn new() -> ChangeBatch<T> {
pub fn new() -> Self {
ChangeBatch {
updates: Vec::new(),
updates: SmallVec::new(),
clean: 0,
}
}
Expand All @@ -46,9 +48,9 @@ impl<T> ChangeBatch<T> {
/// let mut batch = ChangeBatch::<usize>::with_capacity(10);
/// assert!(batch.is_empty());
///```
pub fn with_capacity(capacity: usize) -> ChangeBatch<T> {
pub fn with_capacity(capacity: usize) -> Self {
ChangeBatch {
updates: Vec::with_capacity(capacity),
updates: SmallVec::with_capacity(capacity),
clean: 0,
}
}
Expand All @@ -59,7 +61,7 @@ impl<T> ChangeBatch<T> {
}

/// Expose the internal vector of updates.
pub fn unstable_internal_updates(&self) -> &Vec<(T, i64)> { &self.updates }
pub fn unstable_internal_updates(&self) -> &SmallVec<[(T, i64); X]> { &self.updates }

/// Expose the internal value of `clean`.
pub fn unstable_internal_clean(&self) -> usize { self.clean }
Expand All @@ -82,7 +84,7 @@ impl<T> ChangeBatch<T> {
}
}

impl<T> ChangeBatch<T>
impl<T, const X: usize> ChangeBatch<T, X>
where
T: Ord,
{
Expand All @@ -97,7 +99,7 @@ where
/// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
/// assert!(!batch.is_empty());
///```
pub fn new_from(key: T, val: i64) -> ChangeBatch<T> {
pub fn new_from(key: T, val: i64) -> Self {
let mut result = ChangeBatch::new();
result.update(key, val);
result
Expand Down Expand Up @@ -150,9 +152,9 @@ where
/// use timely::progress::ChangeBatch;
///
/// let batch = ChangeBatch::<usize>::new_from(17, 1);
/// assert_eq!(batch.into_inner(), vec![(17, 1)]);
/// assert_eq!(batch.into_inner().to_vec(), vec![(17, 1)]);
///```
pub fn into_inner(mut self) -> Vec<(T, i64)> {
pub fn into_inner(mut self) -> SmallVec<[(T, i64); X]> {
self.compact();
self.updates
}
Expand Down Expand Up @@ -197,7 +199,7 @@ where
/// assert!(batch.is_empty());
///```
#[inline]
pub fn drain(&mut self) -> ::std::vec::Drain<(T, i64)> {
pub fn drain(&mut self) -> smallvec::Drain<[(T, i64); X]> {
self.compact();
self.clean = 0;
self.updates.drain(..)
Expand Down Expand Up @@ -270,7 +272,7 @@ where
/// assert!(!batch2.is_empty());
///```
#[inline]
pub fn drain_into(&mut self, other: &mut ChangeBatch<T>) where T: Clone {
pub fn drain_into(&mut self, other: &mut ChangeBatch<T, X>) where T: Clone {
if other.updates.is_empty() {
::std::mem::swap(self, other);
}
Expand Down Expand Up @@ -311,7 +313,7 @@ where
}
}

impl<T> Default for ChangeBatch<T> {
impl<T, const X: usize> Default for ChangeBatch<T, X> {
fn default() -> Self {
Self::new()
}
Expand Down
28 changes: 17 additions & 11 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Tracks minimal sets of mutually incomparable elements of a partial order.

use smallvec::SmallVec;

use crate::progress::ChangeBatch;
use crate::order::{PartialOrder, TotalOrder};

Expand All @@ -15,7 +17,7 @@ use crate::order::{PartialOrder, TotalOrder};
/// are identical.
#[derive(Debug, Abomonation, Serialize, Deserialize)]
pub struct Antichain<T> {
elements: Vec<T>
elements: SmallVec<[T; 1]>
}

impl<T: PartialOrder> Antichain<T> {
Expand Down Expand Up @@ -190,7 +192,7 @@ impl<T> Antichain<T> {
///
/// let mut frontier = Antichain::<u32>::new();
///```
pub fn new() -> Antichain<T> { Antichain { elements: Vec::new() } }
pub fn new() -> Antichain<T> { Antichain { elements: SmallVec::new() } }

/// Creates a new empty `Antichain` with space for `capacity` elements.
///
Expand All @@ -203,7 +205,7 @@ impl<T> Antichain<T> {
///```
pub fn with_capacity(capacity: usize) -> Self {
Self {
elements: Vec::with_capacity(capacity),
elements: SmallVec::with_capacity(capacity),
}
}

Expand All @@ -216,7 +218,11 @@ impl<T> Antichain<T> {
///
/// let mut frontier = Antichain::from_elem(2);
///```
pub fn from_elem(element: T) -> Antichain<T> { Antichain { elements: vec![element] } }
pub fn from_elem(element: T) -> Antichain<T> {
let mut elements = SmallVec::with_capacity(1);
elements.push(element);
Antichain { elements }
}

/// Clears the contents of the antichain.
///
Expand Down Expand Up @@ -330,8 +336,8 @@ impl<T: PartialOrder> From<Vec<T>> for Antichain<T> {
}
}

impl<T> Into<Vec<T>> for Antichain<T> {
fn into(self) -> Vec<T> {
impl<T> Into<SmallVec<[T; 1]>> for Antichain<T> {
fn into(self) -> SmallVec<[T; 1]> {
self.elements
}
}
Expand All @@ -345,7 +351,7 @@ impl<T> ::std::ops::Deref for Antichain<T> {

impl<T> ::std::iter::IntoIterator for Antichain<T> {
type Item = T;
type IntoIter = ::std::vec::IntoIter<T>;
type IntoIter = smallvec::IntoIter<[T; 1]>;
fn into_iter(self) -> Self::IntoIter {
self.elements.into_iter()
}
Expand Down Expand Up @@ -520,7 +526,7 @@ impl<T> MutableAntichain<T> {
/// assert!(changes == vec![(1, -1), (2, 1)]);
///```
#[inline]
pub fn update_iter<I>(&mut self, updates: I) -> ::std::vec::Drain<'_, (T, i64)>
pub fn update_iter<I>(&mut self, updates: I) -> smallvec::Drain<'_, [(T, i64); 2]>
where
T: Clone + PartialOrder + Ord,
I: IntoIterator<Item = (T, i64)>,
Expand Down Expand Up @@ -622,11 +628,11 @@ pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
///
/// assert!(changes == vec![(1, -1), (2, 1)]);
/// ```
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> ::std::vec::Drain<(T,i64)>;
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]>;
}

impl<T: PartialOrder+Ord+Clone, I: IntoIterator<Item=(T,i64)>> MutableAntichainFilter<T> for I {
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> ::std::vec::Drain<(T,i64)> {
fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]> {
antichain.update_iter(self.into_iter())
}
}
Expand Down Expand Up @@ -700,7 +706,7 @@ impl<'a, T: 'a> AntichainRef<'a, T> {
///```
pub fn to_owned(&self) -> Antichain<T> where T: Clone {
Antichain {
elements: self.frontier.to_vec()
elements: self.frontier.into()
}
}
}
Expand Down
Loading