Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/topology-metrics.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `internal_metrics` source now exposes Vector's topology graph as Prometheus metrics via the `component_connections` gauge. Each connection between components is represented as a metric with labels indicating source and target component IDs, types, and kinds, enabling topology visualization and monitoring.

authors: elohmeier
2 changes: 2 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod schema;
mod secret;
mod sink;
mod source;
mod topology_metadata;
mod transform;
pub mod unit_test;
mod validation;
Expand All @@ -62,6 +63,7 @@ pub use provider::ProviderConfig;
pub use secret::SecretBackend;
pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
pub use topology_metadata::{SharedTopologyMetadata, TopologyMetadata};
pub use transform::{
BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
};
Expand Down
9 changes: 8 additions & 1 deletion src/config/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use vector_lib::{
source::Source,
};

use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
use super::{
ComponentKey, ProxyConfig, Resource, SharedTopologyMetadata, dot_graph::GraphConfig, schema,
};
use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};

pub type BoxedSource = Box<dyn SourceConfig>;
Expand Down Expand Up @@ -143,6 +145,9 @@ pub struct SourceContext {
/// Extra context data provided by the running app and shared across all components. This can be
/// used to pass shared settings or other data from outside the components.
pub extra_context: ExtraContext,

/// Optional topology metadata for internal sources to expose topology as metrics
pub topology_metadata: Option<SharedTopologyMetadata>,
}

impl SourceContext {
Expand All @@ -165,6 +170,7 @@ impl SourceContext {
schema_definitions: HashMap::default(),
schema: Default::default(),
extra_context: Default::default(),
topology_metadata: None,
},
shutdown,
)
Expand All @@ -186,6 +192,7 @@ impl SourceContext {
schema_definitions: schema_definitions.unwrap_or_default(),
schema: Default::default(),
extra_context: Default::default(),
topology_metadata: None,
}
}

Expand Down
32 changes: 32 additions & 0 deletions src/config/topology_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

use super::{ComponentKey, OutputId};

/// Metadata about the topology connections and component types
/// Used by internal_metrics source to expose topology as metrics
#[derive(Clone, Debug, Default)]
pub struct TopologyMetadata {
/// Map of component to its inputs
pub inputs: HashMap<ComponentKey, Vec<OutputId>>,
/// Map of component to its (type, kind) tuple
pub component_types: HashMap<ComponentKey, (String, String)>,
}

impl TopologyMetadata {
/// Create a new TopologyMetadata instance
pub fn new() -> Self {
Self::default()
}

/// Clear all metadata
pub fn clear(&mut self) {
self.inputs.clear();
self.component_types.clear();
}
}

/// Thread-safe reference to topology metadata
pub type SharedTopologyMetadata = Arc<RwLock<TopologyMetadata>>;
174 changes: 154 additions & 20 deletions src/sources/internal_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

use chrono::Utc;
use futures::StreamExt;
use serde_with::serde_as;
use tokio::time;
Expand All @@ -14,7 +15,8 @@ use vector_lib::{

use crate::{
SourceSender,
config::{SourceConfig, SourceContext, SourceOutput, log_schema},
config::{SharedTopologyMetadata, SourceConfig, SourceContext, SourceOutput, log_schema},
event::{Metric, MetricKind, MetricTags, MetricValue},
internal_events::{EventsReceived, StreamClosedError},
metrics::Controller,
shutdown::ShutdownSignal,
Expand Down Expand Up @@ -122,6 +124,7 @@ impl SourceConfig for InternalMetricsConfig {
interval,
out: cx.out,
shutdown: cx.shutdown,
topology_metadata: cx.topology_metadata.clone(),
}
.run(),
))
Expand All @@ -144,6 +147,7 @@ struct InternalMetrics<'a> {
interval: time::Duration,
out: SourceSender,
shutdown: ShutdownSignal,
topology_metadata: Option<SharedTopologyMetadata>,
}

impl InternalMetrics<'_> {
Expand All @@ -164,25 +168,35 @@ impl InternalMetrics<'_> {
bytes_received.emit(ByteSize(byte_size));
events_received.emit(CountByteSize(count, json_size));

let batch = metrics.into_iter().map(|mut metric| {
// A metric starts out with a default "vector" namespace, but will be overridden
// if an explicit namespace is provided to this source.
if self.namespace != "vector" {
metric = metric.with_namespace(Some(self.namespace.clone()));
}

if let Some(host_key) = &self.host_key.path
&& let Ok(hostname) = &hostname
{
metric.replace_tag(host_key.to_string(), hostname.to_owned());
}
if let Some(pid_key) = &self.pid_key {
metric.replace_tag(pid_key.to_owned(), pid.clone());
}
metric
});

if (self.out.send_batch(batch).await).is_err() {
let mut batch: Vec<Metric> = metrics
.into_iter()
.map(|mut metric| {
// A metric starts out with a default "vector" namespace, but will be overridden
// if an explicit namespace is provided to this source.
if self.namespace != "vector" {
metric = metric.with_namespace(Some(self.namespace.clone()));
}

if let Some(host_key) = &self.host_key.path
&& let Ok(hostname) = &hostname
{
metric.replace_tag(host_key.to_string(), hostname.to_owned());
}
if let Some(pid_key) = &self.pid_key {
metric.replace_tag(pid_key.to_owned(), pid.clone());
}
metric
})
.collect();

// Add topology metrics if available
if let Some(topology_metadata) = &self.topology_metadata {
let topology = topology_metadata.read().unwrap();
let topology_metrics = generate_topology_metrics(&topology, Utc::now());
batch.extend(topology_metrics);
}

if (self.out.send_batch(batch.into_iter()).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
Expand All @@ -192,6 +206,50 @@ impl InternalMetrics<'_> {
}
}

/// Generate metrics for topology connections
fn generate_topology_metrics(
topology: &crate::config::TopologyMetadata,
timestamp: chrono::DateTime<Utc>,
) -> Vec<Metric> {
let mut metrics = Vec::new();

for (to_component, inputs) in &topology.inputs {
for input in inputs {
let mut tags = MetricTags::default();

// Source component labels
tags.insert("from_component_id".to_string(), input.component.to_string());
if let Some((type_name, kind)) = topology.component_types.get(&input.component) {
tags.insert("from_component_type".to_string(), type_name.clone());
tags.insert("from_component_kind".to_string(), kind.clone());
}
if let Some(port) = &input.port {
tags.insert("from_output".to_string(), port.clone());
}

// Target component labels
tags.insert("to_component_id".to_string(), to_component.to_string());
if let Some((type_name, kind)) = topology.component_types.get(to_component) {
tags.insert("to_component_type".to_string(), type_name.clone());
tags.insert("to_component_kind".to_string(), kind.clone());
}

metrics.push(
Metric::new(
"component_connections",
MetricKind::Absolute,
MetricValue::Gauge { value: 1.0 },
)
.with_namespace(Some("vector".to_string()))
.with_tags(Some(tags))
.with_timestamp(Some(timestamp)),
);
}
}

metrics
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
Expand All @@ -201,6 +259,7 @@ mod tests {

use super::*;
use crate::{
config::{ComponentKey, OutputId},
event::{
Event,
metric::{Metric, MetricValue},
Expand Down Expand Up @@ -344,4 +403,79 @@ mod tests {

assert_eq!(event.as_metric().namespace(), Some(namespace));
}

#[test]
fn test_topology_metrics_generation() {
let mut topology = crate::config::TopologyMetadata::new();

// Add a source -> transform connection
topology.inputs.insert(
ComponentKey::from("my_transform"),
vec![OutputId {
component: ComponentKey::from("my_source"),
port: None,
}],
);

// Add a transform -> sink connection
topology.inputs.insert(
ComponentKey::from("my_sink"),
vec![OutputId {
component: ComponentKey::from("my_transform"),
port: Some("output1".to_string()),
}],
);

// Add component types
topology.component_types.insert(
ComponentKey::from("my_source"),
("file".to_string(), "source".to_string()),
);
topology.component_types.insert(
ComponentKey::from("my_transform"),
("remap".to_string(), "transform".to_string()),
);
topology.component_types.insert(
ComponentKey::from("my_sink"),
("console".to_string(), "sink".to_string()),
);

let timestamp = Utc::now();
let metrics = generate_topology_metrics(&topology, timestamp);

// Should have 2 connection metrics
assert_eq!(metrics.len(), 2);

// Find the source -> transform connection
let source_to_transform = metrics
.iter()
.find(|m| m.tags().and_then(|t| t.get("from_component_id")) == Some("my_source"))
.expect("Should find source -> transform metric");

assert_eq!(source_to_transform.name(), "component_connections");
assert_eq!(source_to_transform.namespace(), Some("vector"));
match source_to_transform.value() {
MetricValue::Gauge { value } => assert_eq!(*value, 1.0),
_ => panic!("Expected gauge metric"),
}

let tags1 = source_to_transform.tags().expect("Should have tags");
assert_eq!(tags1.get("from_component_id"), Some("my_source"));
assert_eq!(tags1.get("from_component_type"), Some("file"));
assert_eq!(tags1.get("from_component_kind"), Some("source"));
assert_eq!(tags1.get("to_component_id"), Some("my_transform"));
assert_eq!(tags1.get("to_component_type"), Some("remap"));
assert_eq!(tags1.get("to_component_kind"), Some("transform"));

// Find the transform -> sink connection
let transform_to_sink = metrics
.iter()
.find(|m| m.tags().and_then(|t| t.get("from_component_id")) == Some("my_transform"))
.expect("Should find transform -> sink metric");

let tags2 = transform_to_sink.tags().expect("Should have tags");
assert_eq!(tags2.get("from_component_id"), Some("my_transform"));
assert_eq!(tags2.get("from_output"), Some("output1"));
assert_eq!(tags2.get("to_component_id"), Some("my_sink"));
}
}
1 change: 1 addition & 0 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ mod test {
schema: Default::default(),
schema_definitions: HashMap::default(),
extra_context: Default::default(),
topology_metadata: None,
})
.await
.unwrap();
Expand Down
Loading
Loading