Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core-api/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub struct TelemetryOptions {
/// Specifies the aggregation temporality for metric export. Defaults to cumulative.
#[builder(default = "MetricTemporality::Cumulative")]
pub metric_temporality: MetricTemporality,

// A map of tags to be applied to all metrics
#[builder(default)]
pub global_tags: HashMap<String, String>,
}

/// Options for exporting to an OpenTelemetry Collector
Expand Down
18 changes: 12 additions & 6 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use opentelemetry_otlp::WithExportConfig;
use parking_lot::Mutex;
use std::{
cell::RefCell,
collections::VecDeque,
collections::{HashMap, VecDeque},
convert::TryInto,
env,
net::SocketAddr,
Expand Down Expand Up @@ -221,11 +221,12 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
let aggregator = SDKAggSelector { metric_prefix };
match metrics {
MetricsExporter::Prometheus(addr) => {
let srv = runtime.block_on(async move {
let srv = runtime.block_on(async {
PromServer::new(
*addr,
aggregator,
metric_temporality_to_selector(opts.metric_temporality),
&opts.global_tags,
)
})?;
prom_binding = Some(srv.bound_addr());
Expand All @@ -245,7 +246,7 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
runtime::Tokio,
)
.with_period(metric_periodicity.unwrap_or_else(|| Duration::from_secs(1)))
.with_resource(default_resource())
.with_resource(default_resource(&opts.global_tags))
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
Expand All @@ -266,7 +267,8 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
match &tracing.exporter {
TraceExporter::Otel(OtelCollectorOptions { url, headers, .. }) => {
runtime.block_on(async {
let tracer_cfg = Config::default().with_resource(default_resource());
let tracer_cfg =
Config::default().with_resource(default_resource(&opts.global_tags));
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
Expand Down Expand Up @@ -338,8 +340,12 @@ fn default_resource_kvs() -> &'static [KeyValue] {
static INSTANCE: OnceCell<[KeyValue; 1]> = OnceCell::new();
INSTANCE.get_or_init(|| [KeyValue::new("service.name", TELEM_SERVICE_NAME)])
}
fn default_resource() -> Resource {
Resource::new(default_resource_kvs().iter().cloned())

fn default_resource(override_values: &HashMap<String, String>) -> Resource {
let override_kvs = override_values
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()));
Resource::new(default_resource_kvs().iter().cloned()).merge(&Resource::new(override_kvs))
}

fn metric_temporality_to_selector(
Expand Down
5 changes: 3 additions & 2 deletions core/src/telemetry/prometheus_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use opentelemetry::sdk::{
};
use opentelemetry_prometheus::{ExporterBuilder, PrometheusExporter};
use prometheus::{Encoder, TextEncoder};
use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashMap, convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};

/// Exposes prometheus metrics for scraping
pub(super) struct PromServer {
Expand All @@ -24,12 +24,13 @@ impl PromServer {
addr: SocketAddr,
aggregation: impl AggregatorSelector + Send + Sync + 'static,
temporality: impl TemporalitySelector + Send + Sync + 'static,
tags: &HashMap<String, String>,
) -> Result<Self, anyhow::Error> {
let controller =
controllers::basic(processors::factory(aggregation, temporality).with_memory(true))
// Because Prom is pull-based, make this always refresh
.with_collect_period(Duration::from_secs(0))
.with_resource(default_resource())
.with_resource(default_resource(tags))
.build();
let exporter = ExporterBuilder::new(controller).try_init()?;
let bound_addr = AddrIncoming::bind(&addr)?;
Expand Down