@@ -13,18 +13,17 @@ use std::rc::Rc;
1313use std:: sync:: Arc ;
1414
1515use differential_dataflow:: { Collection , Hashable } ;
16- use mz_repr:: { Diff , GlobalId , Row , Timestamp } ;
17- use mz_timely_util:: operators_async_ext:: OperatorBuilderExt ;
1816use timely:: dataflow:: channels:: pact:: Exchange ;
19- use timely:: dataflow:: operators:: generic:: builder_rc:: OperatorBuilder ;
2017use timely:: dataflow:: Scope ;
2118use timely:: progress:: frontier:: Antichain ;
2219use timely:: progress:: Timestamp as _;
2320use timely:: PartialOrder ;
2421
25- use crate :: storage_state:: StorageState ;
22+ use mz_repr:: { Diff , GlobalId , Row , Timestamp } ;
23+ use mz_timely_util:: builder_async:: { Event , OperatorBuilder } ;
2624
2725use crate :: controller:: CollectionMetadata ;
26+ use crate :: storage_state:: StorageState ;
2827use crate :: types:: errors:: DataflowError ;
2928use crate :: types:: sources:: SourceData ;
3029
@@ -64,43 +63,39 @@ pub fn render<G>(
6463 let weak_token = Rc :: downgrade ( & token) ;
6564
6665 let persist_clients = Arc :: clone ( & storage_state. persist_clients ) ;
67- persist_op. build_async (
68- scope. clone ( ) ,
69- move |mut capabilities, frontiers, scheduler| async move {
70- capabilities. clear ( ) ;
71- let mut buffer = Vec :: new ( ) ;
72- let mut stashed_batches = HashMap :: new ( ) ;
73-
74- let mut write = persist_clients
75- . lock ( )
76- . await
77- . open ( metadata. persist_location )
78- . await
79- . expect ( "could not open persist client" )
80- . open_writer :: < SourceData , ( ) , Timestamp , Diff > ( metadata. data_shard )
81- . await
82- . expect ( "could not open persist shard" ) ;
83-
84- // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
85- // to. Data from the source not beyond this time will be dropped, as it has already
86- // been persisted.
87- // In the future, sources will avoid passing through data not beyond this upper
88- * current_upper. borrow_mut ( ) = write. upper ( ) . clone ( ) ;
89-
90- while scheduler. notified ( ) . await {
91- let input_upper = frontiers. borrow ( ) [ 0 ] . clone ( ) ;
92-
93- if weak_token. upgrade ( ) . is_none ( ) || current_upper. borrow ( ) . is_empty ( ) {
94- return ;
95- }
96-
97- if !active_write_worker {
98- // We cannot simply return because that would block the
99- // frontier from advancing for the one active write worker.
100- continue ;
101- }
102-
103- while let Some ( ( _cap, data) ) = input. next ( ) {
66+ persist_op. build ( move |_capabilities| async move {
67+ let mut buffer = Vec :: new ( ) ;
68+ let mut stashed_batches = HashMap :: new ( ) ;
69+
70+ let mut write = persist_clients
71+ . lock ( )
72+ . await
73+ . open ( metadata. persist_location )
74+ . await
75+ . expect ( "could not open persist client" )
76+ . open_writer :: < SourceData , ( ) , Timestamp , Diff > ( metadata. data_shard )
77+ . await
78+ . expect ( "could not open persist shard" ) ;
79+
80+ // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
81+ // to. Data from the source not beyond this time will be dropped, as it has already
82+ // been persisted.
83+ // In the future, sources will avoid passing through data not beyond this upper
84+ * current_upper. borrow_mut ( ) = write. upper ( ) . clone ( ) ;
85+
86+ if weak_token. upgrade ( ) . is_none ( ) || current_upper. borrow ( ) . is_empty ( ) {
87+ return ;
88+ }
89+
90+ if !active_write_worker {
91+ // We cannot simply return because that would block the
92+ // frontier from advancing for the one active write worker.
93+ std:: future:: pending :: < ( ) > ( ) . await ;
94+ }
95+
96+ while let Some ( event) = input. next ( ) . await {
97+ match event {
98+ Event :: Data ( _cap, data) => {
10499 data. swap ( & mut buffer) ;
105100
106101 // TODO: come up with a better default batch size here
@@ -122,81 +117,82 @@ pub fn render<G>(
122117 }
123118 }
124119 }
120+ Event :: Progress ( input_upper) => {
121+ // See if any timestamps are done!
122+ // TODO(guswynn/petrosagg): remove this additional allocation
123+ let mut finalized_timestamps: Vec < _ > = stashed_batches
124+ . keys ( )
125+ . filter ( |ts| !input_upper. less_equal ( ts) )
126+ . copied ( )
127+ . collect ( ) ;
128+ finalized_timestamps. sort_unstable ( ) ;
129+
130+ // If the frontier has advanced, we need to finalize data being written to persist
131+ if PartialOrder :: less_than ( & * current_upper. borrow ( ) , & input_upper) {
132+ // We always append, even in case we don't have any updates, because appending
133+ // also advances the frontier.
134+ if finalized_timestamps. is_empty ( ) {
135+ let expected_upper = current_upper. borrow ( ) . clone ( ) ;
136+ write
137+ . append (
138+ Vec :: < ( ( SourceData , ( ) ) , Timestamp , Diff ) > :: new ( ) ,
139+ expected_upper,
140+ input_upper. clone ( ) ,
141+ )
142+ . await
143+ . expect ( "cannot append updates" )
144+ . expect ( "invalid/outdated upper" ) ;
125145
126- // See if any timestamps are done!
127- // TODO(guswynn/petrosagg): remove this additional allocation
128- let mut finalized_timestamps: Vec < _ > = stashed_batches
129- . keys ( )
130- . filter ( |ts| !input_upper. less_equal ( ts) )
131- . copied ( )
132- . collect ( ) ;
133- finalized_timestamps. sort_unstable ( ) ;
134-
135- // If the frontier has advanced, we need to finalize data being written to persist
136- if PartialOrder :: less_than ( & * current_upper. borrow ( ) , & input_upper) {
137- // We always append, even in case we don't have any updates, because appending
138- // also advances the frontier.
139- if finalized_timestamps. is_empty ( ) {
140- let expected_upper = current_upper. borrow ( ) . clone ( ) ;
141- write
142- . append (
143- Vec :: < ( ( SourceData , ( ) ) , Timestamp , Diff ) > :: new ( ) ,
144- expected_upper,
145- input_upper. clone ( ) ,
146- )
147- . await
148- . expect ( "cannot append updates" )
149- . expect ( "invalid/outdated upper" ) ;
146+ // advance our stashed frontier
147+ * current_upper. borrow_mut ( ) = input_upper. clone ( ) ;
148+ // wait for more data or a new input frontier
149+ continue ;
150+ }
150151
152+ // `current_upper` tracks the last known upper
153+ let mut expected_upper = current_upper. borrow ( ) . clone ( ) ;
154+ let finalized_batch_count = finalized_timestamps. len ( ) ;
155+
156+ for ( i, ts) in finalized_timestamps. into_iter ( ) . enumerate ( ) {
157+ // TODO(aljoscha): Figure out how errors from this should be reported.
158+
159+ // Set the upper to the upper of the batch (which is 1 past the ts it
160+ // manages) OR the new frontier if we are appending the final batch
161+ let new_upper = if i == finalized_batch_count - 1 {
162+ input_upper. clone ( )
163+ } else {
164+ Antichain :: from_elem ( ts + 1 )
165+ } ;
166+
167+ let mut batch = stashed_batches
168+ . remove ( & ts)
169+ . expect ( "batch for timestamp to still be there" )
170+ . finish ( new_upper. clone ( ) )
171+ . await
172+ . expect ( "invalid usage" ) ;
173+
174+ write
175+ . compare_and_append_batch (
176+ & mut [ & mut batch] ,
177+ expected_upper,
178+ new_upper. clone ( ) ,
179+ )
180+ . await
181+ . expect ( "cannot append updates" )
182+ . expect ( "cannot append updates" )
183+ . expect ( "invalid/outdated upper" ) ;
184+
185+ // next `expected_upper` is the one we just successfully appended
186+ expected_upper = new_upper;
187+ }
151188 // advance our stashed frontier
152189 * current_upper. borrow_mut ( ) = input_upper. clone ( ) ;
153- // wait for more data or a new input frontier
154- continue ;
155- }
156-
157- // `current_upper` tracks the last known upper
158- let mut expected_upper = current_upper. borrow ( ) . clone ( ) ;
159- let finalized_batch_count = finalized_timestamps. len ( ) ;
160-
161- for ( i, ts) in finalized_timestamps. into_iter ( ) . enumerate ( ) {
162- // TODO(aljoscha): Figure out how errors from this should be reported.
163-
164- // Set the upper to the upper of the batch (which is 1 past the ts it
165- // manages) OR the new frontier if we are appending the final batch
166- let new_upper = if i == finalized_batch_count - 1 {
167- input_upper. clone ( )
168- } else {
169- Antichain :: from_elem ( ts + 1 )
170- } ;
171-
172- let mut batch = stashed_batches
173- . remove ( & ts)
174- . expect ( "batch for timestamp to still be there" )
175- . finish ( new_upper. clone ( ) )
176- . await
177- . expect ( "invalid usage" ) ;
178-
179- write
180- . compare_and_append_batch (
181- & mut [ & mut batch] ,
182- expected_upper,
183- new_upper. clone ( ) ,
184- )
185- . await
186- . expect ( "cannot append updates" )
187- . expect ( "cannot append updates" )
188- . expect ( "invalid/outdated upper" ) ;
189-
190- // next `expected_upper` is the one we just successfully appended
191- expected_upper = new_upper;
190+ } else {
191+ // We cannot have updates without the frontier advancing
192+ assert ! ( finalized_timestamps. is_empty( ) ) ;
192193 }
193- // advance our stashed frontier
194- * current_upper. borrow_mut ( ) = input_upper. clone ( ) ;
195- } else {
196- // We cannot have updates without the frontier advancing
197- assert ! ( finalized_timestamps. is_empty( ) ) ;
198194 }
199195 }
200- } ,
201- )
196+ }
197+ } ) ;
202198}
0 commit comments