Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ repository = "https://github.com/webrtc-rs/examples"

[dev-dependencies]
webrtc = { path = "../webrtc" }

tokio = { version = "1.15", features = ["full"] }
console-subscriber = { version = "<=0.1.7" }
tokio = { version = "1.15", features = ["full", "tracing"] }
env_logger = "0.9"
clap = "3.0"
hyper = { version = "0.14", features = ["full"] }
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/reflect/reflect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ async fn main() -> Result<()> {
record.args()
)
})
.filter(None, log::LevelFilter::Trace)
.filter(None, log::LevelFilter::Info)
.init();
}

console_subscriber::init();
// Everything below is the WebRTC-rs API! Thanks for using it ❤️.

// Create a MediaEngine object to configure the supported codec
Expand Down
1 change: 1 addition & 0 deletions webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ring = "0.16.20"
sha2 = "0.10.2"
lazy_static = "1.4"
hex = "0.4.3"
pin-project-lite = "0.2.9"

# [minimal-versions]
# fixes "the trait bound `time::Month: From<u8>` is not satisfied"
Expand Down
63 changes: 52 additions & 11 deletions webrtc/src/peer_connection/operation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[cfg(test)]
mod operation_test;

use pin_project_lite::pin_project;
use std::borrow::Cow;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
Expand All @@ -11,6 +13,41 @@ use waitgroup::WaitGroup;

use crate::error::Result;

pin_project! {
struct PrintingFuture<Fut> {
name: Cow<'static, str>,
#[pin]
fut: Fut,
}
}

impl<Fut> PrintingFuture<Fut> {
fn new(fut: Fut, name: Cow<'static, str>) -> Self {
Self { fut, name }
}
}

impl<Fut> Future for PrintingFuture<Fut>
where
Fut: Future,
{
type Output = Fut::Output;

fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
log::info!("Polled future {}", self.name);
let this = self.project();

this.fut.poll(cx)
}
}

fn trace_future<Fut>(fut: Fut, name: impl Into<Cow<'static, str>>) -> PrintingFuture<Fut> {
PrintingFuture::new(fut, name.into())
}

/// Operation is a function
pub struct Operation(
pub Box<dyn (FnMut() -> Pin<Box<dyn Future<Output = bool> + Send + 'static>>) + Send + Sync>,
Expand Down Expand Up @@ -51,9 +88,10 @@ impl Operations {
let l = Arc::clone(&length);
let ops_tx = Arc::new(ops_tx);
let ops_tx2 = Arc::clone(&ops_tx);
tokio::spawn(async move {
Operations::start(l, ops_tx, ops_rx, close_rx).await;
});
tokio::spawn(trace_future(
Operations::start(l, ops_tx, ops_rx, close_rx),
"Operations::start",
));

Operations {
length,
Expand Down Expand Up @@ -113,19 +151,22 @@ impl Operations {
) {
loop {
tokio::select! {
_ = close_rx.recv() => {
_ = trace_future(close_rx.recv(), "close_rx.recv()") => {
break;
}
result = ops_rx.recv() => {
if let Some(mut f) = result {
length.fetch_sub(1, Ordering::SeqCst);
if f.0().await {
// Requeue this operation
let _ = Operations::enqueue_inner(f, &ops_tx, &length);
}
Some(mut f) = trace_future(ops_rx.recv(), "ops_rx.recv()") => {
length.fetch_sub(1, Ordering::SeqCst);
let op = format!("{:?}", f);
log::info!("Started operation {}", op);
if trace_future(f.0(), op.clone()).await {
// Requeue this operation
log::info!("Re-queued operation {}", op);
let _ = Operations::enqueue_inner(f, &ops_tx, &length);
}
log::info!("Done with operation {}", op);
}
}
log::info!("Operation loop spun");
}
}

Expand Down