@@ -49,6 +49,9 @@ pub struct Aggregator {
49
49
/// buffer is approaching capacity.
50
50
shared : Arc < Shared > ,
51
51
52
+ /// Currently active RPCs streaming state events.
53
+ state_watchers : ShrinkVec < Watch < proto:: instrument:: State > > ,
54
+
52
55
/// Currently active RPCs streaming task events.
53
56
watchers : ShrinkVec < Watch < proto:: instrument:: Update > > ,
54
57
@@ -148,6 +151,7 @@ impl Aggregator {
148
151
events,
149
152
watchers : Default :: default ( ) ,
150
153
details_watchers : Default :: default ( ) ,
154
+ state_watchers : Default :: default ( ) ,
151
155
all_metadata : Default :: default ( ) ,
152
156
new_metadata : Default :: default ( ) ,
153
157
tasks : IdData :: default ( ) ,
@@ -195,11 +199,8 @@ impl Aggregator {
195
199
self . add_task_detail_subscription( watch_request) ;
196
200
} ,
197
201
Some ( Command :: WatchState ( subscription) ) => {
198
- let state = proto:: instrument:: State {
199
- temporality: self . temporality. into( ) ,
200
- } ;
201
- subscription. update( & state) ;
202
- } ,
202
+ self . add_state_subscription( subscription) ;
203
+ }
203
204
Some ( Command :: Pause ) => {
204
205
self . temporality = proto:: instrument:: Temporality :: Paused ;
205
206
}
@@ -214,7 +215,6 @@ impl Aggregator {
214
215
215
216
false
216
217
}
217
-
218
218
} ;
219
219
220
220
// drain and aggregate buffered events.
@@ -252,6 +252,10 @@ impl Aggregator {
252
252
"event channel drain loop" ,
253
253
) ;
254
254
255
+ if !self . state_watchers . is_empty ( ) {
256
+ self . publish_state ( ) ;
257
+ }
258
+
255
259
// flush data to clients, if there are any currently subscribed
256
260
// watchers and we should send a new update.
257
261
if !self . watchers . is_empty ( ) && should_send {
@@ -396,6 +400,20 @@ impl Aggregator {
396
400
// If the task is not found, drop `stream_sender` which will result in a not found error
397
401
}
398
402
403
+ /// Add a state subscription to the watchers.
404
+ fn add_state_subscription ( & mut self , subscription : Watch < proto:: instrument:: State > ) {
405
+ self . state_watchers . push ( subscription) ;
406
+ }
407
+
408
+ /// Publish the current state to all active state watchers.
409
+ fn publish_state ( & mut self ) {
410
+ let state = proto:: instrument:: State {
411
+ temporality : self . temporality . into ( ) ,
412
+ } ;
413
+ self . state_watchers
414
+ . retain_and_shrink ( |watch| watch. update ( & state) ) ;
415
+ }
416
+
399
417
/// Publish the current state to all active watchers.
400
418
///
401
419
/// This drops any watchers which have closed the RPC, or whose update
0 commit comments