@@ -573,7 +573,7 @@ impl<T:Timestamp> Tracker<T> {
573573 . collect :: < Vec < _ > > ( ) ;
574574
575575 if !target_changes. is_empty ( ) {
576- logger. log_target_updates ( Box :: new ( target_changes) ) ;
576+ logger. log_target_pointstamp_updates ( Box :: new ( target_changes) ) ;
577577 }
578578
579579 let source_changes =
@@ -583,7 +583,7 @@ impl<T:Timestamp> Tracker<T> {
583583 . collect :: < Vec < _ > > ( ) ;
584584
585585 if !source_changes. is_empty ( ) {
586- logger. log_source_updates ( Box :: new ( source_changes) ) ;
586+ logger. log_source_pointstamp_updates ( Box :: new ( source_changes) ) ;
587587 }
588588 }
589589
@@ -670,6 +670,7 @@ impl<T:Timestamp> Tracker<T> {
670670 }
671671 self . pushed_changes . update ( ( location, time) , diff) ;
672672 }
673+
673674 }
674675 // Update to an operator output.
675676 // Propagate any changes forward along outgoing edges.
@@ -696,6 +697,30 @@ impl<T:Timestamp> Tracker<T> {
696697 }
697698 }
698699
700+ // Step 3: If logging is enabled, construct and log outbound changes.
701+ if let Some ( logger) = & mut self . logger {
702+ let mut target_changes = Vec :: new ( ) ;
703+ let mut source_changes = Vec :: new ( ) ;
704+
705+ for ( ( location, time) , diff) in self . pushed_changes . iter ( ) {
706+ match location. port {
707+ Port :: Target ( port) => {
708+ target_changes. push ( ( location. node , port, time. clone ( ) , * diff) )
709+ }
710+ Port :: Source ( port) => {
711+ source_changes. push ( ( location. node , port, time. clone ( ) , * diff) )
712+ }
713+ }
714+ }
715+
716+ if !target_changes. is_empty ( ) || !source_changes. is_empty ( ) {
717+ logger. log_frontier_updates (
718+ Box :: new ( target_changes) ,
719+ Box :: new ( source_changes) ,
720+ ) ;
721+ }
722+ }
723+
699724 self . pushed_changes . drain ( )
700725 }
701726
@@ -839,56 +864,113 @@ pub mod logging {
839864 Self { path, logger }
840865 }
841866
842- /// Log source update events with additional identifying information.
843- pub fn log_source_updates ( & mut self , updates : Box < dyn ProgressEventTimestampVec > ) {
867+ /// Log source pointstamp update events with additional identifying information.
868+ pub fn log_source_pointstamp_updates ( & mut self , updates : Box < dyn ProgressEventTimestampVec > ) {
844869 self . logger . log ( {
845- SourceUpdate {
870+ SourcePointstampUpdate {
846871 tracker_id : self . path . clone ( ) ,
847872 updates,
848873 }
849874 } )
850875 }
851- /// Log target update events with additional identifying information.
852- pub fn log_target_updates ( & mut self , updates : Box < dyn ProgressEventTimestampVec > ) {
876+ /// Log target pointstamp update events with additional identifying information.
877+ pub fn log_target_pointstamp_updates ( & mut self , updates : Box < dyn ProgressEventTimestampVec > ) {
853878 self . logger . log ( {
854- TargetUpdate {
879+ TargetPointstampUpdate {
855880 tracker_id : self . path . clone ( ) ,
856881 updates,
857882 }
858883 } )
859884 }
885+
886+ /// Log frontier update events with additional identifying information.
887+ ///
888+ /// We want to log source and target updates at the same time to ensure callers observe
889+ /// consistent frontiers at any point in time.
890+ pub fn log_frontier_updates (
891+ & mut self ,
892+ source_updates : Box < dyn ProgressEventTimestampVec > ,
893+ target_updates : Box < dyn ProgressEventTimestampVec > ,
894+ ) {
895+ let source_event: TrackerEvent = SourceFrontierUpdate {
896+ tracker_id : self . path . clone ( ) ,
897+ updates : source_updates,
898+ } . into ( ) ;
899+ let target_event: TrackerEvent = TargetFrontierUpdate {
900+ tracker_id : self . path . clone ( ) ,
901+ updates : target_updates,
902+ } . into ( ) ;
903+
904+ self . logger . log_many ( [ source_event, target_event] ) ;
905+ }
860906 }
861907
862908 /// Events that the tracker may record.
863909 pub enum TrackerEvent {
864- /// Updates made at a source of data.
865- SourceUpdate ( SourceUpdate ) ,
866- /// Updates made at a target of data.
867- TargetUpdate ( TargetUpdate ) ,
910+ /// Pointstamp updates made at a source of data.
911+ SourcePointstampUpdate ( SourcePointstampUpdate ) ,
912+ /// Pointstamp updates made at a target of data.
913+ TargetPointstampUpdate ( TargetPointstampUpdate ) ,
914+ /// Frontier updates made at a source of data.
915+ SourceFrontierUpdate ( SourceFrontierUpdate ) ,
916+ /// Frontier updates made at a target of data.
917+ TargetFrontierUpdate ( TargetFrontierUpdate ) ,
918+ }
919+
920+ /// A pointstamp update made at a source of data.
921+ pub struct SourcePointstampUpdate {
922+ /// An identifier for the tracker.
923+ pub tracker_id : Vec < usize > ,
924+ /// Updates themselves, as `(node, port, time, diff)`.
925+ pub updates : Box < dyn ProgressEventTimestampVec > ,
926+ }
927+
928+ /// A pointstamp update made at a target of data.
929+ pub struct TargetPointstampUpdate {
930+ /// An identifier for the tracker.
931+ pub tracker_id : Vec < usize > ,
932+ /// Updates themselves, as `(node, port, time, diff)`.
933+ pub updates : Box < dyn ProgressEventTimestampVec > ,
868934 }
869935
870- /// An update made at a source of data.
871- pub struct SourceUpdate {
936+ /// A frontier update at a source of data.
937+ pub struct SourceFrontierUpdate {
872938 /// An identifier for the tracker.
873939 pub tracker_id : Vec < usize > ,
874940 /// Updates themselves, as `(node, port, time, diff)`.
875941 pub updates : Box < dyn ProgressEventTimestampVec > ,
876942 }
877943
878- /// An update made at a target of data.
879- pub struct TargetUpdate {
944+ /// A frontier update at a target of data.
945+ pub struct TargetFrontierUpdate {
880946 /// An identifier for the tracker.
881947 pub tracker_id : Vec < usize > ,
882948 /// Updates themselves, as `(node, port, time, diff)`.
883949 pub updates : Box < dyn ProgressEventTimestampVec > ,
884950 }
885951
886- impl From < SourceUpdate > for TrackerEvent {
887- fn from ( v : SourceUpdate ) -> TrackerEvent { TrackerEvent :: SourceUpdate ( v) }
952+ impl From < SourcePointstampUpdate > for TrackerEvent {
953+ fn from ( v : SourcePointstampUpdate ) -> Self {
954+ Self :: SourcePointstampUpdate ( v)
955+ }
956+ }
957+
958+ impl From < TargetPointstampUpdate > for TrackerEvent {
959+ fn from ( v : TargetPointstampUpdate ) -> Self {
960+ Self :: TargetPointstampUpdate ( v)
961+ }
962+ }
963+
964+ impl From < SourceFrontierUpdate > for TrackerEvent {
965+ fn from ( v : SourceFrontierUpdate ) -> Self {
966+ Self :: SourceFrontierUpdate ( v)
967+ }
888968 }
889969
890- impl From < TargetUpdate > for TrackerEvent {
891- fn from ( v : TargetUpdate ) -> TrackerEvent { TrackerEvent :: TargetUpdate ( v) }
970+ impl From < TargetFrontierUpdate > for TrackerEvent {
971+ fn from ( v : TargetFrontierUpdate ) -> Self {
972+ Self :: TargetFrontierUpdate ( v)
973+ }
892974 }
893975}
894976
@@ -906,32 +988,52 @@ impl<T: Timestamp> Drop for Tracker<T> {
906988 } ;
907989
908990 // Retract pending data that `propagate_all` would normally log.
991+ let mut target_pointstamp_changes = Vec :: new ( ) ;
992+ let mut source_pointstamp_changes = Vec :: new ( ) ;
993+ let mut target_frontier_changes = Vec :: new ( ) ;
994+ let mut source_frontier_changes = Vec :: new ( ) ;
995+
909996 for ( index, per_operator) in self . per_operator . iter_mut ( ) . enumerate ( ) {
910- let target_changes = per_operator. targets
911- . iter_mut ( )
912- . enumerate ( )
913- . flat_map ( |( port , target ) | {
914- target . pointstamps
915- . updates ( )
916- . map ( move | ( time , diff ) | ( index , port , time . clone ( ) , -diff ) )
917- } )
918- . collect :: < Vec < _ > > ( ) ;
919- if !target_changes . is_empty ( ) {
920- logger . log_target_updates ( Box :: new ( target_changes ) ) ;
997+ for ( port , target ) in per_operator. targets . iter_mut ( ) . enumerate ( ) {
998+ let pointstamp_retractions = target . pointstamps
999+ . updates ( )
1000+ . map ( |( time , diff ) | ( index , port , time . clone ( ) , -diff ) ) ;
1001+ target_pointstamp_changes . extend ( pointstamp_retractions ) ;
1002+
1003+ let frontier = target . implications . frontier ( ) . to_owned ( ) ;
1004+ let frontier_retractions = frontier
1005+ . into_iter ( )
1006+ . map ( |time| ( index , port , time , - 1 ) ) ;
1007+ target_frontier_changes . extend ( frontier_retractions ) ;
9211008 }
1009+ }
9221010
923- let source_changes = per_operator. sources
924- . iter_mut ( )
925- . enumerate ( )
926- . flat_map ( |( port, source) | {
927- source. pointstamps
928- . updates ( )
929- . map ( move |( time, diff) | ( index, port, time. clone ( ) , -diff) )
930- } )
931- . collect :: < Vec < _ > > ( ) ;
932- if !source_changes. is_empty ( ) {
933- logger. log_source_updates ( Box :: new ( source_changes) ) ;
1011+ for ( index, per_operator) in self . per_operator . iter_mut ( ) . enumerate ( ) {
1012+ for ( port, source) in per_operator. sources . iter_mut ( ) . enumerate ( ) {
1013+ let pointstamp_retractions = source. pointstamps
1014+ . updates ( )
1015+ . map ( |( time, diff) | ( index, port, time. clone ( ) , -diff) ) ;
1016+ source_pointstamp_changes. extend ( pointstamp_retractions) ;
1017+
1018+ let frontier = source. implications . frontier ( ) . to_owned ( ) ;
1019+ let frontier_retractions = frontier
1020+ . into_iter ( )
1021+ . map ( |time| ( index, port, time, -1 ) ) ;
1022+ source_frontier_changes. extend ( frontier_retractions) ;
9341023 }
9351024 }
1025+
1026+ if !target_pointstamp_changes. is_empty ( ) {
1027+ logger. log_target_pointstamp_updates ( Box :: new ( target_pointstamp_changes) ) ;
1028+ }
1029+ if !source_pointstamp_changes. is_empty ( ) {
1030+ logger. log_source_pointstamp_updates ( Box :: new ( source_pointstamp_changes) ) ;
1031+ }
1032+ if !source_frontier_changes. is_empty ( ) || !target_frontier_changes. is_empty ( ) {
1033+ logger. log_frontier_updates (
1034+ Box :: new ( source_frontier_changes) ,
1035+ Box :: new ( target_frontier_changes) ,
1036+ ) ;
1037+ }
9361038 }
9371039}
0 commit comments