Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions console-api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ fn main() -> Result<(), Box<dyn Error>> {
"../proto/trace.proto",
"../proto/common.proto",
"../proto/tasks.proto",
"../proto/instrument.proto",
"../proto/resources.proto",
];
let dirs = &["../proto"];

Expand Down
1 change: 1 addition & 0 deletions console-api/src/instrument.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tonic::include_proto!("rs.tokio.console.instrument");
2 changes: 2 additions & 0 deletions console-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod common;
pub mod instrument;
pub mod resources;
pub mod tasks;
pub mod trace;
pub use common::*;
1 change: 1 addition & 0 deletions console-api/src/resources.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tonic::include_proto!("rs.tokio.console.resources");
8 changes: 4 additions & 4 deletions console-subscriber/examples/dump.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use console_api::tasks::{tasks_client::TasksClient, TasksRequest};
use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest};
use futures::stream::StreamExt;

#[tokio::main]
Expand All @@ -11,10 +11,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

eprintln!("CONNECTING: {}", target);
let mut client = TasksClient::connect(target).await?;
let mut client = InstrumentClient::connect(target).await?;

let request = tonic::Request::new(TasksRequest {});
let mut stream = client.watch_tasks(request).await?.into_inner();
let request = tonic::Request::new(InstrumentRequest {});
let mut stream = client.watch_updates(request).await?.into_inner();

let mut i: usize = 0;
while let Some(update) = stream.next().await {
Expand Down
37 changes: 24 additions & 13 deletions console-subscriber/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,21 @@ impl Aggregator {
}).collect();
// Send the initial state --- if this fails, the subscription is
// already dead.
if subscription.update(&proto::tasks::TaskUpdate {
new_metadata: Some(proto::RegisterMetadata {
metadata: self.all_metadata.clone(),
}),
new_tasks,
stats_update,
now: Some(now.into()),
}) {
if subscription.update(
&proto::instrument::InstrumentUpdate {
task_update: Some(proto::tasks::TaskUpdate {
new_tasks,
stats_update,
}),
resource_update: None,
now: Some(now.into()),
new_metadata: Some(proto::RegisterMetadata {
metadata: self.all_metadata.clone(),
}),
}


) {
self.watchers.push(subscription)
}
} else {
Expand Down Expand Up @@ -210,11 +217,15 @@ impl Aggregator {
.since_last_update()
.map(|(id, stats)| (id.into_u64(), stats.to_proto()))
.collect();
let update = proto::tasks::TaskUpdate {
new_metadata,
new_tasks,
stats_update,

let update = proto::instrument::InstrumentUpdate {
task_update: Some(proto::tasks::TaskUpdate {
new_tasks,
stats_update,
}),
resource_update: None,
now: Some(now.into()),
new_metadata,
};
self.watchers.retain(|watch: &Watch| watch.update(&update));
}
Expand Down Expand Up @@ -406,7 +417,7 @@ impl<'a, T> Drop for Updating<'a, T> {
}

impl Watch {
fn update(&self, update: &proto::tasks::TaskUpdate) -> bool {
fn update(&self, update: &proto::instrument::InstrumentUpdate) -> bool {
if let Ok(reserve) = self.0.try_reserve() {
reserve.send(Ok(update.clone()));
true
Expand Down
19 changes: 11 additions & 8 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct FieldVisitor {
meta_id: proto::MetaId,
}

struct Watch(mpsc::Sender<Result<proto::tasks::TaskUpdate, tonic::Status>>);
struct Watch(mpsc::Sender<Result<proto::instrument::InstrumentUpdate, tonic::Status>>);

enum Event {
Metadata(&'static Metadata<'static>),
Expand Down Expand Up @@ -277,7 +277,9 @@ impl Server {
let aggregate = tokio::spawn(aggregate.run());
let addr = self.addr;
let res = builder
.add_service(proto::tasks::tasks_server::TasksServer::new(self))
.add_service(proto::instrument::instrument_server::InstrumentServer::new(
self,
))
.serve(addr)
.await;
aggregate.abort();
Expand All @@ -286,13 +288,14 @@ impl Server {
}

#[tonic::async_trait]
impl proto::tasks::tasks_server::Tasks for Server {
type WatchTasksStream =
tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskUpdate, tonic::Status>>;
async fn watch_tasks(
impl proto::instrument::instrument_server::Instrument for Server {
type WatchUpdatesStream = tokio_stream::wrappers::ReceiverStream<
Result<proto::instrument::InstrumentUpdate, tonic::Status>,
>;
async fn watch_updates(
&self,
req: tonic::Request<proto::tasks::TasksRequest>,
) -> Result<tonic::Response<Self::WatchTasksStream>, tonic::Status> {
req: tonic::Request<proto::instrument::InstrumentRequest>,
) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
match req.remote_addr() {
Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
Expand Down
20 changes: 12 additions & 8 deletions console/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use color_eyre::{eyre::eyre, Help, SectionExt};
use console_api::tasks::{tasks_client::TasksClient, TasksRequest};
use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest};
use futures::stream::StreamExt;

use tui::{
Expand Down Expand Up @@ -28,9 +28,9 @@ async fn main() -> color_eyre::Result<()> {
let (mut terminal, _cleanup) = term::init_crossterm()?;
terminal.clear()?;

let mut client = TasksClient::connect(target.clone()).await?;
let request = tonic::Request::new(TasksRequest {});
let mut stream = client.watch_tasks(request).await?.into_inner();
let mut client = InstrumentClient::connect(target.clone()).await?;
let request = tonic::Request::new(InstrumentRequest {});
let mut stream = client.watch_updates(request).await?.into_inner();
let mut tasks = tasks::State::default();
let mut input = input::EventStream::new();
let mut view = view::View::default();
Expand All @@ -45,11 +45,15 @@ async fn main() -> color_eyre::Result<()> {
}
view.update_input(input, &mut tasks);
},
task_update = stream.next() => {
let update = task_update
instrument_update = stream.next() => {
let update = instrument_update
.ok_or_else(|| eyre!("data stream closed by server"))
.with_section(|| "in the future, this should be reconnected automatically...".header("Note:"))?;
tasks.update_tasks(update?);
.with_section(|| "in the future, this should be reconnected automatically...".header("Note:"))??;


if let Some(task_update) = update.task_update {
tasks.update_tasks(task_update,update.now );
}
}
}
terminal.draw(|f| {
Expand Down
8 changes: 6 additions & 2 deletions console/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ impl State {
}
}

pub(crate) fn update_tasks(&mut self, update: proto::tasks::TaskUpdate) {
if let Some(now) = update.now {
pub(crate) fn update_tasks(
&mut self,
update: proto::tasks::TaskUpdate,
now: Option<prost_types::Timestamp>,
) {
if let Some(now) = now {
self.last_updated_at = Some(now.into());
}
let mut stats_update = update.stats_update;
Expand Down
33 changes: 33 additions & 0 deletions proto/instrument.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";

package rs.tokio.console.instrument;

import "google/protobuf/timestamp/timestamp.proto";
import "common.proto";
import "tasks.proto";
import "resources.proto";

service Instrument {
rpc WatchUpdates(InstrumentRequest) returns (stream InstrumentUpdate) {}
}

message InstrumentRequest{
}


message InstrumentUpdate {
// The system time when this update was recorded.
//
// This is the timestamp any durations in the included `Stats` were
// calculated relative to.
google.protobuf.Timestamp now = 1;

// Task state update.
tasks.TaskUpdate task_update = 2;

// Resource state update.
resources.ResourceUpdate resource_update = 3;

// Any new span metadata that was registered since the last update.
common.RegisterMetadata new_metadata = 4;
}
86 changes: 86 additions & 0 deletions proto/resources.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
syntax = "proto3";

package rs.tokio.console.resources;

import "google/protobuf/timestamp/timestamp.proto";
import "google/protobuf/duration.proto";
import "common.proto";

// A resource state update.
message ResourceUpdate {
// A list of new resources that were created since the last `ResourceUpdate` was
// sent.
repeated Resource new_resources = 1;

// Any resource stats that have changed since the last update.
map<uint64, Stats> stats_update = 2;

// Any resource operation updates that have been registered.
repeated ResourceOp resource_ops = 3;
}

// Static data recorded when a new resource is created.
message Resource {
// The resources's ID.
//
// This uniquely identifies this resource across all *currently live*
// resources. This is also the primary way `ResourceOp` messages are
// associated with resources.
common.SpanId id = 1;
// The resources's concrete rust type.
string concrete_type= 3;
// The kind of resource (e.g timer, mutex)
Kind kind = 4;
enum Kind {
TIMER = 0;
}
}

// Task runtime statistics.
message Stats {
// Timestamp of when the task was created.
google.protobuf.Timestamp created_at = 1;
// Timestamp of when the task was dropped.
google.protobuf.Timestamp closed_at = 2;
}

// A resource operation message
//
// Each `ResourceOp` message identifies an update to the state of
// an operation that a task is performing on a resource.
message ResourceOp {
// The id of the resource operation update
common.SpanId id = 1;
// The id of the resource this operation is performed on
common.SpanId resource_id = 2;
// The id of the task that this operation is associated with
common.SpanId task_id = 3;
// The timestamp, representing the point in time this operation update
// has been registered
google.protobuf.Timestamp timestamp = 4;

oneof state {
Invoked invoked = 5;
Done done = 6;
}

// Indicates that an operation has been started
// This can be for example emitted in the beginning
// of a poll method invocation
message Invoked {}
// Indicates that this operation has completed and
// the result that the operation returns.
message Done {
Value result = 1;

enum Value {
// The operation has returned an error
ERROR = 0;
// The operation is redy. //TODO: add the result
READY = 1;
// The operation has returned pending and will be
// called again
PENDING = 2;
}
}
}
17 changes: 2 additions & 15 deletions proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ import "google/protobuf/timestamp/timestamp.proto";
import "google/protobuf/duration.proto";
import "common.proto";

service Tasks {
rpc WatchTasks(TasksRequest) returns (stream TaskUpdate) {}
}

message TasksRequest {
}

// A task state update.
//
// Each `TaskUpdate` contains any task data that has changed since the last
Expand All @@ -27,21 +20,15 @@ message TaskUpdate {
//
// If this is empty, no new tasks were spawned.
repeated Task new_tasks = 1;
// Any new span metadata that was registered since the last update.
common.RegisterMetadata new_metadata = 2;

// Any task stats that have changed since the last update.
//
// This is a map of task IDs (64-bit unsigned integers) to task stats. If a
// task's ID is not included in this map, then its stats have *not* changed
// since the last `TaskUpdate` in which they were present. If a task's ID
// *is* included in this map, the corresponding value represents a complete
// snapshot of that task's stats at in the current time window.
map<uint64, Stats> stats_update = 3;
// The system time when this update was recorded.
//
// This is the timestamp any durations in the included `Stats` were
// calculated relative to.
google.protobuf.Timestamp now = 4;
map<uint64, Stats> stats_update = 2;
}

// Data recorded when a new task is spawned.
Expand Down