@@ -183,11 +183,12 @@ def process_input_socket(self, front_publish_address: str,
183183 # engines are paused, so that we can wake the other
184184 # engines.
185185 engine_to_exclude , wave = msgspec .msgpack .decode (buffer )
186- if wave < self .current_wave :
187- # If the wave number is stale, ensure the message is
188- # handled by all the engines.
189- engine_to_exclude = None
190186 if not self .engines_running :
187+ if wave < self .current_wave :
188+ # If the wave number is stale, ensure the message
189+ # is handled by all the engines.
190+ engine_to_exclude = None
191+
191192 self .engines_running = True
192193 self .stats_changed = True
193194 self ._send_start_wave (publish_back , self .current_wave ,
@@ -203,22 +204,24 @@ def process_input_socket(self, front_publish_address: str,
203204 assert outputs .utility_output is None
204205
205206 eng_index = outputs .engine_index
206- if outputs .scheduler_stats :
207+ scheduler_stats = outputs .scheduler_stats
208+ if scheduler_stats :
207209 # 1. Updated request load stats - update our local
208210 # state with these.
209211 stats = self .engines [eng_index ].request_counts
210- stats [0 ] = outputs . scheduler_stats .num_waiting_reqs
211- stats [1 ] = outputs . scheduler_stats .num_running_reqs
212+ stats [0 ] = scheduler_stats .num_waiting_reqs
213+ stats [1 ] = scheduler_stats .num_running_reqs
212214 self .stats_changed = True
213215
214216 if (wave := outputs .wave_complete ) is not None :
215217 # 2. Notification from rank 0 engine that we've
216218 # moved into the global paused state
217- # (engines_running==False)
219+ # (engines_running==False).
218220 if self .current_wave <= wave :
221+ new_wave = wave + 1
219222 logger .debug ("Moving DP wave from %d to %d." ,
220- self .current_wave , wave )
221- self .current_wave = wave + 1
223+ self .current_wave , new_wave )
224+ self .current_wave = new_wave
222225 self .engines_running = False
223226 self .stats_changed = True
224227 elif (wave := outputs .start_wave ) is not None and (
0 commit comments