File tree Expand file tree Collapse file tree 6 files changed +63
-107
lines changed
monarch_hyperactor/src/v1 Expand file tree Collapse file tree 6 files changed +63
-107
lines changed Original file line number Diff line number Diff line change @@ -2226,20 +2226,16 @@ mod tests {
22262226 proc. spawn ( "log_client" , ( ) ) . await . unwrap ( ) . bind ( ) ;
22272227 log_client. set_aggregate ( & client, None ) . await . unwrap ( ) ;
22282228
2229- // Spawn the forwarder in this proc (it will serve
2230- // BOOTSTRAP_LOG_CHANNEL).
2229+ // Spawn the forwarder in this proc (it will serve BOOTSTRAP_LOG_CHANNEL).
22312230 let _log_forwarder: ActorRef < LogForwardActor > = proc
22322231 . spawn ( "log_forwarder" , log_client. clone ( ) )
22332232 . await
22342233 . unwrap ( )
22352234 . bind ( ) ;
22362235
2237- // Dial the channel but don't post until we know the forwarder
2238- // is receiving.
2239- let tx = channel:: dial :: < LogMessage > ( log_channel. clone ( ) ) . unwrap ( ) ;
2240-
22412236 // Send a fake log message as if it came from the proc
22422237 // manager's writer.
2238+ let tx = channel:: dial :: < LogMessage > ( log_channel. clone ( ) ) . unwrap ( ) ;
22432239 tx. post ( LogMessage :: Log {
22442240 hostname : "testhost" . into ( ) ,
22452241 pid : 12345 ,
@@ -2248,6 +2244,8 @@ mod tests {
22482244 } ) ;
22492245
22502246 // Assert we see it via the tap.
2247+ // Give it up to 2 seconds to travel through forwarder ->
2248+ // client -> print_log_line -> tap.
22512249 let line = RealClock
22522250 . timeout ( Duration :: from_secs ( 2 ) , tap_rx. recv ( ) )
22532251 . await
Original file line number Diff line number Diff line change @@ -212,6 +212,25 @@ impl ProcMeshAgent {
212212 }
213213
214214 pub ( crate ) async fn boot_v1 ( proc : Proc ) -> Result < ActorHandle < Self > , anyhow:: Error > {
215+ // Spawn a LogClientActor in this proc. It aggregates and
216+ // prints logs coming from our stdout/stderr (via the parent's
217+ // log writers). This is the sink for all forwarded LogMessage
218+ // traffic.
219+ let log_client_ref = proc
220+ . spawn ( "log_client" , ( ) )
221+ . await ?
222+ . bind :: < crate :: logging:: LogClientActor > ( ) ;
223+
224+ // Spawn a LogForwardActor. It serves BOOTSTRAP_LOG_CHANNEL
225+ // (set by the parent ProcManager) and forwards any LogMessage
226+ // it receives there into the above LogClientActor. Together
227+ // these give us structured log forwarding without blocking
228+ // pipes.
229+ let _log_fwd_ref = proc
230+ . spawn ( "log_forwarder" , log_client_ref. clone ( ) )
231+ . await ?
232+ . bind :: < crate :: logging:: LogForwardActor > ( ) ;
233+
215234 let agent = ProcMeshAgent {
216235 proc : proc. clone ( ) ,
217236 remote : Remote :: collect ( ) ,
Original file line number Diff line number Diff line change @@ -17,7 +17,6 @@ use hyperactor_mesh::logging::LogClientMessage;
1717use hyperactor_mesh:: logging:: LogForwardActor ;
1818use hyperactor_mesh:: logging:: LogForwardMessage ;
1919use hyperactor_mesh:: v1:: ActorMesh ;
20- use hyperactor_mesh:: v1:: Name ;
2120use hyperactor_mesh:: v1:: actor_mesh:: ActorMeshRef ;
2221use ndslice:: View ;
2322use pyo3:: Bound ;
@@ -81,10 +80,7 @@ impl LoggingMeshClient {
8180 PyPythonTask :: new ( async move {
8281 let client_actor: ActorHandle < LogClientActor > =
8382 instance_dispatch ! ( instance, async move |cx_instance| {
84- cx_instance
85- . proc( )
86- . spawn( & Name :: new( "log_client" ) . to_string( ) , ( ) )
87- . await
83+ cx_instance. proc( ) . spawn( "log_client" , ( ) ) . await
8884 } ) ?;
8985 let client_actor_ref = client_actor. bind ( ) ;
9086 let forwarder_mesh = instance_dispatch ! ( instance, async |cx_instance| {
Original file line number Diff line number Diff line change @@ -50,8 +50,7 @@ def flush_all_proc_mesh_logs(v1: bool = False) -> None:
5050 from monarch ._src .actor .v1 .proc_mesh import get_active_proc_meshes
5151
5252 for pm in get_active_proc_meshes ():
53- if pm ._logging_manager ._logging_mesh_client is not None :
54- pm ._logging_manager .flush ()
53+ pm ._logging_manager .flush ()
5554
5655
5756class LoggingManager :
Original file line number Diff line number Diff line change @@ -200,7 +200,8 @@ async def task(
200200 ) -> HyProcMesh :
201201 hy_proc_mesh = await hy_proc_mesh_task
202202
203- await pm ._logging_manager .init (hy_proc_mesh , stream_log_to_client )
203+ # FIXME: Fix log forwarding.
204+ # await pm._logging_manager.init(hy_proc_mesh, stream_log_to_client)
204205
205206 if setup_actor is not None :
206207 await setup_actor .setup .call ()
You can’t perform that action at this time.
0 commit comments