Skip to content

Commit 2d7b161

Browse files
committed
storage: use CapabilityRefs directly
After TimelyDataflow/timely-dataflow#429 holding onto CapabilityRefs across await points is safe Signed-off-by: Petros Angelatos <[email protected]>
1 parent fcaf3c0 commit 2d7b161

File tree

1 file changed

+2
-9
lines changed

1 file changed

+2
-9
lines changed

src/storage/src/source/persist_source.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,6 @@ where
266266
let (mut update_output, update_output_stream) = fetcher_builder.new_output();
267267
let (mut consumed_part_output, consumed_part_output_stream) = fetcher_builder.new_output();
268268

269-
let update_output_port = update_output_stream.name().port;
270-
let consumed_part_port = consumed_part_output_stream.name().port;
271-
272269
// Re-used state for processing and building rows.
273270
let mut datum_vec = mz_repr::DatumVec::new();
274271
let mut row_builder = Row::default();
@@ -305,12 +302,8 @@ where
305302
// panicking, so swap them to an owned version.
306303
data.swap(&mut buffer);
307304

308-
let update_cap = cap.delayed_for_output(cap.time(), update_output_port);
309-
let mut update_session = output_handle.session(&update_cap);
310-
311-
let consumed_part_cap = cap.delayed_for_output(cap.time(), consumed_part_port);
312-
let mut consumed_part_session =
313-
consumed_part_output_handle.session(&consumed_part_cap);
305+
let mut update_session = output_handle.session(&cap);
306+
let mut consumed_part_session = consumed_part_output_handle.session(&cap);
314307

315308
for (_idx, part) in buffer.drain(..) {
316309
let (consumed_part, fetched_part) =

0 commit comments

Comments
 (0)