@@ -228,6 +228,7 @@ pub mod source {
228228 use std:: marker:: { Send , Sync } ;
229229 use std:: sync:: Arc ;
230230 use timely:: dataflow:: { Scope , Stream , operators:: { Capability , CapabilitySet } } ;
231+ use timely:: dataflow:: operators:: generic:: OutputBuilder ;
231232 use timely:: progress:: Timestamp ;
232233 use timely:: scheduling:: SyncActivator ;
233234
@@ -313,8 +314,11 @@ pub mod source {
313314 let activator2 = scope. activator_for ( Rc :: clone ( & address) ) ;
314315 let drop_activator = DropActivator { activator : Arc :: new ( scope. sync_activator_for ( address. to_vec ( ) ) ) } ;
315316 let mut source = source_builder ( activator) ;
316- let ( mut updates_out, updates) = messages_op. new_output ( ) ;
317- let ( mut progress_out, progress) = messages_op. new_output ( ) ;
317+ let ( updates_out, updates) = messages_op. new_output ( ) ;
318+ let mut updates_out = OutputBuilder :: from ( updates_out) ;
319+ let ( progress_out, progress) = messages_op. new_output ( ) ;
320+ let mut progress_out = OutputBuilder :: from ( progress_out) ;
321+
318322 messages_op. build ( |capabilities| {
319323
320324 // A Weak that communicates whether the returned token has been dropped.
@@ -387,8 +391,11 @@ pub mod source {
387391 // Step 2: The UPDATES operator.
388392 let mut updates_op = OperatorBuilder :: new ( "CDCV2_Updates" . to_string ( ) , scope. clone ( ) ) ;
389393 let mut input = updates_op. new_input ( & updates, Exchange :: new ( |x : & ( D , T , R ) | x. hashed ( ) ) ) ;
390- let ( mut changes_out, changes) = updates_op. new_output ( ) ;
391- let ( mut counts_out, counts) = updates_op. new_output ( ) ;
394+ let ( changes_out, changes) = updates_op. new_output ( ) ;
395+ let mut changes_out = OutputBuilder :: from ( changes_out) ;
396+ let ( counts_out, counts) = updates_op. new_output ( ) ;
397+ let mut counts_out = OutputBuilder :: from ( counts_out) ;
398+
392399 updates_op. build ( move |_capability| {
393400 // Deduplicates updates, and ships novel updates and the counts for each time.
394401 // For simplicity, this operator ships updates as they are discovered to be new.
@@ -438,7 +445,8 @@ pub mod source {
438445 ) ;
439446 let mut counts =
440447 progress_op. new_input ( & counts, Exchange :: new ( |x : & ( T , i64 ) | ( x. 0 ) . hashed ( ) ) ) ;
441- let ( mut frontier_out, frontier) = progress_op. new_output ( ) ;
448+ let ( frontier_out, frontier) = progress_op. new_output ( ) ;
449+ let mut frontier_out = OutputBuilder :: from ( frontier_out) ;
442450 progress_op. build ( move |_capability| {
443451 // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
444452
@@ -554,7 +562,7 @@ pub mod sink {
554562 use timely:: progress:: { Antichain , ChangeBatch , Timestamp } ;
555563 use timely:: dataflow:: { Scope , Stream } ;
556564 use timely:: dataflow:: channels:: pact:: { Exchange , Pipeline } ;
557- use timely:: dataflow:: operators:: generic:: { FrontieredInputHandle , builder_rc:: OperatorBuilder } ;
565+ use timely:: dataflow:: operators:: generic:: { builder_rc:: OperatorBuilder , OutputBuilder } ;
558566
559567 use crate :: { lattice:: Lattice , ExchangeData } ;
560568 use super :: { Writer , Message , Progress } ;
@@ -583,7 +591,8 @@ pub mod sink {
583591 let mut builder = OperatorBuilder :: new ( "UpdatesWriter" . to_owned ( ) , stream. scope ( ) ) ;
584592 let reactivator = stream. scope ( ) . activator_for ( builder. operator_info ( ) . address ) ;
585593 let mut input = builder. new_input ( stream, Pipeline ) ;
586- let ( mut updates_out, updates) = builder. new_output ( ) ;
594+ let ( updates_out, updates) = builder. new_output ( ) ;
595+ let mut updates_out = OutputBuilder :: from ( updates_out) ;
587596
588597 builder. build_reschedule (
589598 move |_capability| {
@@ -650,7 +659,6 @@ pub mod sink {
650659
651660 builder. build_reschedule ( |_capabilities| {
652661 move |frontiers| {
653- let mut input = FrontieredInputHandle :: new ( & mut input, & frontiers[ 0 ] ) ;
654662
655663 // We want to drain inputs no matter what.
656664 // We could do this after the next step, as we are certain these timestamps will
@@ -667,9 +675,9 @@ pub mod sink {
667675 // If our frontier advances strictly, we have the opportunity to issue a progress statement.
668676 if <_ as PartialOrder >:: less_than (
669677 & frontier. borrow ( ) ,
670- & input . frontier . frontier ( ) ,
678+ & frontiers [ 0 ] . frontier ( ) ,
671679 ) {
672- let new_frontier = input . frontier . frontier ( ) ;
680+ let new_frontier = frontiers [ 0 ] . frontier ( ) ;
673681
674682 // Extract the timestamp counts to announce.
675683 let mut announce = Vec :: new ( ) ;
@@ -691,7 +699,7 @@ pub mod sink {
691699 send_queue. push_back ( Message :: Progress ( progress) ) ;
692700
693701 // Advance our frontier to track our progress utterance.
694- frontier = input . frontier . frontier ( ) . to_owned ( ) ;
702+ frontier = frontiers [ 0 ] . frontier ( ) . to_owned ( ) ;
695703
696704 while let Some ( message) = send_queue. front ( ) {
697705 if let Some ( duration) = sink. poll ( message) {
0 commit comments