diff --git a/Cargo.toml b/Cargo.toml index 1a6966772..5df4399b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,4 +4,7 @@ members = [ "console-subscriber", "console-api" ] -resolver = "2" \ No newline at end of file +resolver = "2" + +[patch.crates-io] +tokio = { git = 'https://github.com/zaharidichev/tokio', branch = 'zd/instrument-sleep' } diff --git a/console-api/build.rs b/console-api/build.rs index e5a9ef830..c160623d4 100644 --- a/console-api/build.rs +++ b/console-api/build.rs @@ -5,6 +5,9 @@ fn main() -> Result<(), Box> { "../proto/trace.proto", "../proto/common.proto", "../proto/tasks.proto", + "../proto/instrument.proto", + "../proto/resources.proto", + "../proto/async_ops.proto", ]; let dirs = &["../proto"]; diff --git a/console-api/src/async_ops.rs b/console-api/src/async_ops.rs new file mode 100644 index 000000000..9a1c57dd0 --- /dev/null +++ b/console-api/src/async_ops.rs @@ -0,0 +1 @@ +tonic::include_proto!("rs.tokio.console.async_ops"); diff --git a/console-api/src/common.rs b/console-api/src/common.rs index 2acd26498..e7ec48601 100644 --- a/console-api/src/common.rs +++ b/console-api/src/common.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::hash::{Hash, Hasher}; tonic::include_proto!("rs.tokio.console.common"); @@ -32,19 +33,11 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata { metadata::Kind::Event }; - let location = Location { - file: meta.file().map(String::from), - module_path: meta.module_path().map(String::from), - line: meta.line(), - column: None, // tracing doesn't support columns yet - }; - let field_names = meta.fields().iter().map(|f| f.name().to_string()).collect(); - Metadata { name: meta.name().to_string(), target: meta.target().to_string(), - location: Some(location), + location: Some(meta.into()), kind: kind as i32, level: metadata::Level::from(*meta.level()) as i32, field_names, @@ -53,6 +46,17 @@ impl<'a> From<&'a tracing_core::Metadata<'a>> for Metadata { } } +impl<'a> From<&'a tracing_core::Metadata<'a>> for Location { + fn from(meta: &'a tracing_core::Metadata<'a>) -> Self { + Location { + file: meta.file().map(String::from), + module_path: meta.module_path().map(String::from), + line: meta.line(), + column: None, // tracing doesn't support columns yet + } + } +} + impl<'a> From<&'a std::panic::Location<'a>> for Location { fn from(loc: &'a std::panic::Location<'a>) -> Self { Location { @@ -185,3 +189,35 @@ impl From<&dyn std::fmt::Debug> for field::Value { field::Value::DebugVal(format!("{:?}", val)) } } + +// Clippy warns when a type derives `PartialEq` but has a manual `Hash` impl, +// or vice versa. However, this is unavoidable here, because `prost` generates +// a struct with `#[derive(PartialEq)]`, but we cannot add`#[derive(Hash)]` to the +// generated code. +#[allow(clippy::derive_hash_xor_eq)] +impl Hash for field::Name { + fn hash(&self, state: &mut H) { + match self { + field::Name::NameIdx(idx) => idx.hash(state), + field::Name::StrName(s) => s.hash(state), + } + } +} + +impl Eq for field::Name {} + +// === IDs === + +impl From for Id { + fn from(id: u64) -> Self { + Id { id } + } +} + +impl From for u64 { + fn from(id: Id) -> Self { + id.id + } +} + +impl Copy for Id {} diff --git a/console-api/src/instrument.rs b/console-api/src/instrument.rs new file mode 100644 index 000000000..bfd2fe0fc --- /dev/null +++ b/console-api/src/instrument.rs @@ -0,0 +1 @@ +tonic::include_proto!("rs.tokio.console.instrument"); diff --git a/console-api/src/lib.rs b/console-api/src/lib.rs index b96974159..059121e01 100644 --- a/console-api/src/lib.rs +++ b/console-api/src/lib.rs @@ -1,4 +1,7 @@ +pub mod async_ops; mod common; +pub mod instrument; +pub mod resources; pub mod tasks; pub mod trace; pub use common::*; diff --git a/console-api/src/resources.rs b/console-api/src/resources.rs new file mode 100644 index 000000000..2fc92b71b --- /dev/null +++ b/console-api/src/resources.rs @@ -0,0 +1 @@ +tonic::include_proto!("rs.tokio.console.resources"); diff --git a/console-api/src/tasks.rs b/console-api/src/tasks.rs index 1955ba5a0..3d62eb420 100644 --- a/console-api/src/tasks.rs +++ b/console-api/src/tasks.rs @@ -1,17 +1 @@ tonic::include_proto!("rs.tokio.console.tasks"); - -// === IDs === - -impl From for TaskId { - fn from(id: u64) -> Self { - TaskId { id } - } -} - -impl From for u64 { - fn from(id: TaskId) -> Self { - id.id - } -} - -impl Copy for TaskId {} diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 820299e0a..595b6b906 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -13,6 +13,7 @@ parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"] tokio = { version = "^1.10", features = ["sync", "time", "macros", "tracing"]} tokio-stream = "0.1" +thread_local = "1.1.3" console-api = { path = "../console-api", features = ["transport"]} tonic = { version = "0.5", features = ["transport"] } tracing-core = "0.1.18" diff --git a/console-subscriber/examples/dump.rs b/console-subscriber/examples/dump.rs index db7e6e7dd..26246f323 100644 --- a/console-subscriber/examples/dump.rs +++ b/console-subscriber/examples/dump.rs @@ -1,4 +1,4 @@ -use console_api::tasks::{tasks_client::TasksClient, TasksRequest}; +use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest}; use futures::stream::StreamExt; #[tokio::main] @@ -11,16 +11,16 @@ async fn main() -> Result<(), Box> { }); eprintln!("CONNECTING: {}", target); - let mut client = TasksClient::connect(target).await?; + let mut client = InstrumentClient::connect(target).await?; - let request = tonic::Request::new(TasksRequest {}); - let mut stream = client.watch_tasks(request).await?.into_inner(); + let request = tonic::Request::new(InstrumentRequest {}); + let mut stream = client.watch_updates(request).await?.into_inner(); let mut i: usize = 0; while let Some(update) = stream.next().await { match update { Ok(update) => { - eprintln!("UPDATE {}: {:#?}\n", i, update); + println!("UPDATE {}: {:#?}\n", i, update); i += 1; } Err(e) => { diff --git a/console-subscriber/src/aggregator/id_data.rs b/console-subscriber/src/aggregator/id_data.rs new file mode 100644 index 000000000..a94691295 --- /dev/null +++ b/console-subscriber/src/aggregator/id_data.rs @@ -0,0 +1,150 @@ +use super::{shrink::ShrinkMap, Closable, Id, Ids, ToProto}; +use std::collections::{HashMap, HashSet}; +use std::ops::{Deref, DerefMut}; +use std::time::{Duration, SystemTime}; + +pub(crate) struct IdData { + data: ShrinkMap, +} + +pub(crate) struct Updating<'a, T>(&'a mut (T, bool)); + +pub(crate) enum Include { + All, + UpdatedOnly, +} + +// === impl IdData === + +impl Default for IdData { + fn default() -> Self { + IdData { + data: ShrinkMap::::new(), + } + } +} + +impl IdData { + pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T> + where + T: Default, + { + Updating(self.data.entry(id).or_default()) + } + + pub(crate) fn update(&mut self, id: &Id) -> Option> { + self.data.get_mut(id).map(Updating) + } + + pub(crate) fn insert(&mut self, id: Id, data: T) { + self.data.insert(id, (data, true)); + } + + pub(crate) fn since_last_update(&mut self) -> impl Iterator { + self.data.iter_mut().filter_map(|(id, (data, dirty))| { + if *dirty { + *dirty = false; + Some((id, data)) + } else { + None + } + }) + } + + pub(crate) fn all(&self) -> impl Iterator { + self.data.iter().map(|(id, (data, _))| (id, data)) + } + + pub(crate) fn get(&self, id: &Id) -> Option<&T> { + self.data.get(id).map(|(data, _)| data) + } + + pub(crate) fn as_proto(&mut self, include: Include) -> HashMap + where + T: ToProto, + { + match include { + Include::UpdatedOnly => self + .since_last_update() + .map(|(id, d)| (*id, d.to_proto())) + .collect(), + Include::All => self.all().map(|(id, d)| (*id, d.to_proto())).collect(), + } + } + + pub(crate) fn drop_closed( + &mut self, + stats: &mut IdData, + now: SystemTime, + retention: Duration, + has_watchers: bool, + ids: &mut Ids, + ) { + let _span = tracing::debug_span!( + "drop_closed", + entity = %std::any::type_name::(), + stats = %std::any::type_name::(), + ) + .entered(); + + // drop closed entities + tracing::trace!(?retention, has_watchers, "dropping closed"); + + let mut dropped_ids = HashSet::new(); + stats.data.retain_and_shrink(|id, (stats, dirty)| { + if let Some(closed) = stats.closed_at() { + let closed_for = now.duration_since(closed).unwrap_or_default(); + let should_drop = + // if there are any clients watching, retain all dirty tasks regardless of age + (*dirty && has_watchers) + || closed_for > retention; + tracing::trace!( + stats.id = ?id, + stats.closed_at = ?closed, + stats.closed_for = ?closed_for, + stats.dirty = *dirty, + should_drop, + ); + + if should_drop { + dropped_ids.insert(*id); + } + return !should_drop; + } + + true + }); + + // drop closed entities which no longer have stats. + self.data + .retain_and_shrink(|id, (_, _)| stats.data.contains_key(id)); + + if !dropped_ids.is_empty() { + // drop closed entities which no longer have stats. + self.data + .retain_and_shrink(|id, (_, _)| stats.data.contains_key(id)); + ids.remove_all(&dropped_ids); + } + } +} + +// === impl Updating === + +impl<'a, T> Deref for Updating<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 .0 + } +} + +impl<'a, T> DerefMut for Updating<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 .0 + } +} + +impl<'a, T> Drop for Updating<'a, T> { + fn drop(&mut self) { + self.0 .1 = true; + } +} diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 1714b893f..2ce750abd 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -1,12 +1,13 @@ +use super::{AttributeUpdate, AttributeUpdateOp, Command, Event, WakeOp, Watch}; use crate::{record::Recorder, WatchRequest}; - -use super::{Command, Event, WakeOp, Watch}; use console_api as proto; +use proto::resources::resource; +use proto::resources::stats::Attribute; use tokio::sync::{mpsc, Notify}; use futures::FutureExt; use std::{ - collections::hash_map::Entry, + collections::{hash_map::Entry, HashMap, HashSet}, convert::TryInto, sync::{ atomic::{AtomicBool, Ordering::*}, @@ -21,12 +22,12 @@ use hdrhistogram::{ Histogram, }; -pub type TaskId = u64; +pub type Id = u64; +mod id_data; mod shrink; -mod task_data; +use self::id_data::{IdData, Include}; use self::shrink::{ShrinkMap, ShrinkVec}; -use self::task_data::TaskData; pub(crate) struct Aggregator { /// Channel of incoming events emitted by `TaskLayer`s. @@ -45,10 +46,10 @@ pub(crate) struct Aggregator { flush_capacity: Arc, /// Currently active RPCs streaming task events. - watchers: ShrinkVec>, + watchers: ShrinkVec>, /// Currently active RPCs streaming task details events, by task ID. - details_watchers: ShrinkMap>>, + details_watchers: ShrinkMap>>, /// *All* metadata for task spans and user-defined spans that we care about. /// @@ -61,12 +62,35 @@ pub(crate) struct Aggregator { new_metadata: Vec, /// Map of task IDs to task static data. - tasks: TaskData, + tasks: IdData, /// Map of task IDs to task stats. - stats: TaskData, + task_stats: IdData, + + /// Map of resource IDs to resource static data. + resources: IdData, + + /// Map of resource IDs to resource stats. + resource_stats: IdData, + + /// Map of AsyncOp IDs to AsyncOp static data. + async_ops: IdData, - task_ids: TaskIds, + /// Map of AsyncOp IDs to AsyncOp stats. + async_op_stats: IdData, + + /// *All* PollOp events for AsyncOps on Resources. + /// + /// This is sent to new clients as part of the initial state. + // TODO: drop the poll ops for async ops that have been dropped + all_poll_ops: ShrinkVec, + + /// *New* PollOp events that whave occured since the last update + /// + /// This is emptied on every state update. + new_poll_ops: Vec, + + ids: Ids, /// A sink to record all events to a file. recorder: Option, @@ -81,21 +105,79 @@ pub(crate) struct Flush { pub(crate) triggered: AtomicBool, } +// An entity that at some point in time can be closed. +// This generally refers to spans that have been closed +// indicating that a task, async op or a resource is not +// in use anymore +pub(crate) trait Closable { + fn closed_at(&self) -> Option; +} + +pub(crate) trait ToProto { + type Output; + fn to_proto(&self) -> Self::Output; +} + +#[derive(Debug, Default)] +pub(crate) struct Ids { + /// A counter for the pretty task IDs. + next: Id, + + /// A table that contains the span ID to pretty ID mappings. + id_mappings: ShrinkMap, +} + #[derive(Debug)] enum Temporality { Live, Paused, } -struct Stats { - // task stats - polls: u64, +struct PollStats { + /// The number of polls in progress current_polls: u64, - created_at: Option, + /// The total number of polls + polls: u64, first_poll: Option, last_poll_started: Option, last_poll_ended: Option, busy_time: Duration, +} + +// Represent static data for resources +struct Resource { + id: Id, + metadata: &'static Metadata<'static>, + concrete_type: String, + kind: resource::Kind, +} + +/// Represents a key for a `proto::field::Name`. Because the +/// proto::field::Name might not be unique we also include the +/// metadata id in this key +#[derive(Hash, PartialEq, Eq)] +struct FieldKey { + meta_id: u64, + field_name: proto::field::Name, +} + +#[derive(Default)] +struct ResourceStats { + created_at: Option, + closed_at: Option, + attributes: HashMap, +} + +/// Represents static data for tasks +struct Task { + id: Id, + metadata: &'static Metadata<'static>, + fields: Vec, +} + +struct TaskStats { + // task stats + created_at: Option, closed_at: Option, // waker stats @@ -105,32 +187,88 @@ struct Stats { last_wake: Option, poll_times_histogram: Histogram, + poll_stats: PollStats, } -struct Task { +struct AsyncOp { + id: Id, metadata: &'static Metadata<'static>, - fields: Vec, + source: String, } -#[derive(Debug, Default)] -struct TaskIds { - /// A counter for the pretty task IDs. - next: TaskId, +#[derive(Default)] +struct AsyncOpStats { + created_at: Option, + closed_at: Option, + resource_id: Option, + task_id: Option, + poll_stats: PollStats, +} + +impl Closable for ResourceStats { + fn closed_at(&self) -> Option { + self.closed_at + } +} + +impl Closable for TaskStats { + fn closed_at(&self) -> Option { + self.closed_at + } +} + +impl Closable for AsyncOpStats { + fn closed_at(&self) -> Option { + self.closed_at + } +} + +impl PollStats { + fn update_on_span_enter(&mut self, timestamp: SystemTime) { + if self.current_polls == 0 { + self.last_poll_started = Some(timestamp); + if self.first_poll == None { + self.first_poll = Some(timestamp); + } + self.polls += 1; + } + self.current_polls += 1; + } - /// A table that contains the span ID to pretty task ID mappings. - id_mappings: ShrinkMap, + fn update_on_span_exit(&mut self, timestamp: SystemTime) { + self.current_polls -= 1; + if self.current_polls == 0 { + if let Some(last_poll_started) = self.last_poll_started { + let elapsed = timestamp.duration_since(last_poll_started).unwrap(); + self.last_poll_ended = Some(timestamp); + self.busy_time += elapsed; + } + } + } + + fn since_last_poll(&self, timestamp: SystemTime) -> Option { + self.last_poll_started + .map(|lps| timestamp.duration_since(lps).unwrap()) + } } -impl Default for Stats { +impl Default for PollStats { fn default() -> Self { - Stats { - polls: 0, + PollStats { current_polls: 0, - created_at: None, + polls: 0, first_poll: None, last_poll_started: None, last_poll_ended: None, busy_time: Default::default(), + } + } +} + +impl Default for TaskStats { + fn default() -> Self { + TaskStats { + created_at: None, closed_at: None, wakes: 0, waker_clones: 0, @@ -139,6 +277,7 @@ impl Default for Stats { // significant figures should be in the [0-5] range and memory usage // grows exponentially with higher a sigfig poll_times_histogram: Histogram::::new(2).unwrap(), + poll_stats: PollStats::default(), } } } @@ -162,9 +301,15 @@ impl Aggregator { details_watchers: Default::default(), all_metadata: Default::default(), new_metadata: Default::default(), - tasks: TaskData::new(), - stats: TaskData::new(), - task_ids: TaskIds::default(), + tasks: IdData::default(), + task_stats: IdData::default(), + resources: IdData::default(), + resource_stats: IdData::default(), + async_ops: IdData::default(), + async_op_stats: IdData::default(), + all_poll_ops: Default::default(), + new_poll_ops: Default::default(), + ids: Ids::default(), recorder: builder .recording_path .as_ref() @@ -199,8 +344,8 @@ impl Aggregator { // a new command from a client cmd = self.rpcs.recv() => { match cmd { - Some(Command::WatchTasks(subscription)) => { - self.add_task_subscription(subscription); + Some(Command::Instrument(subscription)) => { + self.add_instrument_subscription(subscription); }, Some(Command::WatchTaskDetail(watch_request)) => { self.add_task_detail_subscription(watch_request); @@ -253,36 +398,76 @@ impl Aggregator { if !self.watchers.is_empty() && should_send { self.publish(); } - - // drop any tasks that have completed *and* whose final data has already - // been sent off. - self.drop_closed_tasks(); + self.cleanup_closed(); } } + fn cleanup_closed(&mut self) { + // drop all closed have that has completed *and* whose final data has already + // been sent off. + let now = SystemTime::now(); + let has_watchers = !self.watchers.is_empty(); + self.tasks.drop_closed( + &mut self.task_stats, + now, + self.retention, + has_watchers, + &mut self.ids, + ); + self.resources.drop_closed( + &mut self.resource_stats, + now, + self.retention, + has_watchers, + &mut self.ids, + ); + self.async_ops.drop_closed( + &mut self.async_op_stats, + now, + self.retention, + has_watchers, + &mut self.ids, + ); + } + /// Add the task subscription to the watchers after sending the first update - fn add_task_subscription(&mut self, subscription: Watch) { - tracing::debug!("new tasks subscription"); - let new_tasks = self - .tasks - .all() - .map(|(&id, task)| task.to_proto(id)) - .collect(); + fn add_instrument_subscription(&mut self, subscription: Watch) { + tracing::debug!("new instrument subscription"); let now = SystemTime::now(); - let stats_update = self - .stats - .all() - .map(|(&id, stats)| (id, stats.to_proto())) - .collect(); // Send the initial state --- if this fails, the subscription is already dead - if subscription.update(&proto::tasks::TaskUpdate { + let update = &proto::instrument::Update { + task_update: Some(proto::tasks::TaskUpdate { + new_tasks: self + .tasks + .all() + .map(|(_, value)| value.to_proto()) + .collect(), + stats_update: self.task_stats.as_proto(Include::All), + }), + resource_update: Some(proto::resources::ResourceUpdate { + new_resources: self + .resources + .all() + .map(|(_, value)| value.to_proto()) + .collect(), + stats_update: self.resource_stats.as_proto(Include::All), + new_poll_ops: (*self.all_poll_ops).clone(), + }), + async_op_update: Some(proto::async_ops::AsyncOpUpdate { + new_async_ops: self + .async_ops + .all() + .map(|(_, value)| value.to_proto()) + .collect(), + stats_update: self.async_op_stats.as_proto(Include::All), + }), + now: Some(now.into()), new_metadata: Some(proto::RegisterMetadata { metadata: (*self.all_metadata).clone(), }), - new_tasks, - stats_update, - now: Some(now.into()), - }) { + }; + + if subscription.update(update) { self.watchers.push(subscription) } } @@ -299,7 +484,7 @@ impl Aggregator { buffer, } = watch_request; tracing::debug!(id = ?id, "new task details subscription"); - if let Some(stats) = self.stats.get(&id) { + if let Some(stats) = self.task_stats.get(&id) { let (tx, rx) = mpsc::channel(buffer); let subscription = Watch(tx); let now = SystemTime::now(); @@ -333,28 +518,44 @@ impl Aggregator { } else { None }; - let new_tasks = self - .tasks - .since_last_update() - .map(|(&id, task)| task.to_proto(id)) - .collect(); - let now = SystemTime::now(); - let stats_update = self - .stats - .since_last_update() - .map(|(&id, stats)| (id, stats.to_proto())) - .collect(); - let update = proto::tasks::TaskUpdate { - new_metadata, - new_tasks, - stats_update, + let new_poll_ops = std::mem::take(&mut self.new_poll_ops); + + let now = SystemTime::now(); + let update = proto::instrument::Update { now: Some(now.into()), + new_metadata, + task_update: Some(proto::tasks::TaskUpdate { + new_tasks: self + .tasks + .since_last_update() + .map(|(_, value)| value.to_proto()) + .collect(), + stats_update: self.task_stats.as_proto(Include::UpdatedOnly), + }), + resource_update: Some(proto::resources::ResourceUpdate { + new_resources: self + .resources + .since_last_update() + .map(|(_, value)| value.to_proto()) + .collect(), + stats_update: self.resource_stats.as_proto(Include::UpdatedOnly), + new_poll_ops, + }), + async_op_update: Some(proto::async_ops::AsyncOpUpdate { + new_async_ops: self + .async_ops + .since_last_update() + .map(|(_, value)| value.to_proto()) + .collect(), + stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly), + }), }; + self.watchers - .retain_and_shrink(|watch: &Watch| watch.update(&update)); + .retain_and_shrink(|watch: &Watch| watch.update(&update)); - let stats = &self.stats; + let stats = &self.task_stats; // Assuming there are much fewer task details subscribers than there are // stats updates, iterate over `details_watchers` and compact the map. self.details_watchers.retain_and_shrink(|&id, watchers| { @@ -381,78 +582,90 @@ impl Aggregator { self.all_metadata.push(meta.into()); self.new_metadata.push(meta.into()); } + Event::Spawn { id, metadata, at, fields, + .. } => { - let task_id = self.task_ids.id_for(id); + let id = self.ids.id_for(id); self.tasks.insert( - task_id, + id, Task { + id, metadata, fields, // TODO: parents }, ); - self.stats.insert( - task_id, - Stats { - polls: 0, + + self.task_stats.insert( + id, + TaskStats { created_at: Some(at), ..Default::default() }, ); } + Event::Enter { id, at } => { - let task_id = self.task_ids.id_for(id); - let mut stats = self.stats.update_or_default(task_id); - if stats.current_polls == 0 { - stats.last_poll_started = Some(at); - if stats.first_poll == None { - stats.first_poll = Some(at); - } - stats.polls += 1; + let id = self.ids.id_for(id); + if let Some(mut task_stats) = self.task_stats.update(&id) { + task_stats.poll_stats.update_on_span_enter(at); + } + + if let Some(mut async_op_stats) = self.async_op_stats.update(&id) { + async_op_stats.poll_stats.update_on_span_enter(at); } - stats.current_polls += 1; } Event::Exit { id, at } => { - let task_id = self.task_ids.id_for(id); - let mut stats = self.stats.update_or_default(task_id); - stats.current_polls -= 1; - if stats.current_polls == 0 { - if let Some(last_poll_started) = stats.last_poll_started { - let elapsed = at.duration_since(last_poll_started).unwrap(); - stats.last_poll_ended = Some(at); - stats.busy_time += elapsed; - stats + let id = self.ids.id_for(id); + if let Some(mut task_stats) = self.task_stats.update(&id) { + task_stats.poll_stats.update_on_span_exit(at); + if let Some(since_last_poll) = task_stats.poll_stats.since_last_poll(at) { + task_stats .poll_times_histogram - .record(elapsed.as_nanos().try_into().unwrap_or(u64::MAX)) + .record(since_last_poll.as_nanos().try_into().unwrap_or(u64::MAX)) .unwrap(); } } + + if let Some(mut async_op_stats) = self.async_op_stats.update(&id) { + async_op_stats.poll_stats.update_on_span_exit(at); + } } Event::Close { id, at } => { - let task_id = self.task_ids.id_for(id); - self.stats.update_or_default(task_id).closed_at = Some(at); + let id = self.ids.id_for(id); + if let Some(mut task_stats) = self.task_stats.update(&id) { + task_stats.closed_at = Some(at); + } + + if let Some(mut resource_stats) = self.resource_stats.update(&id) { + resource_stats.closed_at = Some(at); + } + + if let Some(mut async_op_stats) = self.async_op_stats.update(&id) { + async_op_stats.closed_at = Some(at); + } } Event::Waker { id, op, at } => { - let task_id = self.task_ids.id_for(id); + let id = self.ids.id_for(id); // It's possible for wakers to exist long after a task has // finished. We don't want those cases to create a "new" // task that isn't closed, just to insert some waker stats. // // It may be useful to eventually be able to report about // "wasted" waker ops, but we'll leave that for another time. - if let Some(mut stats) = self.stats.update(&task_id) { + if let Some(mut task_stats) = self.task_stats.update(&id) { match op { WakeOp::Wake | WakeOp::WakeByRef => { - stats.wakes += 1; - stats.last_wake = Some(at); + task_stats.wakes += 1; + task_stats.last_wake = Some(at); // Note: `Waker::wake` does *not* call the `drop` // implementation, so waking by value doesn't @@ -463,64 +676,128 @@ impl Aggregator { // see // https://github.com/rust-lang/rust/blob/673d0db5e393e9c64897005b470bfeb6d5aec61b/library/core/src/task/wake.rs#L211-L212 if let WakeOp::Wake = op { - stats.waker_drops += 1; + task_stats.waker_drops += 1; } } WakeOp::Clone => { - stats.waker_clones += 1; + task_stats.waker_clones += 1; } WakeOp::Drop => { - stats.waker_drops += 1; + task_stats.waker_drops += 1; } } } } - } - } - - fn drop_closed_tasks(&mut self) { - let tasks = &mut self.tasks; - let stats = &mut self.stats; - let task_ids = &mut self.task_ids; - let has_watchers = !self.watchers.is_empty(); - let now = SystemTime::now(); - let retention = self.retention; - // drop stats for closed tasks if they have been updated - tracing::trace!( - ?self.retention, - self.has_watchers = has_watchers, - "dropping closed tasks..." - ); + Event::Resource { + at, + id, + metadata, + kind, + concrete_type, + .. + } => { + let id = self.ids.id_for(id); + self.resources.insert( + id, + Resource { + id, + kind, + metadata, + concrete_type, + }, + ); - let mut dropped_stats = false; - stats.retain_and_shrink(|id, stats, dirty| { - if let Some(closed) = stats.closed_at { - let closed_for = now.duration_since(closed).unwrap_or_default(); - let should_drop = - // if there are any clients watching, retain all dirty tasks regardless of age - (dirty && has_watchers) - || closed_for > retention; - tracing::trace!( - stats.id = ?id, - stats.closed_at = ?closed, - stats.closed_for = ?closed_for, - stats.dirty = dirty, - should_drop, + self.resource_stats.insert( + id, + ResourceStats { + created_at: Some(at), + ..Default::default() + }, ); - dropped_stats = should_drop; - return !should_drop; } - true - }); + Event::PollOp { + metadata, + at, + resource_id, + op_name, + async_op_id, + task_id, + readiness, + } => { + let async_op_id = self.ids.id_for(async_op_id); + let resource_id = self.ids.id_for(resource_id); + let task_id = self.ids.id_for(task_id); + + let mut async_op_stats = self.async_op_stats.update_or_default(async_op_id); + async_op_stats.poll_stats.polls += 1; + async_op_stats.task_id.get_or_insert(task_id); + async_op_stats.resource_id.get_or_insert(resource_id); + + if readiness == proto::Readiness::Pending + && async_op_stats.poll_stats.first_poll.is_none() + { + async_op_stats.poll_stats.first_poll = Some(at); + } + + let poll_op = proto::resources::PollOp { + metadata: Some(metadata.into()), + resource_id: Some(resource_id.into()), + name: op_name, + task_id: Some(task_id.into()), + async_op_id: Some(async_op_id.into()), + readiness: readiness as i32, + }; - // If we dropped any stats, drop task static data and IDs as - if dropped_stats { - // drop closed tasks which no longer have stats. - tasks.retain_and_shrink(|id, _, _| stats.contains(id)); + self.all_poll_ops.push(poll_op.clone()); + self.new_poll_ops.push(poll_op); + } - task_ids.retain_only(&*tasks); + Event::StateUpdate { + resource_id, + update, + .. + } => { + let resource_id = self.ids.id_for(resource_id); + if let Some(mut stats) = self.resource_stats.update(&resource_id) { + let upd_key = (&update.field).into(); + match stats.attributes.entry(upd_key) { + Entry::Occupied(ref mut attr) => { + update_attribute(attr.get_mut(), update); + } + Entry::Vacant(attr) => { + attr.insert(update.into()); + } + } + } + } + + Event::AsyncResourceOp { + at, + id, + source, + metadata, + .. + } => { + let id = self.ids.id_for(id); + self.async_ops.insert( + id, + AsyncOp { + id, + metadata, + source, + }, + ); + + self.async_op_stats.insert( + id, + AsyncOpStats { + created_at: Some(at), + ..Default::default() + }, + ); + } } } } @@ -552,35 +829,26 @@ impl Watch { } } -impl Stats { - fn total_time(&self) -> Option { - self.closed_at.and_then(|end| { - self.created_at - .and_then(|start| end.duration_since(start).ok()) - }) - } +impl ToProto for PollStats { + type Output = proto::PollStats; - fn to_proto(&self) -> proto::tasks::Stats { - proto::tasks::Stats { + fn to_proto(&self) -> Self::Output { + proto::PollStats { polls: self.polls, - created_at: self.created_at.map(Into::into), first_poll: self.first_poll.map(Into::into), last_poll_started: self.last_poll_started.map(Into::into), last_poll_ended: self.last_poll_ended.map(Into::into), busy_time: Some(self.busy_time.into()), - total_time: self.total_time().map(Into::into), - wakes: self.wakes, - waker_clones: self.waker_clones, - waker_drops: self.waker_drops, - last_wake: self.last_wake.map(Into::into), } } } -impl Task { - fn to_proto(&self, id: u64) -> proto::tasks::Task { +impl ToProto for Task { + type Output = proto::tasks::Task; + + fn to_proto(&self) -> Self::Output { proto::tasks::Task { - id: Some(id.into()), + id: Some(self.id.into()), // TODO: more kinds of tasks... kind: proto::tasks::task::Kind::Spawn as i32, metadata: Some(self.metadata.into()), @@ -590,10 +858,103 @@ impl Task { } } -// === impl TaskIds === +impl ToProto for TaskStats { + type Output = proto::tasks::Stats; + + fn to_proto(&self) -> Self::Output { + proto::tasks::Stats { + poll_stats: Some(self.poll_stats.to_proto()), + created_at: self.created_at.map(Into::into), + total_time: total_time(self.created_at, self.closed_at).map(Into::into), + wakes: self.wakes, + waker_clones: self.waker_clones, + waker_drops: self.waker_drops, + last_wake: self.last_wake.map(Into::into), + } + } +} + +impl ToProto for Resource { + type Output = proto::resources::Resource; -impl TaskIds { - fn id_for(&mut self, span_id: span::Id) -> TaskId { + fn to_proto(&self) -> Self::Output { + proto::resources::Resource { + id: Some(self.id.into()), + kind: Some(self.kind.clone()), + metadata: Some(self.metadata.into()), + concrete_type: self.concrete_type.clone(), + } + } +} + +impl ToProto for ResourceStats { + type Output = proto::resources::Stats; + + fn to_proto(&self) -> Self::Output { + let attributes = self.attributes.values().cloned().collect(); + proto::resources::Stats { + created_at: self.created_at.map(Into::into), + total_time: total_time(self.created_at, self.closed_at).map(Into::into), + attributes, + } + } +} + +impl ToProto for AsyncOp { + type Output = proto::async_ops::AsyncOp; + + fn to_proto(&self) -> Self::Output { + proto::async_ops::AsyncOp { + id: Some(self.id.into()), + metadata: Some(self.metadata.into()), + source: self.source.clone(), + } + } +} + +impl ToProto for AsyncOpStats { + type Output = proto::async_ops::Stats; + + fn to_proto(&self) -> Self::Output { + proto::async_ops::Stats { + poll_stats: Some(self.poll_stats.to_proto()), + created_at: self.created_at.map(Into::into), + total_time: total_time(self.created_at, self.closed_at).map(Into::into), + + resource_id: self.resource_id.map(Into::into), + task_id: self.task_id.map(Into::into), + } + } +} + +impl From<&proto::Field> for FieldKey { + fn from(field: &proto::Field) -> Self { + let meta_id = field + .metadata_id + .as_ref() + .expect("field misses metadata id") + .id; + let field_name = field.name.clone().expect("field misses name"); + FieldKey { + meta_id, + field_name, + } + } +} + +impl From for Attribute { + fn from(upd: AttributeUpdate) -> Self { + Attribute { + field: Some(upd.field), + unit: upd.unit, + } + } +} + +// === impl Ids === + +impl Ids { + fn id_for(&mut self, span_id: span::Id) -> Id { match self.id_mappings.entry(span_id) { Entry::Occupied(entry) => *entry.get(), Entry::Vacant(entry) => { @@ -606,9 +967,8 @@ impl TaskIds { } #[inline] - fn retain_only(&mut self, tasks: &TaskData) { - self.id_mappings - .retain(|_, task_id| tasks.contains(task_id)); + fn remove_all(&mut self, ids: &HashSet) { + self.id_mappings.retain(|_, id| !ids.contains(id)); } } @@ -618,3 +978,58 @@ fn serialize_histogram(histogram: &Histogram) -> Result, V2Serializ serializer.serialize(histogram, &mut buf)?; Ok(buf) } + +fn total_time(created_at: Option, closed_at: Option) -> Option { + let end = closed_at?; + let start = created_at?; + end.duration_since(start).ok() +} + +fn update_attribute(attribute: &mut Attribute, update: AttributeUpdate) { + use proto::field::Value::*; + let attribute_val = attribute.field.as_mut().and_then(|a| a.value.as_mut()); + let update_val = update.field.value; + let update_name = update.field.name; + + match (attribute_val, update_val) { + (Some(BoolVal(v)), Some(BoolVal(upd))) => *v = upd, + + (Some(StrVal(v)), Some(StrVal(upd))) => *v = upd, + + (Some(DebugVal(v)), Some(DebugVal(upd))) => *v = upd, + + (Some(U64Val(v)), Some(U64Val(upd))) => match update.op { + Some(AttributeUpdateOp::Add) => *v += upd, + + Some(AttributeUpdateOp::Sub) => *v -= upd, + + Some(AttributeUpdateOp::Override) => *v = upd, + + None => tracing::warn!( + "numeric attribute update {:?} needs to have an op field", + update_name + ), + }, + + (Some(I64Val(v)), Some(I64Val(upd))) => match update.op { + Some(AttributeUpdateOp::Add) => *v += upd, + + Some(AttributeUpdateOp::Sub) => *v -= upd, + + Some(AttributeUpdateOp::Override) => *v = upd, + + None => tracing::warn!( + "numeric attribute update {:?} needs to have an op field", + update_name + ), + }, + + (val, update) => { + tracing::warn!( + "attribute {:?} cannot be updated by update {:?}", + val, + update + ); + } + } +} diff --git a/console-subscriber/src/aggregator/task_data.rs b/console-subscriber/src/aggregator/task_data.rs deleted file mode 100644 index ddf5498bd..000000000 --- a/console-subscriber/src/aggregator/task_data.rs +++ /dev/null @@ -1,82 +0,0 @@ -use super::{shrink::ShrinkMap, TaskId}; -use std::ops::{Deref, DerefMut}; - -pub(crate) struct TaskData { - data: ShrinkMap, -} - -pub(crate) struct Updating<'a, T>(&'a mut (T, bool)); - -// === impl TaskData === - -impl TaskData { - pub(crate) fn new() -> Self { - Self { - data: ShrinkMap::new(), - } - } - - pub(crate) fn update_or_default(&mut self, id: TaskId) -> Updating<'_, T> - where - T: Default, - { - Updating(self.data.entry(id).or_default()) - } - - pub(crate) fn update(&mut self, id: &TaskId) -> Option> { - self.data.get_mut(id).map(Updating) - } - - pub(crate) fn insert(&mut self, id: TaskId, data: T) { - self.data.insert(id, (data, true)); - } - - pub(crate) fn since_last_update(&mut self) -> impl Iterator { - self.data.iter_mut().filter_map(|(id, (data, dirty))| { - if *dirty { - *dirty = false; - Some((id, data)) - } else { - None - } - }) - } - - pub(crate) fn all(&self) -> impl Iterator { - self.data.iter().map(|(id, (data, _))| (id, data)) - } - - pub(crate) fn get(&self, id: &TaskId) -> Option<&T> { - self.data.get(id).map(|(data, _)| data) - } - - pub(crate) fn contains(&self, id: &TaskId) -> bool { - self.data.contains_key(id) - } - - pub(crate) fn retain_and_shrink(&mut self, mut f: impl FnMut(&TaskId, &mut T, bool) -> bool) { - self.data - .retain_and_shrink(|id, (data, dirty)| f(id, data, *dirty)); - } -} - -// === impl Updating === - -impl<'a, T> Deref for Updating<'a, T> { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.0 .0 - } -} - -impl<'a, T> DerefMut for Updating<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 .0 - } -} - -impl<'a, T> Drop for Updating<'a, T> { - fn drop(&mut self) { - self.0 .1 = true; - } -} diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index a24265d78..0253bb19a 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1,15 +1,16 @@ use console_api as proto; +use proto::resources::resource; use serde::Serialize; -use tokio::sync::{mpsc, oneshot}; - use std::{ + cell::RefCell, fmt, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::Arc, time::{Duration, SystemTime}, }; +use thread_local::ThreadLocal; +use tokio::sync::{mpsc, oneshot}; use tracing_core::{ - field::{self, Visit}, span, subscriber::{self, Subscriber}, Metadata, @@ -21,17 +22,23 @@ mod builder; mod callsites; mod init; mod record; +mod stack; pub(crate) mod sync; +mod visitors; use aggregator::Aggregator; pub use builder::Builder; use callsites::Callsites; +use stack::SpanStack; +use visitors::{AsyncOpVisitor, FieldVisitor, ResourceVisitor, WakerVisitor}; pub use init::{build, init}; -use crate::aggregator::TaskId; +use crate::aggregator::Id; +use crate::visitors::{PollOpVisitor, StateUpdateVisitor}; pub struct TasksLayer { + current_spans: ThreadLocal>, tx: mpsc::Sender, flush: Arc, /// When the channel capacity goes under this number, a flush in the aggregator @@ -51,6 +58,26 @@ pub struct TasksLayer { /// there's only one async runtime library in use, but if there are multiple, /// they might all have their own sets of waker ops. waker_callsites: Callsites<16>, + + /// Set of callsites for spans reprenting resources + /// + /// TODO: Take some time to determine more reasonable numbers + resource_callsites: Callsites<32>, + + /// Set of callsites for spans reprensing async operations on resources + /// + /// TODO: Take some time to determine more reasonable numbers + async_op_callsites: Callsites<32>, + + /// Set of callsites for events reprensing poll operation invocations on resources + /// + /// TODO: Take some time to determine more reasonable numbers + poll_op_callsites: Callsites<32>, + + /// Set of callsites for events reprensing state attribute state updates on resources + /// + /// TODO: Take some time to determine more reasonable numbers + state_update_callsites: Callsites<32>, } pub struct Server { @@ -60,31 +87,22 @@ pub struct Server { client_buffer: usize, } -struct FieldVisitor { - fields: Vec, - meta_id: proto::MetaId, -} - -struct WakerVisitor { - id: Option, - op: Option, -} - struct Watch(mpsc::Sender>); enum Command { - WatchTasks(Watch), + Instrument(Watch), WatchTaskDetail(WatchRequest), Pause, Resume, } struct WatchRequest { - id: TaskId, + id: Id, stream_sender: oneshot::Sender>>, buffer: usize, } +#[derive(Debug)] enum Event { Metadata(&'static Metadata<'static>), Spawn { @@ -110,9 +128,51 @@ enum Event { op: WakeOp, at: SystemTime, }, + Resource { + id: span::Id, + metadata: &'static Metadata<'static>, + at: SystemTime, + concrete_type: String, + kind: resource::Kind, + }, + PollOp { + metadata: &'static Metadata<'static>, + at: SystemTime, + resource_id: span::Id, + op_name: String, + async_op_id: span::Id, + task_id: span::Id, + readiness: proto::Readiness, + }, + StateUpdate { + metadata: &'static Metadata<'static>, + at: SystemTime, + resource_id: span::Id, + update: AttributeUpdate, + }, + AsyncResourceOp { + id: span::Id, + metadata: &'static Metadata<'static>, + at: SystemTime, + source: String, + }, } -#[derive(Clone, Copy, Serialize)] +#[derive(Debug, Clone)] +struct AttributeUpdate { + field: proto::Field, + op: Option, + unit: Option, +} + +#[derive(Debug, Clone)] +enum AttributeUpdateOp { + Add, + Override, + Sub, +} + +#[derive(Clone, Debug, Copy, Serialize)] enum WakeOp { Wake, WakeByRef, @@ -170,6 +230,11 @@ impl TasksLayer { flush_under_capacity, spawn_callsites: Callsites::default(), waker_callsites: Callsites::default(), + resource_callsites: Callsites::default(), + async_op_callsites: Callsites::default(), + poll_op_callsites: Callsites::default(), + state_update_callsites: Callsites::default(), + current_spans: ThreadLocal::new(), }; (layer, server) } @@ -187,6 +252,14 @@ impl TasksLayer { self.spawn_callsites.contains(meta) } + fn is_resource(&self, meta: &'static Metadata<'static>) -> bool { + self.resource_callsites.contains(meta) + } + + fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool { + self.async_op_callsites.contains(meta) + } + fn is_id_spawned(&self, id: &span::Id, cx: &Context<'_, S>) -> bool where S: Subscriber + for<'a> LookupSpan<'a>, @@ -196,6 +269,44 @@ impl TasksLayer { .unwrap_or(false) } + fn is_id_resource(&self, id: &span::Id, cx: &Context<'_, S>) -> bool + where + S: Subscriber + for<'a> LookupSpan<'a>, + { + cx.span(id) + .map(|span| self.is_resource(span.metadata())) + .unwrap_or(false) + } + + fn is_id_async_op(&self, id: &span::Id, cx: &Context<'_, S>) -> bool + where + S: Subscriber + for<'a> LookupSpan<'a>, + { + cx.span(id) + .map(|span| self.is_async_op(span.metadata())) + .unwrap_or(false) + } + + fn is_id_tracked(&self, id: &span::Id, cx: &Context<'_, S>) -> bool + where + S: Subscriber + for<'a> LookupSpan<'a>, + { + self.is_id_async_op(id, cx) || self.is_id_resource(id, cx) || self.is_id_spawned(id, cx) + } + + fn first_entered

(&self, stack: &SpanStack, p: P) -> Option + where + P: Fn(&span::Id) -> bool, + { + stack + .stack() + .iter() + .rev() + .find(|id| p(id.id())) + .map(|id| id.id()) + .cloned() + } + fn send(&self, event: Event) { use mpsc::error::TrySendError; @@ -224,21 +335,20 @@ where S: Subscriber + for<'a> LookupSpan<'a>, { fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest { - if meta.name() == "runtime.spawn" - // back compat until tokio is updated to use the standardized naming - // scheme - || (meta.name() == "task" && meta.target() == "tokio::task") - { - self.spawn_callsites.insert(meta); - } else if meta.target() == "runtime::waker" - // back compat until tokio is updated to use the standardized naming - // scheme - || meta.target() == "tokio::task::waker" - { - self.waker_callsites.insert(meta); + match (meta.name(), meta.target()) { + ("runtime.spawn", _) | ("task", "tokio::task") => self.spawn_callsites.insert(meta), + (_, "runtime::waker") | (_, "tokio::task::waker") => self.waker_callsites.insert(meta), + (ResourceVisitor::RES_SPAN_NAME, _) => self.resource_callsites.insert(meta), + (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => self.async_op_callsites.insert(meta), + (_, PollOpVisitor::POLL_OP_EVENT_TARGET) | (_, "tokio::resource::poll_op") => { + self.poll_op_callsites.insert(meta) + } + (_, StateUpdateVisitor::STATE_UPDATE_EVENT_TARGET) + | (_, "tokio::resource::state_update") => self.state_update_callsites.insert(meta), + (_, _) => {} } - self.send(Event::Metadata(meta)); + self.send(Event::Metadata(meta)); subscriber::Interest::always() } @@ -246,43 +356,119 @@ where let metadata = attrs.metadata(); if self.is_spawn(metadata) { let at = SystemTime::now(); - let mut fields_collector = FieldVisitor { - fields: Vec::default(), - meta_id: metadata.into(), - }; - attrs.record(&mut fields_collector); - + let mut field_visitor = FieldVisitor::new(metadata.into()); + attrs.record(&mut field_visitor); self.send(Event::Spawn { id: id.clone(), at, metadata, - fields: fields_collector.fields, + fields: field_visitor.result(), }); + } else if self.is_resource(metadata) { + let mut resource_visitor = ResourceVisitor::default(); + attrs.record(&mut resource_visitor); + if let Some((concrete_type, kind)) = resource_visitor.result() { + let at = SystemTime::now(); + self.send(Event::Resource { + id: id.clone(), + metadata, + at, + concrete_type, + kind, + }); + } // else unknown resource span format + } else if self.is_async_op(metadata) { + let mut async_op_visitor = AsyncOpVisitor::default(); + attrs.record(&mut async_op_visitor); + if let Some(source) = async_op_visitor.result() { + let at = SystemTime::now(); + self.send(Event::AsyncResourceOp { + id: id.clone(), + at, + metadata, + source, + }); + } + // else async op span needs to have a source field } } - fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) { + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + let metadata = event.metadata(); if self.waker_callsites.contains(event.metadata()) { let at = SystemTime::now(); - let mut visitor = WakerVisitor { id: None, op: None }; + let mut visitor = WakerVisitor::default(); event.record(&mut visitor); - - if let WakerVisitor { - id: Some(id), - op: Some(op), - } = visitor - { + if let Some((id, op)) = visitor.result() { self.send(Event::Waker { id, op, at }); } - // else... - // unknown waker event... what to do? can't trace it from here... + // else unknown waker event... what to do? can't trace it from here... + } else if self.poll_op_callsites.contains(event.metadata()) { + match ctx.current_span().id() { + Some(resource_id) if self.is_id_resource(resource_id, &ctx) => { + let mut poll_op_visitor = PollOpVisitor::default(); + event.record(&mut poll_op_visitor); + if let Some((op_name, readiness)) = poll_op_visitor.result() { + let task_and_async_op_ids = self.current_spans.get().and_then(|stack| { + let stack = stack.borrow(); + let task_id = + self.first_entered(&stack, |id| self.is_id_spawned(id, &ctx))?; + let async_op_id = + self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?; + Some((task_id, async_op_id)) + }); + + if let Some((task_id, async_op_id)) = task_and_async_op_ids { + let at = SystemTime::now(); + self.send(Event::PollOp { + metadata, + at, + resource_id: resource_id.clone(), + op_name, + async_op_id, + task_id, + readiness, + }); + } + // else poll op event should be emitted in the context of an async op and task spans + } + } + _ => {} // poll op event should be emitted in the context of a resource span + } + } else if self.state_update_callsites.contains(event.metadata()) { + match ctx.current_span().id() { + Some(resource_id) if self.is_id_resource(resource_id, &ctx) => { + let meta_id = event.metadata().into(); + let mut state_update_visitor = StateUpdateVisitor::new(meta_id); + event.record(&mut state_update_visitor); + if let Some(update) = state_update_visitor.result() { + let at = SystemTime::now(); + self.send(Event::StateUpdate { + metadata, + at, + resource_id: resource_id.clone(), + update, + }); + } + } + _ => eprintln!( + "state update event should be emitted in the context of a resource span: {:?}", + event + ), + } } } fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) { - if !self.is_id_spawned(id, &cx) { + if !self.is_id_tracked(id, &cx) { return; } + + self.current_spans + .get_or_default() + .borrow_mut() + .push(id.clone()); + self.send(Event::Enter { at: SystemTime::now(), id: id.clone(), @@ -290,9 +476,14 @@ where } fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) { - if !self.is_id_spawned(id, &cx) { + if !self.is_id_tracked(id, &cx) { return; } + + if let Some(spans) = self.current_spans.get() { + spans.borrow_mut().pop(id); + } + self.send(Event::Exit { at: SystemTime::now(), id: id.clone(), @@ -300,9 +491,10 @@ where } fn on_close(&self, id: span::Id, cx: Context<'_, S>) { - if !self.is_id_spawned(&id, &cx) { + if !self.is_id_tracked(&id, &cx) { return; } + self.send(Event::Close { at: SystemTime::now(), id, @@ -343,7 +535,9 @@ impl Server { let aggregate = tokio::spawn(aggregate.run()); let addr = self.addr; let res = builder - .add_service(proto::tasks::tasks_server::TasksServer::new(self)) + .add_service(proto::instrument::instrument_server::InstrumentServer::new( + self, + )) .serve(addr) .await; aggregate.abort(); @@ -352,15 +546,15 @@ impl Server { } #[tonic::async_trait] -impl proto::tasks::tasks_server::Tasks for Server { - type WatchTasksStream = - tokio_stream::wrappers::ReceiverStream>; +impl proto::instrument::instrument_server::Instrument for Server { + type WatchUpdatesStream = + tokio_stream::wrappers::ReceiverStream>; type WatchTaskDetailsStream = tokio_stream::wrappers::ReceiverStream>; - async fn watch_tasks( + async fn watch_updates( &self, - req: tonic::Request, - ) -> Result, tonic::Status> { + req: tonic::Request, + ) -> Result, tonic::Status> { match req.remote_addr() { Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"), None => tracing::debug!(client.addr = %"", "starting a new watch"), @@ -369,7 +563,7 @@ impl proto::tasks::tasks_server::Tasks for Server { tonic::Status::internal("cannot start new watch, aggregation task is not running") })?; let (tx, rx) = mpsc::channel(self.client_buffer); - permit.send(Command::WatchTasks(Watch(tx))); + permit.send(Command::Instrument(Watch(tx))); tracing::debug!("watch started"); let stream = tokio_stream::wrappers::ReceiverStream::new(rx); Ok(tonic::Response::new(stream)) @@ -377,7 +571,7 @@ impl proto::tasks::tasks_server::Tasks for Server { async fn watch_task_details( &self, - req: tonic::Request, + req: tonic::Request, ) -> Result, tonic::Status> { let task_id = req .into_inner() @@ -407,87 +601,21 @@ impl proto::tasks::tasks_server::Tasks for Server { async fn pause( &self, - _req: tonic::Request, - ) -> Result, tonic::Status> { + _req: tonic::Request, + ) -> Result, tonic::Status> { self.subscribe.send(Command::Pause).await.map_err(|_| { tonic::Status::internal("cannot pause, aggregation task is not running") })?; - Ok(tonic::Response::new(proto::tasks::PauseResponse {})) + Ok(tonic::Response::new(proto::instrument::PauseResponse {})) } async fn resume( &self, - _req: tonic::Request, - ) -> Result, tonic::Status> { + _req: tonic::Request, + ) -> Result, tonic::Status> { self.subscribe.send(Command::Resume).await.map_err(|_| { tonic::Status::internal("cannot resume, aggregation task is not running") })?; - Ok(tonic::Response::new(proto::tasks::ResumeResponse {})) - } -} - -impl Visit for FieldVisitor { - fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) { - self.fields.push(proto::Field { - name: Some(field.name().into()), - value: Some(value.into()), - metadata_id: Some(self.meta_id.clone()), - }); - } - - fn record_i64(&mut self, field: &tracing_core::Field, value: i64) { - self.fields.push(proto::Field { - name: Some(field.name().into()), - value: Some(value.into()), - metadata_id: Some(self.meta_id.clone()), - }); - } - - fn record_u64(&mut self, field: &tracing_core::Field, value: u64) { - self.fields.push(proto::Field { - name: Some(field.name().into()), - value: Some(value.into()), - metadata_id: Some(self.meta_id.clone()), - }); - } - - fn record_bool(&mut self, field: &tracing_core::Field, value: bool) { - self.fields.push(proto::Field { - name: Some(field.name().into()), - value: Some(value.into()), - metadata_id: Some(self.meta_id.clone()), - }); - } - - fn record_str(&mut self, field: &tracing_core::Field, value: &str) { - self.fields.push(proto::Field { - name: Some(field.name().into()), - value: Some(value.into()), - metadata_id: Some(self.meta_id.clone()), - }); - } -} - -impl Visit for WakerVisitor { - fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) { - // don't care (yet?) - } - - fn record_u64(&mut self, field: &tracing_core::Field, value: u64) { - if field.name() == "task.id" { - self.id = Some(span::Id::from_u64(value)); - } - } - - fn record_str(&mut self, field: &tracing_core::Field, value: &str) { - if field.name() == "op" { - self.op = Some(match value { - "waker.wake" => WakeOp::Wake, - "waker.wake_by_ref" => WakeOp::WakeByRef, - "waker.clone" => WakeOp::Clone, - "waker.drop" => WakeOp::Drop, - _ => return, - }); - } + Ok(tonic::Response::new(proto::instrument::ResumeResponse {})) } } diff --git a/console-subscriber/src/stack.rs b/console-subscriber/src/stack.rs new file mode 100644 index 000000000..a926bf61a --- /dev/null +++ b/console-subscriber/src/stack.rs @@ -0,0 +1,55 @@ +use tracing_core::span::Id; + +// This has been copied from tracing-subscriber. Once the library adds +// the ability to iterate over entered spans, this code will +// no longer be needed here +// +// https://github.com/tokio-rs/tracing/blob/master/tracing-subscriber/src/registry/stack.rs +#[derive(Debug, Clone)] +pub(crate) struct ContextId { + id: Id, + duplicate: bool, +} + +impl ContextId { + pub fn id(&self) -> &Id { + &self.id + } +} + +/// `SpanStack` tracks what spans are currently executing on a thread-local basis. +/// +/// A "separate current span" for each thread is a semantic choice, as each span +/// can be executing in a different thread. +#[derive(Debug, Default)] +pub(crate) struct SpanStack { + stack: Vec, +} + +impl SpanStack { + #[inline] + pub(crate) fn push(&mut self, id: Id) -> bool { + let duplicate = self.stack.iter().any(|i| i.id == id); + self.stack.push(ContextId { id, duplicate }); + !duplicate + } + + #[inline] + pub(crate) fn pop(&mut self, expected_id: &Id) -> bool { + if let Some((idx, _)) = self + .stack + .iter() + .enumerate() + .rev() + .find(|(_, ctx_id)| ctx_id.id == *expected_id) + { + let ContextId { id: _, duplicate } = self.stack.remove(idx); + return !duplicate; + } + false + } + + pub(crate) fn stack(&self) -> &Vec { + &self.stack + } +} diff --git a/console-subscriber/src/visitors.rs b/console-subscriber/src/visitors.rs new file mode 100644 index 000000000..18deefedc --- /dev/null +++ b/console-subscriber/src/visitors.rs @@ -0,0 +1,385 @@ +//! These visitors are respondible for extracing the relevan +//! fields from tracing metadata and producing the parts +//! needed to construct `Event` instances. + +use super::{AttributeUpdate, AttributeUpdateOp, WakeOp}; +use console_api as proto; +use proto::resources::resource; +use tracing_core::{ + field::{self, Visit}, + span, +}; + +/// Used to extract the fields needed to construct +/// an Event::Resource from the metadata of a tracing span +/// that has the following shape: +/// +/// tracing::trace_span!( +/// "runtime.resource", +/// concrete_type = "Sleep", +/// kind = "timer", +/// ); +/// +/// Fields: +/// concrete_type - indicates the concrete rust type for this resource +/// kind - indicates the type of resource (i.e. timer, sync, io ) +#[derive(Default)] +pub(crate) struct ResourceVisitor { + concrete_type: Option, + kind: Option, +} + +/// Used to extract all fields from the metadata +/// of a tracing span +pub(crate) struct FieldVisitor { + fields: Vec, + meta_id: proto::MetaId, +} + +/// Used to extract the fields needed to construct +/// an Event::AsyncOp from the metadata of a tracing span +/// that has the following shape: +/// +/// tracing::trace_span!( +/// "runtime.resource.async_op", +/// source = "Sleep::new_timeout", +/// ); +/// +/// Fields: +/// source - the method which has created an instance of this async operation +#[derive(Default)] +pub(crate) struct AsyncOpVisitor { + source: Option, +} + +/// Used to extract the fields needed to construct +/// an Event::Waker from the metadata of a tracing span +/// that has the following shape: +/// +/// tracing::trace!( +/// target: "tokio::task::waker", +/// op = "waker.clone", +/// task.id = id.into_u64(), +/// ); +/// +/// Fields: +/// task.id - the id of the task this waker will wake +/// op - the operation associated with this waker event +#[derive(Default)] +pub(crate) struct WakerVisitor { + id: Option, + op: Option, +} + +/// Used to extract the fields needed to construct +/// an Event::PollOp from the metadata of a tracing event +/// that has the following shape: +/// +/// tracing::trace!( +/// target: "runtime::resource::poll_op", +/// op_name = "poll_elapsed", +/// readiness = "pending" +/// ); +/// +/// Fields: +/// op_name - the name of this resource poll operation +/// readiness - the result of invoking this poll op, describing its readiness +#[derive(Default)] +pub(crate) struct PollOpVisitor { + op_name: Option, + readiness: Option, +} + +/// Used to extract the fields needed to construct +/// an Event::StateUpdate from the metadata of a tracing event +/// that has the following shape: +/// +/// tracing::trace!( +/// target: "runtime::resource::state_update", +/// duration = duration, +/// duration.unit = "ms", +/// duration.op = "override", +/// ); +/// +/// Fields: +/// attribute_name - a field value for a field that has the name of the resource attribute being updated +/// value - the value for this update +/// unit - the unit for the value being updated (e.g. ms, s, bytes) +/// op - the operation that this update performs to the value of the resource attribute (one of: ovr, sub, add) +pub(crate) struct StateUpdateVisitor { + meta_id: proto::MetaId, + field: Option, + unit: Option, + op: Option, +} + +impl ResourceVisitor { + pub(crate) const RES_SPAN_NAME: &'static str = "runtime.resource"; + const RES_CONCRETE_TYPE_FIELD_NAME: &'static str = "concrete_type"; + const RES_KIND_FIELD_NAME: &'static str = "kind"; + const RES_KIND_TIMER: &'static str = "timer"; + + pub(crate) fn result(self) -> Option<(String, resource::Kind)> { + self.concrete_type.zip(self.kind) + } +} + +impl Visit for ResourceVisitor { + fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {} + + fn record_str(&mut self, field: &tracing_core::Field, value: &str) { + match field.name() { + Self::RES_CONCRETE_TYPE_FIELD_NAME => self.concrete_type = Some(value.to_string()), + Self::RES_KIND_FIELD_NAME => { + let kind = Some(match value { + Self::RES_KIND_TIMER => { + resource::kind::Kind::Known(resource::kind::Known::Timer as i32) + } + other => resource::kind::Kind::Other(other.to_string()), + }); + self.kind = Some(resource::Kind { kind }); + } + _ => {} + } + } +} + +impl FieldVisitor { + pub(crate) fn new(meta_id: proto::MetaId) -> Self { + FieldVisitor { + fields: Vec::default(), + meta_id, + } + } + pub(crate) fn result(self) -> Vec { + self.fields + } +} + +impl Visit for FieldVisitor { + fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) { + self.fields.push(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + + fn record_i64(&mut self, field: &tracing_core::Field, value: i64) { + self.fields.push(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + + fn record_u64(&mut self, field: &tracing_core::Field, value: u64) { + self.fields.push(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + + fn record_bool(&mut self, field: &tracing_core::Field, value: bool) { + self.fields.push(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + + fn record_str(&mut self, field: &tracing_core::Field, value: &str) { + self.fields.push(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } +} + +impl AsyncOpVisitor { + pub(crate) const ASYNC_OP_SPAN_NAME: &'static str = "runtime.resource.async_op"; + const ASYNC_OP_SRC_FIELD_NAME: &'static str = "source"; + + pub(crate) fn result(self) -> Option { + self.source + } +} + +impl Visit for AsyncOpVisitor { + fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {} + + fn record_str(&mut self, field: &tracing_core::Field, value: &str) { + if field.name() == Self::ASYNC_OP_SRC_FIELD_NAME { + self.source = Some(value.to_string()); + } + } +} + +impl WakerVisitor { + const WAKE: &'static str = "waker.wake"; + const WAKE_BY_REF: &'static str = "waker.wake_by_ref"; + const CLONE: &'static str = "waker.clone"; + const DROP: &'static str = "waker.drop"; + const TASK_ID_FIELD_NAME: &'static str = "task.id"; + + pub(crate) fn result(self) -> Option<(span::Id, WakeOp)> { + self.id.zip(self.op) + } +} + +impl Visit for WakerVisitor { + fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) { + // don't care (yet?) + } + + fn record_u64(&mut self, field: &tracing_core::Field, value: u64) { + if field.name() == Self::TASK_ID_FIELD_NAME { + self.id = Some(span::Id::from_u64(value)); + } + } + + fn record_str(&mut self, field: &tracing_core::Field, value: &str) { + if field.name() == "op" { + self.op = Some(match value { + Self::WAKE => WakeOp::Wake, + Self::WAKE_BY_REF => WakeOp::WakeByRef, + Self::CLONE => WakeOp::Clone, + Self::DROP => WakeOp::Drop, + _ => return, + }); + } + } +} + +impl PollOpVisitor { + pub(crate) const POLL_OP_EVENT_TARGET: &'static str = "runtime::resource::poll_op"; + const OP_NAME_FIELD_NAME: &'static str = "op_name"; + const OP_READINESS_FIELD_NAME: &'static str = "readiness"; + const OP_READINESS_READY: &'static str = "ready"; + const OP_READINESS_PENDING: &'static str = "pending"; + + pub(crate) fn result(self) -> Option<(String, proto::Readiness)> { + let op_name = self.op_name?; + let readiness = self.readiness?; + Some((op_name, readiness)) + } +} + +impl Visit for PollOpVisitor { + fn record_debug(&mut self, _: &field::Field, _: &dyn std::fmt::Debug) {} + + fn record_str(&mut self, field: &tracing_core::Field, value: &str) { + match field.name() { + Self::OP_NAME_FIELD_NAME => { + self.op_name = Some(value.to_string()); + } + Self::OP_READINESS_FIELD_NAME => { + self.readiness = Some(match value { + Self::OP_READINESS_READY => proto::Readiness::Ready, + Self::OP_READINESS_PENDING => proto::Readiness::Pending, + _ => return, + }); + } + _ => {} + } + } +} + +impl StateUpdateVisitor { + pub(crate) const STATE_UPDATE_EVENT_TARGET: &'static str = "runtime::resource::state_update"; + + const STATE_OP_SUFFIX: &'static str = ".op"; + const STATE_UNIT_SUFFIX: &'static str = ".unit"; + + const OP_ADD: &'static str = "add"; + const OP_SUB: &'static str = "sub"; + const OP_OVERRIDE: &'static str = "override"; + + pub(crate) fn new(meta_id: proto::MetaId) -> Self { + StateUpdateVisitor { + meta_id, + field: None, + unit: None, + op: None, + } + } + + pub(crate) fn result(self) -> Option { + Some(AttributeUpdate { + field: self.field?, + op: self.op, + unit: self.unit, + }) + } +} + +impl Visit for StateUpdateVisitor { + fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) { + if !field.name().ends_with(Self::STATE_OP_SUFFIX) + && !field.name().ends_with(Self::STATE_UNIT_SUFFIX) + { + self.field = Some(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + } + + fn record_i64(&mut self, field: &field::Field, value: i64) { + if !field.name().ends_with(Self::STATE_OP_SUFFIX) + && !field.name().ends_with(Self::STATE_UNIT_SUFFIX) + { + self.field = Some(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + } + + fn record_u64(&mut self, field: &field::Field, value: u64) { + if !field.name().ends_with(Self::STATE_OP_SUFFIX) + && !field.name().ends_with(Self::STATE_UNIT_SUFFIX) + { + self.field = Some(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + } + + fn record_bool(&mut self, field: &field::Field, value: bool) { + if !field.name().ends_with(Self::STATE_OP_SUFFIX) + && !field.name().ends_with(Self::STATE_UNIT_SUFFIX) + { + self.field = Some(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + } + + fn record_str(&mut self, field: &field::Field, value: &str) { + if field.name().ends_with(Self::STATE_OP_SUFFIX) { + match value { + Self::OP_ADD => self.op = Some(AttributeUpdateOp::Add), + Self::OP_SUB => self.op = Some(AttributeUpdateOp::Sub), + Self::OP_OVERRIDE => self.op = Some(AttributeUpdateOp::Override), + _ => {} + }; + } else if field.name().ends_with(Self::STATE_UNIT_SUFFIX) { + self.unit = Some(value.to_string()); + } else { + self.field = Some(proto::Field { + name: Some(field.name().into()), + value: Some(value.into()), + metadata_id: Some(self.meta_id.clone()), + }); + } + } +} diff --git a/console/src/conn.rs b/console/src/conn.rs index e644d0cdb..e74617a29 100644 --- a/console/src/conn.rs +++ b/console/src/conn.rs @@ -1,7 +1,8 @@ -use console_api::tasks::{ - tasks_client::TasksClient, DetailsRequest, PauseRequest, ResumeRequest, TaskDetails, - TaskUpdate, TasksRequest, +use console_api::instrument::{ + instrument_client::InstrumentClient, InstrumentRequest, PauseRequest, ResumeRequest, + TaskDetailsRequest, Update, }; +use console_api::tasks::TaskDetails; use futures::stream::StreamExt; use std::{error::Error, pin::Pin, time::Duration}; use tonic::{transport::Channel, transport::Uri, Streaming}; @@ -15,8 +16,8 @@ pub struct Connection { #[derive(Debug)] enum State { Connected { - client: TasksClient, - stream: Streaming, + client: InstrumentClient, + stream: Streaming, }, Disconnected(Duration), } @@ -71,9 +72,9 @@ impl Connection { tokio::time::sleep(backoff).await; } let try_connect = async { - let mut client = TasksClient::connect(self.target.clone()).await?; - let request = tonic::Request::new(TasksRequest {}); - let stream = client.watch_tasks(request).await?.into_inner(); + let mut client = InstrumentClient::connect(self.target.clone()).await?; + let request = tonic::Request::new(InstrumentRequest {}); + let stream = client.watch_updates(request).await?.into_inner(); Ok::>(State::Connected { client, stream }) }; self.state = match try_connect.await { @@ -90,7 +91,7 @@ impl Connection { } } - pub async fn next_update(&mut self) -> TaskUpdate { + pub async fn next_update(&mut self) -> Update { loop { match self.state { State::Connected { ref mut stream, .. } => match Pin::new(stream).next().await { @@ -115,7 +116,7 @@ impl Connection { task_id: u64, ) -> Result, tonic::Status> { with_client!(self, client, { - let request = tonic::Request::new(DetailsRequest { + let request = tonic::Request::new(TaskDetailsRequest { id: Some(task_id.into()), }); client.watch_task_details(request).await diff --git a/console/src/main.rs b/console/src/main.rs index 476d70566..0c973051a 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -4,6 +4,7 @@ use tasks::State; use clap::Clap; use futures::stream::StreamExt; +use std::convert::TryInto; use tokio::sync::{mpsc, watch}; use tui::{ layout::{Constraint, Direction, Layout}, @@ -74,7 +75,10 @@ async fn main() -> color_eyre::Result<()> { Ok(stream) => { tokio::spawn(watch_details_stream(task_id, stream, update_rx.clone(), details_tx.clone())); }, - Err(error) => {tracing::warn!(%error, "error watching task details"); tasks.unset_task_details();} + Err(error) => { + tracing::warn!(%error, "error watching task details"); + tasks.unset_task_details(); + } } }, UpdateKind::ExitTaskView => { @@ -83,7 +87,12 @@ async fn main() -> color_eyre::Result<()> { _ => {} } }, - task_update = conn.next_update() => tasks.update_tasks(&view.styles, task_update), + instrument_update = conn.next_update() => { + let now = instrument_update.now.map(|v| v.try_into().unwrap()); + if let Some(task_update) = instrument_update.task_update { + tasks.update_tasks(&view.styles, task_update, instrument_update.new_metadata, now); + } + } details_update = details_rx.recv() => { if let Some(details_update) = details_update { tasks.update_task_details(details_update); diff --git a/console/src/tasks.rs b/console/src/tasks.rs index 44fed476a..87b9d23e0 100644 --- a/console/src/tasks.rs +++ b/console/src/tasks.rs @@ -130,12 +130,18 @@ impl State { self.new_tasks.drain(..) } - pub(crate) fn update_tasks(&mut self, styles: &view::Styles, update: proto::tasks::TaskUpdate) { - if let Some(now) = update.now { - self.last_updated_at = Some(now.try_into().unwrap()); + pub(crate) fn update_tasks( + &mut self, + styles: &view::Styles, + update: proto::tasks::TaskUpdate, + new_metadata: Option, + now: Option, + ) { + if let Some(now) = now { + self.last_updated_at = Some(now); } - if let Some(new_metadata) = update.new_metadata { + if let Some(new_metadata) = new_metadata { let metas = new_metadata.metadata.into_iter().filter_map(|meta| { let id = meta.id?.id; let metadata = meta.metadata?; @@ -390,15 +396,16 @@ impl From for Stats { } let total = pb.total_time.map(pb_duration); - let busy = pb.busy_time.map(pb_duration).unwrap_or_default(); + let poll_stats = pb.poll_stats.expect("task should have poll stats"); + let busy = poll_stats.busy_time.map(pb_duration).unwrap_or_default(); let idle = total.map(|total| total - busy); Self { total, idle, busy, - last_poll_started: pb.last_poll_started.map(|v| v.try_into().unwrap()), - last_poll_ended: pb.last_poll_ended.map(|v| v.try_into().unwrap()), - polls: pb.polls, + last_poll_started: poll_stats.last_poll_started.map(|v| v.try_into().unwrap()), + last_poll_ended: poll_stats.last_poll_ended.map(|v| v.try_into().unwrap()), + polls: poll_stats.polls, created_at: pb .created_at .expect("task span was never created") diff --git a/proto/async_ops.proto b/proto/async_ops.proto new file mode 100644 index 000000000..c8cb26046 --- /dev/null +++ b/proto/async_ops.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package rs.tokio.console.async_ops; + +import "google/protobuf/timestamp/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "common.proto"; + + +// An AsyncOp state update. +// +// An async operation is a an operation that is associated with a resource +// This could, for example, be a a read or write on a TCP stream or a receive operation on +// a channel. +message AsyncOpUpdate { + // A list of new async operations that were created since the last `AsyncOpUpdate` + // was sent. Note that the fact that an async operation has been created + // does not mean that is has been polled or is being polled. This information + // is reflected in the Stats of the operation. + repeated AsyncOp new_async_ops = 1; + // Any async op stats that have changed since the last update. + map stats_update = 2; +} + +message AsyncOp { + // The async op's ID. + // + // This uniquely identifies this op across all *currently live* + // ones. + common.Id id = 1; + // The numeric ID of the op's `Metadata`. + // + // This identifies the `Metadata` that describes the `tracing` span + // corresponding to this async op. The metadata for this ID will have been sent + // in a prior `RegisterMetadata` message. + common.MetaId metadata = 2; + // The source of this async operation. Most commonly this should be the name + // of the method where the instantiation of this op has happened. + string source = 3; +} + + +message Stats { + // Timestamp of when the async op has been created. + google.protobuf.Timestamp created_at = 1; + // The amount of time this op has *existed*, regardless of whether or not + // it was being polled. + // + // Subtracting `poll_stats.busy_time` from `total_time` calculates the resource's idle + // time, the amount of time it has spent *waiting* to be polled. + google.protobuf.Duration total_time = 2; + // The resource Id this `AsyncOp` is associated with. Note that both + // `resource_id` and `task_id` can be None if this async op has not been polled yet + common.Id resource_id = 3; + // The Id of the task that is awaiting on this op. + common.Id task_id = 4; + // Contains the operation poll stats. + common.PollStats poll_stats = 5; +} + diff --git a/proto/common.proto b/proto/common.proto index 416ed1202..aeb9eb433 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -2,6 +2,11 @@ syntax = "proto3"; package rs.tokio.console.common; import "google/protobuf/timestamp/timestamp.proto"; +import "google/protobuf/duration.proto"; + +message Id { + uint64 id = 1; +} /// A Rust source code location. message Location { @@ -82,4 +87,44 @@ message Metadata { DEBUG = 3; TRACE = 4; } +} + +// Contains stats about objects that can be polled. Currently these can be: +// - tasks that have been spawned +// - async operations on resources that are performed within the context of a task +message PollStats { + // The total number of times this object has been polled. + uint64 polls = 1; + // The timestamp of the first time this object was polled. + // + // If this is `None`, the object has not yet been polled. + // + // Subtracting this timestamp from `created_at` can be used to calculate the + // time to first poll for this object, a measurement of executor latency. + optional google.protobuf.Timestamp first_poll = 3; + // The timestamp of the most recent time this objects's poll method was invoked. + // + // If this is `None`, the object has not yet been polled. + // + // If the object has only been polled a single time, then this value may be + // equal to the `first_poll` timestamp. + // + optional google.protobuf.Timestamp last_poll_started = 4; + // The timestamp of the most recent time this objects's poll method finished execution. + // + // If this is `None`, the object has not yet been polled or is currently being polled. + // + // If the object does not exist anymore, then this is the time the final invocation of + // its poll method has completed. + optional google.protobuf.Timestamp last_poll_ended = 5; + // The total duration this object was being *actively polled*, summed across + // all polls. Note that this includes only polls that have completed and is + // not reflecting any inprogress polls. + google.protobuf.Duration busy_time = 6; +} + +// Indicates the readiness of a pollable entity (i.e task, resource). +enum Readiness { + READY = 0; + PENDING = 1; } \ No newline at end of file diff --git a/proto/instrument.proto b/proto/instrument.proto new file mode 100644 index 000000000..2a72f7e86 --- /dev/null +++ b/proto/instrument.proto @@ -0,0 +1,66 @@ +syntax = "proto3"; + +package rs.tokio.console.instrument; + +import "google/protobuf/timestamp/timestamp.proto"; +import "common.proto"; +import "tasks.proto"; +import "resources.proto"; +import "async_ops.proto"; + +service Instrument { + rpc WatchUpdates(InstrumentRequest) returns (stream Update) {} + rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {} + rpc Pause(PauseRequest) returns (PauseResponse) {} + rpc Resume(ResumeRequest) returns (ResumeResponse) {} +} + +// TODO: In the future allow for the request to specify +// only the data that the caller cares about (i.e. only +// tasks but no resources) +message InstrumentRequest { +} + +message TaskDetailsRequest { + common.Id id = 1; +} + +message PauseRequest { +} + +message ResumeRequest { +} + +// Update carries all information regarding tasks, resources, async operations +// and resoruce operations in one message. There are a couple of reasons to combine all +// of these into a single message: +// +// - we can use one single timestamp for all the data +// - we can have all the new_metadata in one place +// - things such as async ops and resouce ops do not make sense +// on their own as they have relations to tasks and resources +message Update { + // The system time when this update was recorded. + // + // This is the timestamp any durations in the included `Stats` were + // calculated relative to. + google.protobuf.Timestamp now = 1; + + // Task state update. + tasks.TaskUpdate task_update = 2; + + // Resource state update. + resources.ResourceUpdate resource_update = 3; + + // Async operations state update + async_ops.AsyncOpUpdate async_op_update = 4; + + // Any new span metadata that was registered since the last update. + common.RegisterMetadata new_metadata = 5; +} + +message PauseResponse { +} + +message ResumeResponse { +} diff --git a/proto/resources.proto b/proto/resources.proto new file mode 100644 index 000000000..4993ff74d --- /dev/null +++ b/proto/resources.proto @@ -0,0 +1,91 @@ +syntax = "proto3"; + +package rs.tokio.console.resources; + +import "google/protobuf/timestamp/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "common.proto"; + +// A resource state update. +// +// Each `ResourceUpdate` contains any resource data that has changed since the last +// update. This includes: +// - any new resources that were created since the last update +// - the current stats for any resource whose stats changed since the last update +// - any new poll ops that have been incoked on a resource +message ResourceUpdate { + // A list of new resources that were created since the last `ResourceUpdate` was + // sent. + repeated Resource new_resources = 1; + + // Any resource stats that have changed since the last update. + map stats_update = 2; + + // A list of all new poll ops that have been invoked on resources since the last update. + repeated PollOp new_poll_ops = 3; +} + +// Static data recorded when a new resource is created. +message Resource { + // The resources's ID. + // + // This uniquely identifies this resource across all *currently live* + // resources. This is also the primary way any operations on a resource + // are associated with it + common.Id id = 1; + // The numeric ID of the resources's `Metadata`. + common.MetaId metadata = 2; + // The resources's concrete rust type. + string concrete_type = 3; + // The kind of resource (e.g timer, mutex) + Kind kind = 4; + + message Kind { + oneof kind { + Known known = 1; + string other = 2; + } + enum Known { + TIMER = 0; + } + } +} + +// Task runtime stats of a resource. +message Stats { + // Timestamp of when the resource was created. + google.protobuf.Timestamp created_at = 1; + // The amount of time this resource has *existed*, regardless of whether or not + // it was used by a task. + google.protobuf.Duration total_time = 2; + // State attributes of the resource. These are dependent on the type of the resource. + // For example, a timer resource will have a duration while a semaphore resource may + // have permits as an attribute. These values may change over time as the state of + // the resource changes. Therefore, they live in the runtime stats rather than the + // static data describing the resource. + repeated Attribute attributes = 3; + + message Attribute { + common.Field field = 1; + optional string unit = 2; + } +} + +message PollOp { + // The numeric ID of the op's `Metadata`. + // + // This identifies the `Metadata` that describes the `tracing` span + // corresponding to this op. The metadata for this ID will have been sent + // in a prior `RegisterMetadata` message. + common.MetaId metadata = 2; + // The resources's ID. + common.Id resource_id = 3; + // the name of this op (e.g. poll_elapsed, new_timeout, reset, etc.) + string name = 4; + // Identifies the task context that this poll op has been called from. + common.Id task_id = 5; + // Identifies the async op ID that this poll op is part of. + common.Id async_op_id = 6; + // Whether this poll op has returned with ready or pending. + common.Readiness readiness = 7; +} \ No newline at end of file diff --git a/proto/tasks.proto b/proto/tasks.proto index 7b0da04c1..6acddbb1a 100644 --- a/proto/tasks.proto +++ b/proto/tasks.proto @@ -6,46 +6,18 @@ import "google/protobuf/timestamp/timestamp.proto"; import "google/protobuf/duration.proto"; import "common.proto"; -service Tasks { - rpc WatchTasks(TasksRequest) returns (stream TaskUpdate) {} - rpc WatchTaskDetails(DetailsRequest) returns (stream TaskDetails) {} - rpc Pause(PauseRequest) returns (PauseResponse) {} - rpc Resume(ResumeRequest) returns (ResumeResponse) {} -} - -message TaskId { - uint64 id = 1; -} - -message TasksRequest { -} - -message DetailsRequest { - TaskId id = 1; -} - -message PauseRequest { -} - -message ResumeRequest { -} - // A task state update. // // Each `TaskUpdate` contains any task data that has changed since the last // update. This includes: // - any new tasks that were spawned since the last update -// - metadata for any new spans that were registered since the last update // - the current stats for any task whose stats changed since the last update -// - a list of the IDs of any tasks which *completed* since the last update message TaskUpdate { // A list of new tasks that were spawned since the last `TaskUpdate` was // sent. // // If this is empty, no new tasks were spawned. repeated Task new_tasks = 1; - // Any new span metadata that was registered since the last update. - common.RegisterMetadata new_metadata = 2; // Any task stats that have changed since the last update. // // This is a map of task IDs (64-bit unsigned integers) to task stats. If a @@ -54,17 +26,12 @@ message TaskUpdate { // *is* included in this map, the corresponding value represents a complete // snapshot of that task's stats at in the current time window. map stats_update = 3; - // The system time when this update was recorded. - // - // This is the timestamp any durations in the included `Stats` were - // calculated relative to. - google.protobuf.Timestamp now = 4; } // A task details update message TaskDetails { // The task's ID which the details belong to. - TaskId task_id = 1; + common.Id task_id = 1; google.protobuf.Timestamp now = 2; @@ -72,12 +39,6 @@ message TaskDetails { optional bytes poll_times_histogram = 3; } -message PauseResponse { -} - -message ResumeResponse { -} - // Data recorded when a new task is spawned. message Task { // The task's ID. @@ -87,7 +48,7 @@ message Task { // identified by this ID; if the client requires additional information // included in the `Task` message, it should store that data and access it // by ID. - TaskId id = 1; + common.Id id = 1; // The numeric ID of the task's `Metadata`. // // This identifies the `Metadata` that describes the `tracing` span @@ -122,50 +83,25 @@ message Task { // Task performance statistics. message Stats { - // The total number of times this task has been polled. - uint64 polls = 1; // Timestamp of when the task was spawned. - google.protobuf.Timestamp created_at = 2; - // The timestamp of the first time this task was polled. - // - // If this is `None`, the task has not yet been polled. - // - // Subtracting this timestamp from `created_at` can be used to calculate the - // time to first poll for this task, a measurement of executor latency. - optional google.protobuf.Timestamp first_poll = 3; - // The timestamp of the most recent time this task's poll method was invoked. - // - // If this is `None`, the task has not yet been polled. - // - // If the task has only been polled a single time, then this value may be - // equal to the `first_poll` timestamp. - // - optional google.protobuf.Timestamp last_poll_started = 4; - // The timestamp of the most recent time this task's poll method finished execution. - // - // If this is `None`, the task has not yet been polled or is currently being polled. - // - // If the task has completed, then this is the time the final invocation of its poll - // method has completed. - optional google.protobuf.Timestamp last_poll_ended = 5; - // The total duration this task was being *actively polled*, summed across - // all polls. Note that this includes only polls that have completed and is - // not reflecting any inprogress polls. - google.protobuf.Duration busy_time = 6; + google.protobuf.Timestamp created_at = 1; + // The amount of time this task has *existed*, regardless of whether or not // it was being polled. // // Subtracting `busy_time` from `total_time` calculates the task's idle // time, the amount of time it has spent *waiting* to be polled. - google.protobuf.Duration total_time = 7; + google.protobuf.Duration total_time = 2; // The total number of times this task's waker has been woken. - uint64 wakes = 8; + uint64 wakes = 3; // The total number of times this task's waker has been cloned. - uint64 waker_clones = 9; + uint64 waker_clones = 4; // The total number of times this task's waker has been dropped. - uint64 waker_drops = 10; + uint64 waker_drops = 5; // The timestamp of the most recent time this task has been woken. // // If this is `None`, the task has not yet been woken. - optional google.protobuf.Timestamp last_wake = 11; + optional google.protobuf.Timestamp last_wake = 6; + // Contains task poll statistics. + common.PollStats poll_stats = 7; } diff --git a/proto/trace.proto b/proto/trace.proto index 4e550f871..1391c741d 100644 --- a/proto/trace.proto +++ b/proto/trace.proto @@ -30,7 +30,7 @@ message TraceEvent { message RegisterThreads { map names = 1; } - + message Enter { common.SpanId span_id = 1; uint64 thread_id = 2;