Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use aggregator::Aggregator;
pub use builder::Builder;
use callsites::Callsites;
use stack::SpanStack;
use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
use visitors::{
AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, StateAttributeVisitor, TaskVisitor,
WakerVisitor,
};

pub use builder::{init, spawn};

Expand Down Expand Up @@ -324,6 +327,36 @@ impl ConsoleLayer {
};
(layer, server)
}

fn send_attribute_updates(
&self,
update_id: &span::Id,
update_meta: &'static Metadata<'static>,
updates: Vec<AttributeUpdate>,
) {
let update_type = if self.async_op_state_update_callsites.contains(update_meta) {
UpdateType::AsyncOp
} else {
UpdateType::Resource
};

let dropped = if self.async_op_state_update_callsites.contains(update_meta) {
&self.shared.dropped_async_ops
} else {
&self.shared.dropped_resources
};

for update in updates.into_iter() {
self.send(
dropped,
Event::StateUpdate {
update_id: update_id.clone(),
update_type: update_type.clone(),
update,
},
);
}
}
}

impl ConsoleLayer {
Expand Down Expand Up @@ -539,7 +572,7 @@ where
let parent_id = self.current_spans.get().and_then(|stack| {
self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
});
self.send(
let sent = self.send(
&self.shared.dropped_resources,
Event::Resource {
id: id.clone(),
Expand All @@ -552,7 +585,15 @@ where
is_internal,
inherit_child_attrs,
},
)
);

if sent {
let mut attribute_visitor = StateAttributeVisitor::new(metadata.into());
attrs.record(&mut attribute_visitor);
self.send_attribute_updates(id, metadata, attribute_visitor.updates)
}

sent
} else {
// else unknown resource span format
false
Expand All @@ -571,7 +612,7 @@ where
});

if let Some(resource_id) = resource_id {
self.send(
let sent = self.send(
&self.shared.dropped_async_ops,
Event::AsyncResourceOp {
id: id.clone(),
Expand All @@ -582,7 +623,15 @@ where
source,
inherit_child_attrs,
},
)
);

if sent {
let mut attribute_visitor = StateAttributeVisitor::new(metadata.into());
attrs.record(&mut attribute_visitor);
self.send_attribute_updates(id, metadata, attribute_visitor.updates)
}

sent
} else {
false
}
Expand Down Expand Up @@ -715,6 +764,19 @@ where
}
}

fn on_record(&self, id: &span::Id, values: &span::Record<'_>, cx: Context<'_, S>) {
if !self.is_id_tracked(id, &cx) {
return;
}

if let Some(span) = cx.span(id) {
let metadata = span.metadata();
let mut attribute_visitor = StateAttributeVisitor::new(metadata.into());
values.record(&mut attribute_visitor);
self.send_attribute_updates(id, metadata, attribute_visitor.updates)
}
}

fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
if !self.is_id_tracked(id, &cx) {
return;
Expand Down
108 changes: 108 additions & 0 deletions console-subscriber/src/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,111 @@ impl Visit for StateUpdateVisitor {
}
}
}

pub(crate) struct StateAttributeVisitor {
meta_id: proto::MetaId,
pub(crate) updates: Vec<AttributeUpdate>,
}

/// Used to extract the data needed to construct
/// an Event::StateUpdate from the the metadata of
/// a span. A span field that represents a state update
/// has the form of `state.name.unit.delta` where
/// the `delta` suffix is optional and indicates that
/// this update represents a positive or negative change
/// in the value of the attribute.
///
/// State updates can be emitted during the
/// creation of a span or the recording of a field's
/// value:
///
/// tracing::trace_span!(
/// "runtime.resource",
/// concrete_type = "Sleep",
/// kind = "timer",
/// state.duration.ms = duration,
/// );
///
/// resource_span.record("state.duration.ms", &duration);
impl StateAttributeVisitor {
const STATE_PREFIX: &'static str = "state.";
const DELTA: &'static str = "delta";

pub(crate) fn new(meta_id: proto::MetaId) -> Self {
Self {
meta_id,
updates: Vec::default(),
}
}

fn extract(&self, field: &field::Field) -> Option<(AttributeUpdate, bool)> {
if field.name().starts_with(Self::STATE_PREFIX) {
let mut parts = field.name().split('.');
parts.next();
if let Some(name) = parts.next() {
let unit = parts.next().filter(|u| *u != Self::DELTA);
let is_delta = field.name().contains(Self::DELTA);
let field = proto::Field {
name: Some(name.into()),
value: None,
metadata_id: Some(self.meta_id.clone()),
};

let upd = AttributeUpdate {
field,
op: Some(AttributeUpdateOp::Override),
unit: unit.map(String::from),
};
return Some((upd, is_delta));
}
}
None
}
}

impl Visit for StateAttributeVisitor {
fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) {
if let Some((mut upd, _)) = self.extract(field) {
upd.field.value = Some(value.into());
self.updates.push(upd);
}
}

fn record_i64(&mut self, field: &field::Field, value: i64) {
if let Some((mut upd, is_delta)) = self.extract(field) {
upd.field.value = Some(value.into());
if is_delta {
if value < 0 {
upd.op = Some(AttributeUpdateOp::Sub)
} else {
upd.op = Some(AttributeUpdateOp::Add)
}
}
self.updates.push(upd);
}
}

fn record_u64(&mut self, field: &field::Field, value: u64) {
if let Some((mut upd, is_delta)) = self.extract(field) {
upd.field.value = Some(value.into());
if is_delta {
upd.op = Some(AttributeUpdateOp::Add)
}
self.updates.push(upd);
}
}

fn record_bool(&mut self, field: &field::Field, value: bool) {
if let Some((mut upd, _)) = self.extract(field) {
upd.field.value = Some(value.into());
self.updates.push(upd);
}
}

fn record_str(&mut self, field: &field::Field, value: &str) {
if let Some((mut upd, _)) = self.extract(field) {
upd.field.value = Some(value.into());
self.updates.push(upd);
}
}
}