Skip to content

Commit a944dbc

Browse files
beneschhawkw
andcommitted
feat(console): add support for Unix domain sockets (#388)
Add support for console connections that use Unix domain sockets rather than TCP. Closes #296. Co-authored-by: Eliza Weisman <[email protected]>
1 parent f5b06d2 commit a944dbc

File tree

9 files changed

+208
-22
lines changed

9 files changed

+208
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

console-subscriber/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ env-filter = ["tracing-subscriber/env-filter"]
3333

3434
crossbeam-utils = "0.8.7"
3535
tokio = { version = "^1.21", features = ["sync", "time", "macros", "tracing"] }
36-
tokio-stream = "0.1"
36+
tokio-stream = { version = "0.1", features = ["net"] }
3737
thread_local = "1.1.3"
3838
console-api = { version = "0.4.0", path = "../console-api", features = ["transport"] }
3939
tonic = { version = "0.8", features = ["transport"] }

console-subscriber/examples/uds.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
//! Demonstrates serving the console API over a [Unix domain socket] (UDS)
2+
//! connection, rather than over TCP.
3+
//!
4+
//! Note that this example only works on Unix operating systems that
5+
//! support UDS, such as Linux, BSDs, and macOS.
6+
//!
7+
//! [Unix domain socket]: https://en.wikipedia.org/wiki/Unix_domain_socket
8+
9+
#[cfg(unix)]
10+
use {
11+
std::time::Duration,
12+
tokio::{fs, task, time},
13+
tracing::info,
14+
};
15+
16+
#[cfg(unix)]
17+
#[tokio::main]
18+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
19+
let cwd = fs::canonicalize(".").await?;
20+
let addr = cwd.join("console-server");
21+
console_subscriber::ConsoleLayer::builder()
22+
.server_addr(&*addr)
23+
.init();
24+
info!(
25+
"listening for console connections at file://localhost{}",
26+
addr.display()
27+
);
28+
task::Builder::default()
29+
.name("sleepy")
30+
.spawn(async move { time::sleep(Duration::from_secs(90)).await })
31+
.unwrap()
32+
.await?;
33+
34+
Ok(())
35+
}
36+
37+
#[cfg(not(unix))]
38+
fn main() {
39+
panic!("only supported on Unix platforms")
40+
}

console-subscriber/src/builder.rs

Lines changed: 104 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use super::{ConsoleLayer, Server};
2+
#[cfg(unix)]
3+
use std::path::Path;
24
use std::{
3-
net::{SocketAddr, ToSocketAddrs},
5+
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
46
path::PathBuf,
57
thread,
68
time::Duration,
@@ -32,7 +34,7 @@ pub struct Builder {
3234
pub(crate) retention: Duration,
3335

3436
/// The address on which to serve the RPC server.
35-
pub(super) server_addr: SocketAddr,
37+
pub(super) server_addr: ServerAddr,
3638

3739
/// If and where to save a recording of the events.
3840
pub(super) recording_path: Option<PathBuf>,
@@ -58,7 +60,7 @@ impl Default for Builder {
5860
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
5961
retention: ConsoleLayer::DEFAULT_RETENTION,
6062
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
61-
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
63+
server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)),
6264
recording_path: None,
6365
filter_env_var: "RUST_LOG".to_string(),
6466
self_trace: false,
@@ -137,8 +139,38 @@ impl Builder {
137139
/// before falling back on constructing a socket address from those
138140
/// defaults.
139141
///
142+
/// The socket address can be either a TCP socket address or a
143+
/// [Unix domain socket] (UDS) address. Unix domain sockets are only
144+
/// supported on Unix-compatible operating systems, such as Linux, BSDs,
145+
/// and macOS.
146+
///
147+
/// Each call to this method will overwrite the previously set value.
148+
///
149+
/// # Examples
150+
///
151+
/// Connect to the TCP address `localhost:1234`:
152+
///
153+
/// ```
154+
/// # use console_subscriber::Builder;
155+
/// use std::net::Ipv4Addr;
156+
/// let builder = Builder::default().server_addr((Ipv4Addr::LOCALHOST, 1234));
157+
/// ```
158+
///
159+
/// Connect to the UDS address `/tmp/tokio-console`:
160+
///
161+
/// ```
162+
/// # use console_subscriber::Builder;
163+
/// # #[cfg(unix)]
164+
/// use std::path::Path;
165+
///
166+
/// // Unix domain sockets are only available on Unix-compatible operating systems.
167+
/// #[cfg(unix)]
168+
/// let builder = Builder::default().server_addr(Path::new("/tmp/tokio-console"));
169+
/// ```
170+
///
140171
/// [environment variable]: `Builder::with_default_env`
141-
pub fn server_addr(self, server_addr: impl Into<SocketAddr>) -> Self {
172+
/// [Unix domain socket]: https://en.wikipedia.org/wiki/Unix_domain_socket
173+
pub fn server_addr(self, server_addr: impl Into<ServerAddr>) -> Self {
142174
Self {
143175
server_addr: server_addr.into(),
144176
..self
@@ -231,11 +263,14 @@ impl Builder {
231263
}
232264

233265
if let Ok(bind) = std::env::var("TOKIO_CONSOLE_BIND") {
234-
self.server_addr = bind
235-
.to_socket_addrs()
236-
.expect("TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321")
237-
.next()
238-
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND");
266+
self.server_addr = ServerAddr::Tcp(
267+
bind.to_socket_addrs()
268+
.expect(
269+
"TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321",
270+
)
271+
.next()
272+
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND"),
273+
);
239274
}
240275

241276
if let Some(interval) = duration_from_env("TOKIO_CONSOLE_PUBLISH_INTERVAL") {
@@ -456,6 +491,66 @@ impl Builder {
456491
}
457492
}
458493

494+
/// Specifies the address on which a [`Server`] should listen.
495+
///
496+
/// This type is passed as an argument to the [`Builder::server_addr`]
497+
/// method, and may be either a TCP socket address, or a [Unix domain socket]
498+
/// (UDS) address. Unix domain sockets are only supported on Unix-compatible
499+
/// operating systems, such as Linux, BSDs, and macOS.
500+
///
501+
/// [`Server`]: crate::Server
502+
/// [Unix domain socket]: https://en.wikipedia.org/wiki/Unix_domain_socket
503+
#[derive(Clone, Debug)]
504+
#[non_exhaustive]
505+
pub enum ServerAddr {
506+
/// A TCP address.
507+
Tcp(SocketAddr),
508+
/// A Unix socket address.
509+
#[cfg(unix)]
510+
Unix(PathBuf),
511+
}
512+
513+
impl From<SocketAddr> for ServerAddr {
514+
fn from(addr: SocketAddr) -> ServerAddr {
515+
ServerAddr::Tcp(addr)
516+
}
517+
}
518+
519+
impl From<SocketAddrV4> for ServerAddr {
520+
fn from(addr: SocketAddrV4) -> ServerAddr {
521+
ServerAddr::Tcp(addr.into())
522+
}
523+
}
524+
525+
impl From<SocketAddrV6> for ServerAddr {
526+
fn from(addr: SocketAddrV6) -> ServerAddr {
527+
ServerAddr::Tcp(addr.into())
528+
}
529+
}
530+
531+
impl<I> From<(I, u16)> for ServerAddr
532+
where
533+
I: Into<IpAddr>,
534+
{
535+
fn from(pieces: (I, u16)) -> ServerAddr {
536+
ServerAddr::Tcp(pieces.into())
537+
}
538+
}
539+
540+
#[cfg(unix)]
541+
impl From<PathBuf> for ServerAddr {
542+
fn from(path: PathBuf) -> ServerAddr {
543+
ServerAddr::Unix(path)
544+
}
545+
}
546+
547+
#[cfg(unix)]
548+
impl<'a> From<&'a Path> for ServerAddr {
549+
fn from(path: &'a Path) -> ServerAddr {
550+
ServerAddr::Unix(path.to_path_buf())
551+
}
552+
}
553+
459554
/// Initializes the console [tracing `Subscriber`][sub] and starts the console
460555
/// subscriber [`Server`] on its own background thread.
461556
///

console-subscriber/src/lib.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,19 @@ use serde::Serialize;
55
use std::{
66
cell::RefCell,
77
fmt,
8-
net::{IpAddr, Ipv4Addr, SocketAddr},
8+
net::{IpAddr, Ipv4Addr},
99
sync::{
1010
atomic::{AtomicUsize, Ordering},
1111
Arc,
1212
},
1313
time::{Duration, Instant},
1414
};
1515
use thread_local::ThreadLocal;
16+
#[cfg(unix)]
17+
use tokio::net::UnixListener;
1618
use tokio::sync::{mpsc, oneshot};
19+
#[cfg(unix)]
20+
use tokio_stream::wrappers::UnixListenerStream;
1721
use tracing_core::{
1822
span::{self, Id},
1923
subscriber::{self, Subscriber},
@@ -36,7 +40,7 @@ pub(crate) mod sync;
3640
mod visitors;
3741

3842
use aggregator::Aggregator;
39-
pub use builder::Builder;
43+
pub use builder::{Builder, ServerAddr};
4044
use callsites::Callsites;
4145
use record::Recorder;
4246
use stack::SpanStack;
@@ -134,7 +138,7 @@ pub struct ConsoleLayer {
134138
/// [cli]: https://crates.io/crates/tokio-console
135139
pub struct Server {
136140
subscribe: mpsc::Sender<Command>,
137-
addr: SocketAddr,
141+
addr: ServerAddr,
138142
aggregator: Option<Aggregator>,
139143
client_buffer: usize,
140144
}
@@ -945,13 +949,22 @@ impl Server {
945949
.take()
946950
.expect("cannot start server multiple times");
947951
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
948-
let addr = self.addr;
949-
let serve = builder
950-
.add_service(proto::instrument::instrument_server::InstrumentServer::new(
951-
self,
952-
))
953-
.serve(addr);
954-
let res = spawn_named(serve, "console::serve").await;
952+
let addr = self.addr.clone();
953+
let router = builder.add_service(
954+
proto::instrument::instrument_server::InstrumentServer::new(self),
955+
);
956+
let res = match addr {
957+
ServerAddr::Tcp(addr) => {
958+
let serve = router.serve(addr);
959+
spawn_named(serve, "console::serve").await
960+
}
961+
#[cfg(unix)]
962+
ServerAddr::Unix(path) => {
963+
let incoming = UnixListener::bind(path)?;
964+
let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
965+
spawn_named(serve, "console::serve").await
966+
}
967+
};
955968
aggregate.abort();
956969
res?.map_err(Into::into)
957970
}

tokio-console/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ tokio = { version = "1", features = ["full", "rt-multi-thread"] }
3434
tonic = { version = "0.8", features = ["transport"] }
3535
futures = "0.3"
3636
tui = { version = "0.16.0", default-features = false, features = ["crossterm"] }
37+
tower = "0.4.12"
3738
tracing = "0.1"
3839
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
3940
tracing-journald = { version = "0.2", optional = true }

tokio-console/args.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ ARGS:
77

88
This may be an IP address and port, or a DNS name.
99

10+
On Unix platforms, this may also be a URI with the `file` scheme that specifies the path
11+
to a Unix domain socket, as in `file://localhost/path/to/socket`.
12+
1013
[default: http://127.0.0.1:6669]
1114

1215
OPTIONS:

tokio-console/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ pub struct Config {
2626
///
2727
/// This may be an IP address and port, or a DNS name.
2828
///
29+
/// On Unix platforms, this may also be a URI with the `file` scheme that
30+
/// specifies the path to a Unix domain socket, as in
31+
/// `file://localhost/path/to/socket`.
32+
///
2933
/// [default: http://127.0.0.1:6669]
3034
#[clap(value_hint = ValueHint::Url)]
3135
pub(crate) target_addr: Option<Uri>,

tokio-console/src/conn.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ use console_api::instrument::{
55
use console_api::tasks::TaskDetails;
66
use futures::stream::StreamExt;
77
use std::{error::Error, pin::Pin, time::Duration};
8-
use tonic::{transport::Channel, transport::Uri, Streaming};
8+
#[cfg(unix)]
9+
use tokio::net::UnixStream;
10+
use tonic::{
11+
transport::{Channel, Endpoint, Uri},
12+
Streaming,
13+
};
914

1015
#[derive(Debug)]
1116
pub struct Connection {
@@ -78,7 +83,31 @@ impl Connection {
7883
tokio::time::sleep(backoff).await;
7984
}
8085
let try_connect = async {
81-
let mut client = InstrumentClient::connect(self.target.clone()).await?;
86+
let channel = match self.target.scheme_str() {
87+
#[cfg(unix)]
88+
Some("file") => {
89+
// Dummy endpoint is ignored by the connector.
90+
let endpoint = Endpoint::from_static("http://localhost");
91+
if !matches!(self.target.host(), None | Some("localhost")) {
92+
return Err("cannot connect to non-localhost unix domain socket".into());
93+
}
94+
let path = self.target.path().to_owned();
95+
endpoint
96+
.connect_with_connector(tower::service_fn(move |_| {
97+
UnixStream::connect(path.clone())
98+
}))
99+
.await?
100+
}
101+
#[cfg(not(unix))]
102+
Some("file") => {
103+
return Err("unix domain sockets are not supported on this platform".into());
104+
}
105+
_ => {
106+
let endpoint = Endpoint::try_from(self.target.clone())?;
107+
endpoint.connect().await?
108+
}
109+
};
110+
let mut client = InstrumentClient::new(channel);
82111
let request = tonic::Request::new(InstrumentRequest {});
83112
let stream = Box::new(client.watch_updates(request).await?.into_inner());
84113
Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected { client, stream })

0 commit comments

Comments
 (0)