diff --git a/console-api/proto/instrument.proto b/console-api/proto/instrument.proto index b2b96c47a..6ef07a8c5 100644 --- a/console-api/proto/instrument.proto +++ b/console-api/proto/instrument.proto @@ -14,6 +14,8 @@ service Instrument { rpc WatchUpdates(InstrumentRequest) returns (stream Update) {} // Produces a stream of updates describing the activity of a specific task. rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {} + // Produces a stream of state of the aggregator. + rpc WatchState(StateRequest) returns (stream State) {} // Registers that the console observer wants to pause the stream. rpc Pause(PauseRequest) returns (PauseResponse) {} // Registers that the console observer wants to resume the stream. @@ -72,6 +74,23 @@ message Update { common.RegisterMetadata new_metadata = 5; } +// StateRequest requests the current state of the aggregator. +message StateRequest { +} + +// State carries the current state of the aggregator. +message State { + Temporality temporality = 1; +} + +// The time "state" of the aggregator. +enum Temporality { + // The aggregator is currently live. + LIVE = 0; + // The aggregator is currently paused. + PAUSED = 1; +} + // `PauseResponse` is the value returned after a pause request. message PauseResponse { } diff --git a/console-api/src/generated/rs.tokio.console.instrument.rs b/console-api/src/generated/rs.tokio.console.instrument.rs index e7cfd0abb..c20821f56 100644 --- a/console-api/src/generated/rs.tokio.console.instrument.rs +++ b/console-api/src/generated/rs.tokio.console.instrument.rs @@ -50,12 +50,50 @@ pub struct Update { #[prost(message, optional, tag = "5")] pub new_metadata: ::core::option::Option, } +/// StateRequest requests the current state of the aggregator. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct StateRequest {} +/// State carries the current state of the aggregator. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct State { + #[prost(enumeration = "Temporality", tag = "1")] + pub temporality: i32, +} /// `PauseResponse` is the value returned after a pause request. #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct PauseResponse {} /// `ResumeResponse` is the value returned after a resume request. #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ResumeResponse {} +/// The time "state" of the aggregator. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum Temporality { + /// The aggregator is currently live. + Live = 0, + /// The aggregator is currently paused. + Paused = 1, +} +impl Temporality { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Live => "LIVE", + Self::Paused => "PAUSED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "LIVE" => Some(Self::Live), + "PAUSED" => Some(Self::Paused), + _ => None, + } + } +} /// Generated client implementations. pub mod instrument_client { #![allow( @@ -208,6 +246,36 @@ pub mod instrument_client { ); self.inner.server_streaming(req, path, codec).await } + /// Produces a stream of state of the aggregator. + pub async fn watch_state( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rs.tokio.console.instrument.Instrument/WatchState", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "rs.tokio.console.instrument.Instrument", + "WatchState", + ), + ); + self.inner.server_streaming(req, path, codec).await + } /// Registers that the console observer wants to pause the stream. pub async fn pause( &mut self, @@ -302,6 +370,17 @@ pub mod instrument_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the WatchState method. + type WatchStateStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// Produces a stream of state of the aggregator. + async fn watch_state( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Registers that the console observer wants to pause the stream. async fn pause( &self, @@ -482,6 +561,52 @@ pub mod instrument_server { }; Box::pin(fut) } + "/rs.tokio.console.instrument.Instrument/WatchState" => { + #[allow(non_camel_case_types)] + struct WatchStateSvc(pub Arc); + impl< + T: Instrument, + > tonic::server::ServerStreamingService + for WatchStateSvc { + type Response = super::State; + type ResponseStream = T::WatchStateStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::watch_state(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WatchStateSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/rs.tokio.console.instrument.Instrument/Pause" => { #[allow(non_camel_case_types)] struct PauseSvc(pub Arc); diff --git a/console-subscriber/examples/grpc_web/app/src/gen/instrument_connect.ts b/console-subscriber/examples/grpc_web/app/src/gen/instrument_connect.ts index fdc38b673..1c9112185 100644 --- a/console-subscriber/examples/grpc_web/app/src/gen/instrument_connect.ts +++ b/console-subscriber/examples/grpc_web/app/src/gen/instrument_connect.ts @@ -3,7 +3,7 @@ /* eslint-disable */ // @ts-nocheck -import { InstrumentRequest, PauseRequest, PauseResponse, ResumeRequest, ResumeResponse, TaskDetailsRequest, Update } from "./instrument_pb.js"; +import { InstrumentRequest, PauseRequest, PauseResponse, ResumeRequest, ResumeResponse, State, StateRequest, TaskDetailsRequest, Update } from "./instrument_pb.js"; import { MethodKind } from "@bufbuild/protobuf"; import { TaskDetails } from "./tasks_pb.js"; @@ -37,6 +37,17 @@ export const Instrument = { O: TaskDetails, kind: MethodKind.ServerStreaming, }, + /** + * Produces a stream of state of the aggregator. + * + * @generated from rpc rs.tokio.console.instrument.Instrument.WatchState + */ + watchState: { + name: "WatchState", + I: StateRequest, + O: State, + kind: MethodKind.ServerStreaming, + }, /** * Registers that the console observer wants to pause the stream. * diff --git a/console-subscriber/examples/grpc_web/app/src/gen/instrument_pb.ts b/console-subscriber/examples/grpc_web/app/src/gen/instrument_pb.ts index 09ec3b34d..e55291526 100644 --- a/console-subscriber/examples/grpc_web/app/src/gen/instrument_pb.ts +++ b/console-subscriber/examples/grpc_web/app/src/gen/instrument_pb.ts @@ -10,6 +10,32 @@ import { TaskUpdate } from "./tasks_pb.js"; import { ResourceUpdate } from "./resources_pb.js"; import { AsyncOpUpdate } from "./async_ops_pb.js"; +/** + * The time "state" of the aggregator. + * + * @generated from enum rs.tokio.console.instrument.Temporality + */ +export enum Temporality { + /** + * The aggregator is currently live. + * + * @generated from enum value: LIVE = 0; + */ + LIVE = 0, + + /** + * The aggregator is currently paused. + * + * @generated from enum value: PAUSED = 1; + */ + PAUSED = 1, +} +// Retrieve enum metadata with: proto3.getEnumType(Temporality) +proto3.util.setEnumType(Temporality, "rs.tokio.console.instrument.Temporality", [ + { no: 0, name: "LIVE" }, + { no: 1, name: "PAUSED" }, +]); + /** * InstrumentRequest requests the stream of updates * to observe the async runtime state over time. @@ -239,6 +265,78 @@ export class Update extends Message { } } +/** + * StateRequest requests the current state of the aggregator. + * + * @generated from message rs.tokio.console.instrument.StateRequest + */ +export class StateRequest extends Message { + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "rs.tokio.console.instrument.StateRequest"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): StateRequest { + return new StateRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): StateRequest { + return new StateRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): StateRequest { + return new StateRequest().fromJsonString(jsonString, options); + } + + static equals(a: StateRequest | PlainMessage | undefined, b: StateRequest | PlainMessage | undefined): boolean { + return proto3.util.equals(StateRequest, a, b); + } +} + +/** + * State carries the current state of the aggregator. + * + * @generated from message rs.tokio.console.instrument.State + */ +export class State extends Message { + /** + * @generated from field: rs.tokio.console.instrument.Temporality temporality = 1; + */ + temporality = Temporality.LIVE; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "rs.tokio.console.instrument.State"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "temporality", kind: "enum", T: proto3.getEnumType(Temporality) }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): State { + return new State().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): State { + return new State().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): State { + return new State().fromJsonString(jsonString, options); + } + + static equals(a: State | PlainMessage | undefined, b: State | PlainMessage | undefined): boolean { + return proto3.util.equals(State, a, b); + } +} + /** * `PauseResponse` is the value returned after a pause request. * diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index b6846d62e..5d5b37c38 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -49,6 +49,9 @@ pub struct Aggregator { /// buffer is approaching capacity. shared: Arc, + /// Currently active RPCs streaming state events. + state_watchers: ShrinkVec>, + /// Currently active RPCs streaming task events. watchers: ShrinkVec>, @@ -89,7 +92,7 @@ pub struct Aggregator { poll_ops: Vec, /// The time "state" of the aggregator, such as paused or live. - temporality: Temporality, + temporality: proto::instrument::Temporality, /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a /// timestamp that can be sent over the wire. @@ -102,11 +105,6 @@ pub(crate) struct Flush { triggered: AtomicBool, } -#[derive(Debug)] -enum Temporality { - Live, - Paused, -} // Represent static data for resources struct Resource { id: Id, @@ -153,6 +151,7 @@ impl Aggregator { events, watchers: Default::default(), details_watchers: Default::default(), + state_watchers: Default::default(), all_metadata: Default::default(), new_metadata: Default::default(), tasks: IdData::default(), @@ -162,7 +161,7 @@ impl Aggregator { async_ops: IdData::default(), async_op_stats: IdData::default(), poll_ops: Default::default(), - temporality: Temporality::Live, + temporality: proto::instrument::Temporality::Live, base_time, } } @@ -179,8 +178,8 @@ impl Aggregator { // if the flush interval elapses, flush data to the client _ = publish.tick() => { match self.temporality { - Temporality::Live => true, - Temporality::Paused => false, + proto::instrument::Temporality::Live => true, + proto::instrument::Temporality::Paused => false, } } @@ -199,11 +198,14 @@ impl Aggregator { Some(Command::WatchTaskDetail(watch_request)) => { self.add_task_detail_subscription(watch_request); }, + Some(Command::WatchState(subscription)) => { + self.add_state_subscription(subscription); + } Some(Command::Pause) => { - self.temporality = Temporality::Paused; + self.temporality = proto::instrument::Temporality::Paused; } Some(Command::Resume) => { - self.temporality = Temporality::Live; + self.temporality = proto::instrument::Temporality::Live; } None => { tracing::debug!("rpc channel closed, terminating"); @@ -213,7 +215,6 @@ impl Aggregator { false } - }; // drain and aggregate buffered events. @@ -251,6 +252,10 @@ impl Aggregator { "event channel drain loop", ); + if !self.state_watchers.is_empty() { + self.publish_state(); + } + // flush data to clients, if there are any currently subscribed // watchers and we should send a new update. if !self.watchers.is_empty() && should_send { @@ -395,6 +400,20 @@ impl Aggregator { // If the task is not found, drop `stream_sender` which will result in a not found error } + /// Add a state subscription to the watchers. + fn add_state_subscription(&mut self, subscription: Watch) { + self.state_watchers.push(subscription); + } + + /// Publish the current state to all active state watchers. + fn publish_state(&mut self) { + let state = proto::instrument::State { + temporality: self.temporality.into(), + }; + self.state_watchers + .retain_and_shrink(|watch| watch.update(&state)); + } + /// Publish the current state to all active watchers. /// /// This drops any watchers which have closed the RPC, or whose update diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index aa0085796..9707a7632 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -178,6 +178,7 @@ struct Watch(mpsc::Sender>); enum Command { Instrument(Watch), WatchTaskDetail(WatchRequest), + WatchState(Watch), Pause, Resume, } @@ -1190,6 +1191,8 @@ impl proto::instrument::instrument_server::Instrument for Server { tokio_stream::wrappers::ReceiverStream>; type WatchTaskDetailsStream = tokio_stream::wrappers::ReceiverStream>; + type WatchStateStream = + tokio_stream::wrappers::ReceiverStream>; async fn watch_updates( &self, req: tonic::Request, @@ -1245,6 +1248,21 @@ impl proto::instrument::instrument_server::Instrument for Server { Ok(tonic::Response::new(stream)) } + async fn watch_state( + &self, + _req: tonic::Request, + ) -> Result, tonic::Status> { + let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer); + self.subscribe + .send(Command::WatchState(Watch(stream_sender))) + .await + .map_err(|_| { + tonic::Status::internal("cannot get state, aggregation task is not running") + })?; + let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv); + Ok(tonic::Response::new(stream)) + } + async fn pause( &self, _req: tonic::Request, diff --git a/tokio-console/src/conn.rs b/tokio-console/src/conn.rs index 03fb88e30..c27893ffe 100644 --- a/tokio-console/src/conn.rs +++ b/tokio-console/src/conn.rs @@ -1,6 +1,7 @@ +use console_api::instrument::StateRequest; use console_api::instrument::{ instrument_client::InstrumentClient, InstrumentRequest, PauseRequest, ResumeRequest, - TaskDetailsRequest, Update, + State as InstrumentState, TaskDetailsRequest, Update, }; use console_api::tasks::TaskDetails; use futures::stream::StreamExt; @@ -30,11 +31,18 @@ pub struct Connection { enum State { Connected { client: InstrumentClient, - stream: Box>, + update_stream: Box>, + state_stream: Box>, }, Disconnected(Duration), } +#[allow(clippy::large_enum_variant)] +pub(crate) enum Message { + Update(Update), + State(InstrumentState), +} + macro_rules! with_client { ($me:ident, $client:ident, $block:expr) => ({ loop { @@ -110,9 +118,16 @@ impl Connection { } }; let mut client = InstrumentClient::new(channel); - let request = tonic::Request::new(InstrumentRequest {}); - let stream = Box::new(client.watch_updates(request).await?.into_inner()); - Ok::>(State::Connected { client, stream }) + let update_request = tonic::Request::new(InstrumentRequest {}); + let update_stream = + Box::new(client.watch_updates(update_request).await?.into_inner()); + let state_request = tonic::Request::new(StateRequest {}); + let state_stream = Box::new(client.watch_state(state_request).await?.into_inner()); + Ok::>(State::Connected { + client, + update_stream, + state_stream, + }) }; self.state = match try_connect.await { Ok(connected) => { @@ -128,20 +143,39 @@ impl Connection { } } - pub async fn next_update(&mut self) -> Update { + pub async fn next_message(&mut self) -> Message { loop { - match self.state { - State::Connected { ref mut stream, .. } => match stream.next().await { - Some(Ok(update)) => return update, - Some(Err(status)) => { - tracing::warn!(%status, "error from stream"); - self.state = State::Disconnected(Self::BACKOFF); + match &mut self.state { + State::Connected { + update_stream, + state_stream, + .. + } => { + tokio::select! { biased; // Always biased to update stream. + update = update_stream.next() => match update { + Some(Ok(update)) => return Message::Update(update), + Some(Err(status)) => { + tracing::warn!(%status, "error from update stream"); + self.state = State::Disconnected(Self::BACKOFF); + } + None => { + tracing::error!("update stream closed by server"); + self.state = State::Disconnected(Self::BACKOFF); + } + }, + state = state_stream.next() => match state { + Some(Ok(state)) => return Message::State(state), + Some(Err(status)) => { + tracing::warn!(%status, "error from state stream"); + self.state = State::Disconnected(Self::BACKOFF); + } + None => { + tracing::error!("state stream closed by server"); + self.state = State::Disconnected(Self::BACKOFF); + } + }, } - None => { - tracing::error!("stream closed by server"); - self.state = State::Disconnected(Self::BACKOFF); - } - }, + } State::Disconnected(_) => self.connect().await, } } diff --git a/tokio-console/src/main.rs b/tokio-console/src/main.rs index 6b8ed54cf..45b8a19b4 100644 --- a/tokio-console/src/main.rs +++ b/tokio-console/src/main.rs @@ -1,6 +1,6 @@ use color_eyre::{eyre::eyre, Help, SectionExt}; use console_api::tasks::TaskDetails; -use state::State; +use state::{State, Temporality}; use futures::stream::StreamExt; use ratatui::{ @@ -95,10 +95,10 @@ async fn main() -> color_eyre::Result<()> { if input::is_space(&input) { if state.is_paused() { conn.resume().await; - state.resume(); + state.start_unpausing(); } else { conn.pause().await; - state.pause(); + state.start_pausing(); } } @@ -124,8 +124,15 @@ async fn main() -> color_eyre::Result<()> { _ => {} } }, - instrument_update = conn.next_update() => { - state.update(&view.styles, view.current_view(), instrument_update); + instrument_message = conn.next_message() => { + match instrument_message { + conn::Message::Update(update) => { + state.update(&view.styles, view.current_view(), update); + }, + conn::Message::State(state_update) => { + state.update_state(state_update); + } + } } details_update = details_rx.recv() => { if let Some(details_update) = details_update { @@ -148,8 +155,17 @@ async fn main() -> color_eyre::Result<()> { .split(f.size()); let mut header_text = conn.render(&view.styles); - if state.is_paused() { - header_text.push_span(Span::styled(" PAUSED", view.styles.fg(Color::Red))); + match state.temporality() { + Temporality::Paused => { + header_text.push_span(Span::styled(" PAUSED", view.styles.fg(Color::Red))); + } + Temporality::Pausing => { + header_text.push_span(Span::styled(" PAUSING", view.styles.fg(Color::Yellow))); + } + Temporality::Unpausing => { + header_text.push_span(Span::styled(" UNPAUSING", view.styles.fg(Color::Green))); + } + Temporality::Live => {} } let dropped_async_ops_state = state.async_ops_state().dropped_events(); let dropped_tasks_state = state.tasks_state().dropped_events(); diff --git a/tokio-console/src/state/mod.rs b/tokio-console/src/state/mod.rs index 3ff032aca..4693f8cce 100644 --- a/tokio-console/src/state/mod.rs +++ b/tokio-console/src/state/mod.rs @@ -72,11 +72,28 @@ pub(crate) enum FieldValue { } #[derive(Debug)] -enum Temporality { +pub(crate) enum Temporality { + Unpausing, Live, + Pausing, Paused, } +impl Default for Temporality { + fn default() -> Self { + Self::Live + } +} + +impl From for Temporality { + fn from(pb: proto::instrument::Temporality) -> Self { + match pb { + proto::instrument::Temporality::Live => Self::Live, + proto::instrument::Temporality::Paused => Self::Paused, + } + } +} + #[derive(Debug, Eq, PartialEq)] pub(crate) struct Attribute { field: Field, @@ -236,23 +253,26 @@ impl State { } // temporality methods + pub(crate) fn temporality(&self) -> &Temporality { + &self.temporality + } - pub(crate) fn pause(&mut self) { - self.temporality = Temporality::Paused; + pub(crate) fn start_unpausing(&mut self) { + self.temporality = Temporality::Unpausing; } - pub(crate) fn resume(&mut self) { - self.temporality = Temporality::Live; + pub(crate) fn start_pausing(&mut self) { + self.temporality = Temporality::Pausing; } - pub(crate) fn is_paused(&self) -> bool { - matches!(self.temporality, Temporality::Paused) + pub(crate) fn update_state(&mut self, state: proto::instrument::State) { + self.temporality = proto::instrument::Temporality::try_from(state.temporality) + .expect("invalid temporality") + .into(); } -} -impl Default for Temporality { - fn default() -> Self { - Self::Live + pub(crate) fn is_paused(&self) -> bool { + matches!(self.temporality, Temporality::Paused | Temporality::Pausing) } }