diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a12ace73f..b27d7e647 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -16,8 +16,8 @@ repository = "https://github.com/webrtc-rs/examples" [dev-dependencies] webrtc = { path = "../webrtc" } - -tokio = { version = "1.15", features = ["full"] } +console-subscriber = { version = "<=0.1.7" } +tokio = { version = "1.15", features = ["full", "tracing"] } env_logger = "0.9" clap = "3.0" hyper = { version = "0.14", features = ["full"] } diff --git a/examples/examples/reflect/reflect.rs b/examples/examples/reflect/reflect.rs index fcb51c49c..d9f20dfdc 100644 --- a/examples/examples/reflect/reflect.rs +++ b/examples/examples/reflect/reflect.rs @@ -78,10 +78,10 @@ async fn main() -> Result<()> { record.args() ) }) - .filter(None, log::LevelFilter::Trace) + .filter(None, log::LevelFilter::Info) .init(); } - + console_subscriber::init(); // Everything below is the WebRTC-rs API! Thanks for using it ❤️. // Create a MediaEngine object to configure the supported codec diff --git a/webrtc/Cargo.toml b/webrtc/Cargo.toml index 92152684f..ef2dc5367 100644 --- a/webrtc/Cargo.toml +++ b/webrtc/Cargo.toml @@ -44,6 +44,7 @@ ring = "0.16.20" sha2 = "0.10.2" lazy_static = "1.4" hex = "0.4.3" +pin-project-lite = "0.2.9" # [minimal-versions] # fixes "the trait bound `time::Month: From` is not satisfied" diff --git a/webrtc/src/peer_connection/operation/mod.rs b/webrtc/src/peer_connection/operation/mod.rs index 5062f1b79..9006956c5 100644 --- a/webrtc/src/peer_connection/operation/mod.rs +++ b/webrtc/src/peer_connection/operation/mod.rs @@ -1,6 +1,8 @@ #[cfg(test)] mod operation_test; +use pin_project_lite::pin_project; +use std::borrow::Cow; use std::fmt; use std::future::Future; use std::pin::Pin; @@ -11,6 +13,41 @@ use waitgroup::WaitGroup; use crate::error::Result; +pin_project! { + struct PrintingFuture { + name: Cow<'static, str>, + #[pin] + fut: Fut, + } +} + +impl PrintingFuture { + fn new(fut: Fut, name: Cow<'static, str>) -> Self { + Self { fut, name } + } +} + +impl Future for PrintingFuture +where + Fut: Future, +{ + type Output = Fut::Output; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + log::info!("Polled future {}", self.name); + let this = self.project(); + + this.fut.poll(cx) + } +} + +fn trace_future(fut: Fut, name: impl Into>) -> PrintingFuture { + PrintingFuture::new(fut, name.into()) +} + /// Operation is a function pub struct Operation( pub Box Pin + Send + 'static>>) + Send + Sync>, @@ -51,9 +88,10 @@ impl Operations { let l = Arc::clone(&length); let ops_tx = Arc::new(ops_tx); let ops_tx2 = Arc::clone(&ops_tx); - tokio::spawn(async move { - Operations::start(l, ops_tx, ops_rx, close_rx).await; - }); + tokio::spawn(trace_future( + Operations::start(l, ops_tx, ops_rx, close_rx), + "Operations::start", + )); Operations { length, @@ -113,19 +151,22 @@ impl Operations { ) { loop { tokio::select! { - _ = close_rx.recv() => { + _ = trace_future(close_rx.recv(), "close_rx.recv()") => { break; } - result = ops_rx.recv() => { - if let Some(mut f) = result { - length.fetch_sub(1, Ordering::SeqCst); - if f.0().await { - // Requeue this operation - let _ = Operations::enqueue_inner(f, &ops_tx, &length); - } + Some(mut f) = trace_future(ops_rx.recv(), "ops_rx.recv()") => { + length.fetch_sub(1, Ordering::SeqCst); + let op = format!("{:?}", f); + log::info!("Started operation {}", op); + if trace_future(f.0(), op.clone()).await { + // Requeue this operation + log::info!("Re-queued operation {}", op); + let _ = Operations::enqueue_inner(f, &ops_tx, &length); } + log::info!("Done with operation {}", op); } } + log::info!("Operation loop spun"); } }