diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index eb528e0ce..563491f63 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -34,7 +34,10 @@ use aggregator::Aggregator; pub use builder::Builder; use callsites::Callsites; use stack::SpanStack; -use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor}; +use visitors::{ + AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, StateAttributeVisitor, TaskVisitor, + WakerVisitor, +}; pub use builder::{init, spawn}; @@ -324,6 +327,36 @@ impl ConsoleLayer { }; (layer, server) } + + fn send_attribute_updates( + &self, + update_id: &span::Id, + update_meta: &'static Metadata<'static>, + updates: Vec, + ) { + let update_type = if self.async_op_state_update_callsites.contains(update_meta) { + UpdateType::AsyncOp + } else { + UpdateType::Resource + }; + + let dropped = if self.async_op_state_update_callsites.contains(update_meta) { + &self.shared.dropped_async_ops + } else { + &self.shared.dropped_resources + }; + + for update in updates.into_iter() { + self.send( + dropped, + Event::StateUpdate { + update_id: update_id.clone(), + update_type: update_type.clone(), + update, + }, + ); + } + } } impl ConsoleLayer { @@ -539,7 +572,7 @@ where let parent_id = self.current_spans.get().and_then(|stack| { self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx)) }); - self.send( + let sent = self.send( &self.shared.dropped_resources, Event::Resource { id: id.clone(), @@ -552,7 +585,15 @@ where is_internal, inherit_child_attrs, }, - ) + ); + + if sent { + let mut attribute_visitor = StateAttributeVisitor::new(metadata.into()); + attrs.record(&mut attribute_visitor); + self.send_attribute_updates(id, metadata, attribute_visitor.updates) + } + + sent } else { // else unknown resource span format false @@ -571,7 +612,7 @@ where }); if let Some(resource_id) = resource_id { - self.send( + let sent = self.send( &self.shared.dropped_async_ops, Event::AsyncResourceOp { id: id.clone(), @@ -582,7 +623,15 @@ where source, inherit_child_attrs, }, - ) + ); + + if sent { + let mut attribute_visitor = StateAttributeVisitor::new(metadata.into()); + attrs.record(&mut attribute_visitor); + self.send_attribute_updates(id, metadata, attribute_visitor.updates) + } + + sent } else { false } @@ -715,6 +764,19 @@ where } } + fn on_record(&self, id: &span::Id, values: &span::Record<'_>, cx: Context<'_, S>) { + if !self.is_id_tracked(id, &cx) { + return; + } + + if let Some(span) = cx.span(id) { + let metadata = span.metadata(); + let mut attribute_visitor = StateAttributeVisitor::new(metadata.into()); + values.record(&mut attribute_visitor); + self.send_attribute_updates(id, metadata, attribute_visitor.updates) + } + } + fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) { if !self.is_id_tracked(id, &cx) { return; diff --git a/console-subscriber/src/visitors.rs b/console-subscriber/src/visitors.rs index f0027094b..20b1b451d 100644 --- a/console-subscriber/src/visitors.rs +++ b/console-subscriber/src/visitors.rs @@ -533,3 +533,111 @@ impl Visit for StateUpdateVisitor { } } } + +pub(crate) struct StateAttributeVisitor { + meta_id: proto::MetaId, + pub(crate) updates: Vec, +} + +/// Used to extract the data needed to construct +/// an Event::StateUpdate from the the metadata of +/// a span. A span field that represents a state update +/// has the form of `state.name.unit.delta` where +/// the `delta` suffix is optional and indicates that +/// this update represents a positive or negative change +/// in the value of the attribute. +/// +/// State updates can be emitted during the +/// creation of a span or the recording of a field's +/// value: +/// +/// tracing::trace_span!( +/// "runtime.resource", +/// concrete_type = "Sleep", +/// kind = "timer", +/// state.duration.ms = duration, +/// ); +/// +/// resource_span.record("state.duration.ms", &duration); +impl StateAttributeVisitor { + const STATE_PREFIX: &'static str = "state."; + const DELTA: &'static str = "delta"; + + pub(crate) fn new(meta_id: proto::MetaId) -> Self { + Self { + meta_id, + updates: Vec::default(), + } + } + + fn extract(&self, field: &field::Field) -> Option<(AttributeUpdate, bool)> { + if field.name().starts_with(Self::STATE_PREFIX) { + let mut parts = field.name().split('.'); + parts.next(); + if let Some(name) = parts.next() { + let unit = parts.next().filter(|u| *u != Self::DELTA); + let is_delta = field.name().contains(Self::DELTA); + let field = proto::Field { + name: Some(name.into()), + value: None, + metadata_id: Some(self.meta_id.clone()), + }; + + let upd = AttributeUpdate { + field, + op: Some(AttributeUpdateOp::Override), + unit: unit.map(String::from), + }; + return Some((upd, is_delta)); + } + } + None + } +} + +impl Visit for StateAttributeVisitor { + fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) { + if let Some((mut upd, _)) = self.extract(field) { + upd.field.value = Some(value.into()); + self.updates.push(upd); + } + } + + fn record_i64(&mut self, field: &field::Field, value: i64) { + if let Some((mut upd, is_delta)) = self.extract(field) { + upd.field.value = Some(value.into()); + if is_delta { + if value < 0 { + upd.op = Some(AttributeUpdateOp::Sub) + } else { + upd.op = Some(AttributeUpdateOp::Add) + } + } + self.updates.push(upd); + } + } + + fn record_u64(&mut self, field: &field::Field, value: u64) { + if let Some((mut upd, is_delta)) = self.extract(field) { + upd.field.value = Some(value.into()); + if is_delta { + upd.op = Some(AttributeUpdateOp::Add) + } + self.updates.push(upd); + } + } + + fn record_bool(&mut self, field: &field::Field, value: bool) { + if let Some((mut upd, _)) = self.extract(field) { + upd.field.value = Some(value.into()); + self.updates.push(upd); + } + } + + fn record_str(&mut self, field: &field::Field, value: &str) { + if let Some((mut upd, _)) = self.extract(field) { + upd.field.value = Some(value.into()); + self.updates.push(upd); + } + } +}