Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,10 @@ rustc-demangle = { opt-level = 3 }
# it's unlikely we're going to get interactive access to a debugger in
# production installations, but we still want useful crash reports.
debug = 1


[patch."https://github.com/TimelyDataflow/timely-dataflow"]
timely = { git = "https://github.com/petrosagg/timely-dataflow", branch = "safe-capabilityref" }

[patch."https://github.com/TimelyDataflow/differential-dataflow"]
differential-dataflow = { git = "https://github.com/petrosagg/differential-dataflow", branch = "safe-capref" }
2 changes: 2 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ allow-git = [
"https://github.com/MaterializeInc/rust-prometheus.git",
"https://github.com/TimelyDataflow/timely-dataflow",
"https://github.com/TimelyDataflow/differential-dataflow.git",
"https://github.com/petrosagg/timely-dataflow",
"https://github.com/petrosagg/differential-dataflow.git",
"https://github.com/fede1024/rust-rdkafka.git",
# Waiting for a new release that includes the latest dependency bumps.
"https://github.com/tikv/fail-rs.git",
Expand Down
2 changes: 1 addition & 1 deletion src/interchange/src/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
let x: Stream<G, ((Option<Row>, Vec<DiffPair<Row>>), G::Timestamp, Diff)> =
batches.unary(Pipeline, "combine_at_timestamp", move |_, _| {
move |input, output| {
while let Some((cap, batches)) = input.next() {
while let Some((cap, batches)) = input.next().with_cap() {
let mut session = output.session(&cap);
batches.swap(&mut rows_buf);
for batch in rows_buf.drain(..) {
Expand Down
4 changes: 1 addition & 3 deletions src/persist/src/operators/async_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,7 @@ mod test {
// Drop initial capabilities
capabilities.clear();
let mut output_handle = output.activate();
while let Some((cap, data)) = input_handle.next() {
let cap = cap.retain();

while let Some((cap, data)) = input_handle.next().with_cap() {
let mut session = output_handle.session(&cap);
for item in data.iter().copied() {
tokio::time::sleep(Duration::from_millis(10)).await;
Expand Down
5 changes: 1 addition & 4 deletions src/persist/src/operators/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,7 @@ where
let mut data_output = data_output.activate();

// Write out everything and forward, keeping the write futures.
while let Some((cap, data)) = input.next() {
// TODO(petrosagg): remove this unconditional retain once this is released:
// https://github.com/TimelyDataflow/timely-dataflow/pull/429
let cap = cap.retain();
while let Some((cap, data)) = input.next().with_cap() {
data.swap(&mut buffer);

let mut session = data_output.session(&cap);
Expand Down
3 changes: 3 additions & 0 deletions src/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", bran
uncased = "0.9.6"
url = "2.2.2"
uuid = { version = "0.8.2", features = ["serde", "v4"] }

[dev-dependencies]
datadriven = "0.6.0"