@@ -60,7 +60,7 @@ impl KvEventPublisher {
6060 }
6161
6262 pub fn publish ( & self , event : KvCacheEvent ) -> Result < ( ) , mpsc:: error:: SendError < KvCacheEvent > > {
63- tracing:: info !( "Publish event: {:?}" , event) ;
63+ tracing:: trace !( "Publish event: {:?}" , event) ;
6464 self . tx . send ( event)
6565 }
6666
@@ -90,6 +90,7 @@ fn start_publish_task(
9090
9191pub struct KvEventPublisherFromZmq {
9292 kv_block_size : usize ,
93+ processor_handle : Option < tokio:: task:: JoinHandle < ( ) > > ,
9394 zmq_handle : Option < tokio:: task:: JoinHandle < ( ) > > ,
9495 zmq_token : Option < dynamo_runtime:: CancellationToken > ,
9596}
@@ -98,6 +99,7 @@ impl KvEventPublisherFromZmq {
9899 pub fn new ( kv_block_size : usize ) -> Self {
99100 Self {
100101 kv_block_size,
102+ processor_handle : None ,
101103 zmq_handle : None ,
102104 zmq_token : None ,
103105 }
@@ -126,20 +128,23 @@ impl KvEventPublisherFromZmq {
126128 zmq_endpoint,
127129 zmq_topic,
128130 raw_tx,
129- zmq_token,
131+ zmq_token. clone ( ) ,
130132 ) ) ,
131133 ) ;
132134
133- component
134- . drt ( )
135- . runtime ( )
136- . secondary ( )
137- . spawn ( start_event_processor (
138- raw_rx,
139- component,
140- worker_id,
141- kv_block_size,
142- ) ) ;
135+ self . processor_handle = Some (
136+ component
137+ . drt ( )
138+ . runtime ( )
139+ . secondary ( )
140+ . spawn ( start_event_processor (
141+ raw_rx,
142+ component,
143+ worker_id,
144+ kv_block_size,
145+ zmq_token,
146+ ) )
147+ ) ;
143148 }
144149
145150 pub fn shutdown ( & mut self ) {
@@ -149,6 +154,9 @@ impl KvEventPublisherFromZmq {
149154 if let Some ( handle) = self . zmq_handle . take ( ) {
150155 handle. abort ( ) ;
151156 }
157+ if let Some ( handle) = self . processor_handle . take ( ) {
158+ handle. abort ( ) ;
159+ }
152160 }
153161}
154162
@@ -157,24 +165,45 @@ async fn start_event_processor<P: EventPublisher>(
157165 component : P ,
158166 worker_id : i64 ,
159167 kv_block_size : usize ,
168+ cancellation_token : dynamo_runtime:: CancellationToken ,
160169) {
161- while let Some ( ( seq, payload) ) = raw_rx. recv ( ) . await {
162- match rmps:: from_slice :: < KvEventBatch > ( & payload) {
163- Ok ( batch) => {
164- for raw_evt in batch. events . into_iter ( ) {
165- if let Some ( event) = convert_event ( raw_evt, seq, kv_block_size) {
166- let router_event = RouterEvent :: new ( worker_id, event) ;
167- if let Err ( e) = component. publish ( KV_EVENT_SUBJECT , & router_event) . await {
168- tracing:: warn!( "Failed to publish router event: {}" , e) ;
170+ loop {
171+ tokio:: select! {
172+ // Check for cancellation
173+ _ = cancellation_token. cancelled( ) => {
174+ tracing:: debug!( "Event processor received cancellation signal" ) ;
175+ break ;
176+ }
177+
178+ // Process incoming messages
179+ msg = raw_rx. recv( ) => {
180+ match msg {
181+ Some ( ( seq, payload) ) => {
182+ match rmps:: from_slice:: <KvEventBatch >( & payload) {
183+ Ok ( batch) => {
184+ for raw_evt in batch. events. into_iter( ) {
185+ if let Some ( event) = convert_event( raw_evt, seq, kv_block_size) {
186+ let router_event = RouterEvent :: new( worker_id, event) ;
187+ if let Err ( e) = component. publish( KV_EVENT_SUBJECT , & router_event) . await {
188+ tracing:: warn!( error=%e, "Failed to publish router event." ) ;
189+ }
190+ }
191+ }
192+ }
193+ Err ( e) => {
194+ tracing:: warn!( error=%e, "Failed to decode KVEventBatch msgpack" ) ;
195+ }
169196 }
170197 }
198+ None => {
199+ tracing:: debug!( "Event processor channel closed" ) ;
200+ break ;
201+ }
171202 }
172203 }
173- Err ( e) => {
174- tracing:: warn!( "Failed to decode KVEventBatch msgpack: {}" , e) ;
175- }
176204 }
177205 }
206+ tracing:: debug!( "Event processor exiting" ) ;
178207}
179208
180209async fn start_zmq_listener (
@@ -183,7 +212,7 @@ async fn start_zmq_listener(
183212 raw_tx : mpsc:: UnboundedSender < ( u64 , Vec < u8 > ) > ,
184213 zmq_token : dynamo_runtime:: CancellationToken ,
185214) {
186- tracing:: info !(
215+ tracing:: debug !(
187216 "KVEventPublisher connecting to ZMQ endpoint {} (topic '{}')" ,
188217 zmq_endpoint,
189218 zmq_topic
@@ -217,34 +246,34 @@ async fn start_zmq_listener(
217246 // We expect multipart frames: [topic, seq, payload]
218247 let mut frames: Vec <Vec <u8 >> = msg. into_vec( ) . into_iter( ) . map( |frame| frame. to_vec( ) ) . collect( ) ;
219248
220- if frames. len( ) == 3 {
221- let payload = frames. remove( 2 ) ;
222- let seq_bytes = frames. remove( 1 ) ;
249+ if frames. len( ) != 3 {
250+ tracing:: warn!( expected=3 , actual=%frames. len( ) , "Received unexpected ZMQ frame count" ) ;
251+ continue ;
252+ }
253+ let payload = frames. remove( 2 ) ;
254+ let seq_bytes = frames. remove( 1 ) ;
223255
224- if seq_bytes. len( ) != 8 {
225- tracing:: warn!( "Invalid sequence number frame len={}" , seq_bytes . len ( ) ) ;
226- continue ;
227- }
256+ if seq_bytes. len( ) != 8 {
257+ tracing:: warn!( expected= 8 , actual=%seq_bytes . len ( ) , "Invalid sequence number byte length" ) ;
258+ continue ;
259+ }
228260
229- let seq = u64 :: from_be_bytes( seq_bytes. try_into( ) . unwrap( ) ) ;
230- if raw_tx. send( ( seq, payload) ) . is_err( ) {
231- tracing:: warn!( "Failed to send message to channel - receiver dropped" ) ;
232- break ;
233- }
234- } else {
235- tracing:: warn!( "Received unexpected ZMQ frame count: {}" , frames. len( ) ) ;
261+ let seq = u64 :: from_be_bytes( seq_bytes. try_into( ) . unwrap( ) ) ;
262+ if raw_tx. send( ( seq, payload) ) . is_err( ) {
263+ tracing:: warn!( "Failed to send message to channel - receiver dropped" ) ;
264+ break ;
236265 }
237266 }
238267 Err ( e) => {
239- tracing:: warn!( "Error reading from ZMQ socket: {}" , e ) ;
268+ tracing:: warn!( error=%e , "Error reading from ZMQ socket" ) ;
240269 // Brief sleep to avoid tight error loop
241270 tokio:: time:: sleep( tokio:: time:: Duration :: from_millis( 10 ) ) . await ;
242271 }
243272 }
244273 }
245274 }
246275 }
247- tracing:: info !( "ZMQ listener exiting" ) ;
276+ tracing:: debug !( "ZMQ listener exiting" ) ;
248277}
249278
250279/// Convert a raw event coming from the ZMQ channel into the internal
@@ -355,17 +384,14 @@ struct KvEventBatch {
355384#[ derive( Debug , Deserialize , Serialize ) ]
356385#[ serde( tag = "type" ) ] // msgspec encodes variant tag as a string when `tag=True`
357386enum RawKvEvent {
358- #[ serde( rename = "BlockStored" ) ]
359387 BlockStored {
360388 block_hashes : Vec < i64 > ,
361389 parent_block_hash : Option < i64 > ,
362390 token_ids : Vec < u32 > ,
363391 block_size : usize ,
364392 lora_id : Option < u64 > ,
365393 } ,
366- #[ serde( rename = "BlockRemoved" ) ]
367394 BlockRemoved { block_hashes : Vec < i64 > } ,
368- #[ serde( rename = "AllBlocksCleared" ) ]
369395 AllBlocksCleared ,
370396}
371397
@@ -620,6 +646,8 @@ mod tests_startup_helpers {
620646 } ;
621647 let payload = rmps:: to_vec ( & batch) . unwrap ( ) ;
622648
649+ let token = dynamo_runtime:: CancellationToken :: new ( ) ;
650+
623651 // 2) channel feeding the processor
624652 let ( tx, rx) = mpsc:: unbounded_channel :: < ( u64 , Vec < u8 > ) > ( ) ;
625653 tx. send ( ( 123 , payload. clone ( ) ) ) . unwrap ( ) ; // seq = 123
@@ -629,7 +657,7 @@ mod tests_startup_helpers {
629657 let ( comp, published) = MockComponent :: new ( ) ;
630658
631659 // 4) run the function under test (let it consume exactly one msg)
632- let handle = tokio:: spawn ( start_event_processor ( rx, comp, worker_id, kv_block_size) ) ;
660+ let handle = tokio:: spawn ( start_event_processor ( rx, comp, worker_id, kv_block_size, token ) ) ;
633661
634662 tokio:: time:: timeout ( std:: time:: Duration :: from_secs ( 1 ) , handle)
635663 . await
0 commit comments