From 9f62ac52ca06417fd7fff2f1086c755767b35d86 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 9 Jun 2021 12:25:12 +0000 Subject: [PATCH] add proto schema for resources instrumentation Signed-off-by: Zahari Dichev --- console-api/build.rs | 2 + console-api/src/instrument.rs | 1 + console-api/src/lib.rs | 2 + console-api/src/resources.rs | 1 + console-subscriber/examples/dump.rs | 8 +-- console-subscriber/src/aggregator.rs | 37 +++++++----- console-subscriber/src/lib.rs | 19 +++--- console/src/main.rs | 20 ++++--- console/src/tasks.rs | 8 ++- proto/instrument.proto | 33 +++++++++++ proto/resources.proto | 86 ++++++++++++++++++++++++++++ proto/tasks.proto | 17 +----- 12 files changed, 184 insertions(+), 50 deletions(-) create mode 100644 console-api/src/instrument.rs create mode 100644 console-api/src/resources.rs create mode 100644 proto/instrument.proto create mode 100644 proto/resources.proto diff --git a/console-api/build.rs b/console-api/build.rs index e5a9ef830..6b55965f3 100644 --- a/console-api/build.rs +++ b/console-api/build.rs @@ -5,6 +5,8 @@ fn main() -> Result<(), Box> { "../proto/trace.proto", "../proto/common.proto", "../proto/tasks.proto", + "../proto/instrument.proto", + "../proto/resources.proto", ]; let dirs = &["../proto"]; diff --git a/console-api/src/instrument.rs b/console-api/src/instrument.rs new file mode 100644 index 000000000..bfd2fe0fc --- /dev/null +++ b/console-api/src/instrument.rs @@ -0,0 +1 @@ +tonic::include_proto!("rs.tokio.console.instrument"); diff --git a/console-api/src/lib.rs b/console-api/src/lib.rs index b96974159..35fc2976f 100644 --- a/console-api/src/lib.rs +++ b/console-api/src/lib.rs @@ -1,4 +1,6 @@ mod common; +pub mod instrument; +pub mod resources; pub mod tasks; pub mod trace; pub use common::*; diff --git a/console-api/src/resources.rs b/console-api/src/resources.rs new file mode 100644 index 000000000..2fc92b71b --- /dev/null +++ b/console-api/src/resources.rs @@ -0,0 +1 @@ +tonic::include_proto!("rs.tokio.console.resources"); diff --git a/console-subscriber/examples/dump.rs b/console-subscriber/examples/dump.rs index db7e6e7dd..ed102e881 100644 --- a/console-subscriber/examples/dump.rs +++ b/console-subscriber/examples/dump.rs @@ -1,4 +1,4 @@ -use console_api::tasks::{tasks_client::TasksClient, TasksRequest}; +use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest}; use futures::stream::StreamExt; #[tokio::main] @@ -11,10 +11,10 @@ async fn main() -> Result<(), Box> { }); eprintln!("CONNECTING: {}", target); - let mut client = TasksClient::connect(target).await?; + let mut client = InstrumentClient::connect(target).await?; - let request = tonic::Request::new(TasksRequest {}); - let mut stream = client.watch_tasks(request).await?.into_inner(); + let request = tonic::Request::new(InstrumentRequest {}); + let mut stream = client.watch_updates(request).await?.into_inner(); let mut i: usize = 0; while let Some(update) = stream.next().await { diff --git a/console-subscriber/src/aggregator.rs b/console-subscriber/src/aggregator.rs index 6421b28b2..1d2c4bf3e 100644 --- a/console-subscriber/src/aggregator.rs +++ b/console-subscriber/src/aggregator.rs @@ -135,14 +135,21 @@ impl Aggregator { }).collect(); // Send the initial state --- if this fails, the subscription is // already dead. - if subscription.update(&proto::tasks::TaskUpdate { - new_metadata: Some(proto::RegisterMetadata { - metadata: self.all_metadata.clone(), - }), - new_tasks, - stats_update, - now: Some(now.into()), - }) { + if subscription.update( + &proto::instrument::InstrumentUpdate { + task_update: Some(proto::tasks::TaskUpdate { + new_tasks, + stats_update, + }), + resource_update: None, + now: Some(now.into()), + new_metadata: Some(proto::RegisterMetadata { + metadata: self.all_metadata.clone(), + }), + } + + + ) { self.watchers.push(subscription) } } else { @@ -210,11 +217,15 @@ impl Aggregator { .since_last_update() .map(|(id, stats)| (id.into_u64(), stats.to_proto())) .collect(); - let update = proto::tasks::TaskUpdate { - new_metadata, - new_tasks, - stats_update, + + let update = proto::instrument::InstrumentUpdate { + task_update: Some(proto::tasks::TaskUpdate { + new_tasks, + stats_update, + }), + resource_update: None, now: Some(now.into()), + new_metadata, }; self.watchers.retain(|watch: &Watch| watch.update(&update)); } @@ -406,7 +417,7 @@ impl<'a, T> Drop for Updating<'a, T> { } impl Watch { - fn update(&self, update: &proto::tasks::TaskUpdate) -> bool { + fn update(&self, update: &proto::instrument::InstrumentUpdate) -> bool { if let Ok(reserve) = self.0.try_reserve() { reserve.send(Ok(update.clone())); true diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 4cedea72f..b061a2c68 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -42,7 +42,7 @@ struct FieldVisitor { meta_id: proto::MetaId, } -struct Watch(mpsc::Sender>); +struct Watch(mpsc::Sender>); enum Event { Metadata(&'static Metadata<'static>), @@ -277,7 +277,9 @@ impl Server { let aggregate = tokio::spawn(aggregate.run()); let addr = self.addr; let res = builder - .add_service(proto::tasks::tasks_server::TasksServer::new(self)) + .add_service(proto::instrument::instrument_server::InstrumentServer::new( + self, + )) .serve(addr) .await; aggregate.abort(); @@ -286,13 +288,14 @@ impl Server { } #[tonic::async_trait] -impl proto::tasks::tasks_server::Tasks for Server { - type WatchTasksStream = - tokio_stream::wrappers::ReceiverStream>; - async fn watch_tasks( +impl proto::instrument::instrument_server::Instrument for Server { + type WatchUpdatesStream = tokio_stream::wrappers::ReceiverStream< + Result, + >; + async fn watch_updates( &self, - req: tonic::Request, - ) -> Result, tonic::Status> { + req: tonic::Request, + ) -> Result, tonic::Status> { match req.remote_addr() { Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"), None => tracing::debug!(client.addr = %"", "starting a new watch"), diff --git a/console/src/main.rs b/console/src/main.rs index 5033697a1..d46b287bd 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -1,5 +1,5 @@ use color_eyre::{eyre::eyre, Help, SectionExt}; -use console_api::tasks::{tasks_client::TasksClient, TasksRequest}; +use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest}; use futures::stream::StreamExt; use tui::{ @@ -28,9 +28,9 @@ async fn main() -> color_eyre::Result<()> { let (mut terminal, _cleanup) = term::init_crossterm()?; terminal.clear()?; - let mut client = TasksClient::connect(target.clone()).await?; - let request = tonic::Request::new(TasksRequest {}); - let mut stream = client.watch_tasks(request).await?.into_inner(); + let mut client = InstrumentClient::connect(target.clone()).await?; + let request = tonic::Request::new(InstrumentRequest {}); + let mut stream = client.watch_updates(request).await?.into_inner(); let mut tasks = tasks::State::default(); let mut input = input::EventStream::new(); let mut view = view::View::default(); @@ -45,11 +45,15 @@ async fn main() -> color_eyre::Result<()> { } view.update_input(input, &mut tasks); }, - task_update = stream.next() => { - let update = task_update + instrument_update = stream.next() => { + let update = instrument_update .ok_or_else(|| eyre!("data stream closed by server")) - .with_section(|| "in the future, this should be reconnected automatically...".header("Note:"))?; - tasks.update_tasks(update?); + .with_section(|| "in the future, this should be reconnected automatically...".header("Note:"))??; + + + if let Some(task_update) = update.task_update { + tasks.update_tasks(task_update,update.now ); + } } } terminal.draw(|f| { diff --git a/console/src/tasks.rs b/console/src/tasks.rs index 87cf390fd..e608a01c0 100644 --- a/console/src/tasks.rs +++ b/console/src/tasks.rs @@ -110,8 +110,12 @@ impl State { } } - pub(crate) fn update_tasks(&mut self, update: proto::tasks::TaskUpdate) { - if let Some(now) = update.now { + pub(crate) fn update_tasks( + &mut self, + update: proto::tasks::TaskUpdate, + now: Option, + ) { + if let Some(now) = now { self.last_updated_at = Some(now.into()); } let mut stats_update = update.stats_update; diff --git a/proto/instrument.proto b/proto/instrument.proto new file mode 100644 index 000000000..ca9f4a633 --- /dev/null +++ b/proto/instrument.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package rs.tokio.console.instrument; + +import "google/protobuf/timestamp/timestamp.proto"; +import "common.proto"; +import "tasks.proto"; +import "resources.proto"; + +service Instrument { + rpc WatchUpdates(InstrumentRequest) returns (stream InstrumentUpdate) {} +} + +message InstrumentRequest{ +} + + +message InstrumentUpdate { + // The system time when this update was recorded. + // + // This is the timestamp any durations in the included `Stats` were + // calculated relative to. + google.protobuf.Timestamp now = 1; + + // Task state update. + tasks.TaskUpdate task_update = 2; + + // Resource state update. + resources.ResourceUpdate resource_update = 3; + + // Any new span metadata that was registered since the last update. + common.RegisterMetadata new_metadata = 4; +} \ No newline at end of file diff --git a/proto/resources.proto b/proto/resources.proto new file mode 100644 index 000000000..1b10db9b9 --- /dev/null +++ b/proto/resources.proto @@ -0,0 +1,86 @@ +syntax = "proto3"; + +package rs.tokio.console.resources; + +import "google/protobuf/timestamp/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "common.proto"; + +// A resource state update. +message ResourceUpdate { + // A list of new resources that were created since the last `ResourceUpdate` was + // sent. + repeated Resource new_resources = 1; + + // Any resource stats that have changed since the last update. + map stats_update = 2; + + // Any resource operation updates that have been registered. + repeated ResourceOp resource_ops = 3; +} + +// Static data recorded when a new resource is created. +message Resource { + // The resources's ID. + // + // This uniquely identifies this resource across all *currently live* + // resources. This is also the primary way `ResourceOp` messages are + // associated with resources. + common.SpanId id = 1; + // The resources's concrete rust type. + string concrete_type= 3; + // The kind of resource (e.g timer, mutex) + Kind kind = 4; + enum Kind { + TIMER = 0; + } +} + +// Task runtime statistics. +message Stats { + // Timestamp of when the task was created. + google.protobuf.Timestamp created_at = 1; + // Timestamp of when the task was dropped. + google.protobuf.Timestamp closed_at = 2; +} + +// A resource operation message +// +// Each `ResourceOp` message identifies an update to the state of +// an operation that a task is performing on a resource. +message ResourceOp { + // The id of the resource operation update + common.SpanId id = 1; + // The id of the resource this operation is performed on + common.SpanId resource_id = 2; + // The id of the task that this operation is associated with + common.SpanId task_id = 3; + // The timestamp, representing the point in time this operation update + // has been registered + google.protobuf.Timestamp timestamp = 4; + + oneof state { + Invoked invoked = 5; + Done done = 6; + } + + // Indicates that an operation has been started + // This can be for example emitted in the beginning + // of a poll method invocation + message Invoked {} + // Indicates that this operation has completed and + // the result that the operation returns. + message Done { + Value result = 1; + + enum Value { + // The operation has returned an error + ERROR = 0; + // The operation is redy. //TODO: add the result + READY = 1; + // The operation has returned pending and will be + // called again + PENDING = 2; + } + } +} \ No newline at end of file diff --git a/proto/tasks.proto b/proto/tasks.proto index 0fb2f8d19..be6151060 100644 --- a/proto/tasks.proto +++ b/proto/tasks.proto @@ -6,13 +6,6 @@ import "google/protobuf/timestamp/timestamp.proto"; import "google/protobuf/duration.proto"; import "common.proto"; -service Tasks { - rpc WatchTasks(TasksRequest) returns (stream TaskUpdate) {} -} - -message TasksRequest { -} - // A task state update. // // Each `TaskUpdate` contains any task data that has changed since the last @@ -27,8 +20,7 @@ message TaskUpdate { // // If this is empty, no new tasks were spawned. repeated Task new_tasks = 1; - // Any new span metadata that was registered since the last update. - common.RegisterMetadata new_metadata = 2; + // Any task stats that have changed since the last update. // // This is a map of task IDs (64-bit unsigned integers) to task stats. If a @@ -36,12 +28,7 @@ message TaskUpdate { // since the last `TaskUpdate` in which they were present. If a task's ID // *is* included in this map, the corresponding value represents a complete // snapshot of that task's stats at in the current time window. - map stats_update = 3; - // The system time when this update was recorded. - // - // This is the timestamp any durations in the included `Stats` were - // calculated relative to. - google.protobuf.Timestamp now = 4; + map stats_update = 2; } // Data recorded when a new task is spawned.