From 8881a73e26b0b544a45dcdd28e82b4489442534b Mon Sep 17 00:00:00 2001 From: Enno Richter Date: Wed, 17 Sep 2025 07:00:42 +0200 Subject: [PATCH] feat(internal_metrics source): expose topology connections as metrics Adds the ability to expose Vector's component topology graph as metrics through the internal_metrics source. Each connection between components is represented as a component_connections gauge metric with labels for source and target components. --- changelog.d/topology-metrics.feature.md | 3 + src/config/mod.rs | 2 + src/config/source.rs | 9 +- src/config/topology_metadata.rs | 32 +++++ src/sources/internal_metrics.rs | 174 +++++++++++++++++++++--- src/sources/socket/mod.rs | 1 + src/topology/builder.rs | 71 +++++++++- src/topology/running.rs | 71 +++++++++- 8 files changed, 339 insertions(+), 24 deletions(-) create mode 100644 changelog.d/topology-metrics.feature.md create mode 100644 src/config/topology_metadata.rs diff --git a/changelog.d/topology-metrics.feature.md b/changelog.d/topology-metrics.feature.md new file mode 100644 index 0000000000000..44f921a534479 --- /dev/null +++ b/changelog.d/topology-metrics.feature.md @@ -0,0 +1,3 @@ +The `internal_metrics` source now exposes Vector's topology graph as Prometheus metrics via the `component_connections` gauge. Each connection between components is represented as a metric with labels indicating source and target component IDs, types, and kinds, enabling topology visualization and monitoring. + +authors: elohmeier diff --git a/src/config/mod.rs b/src/config/mod.rs index 71c18a0e339cb..3c1be78786869 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -42,6 +42,7 @@ pub mod schema; mod secret; mod sink; mod source; +mod topology_metadata; mod transform; pub mod unit_test; mod validation; @@ -62,6 +63,7 @@ pub use provider::ProviderConfig; pub use secret::SecretBackend; pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter}; pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter}; +pub use topology_metadata::{SharedTopologyMetadata, TopologyMetadata}; pub use transform::{ BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids, }; diff --git a/src/config/source.rs b/src/config/source.rs index 25fd6ab72915b..4fdc674f4bcff 100644 --- a/src/config/source.rs +++ b/src/config/source.rs @@ -16,7 +16,9 @@ use vector_lib::{ source::Source, }; -use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema}; +use super::{ + ComponentKey, ProxyConfig, Resource, SharedTopologyMetadata, dot_graph::GraphConfig, schema, +}; use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal}; pub type BoxedSource = Box; @@ -143,6 +145,9 @@ pub struct SourceContext { /// Extra context data provided by the running app and shared across all components. This can be /// used to pass shared settings or other data from outside the components. pub extra_context: ExtraContext, + + /// Optional topology metadata for internal sources to expose topology as metrics + pub topology_metadata: Option, } impl SourceContext { @@ -165,6 +170,7 @@ impl SourceContext { schema_definitions: HashMap::default(), schema: Default::default(), extra_context: Default::default(), + topology_metadata: None, }, shutdown, ) @@ -186,6 +192,7 @@ impl SourceContext { schema_definitions: schema_definitions.unwrap_or_default(), schema: Default::default(), extra_context: Default::default(), + topology_metadata: None, } } diff --git a/src/config/topology_metadata.rs b/src/config/topology_metadata.rs new file mode 100644 index 0000000000000..0902bd1acd6ae --- /dev/null +++ b/src/config/topology_metadata.rs @@ -0,0 +1,32 @@ +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use super::{ComponentKey, OutputId}; + +/// Metadata about the topology connections and component types +/// Used by internal_metrics source to expose topology as metrics +#[derive(Clone, Debug, Default)] +pub struct TopologyMetadata { + /// Map of component to its inputs + pub inputs: HashMap>, + /// Map of component to its (type, kind) tuple + pub component_types: HashMap, +} + +impl TopologyMetadata { + /// Create a new TopologyMetadata instance + pub fn new() -> Self { + Self::default() + } + + /// Clear all metadata + pub fn clear(&mut self) { + self.inputs.clear(); + self.component_types.clear(); + } +} + +/// Thread-safe reference to topology metadata +pub type SharedTopologyMetadata = Arc>; diff --git a/src/sources/internal_metrics.rs b/src/sources/internal_metrics.rs index 47b5fc7101902..54a6b9b9790fa 100644 --- a/src/sources/internal_metrics.rs +++ b/src/sources/internal_metrics.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use chrono::Utc; use futures::StreamExt; use serde_with::serde_as; use tokio::time; @@ -14,7 +15,8 @@ use vector_lib::{ use crate::{ SourceSender, - config::{SourceConfig, SourceContext, SourceOutput, log_schema}, + config::{SharedTopologyMetadata, SourceConfig, SourceContext, SourceOutput, log_schema}, + event::{Metric, MetricKind, MetricTags, MetricValue}, internal_events::{EventsReceived, StreamClosedError}, metrics::Controller, shutdown::ShutdownSignal, @@ -122,6 +124,7 @@ impl SourceConfig for InternalMetricsConfig { interval, out: cx.out, shutdown: cx.shutdown, + topology_metadata: cx.topology_metadata.clone(), } .run(), )) @@ -144,6 +147,7 @@ struct InternalMetrics<'a> { interval: time::Duration, out: SourceSender, shutdown: ShutdownSignal, + topology_metadata: Option, } impl InternalMetrics<'_> { @@ -164,25 +168,35 @@ impl InternalMetrics<'_> { bytes_received.emit(ByteSize(byte_size)); events_received.emit(CountByteSize(count, json_size)); - let batch = metrics.into_iter().map(|mut metric| { - // A metric starts out with a default "vector" namespace, but will be overridden - // if an explicit namespace is provided to this source. - if self.namespace != "vector" { - metric = metric.with_namespace(Some(self.namespace.clone())); - } - - if let Some(host_key) = &self.host_key.path - && let Ok(hostname) = &hostname - { - metric.replace_tag(host_key.to_string(), hostname.to_owned()); - } - if let Some(pid_key) = &self.pid_key { - metric.replace_tag(pid_key.to_owned(), pid.clone()); - } - metric - }); - - if (self.out.send_batch(batch).await).is_err() { + let mut batch: Vec = metrics + .into_iter() + .map(|mut metric| { + // A metric starts out with a default "vector" namespace, but will be overridden + // if an explicit namespace is provided to this source. + if self.namespace != "vector" { + metric = metric.with_namespace(Some(self.namespace.clone())); + } + + if let Some(host_key) = &self.host_key.path + && let Ok(hostname) = &hostname + { + metric.replace_tag(host_key.to_string(), hostname.to_owned()); + } + if let Some(pid_key) = &self.pid_key { + metric.replace_tag(pid_key.to_owned(), pid.clone()); + } + metric + }) + .collect(); + + // Add topology metrics if available + if let Some(topology_metadata) = &self.topology_metadata { + let topology = topology_metadata.read().unwrap(); + let topology_metrics = generate_topology_metrics(&topology, Utc::now()); + batch.extend(topology_metrics); + } + + if (self.out.send_batch(batch.into_iter()).await).is_err() { emit!(StreamClosedError { count }); return Err(()); } @@ -192,6 +206,50 @@ impl InternalMetrics<'_> { } } +/// Generate metrics for topology connections +fn generate_topology_metrics( + topology: &crate::config::TopologyMetadata, + timestamp: chrono::DateTime, +) -> Vec { + let mut metrics = Vec::new(); + + for (to_component, inputs) in &topology.inputs { + for input in inputs { + let mut tags = MetricTags::default(); + + // Source component labels + tags.insert("from_component_id".to_string(), input.component.to_string()); + if let Some((type_name, kind)) = topology.component_types.get(&input.component) { + tags.insert("from_component_type".to_string(), type_name.clone()); + tags.insert("from_component_kind".to_string(), kind.clone()); + } + if let Some(port) = &input.port { + tags.insert("from_output".to_string(), port.clone()); + } + + // Target component labels + tags.insert("to_component_id".to_string(), to_component.to_string()); + if let Some((type_name, kind)) = topology.component_types.get(to_component) { + tags.insert("to_component_type".to_string(), type_name.clone()); + tags.insert("to_component_kind".to_string(), kind.clone()); + } + + metrics.push( + Metric::new( + "component_connections", + MetricKind::Absolute, + MetricValue::Gauge { value: 1.0 }, + ) + .with_namespace(Some("vector".to_string())) + .with_tags(Some(tags)) + .with_timestamp(Some(timestamp)), + ); + } + } + + metrics +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -201,6 +259,7 @@ mod tests { use super::*; use crate::{ + config::{ComponentKey, OutputId}, event::{ Event, metric::{Metric, MetricValue}, @@ -344,4 +403,79 @@ mod tests { assert_eq!(event.as_metric().namespace(), Some(namespace)); } + + #[test] + fn test_topology_metrics_generation() { + let mut topology = crate::config::TopologyMetadata::new(); + + // Add a source -> transform connection + topology.inputs.insert( + ComponentKey::from("my_transform"), + vec![OutputId { + component: ComponentKey::from("my_source"), + port: None, + }], + ); + + // Add a transform -> sink connection + topology.inputs.insert( + ComponentKey::from("my_sink"), + vec![OutputId { + component: ComponentKey::from("my_transform"), + port: Some("output1".to_string()), + }], + ); + + // Add component types + topology.component_types.insert( + ComponentKey::from("my_source"), + ("file".to_string(), "source".to_string()), + ); + topology.component_types.insert( + ComponentKey::from("my_transform"), + ("remap".to_string(), "transform".to_string()), + ); + topology.component_types.insert( + ComponentKey::from("my_sink"), + ("console".to_string(), "sink".to_string()), + ); + + let timestamp = Utc::now(); + let metrics = generate_topology_metrics(&topology, timestamp); + + // Should have 2 connection metrics + assert_eq!(metrics.len(), 2); + + // Find the source -> transform connection + let source_to_transform = metrics + .iter() + .find(|m| m.tags().and_then(|t| t.get("from_component_id")) == Some("my_source")) + .expect("Should find source -> transform metric"); + + assert_eq!(source_to_transform.name(), "component_connections"); + assert_eq!(source_to_transform.namespace(), Some("vector")); + match source_to_transform.value() { + MetricValue::Gauge { value } => assert_eq!(*value, 1.0), + _ => panic!("Expected gauge metric"), + } + + let tags1 = source_to_transform.tags().expect("Should have tags"); + assert_eq!(tags1.get("from_component_id"), Some("my_source")); + assert_eq!(tags1.get("from_component_type"), Some("file")); + assert_eq!(tags1.get("from_component_kind"), Some("source")); + assert_eq!(tags1.get("to_component_id"), Some("my_transform")); + assert_eq!(tags1.get("to_component_type"), Some("remap")); + assert_eq!(tags1.get("to_component_kind"), Some("transform")); + + // Find the transform -> sink connection + let transform_to_sink = metrics + .iter() + .find(|m| m.tags().and_then(|t| t.get("from_component_id")) == Some("my_transform")) + .expect("Should find transform -> sink metric"); + + let tags2 = transform_to_sink.tags().expect("Should have tags"); + assert_eq!(tags2.get("from_component_id"), Some("my_transform")); + assert_eq!(tags2.get("from_output"), Some("output1")); + assert_eq!(tags2.get("to_component_id"), Some("my_sink")); + } } diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 9b467426d16ba..83eb2ed13dc67 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -1055,6 +1055,7 @@ mod test { schema: Default::default(), schema_definitions: HashMap::default(), extra_context: Default::default(), + topology_metadata: None, }) .await .unwrap(); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index c38a11c5ceba9..96a92fd76e263 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, future::ready, num::NonZeroUsize, - sync::{Arc, LazyLock, Mutex}, + sync::{Arc, LazyLock, Mutex, RwLock}, time::Instant, }; @@ -41,7 +41,8 @@ use crate::{ SourceSender, config::{ ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, - ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput, + ProxyConfig, SharedTopologyMetadata, SinkContext, SourceContext, TopologyMetadata, + TransformContext, TransformOuter, TransformOutput, }, event::{EventArray, EventContainer}, extra_context::ExtraContext, @@ -84,6 +85,7 @@ struct Builder<'a> { detach_triggers: HashMap, extra_context: ExtraContext, utilization_emitter: UtilizationEmitter, + topology_metadata: Option, } impl<'a> Builder<'a> { @@ -106,9 +108,64 @@ impl<'a> Builder<'a> { detach_triggers: HashMap::new(), extra_context, utilization_emitter: UtilizationEmitter::new(), + topology_metadata: None, } } + /// Create topology metadata from the current configuration + fn create_topology_metadata(&mut self) -> Option { + let mut metadata = TopologyMetadata::new(); + + // Collect inputs for each component + for (key, transform) in self.config.transforms() { + for input in &transform.inputs { + metadata + .inputs + .entry(key.clone()) + .or_default() + .push(input.clone()); + } + metadata.component_types.insert( + key.clone(), + ( + transform.inner.get_component_name().to_string(), + "transform".to_string(), + ), + ); + } + + for (key, sink) in self.config.sinks() { + for input in &sink.inputs { + metadata + .inputs + .entry(key.clone()) + .or_default() + .push(input.clone()); + } + metadata.component_types.insert( + key.clone(), + ( + sink.inner.get_component_name().to_string(), + "sink".to_string(), + ), + ); + } + + for (key, source) in self.config.sources() { + metadata.component_types.insert( + key.clone(), + ( + source.inner.get_component_name().to_string(), + "source".to_string(), + ), + ); + } + + let shared = Arc::new(RwLock::new(metadata)); + self.topology_metadata = Some(Arc::clone(&shared)); + Some(shared) + } + /// Builds the new pieces of the topology found in `self.diff`. async fn build(mut self) -> Result> { let enrichment_tables = self.load_enrichment_tables().await; @@ -130,6 +187,7 @@ impl<'a> Builder<'a> { shutdown_coordinator: self.shutdown_coordinator, detach_triggers: self.detach_triggers, utilization_emitter: Some(self.utilization_emitter), + topology_metadata: self.topology_metadata, }) } else { Err(self.errors) @@ -337,6 +395,13 @@ impl<'a> Builder<'a> { .shutdown_coordinator .register_source(key, INTERNAL_SOURCES.contains(&typetag)); + // Create topology metadata for internal_metrics source + let topology_metadata = if key.id() == "internal_metrics" { + self.create_topology_metadata() + } else { + None + }; + let context = SourceContext { key: key.clone(), globals: self.config.global.clone(), @@ -348,6 +413,7 @@ impl<'a> Builder<'a> { schema_definitions, schema: self.config.schema, extra_context: self.extra_context.clone(), + topology_metadata, }; let source = source.inner.build(context).await; let server = match source { @@ -769,6 +835,7 @@ pub struct TopologyPieces { pub(crate) shutdown_coordinator: SourceShutdownCoordinator, pub(crate) detach_triggers: HashMap, pub(crate) utilization_emitter: Option, + pub(crate) topology_metadata: Option, } impl TopologyPieces { diff --git a/src/topology/running.rs b/src/topology/running.rs index 91368f6869d88..c997160bd76af 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -28,7 +28,10 @@ use super::{ task::{Task, TaskOutput}, }; use crate::{ - config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource}, + config::{ + ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource, + SharedTopologyMetadata, + }, event::EventArray, extra_context::ExtraContext, shutdown::SourceShutdownCoordinator, @@ -56,6 +59,7 @@ pub struct RunningTopology { utilization_task: Option, utilization_task_shutdown_trigger: Option, pending_reload: Option>, + topology_metadata: Option, } impl RunningTopology { @@ -77,6 +81,7 @@ impl RunningTopology { utilization_task: None, utilization_task_shutdown_trigger: None, pending_reload: None, + topology_metadata: None, } } @@ -789,6 +794,15 @@ impl RunningTopology { // sources/transforms, to ensure we're connecting components in order. self.reattach_severed_inputs(diff); + // Update topology metadata if present (for internal_metrics source) + if let Some(ref metadata) = new_pieces.topology_metadata { + self.topology_metadata = Some(Arc::clone(metadata)); + self.update_topology_metadata(); + } else if let Some(ref _metadata) = self.topology_metadata { + // If we already have topology metadata, update it with current configuration + self.update_topology_metadata(); + } + // Broadcast any topology changes to subscribers. if !self.watch.0.is_closed() { let outputs = self @@ -1203,6 +1217,56 @@ impl RunningTopology { .insert(key.clone(), spawn_named(source_task, task_name.as_ref())); } + /// Update the topology metadata with current configuration + fn update_topology_metadata(&mut self) { + if let Some(ref metadata_ref) = self.topology_metadata { + let mut metadata = metadata_ref.write().unwrap(); + metadata.clear(); + + // Collect inputs for each component + for (key, inputs) in &self.inputs_tap_metadata { + for input in inputs { + metadata + .inputs + .entry(key.clone()) + .or_default() + .push(input.clone()); + } + } + + // Collect component types + for (key, transform) in self.config.transforms() { + metadata.component_types.insert( + key.clone(), + ( + transform.inner.get_component_name().to_string(), + "transform".to_string(), + ), + ); + } + + for (key, sink) in self.config.sinks() { + metadata.component_types.insert( + key.clone(), + ( + sink.inner.get_component_name().to_string(), + "sink".to_string(), + ), + ); + } + + for (key, source) in self.config.sources() { + metadata.component_types.insert( + key.clone(), + ( + source.inner.get_component_name().to_string(), + "source".to_string(), + ), + ); + } + } + } + pub async fn start_init_validated( config: Config, extra_context: ExtraContext, @@ -1276,6 +1340,11 @@ impl RunningTopology { { return None; } + // Transfer topology metadata if present + if let Some(ref metadata) = pieces.topology_metadata { + running_topology.topology_metadata = Some(Arc::clone(metadata)); + } + running_topology.connect_diff(&diff, &mut pieces).await; running_topology.spawn_diff(&diff, pieces);