diff --git a/core-api/src/telemetry.rs b/core-api/src/telemetry.rs index 32f179c8b..b11fc1439 100644 --- a/core-api/src/telemetry.rs +++ b/core-api/src/telemetry.rs @@ -29,25 +29,21 @@ pub trait CoreTelemetry { #[derive(Debug, Clone, derive_builder::Builder)] #[non_exhaustive] pub struct TelemetryOptions { - /// Optional trace exporter - set as None to disable. - #[builder(setter(into, strip_option), default)] - pub tracing: Option, /// Optional logger - set as None to disable. #[builder(setter(into, strip_option), default)] pub logging: Option, /// Optional metrics exporter - set as None to disable. #[builder(setter(into, strip_option), default)] pub metrics: Option>, - /// If set true, strip the prefix `temporal_` from metrics, if present. Will be removed - /// eventually as the prefix is consistent with other SDKs. - #[builder(default)] - pub no_temporal_prefix_for_metrics: bool, /// If set true (the default) explicitly attach a `service_name` label to all metrics. Turn this /// off if your collection system supports the `target_info` metric from the OpenMetrics spec. /// For more, see /// [here](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems) #[builder(default = "true")] pub attach_service_name: bool, + /// A prefix to be applied to all core-created metrics. Defaults to "temporal_". + #[builder(default = "METRIC_PREFIX.to_string()")] + pub metric_prefix: String, } /// Options for exporting to an OpenTelemetry Collector @@ -67,9 +63,6 @@ pub struct OtelCollectorOptions { // A map of tags to be applied to all metrics #[builder(default)] pub global_tags: HashMap, - /// A prefix to be applied to all metrics. Defaults to "temporal_". - #[builder(default = "METRIC_PREFIX")] - pub metric_prefix: &'static str, } /// Options for exporting metrics to Prometheus @@ -79,9 +72,6 @@ pub struct PrometheusExporterOptions { // A map of tags to be applied to all metrics #[builder(default)] pub global_tags: HashMap, - /// A prefix to be applied to all metrics. Defaults to "temporal_". - #[builder(default = "METRIC_PREFIX")] - pub metric_prefix: &'static str, /// If set true, all counters will include a "_total" suffix #[builder(default = "false")] pub counters_total_suffix: bool, @@ -91,23 +81,6 @@ pub struct PrometheusExporterOptions { pub unit_suffix: bool, } -/// Configuration for the external export of traces -#[derive(Debug, Clone)] -pub struct TraceExportConfig { - /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string. - pub filter: String, - /// Where they should go - pub exporter: TraceExporter, -} - -/// Control where traces are exported. -#[derive(Debug, Clone)] -pub enum TraceExporter { - // TODO: Remove - /// Export traces to an OpenTelemetry Collector . - Otel(OtelCollectorOptions), -} - /// Control where logs go #[derive(Debug, Clone)] pub enum Logger { diff --git a/core/src/lib.rs b/core/src/lib.rs index 8629c7851..475950cda 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -55,10 +55,7 @@ use std::sync::Arc; use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics}; use temporal_sdk_core_api::{ errors::{CompleteActivityError, PollActivityError, PollWfError}, - telemetry::{ - metrics::{CoreMeter, TemporalMeter}, - TelemetryOptions, - }, + telemetry::TelemetryOptions, Worker as WorkerTrait, }; use temporal_sdk_core_protos::coresdk::ActivityHeartbeat; @@ -265,27 +262,14 @@ impl CoreRuntime { self.runtime_handle.clone() } - /// Returns the metric meter used for recording metrics, if they were enabled. - pub fn metric_meter(&self) -> Option { - self.telemetry.get_metric_meter() - } - - /// Return the trace subscriber associated with the telemetry options/instance. Can be used - /// to manually set the default for a thread or globally using the `tracing` crate, or with - /// [set_trace_subscriber_for_current_thread] - pub fn trace_subscriber(&self) -> Arc { - self.telemetry.trace_subscriber() - } - /// Return a reference to the owned [TelemetryInstance] pub fn telemetry(&self) -> &TelemetryInstance { &self.telemetry } - /// Some metric meters cannot be initialized until after a tokio runtime has started and after - /// other telemetry has initted (ex: prometheus). They can be attached here. - pub fn attach_late_init_metrics(&mut self, meter: Arc) { - self.telemetry.attach_late_init_metrics(meter) + /// Return a mutable reference to the owned [TelemetryInstance] + pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance { + &mut self.telemetry } } diff --git a/core/src/telemetry/metrics.rs b/core/src/telemetry/metrics.rs index e2a9351a6..e224f6f1b 100644 --- a/core/src/telemetry/metrics.rs +++ b/core/src/telemetry/metrics.rs @@ -89,7 +89,7 @@ impl MetricsContext { } pub(crate) fn top_level(namespace: String, tq: String, telemetry: &TelemetryInstance) -> Self { - if let Some(mut meter) = telemetry.get_metric_meter() { + if let Some(mut meter) = telemetry.get_temporal_metric_meter() { meter .default_attribs .attributes @@ -778,7 +778,7 @@ impl Gauge for MemoryGaugeU64 { #[derive(Debug, derive_more::Constructor)] pub(crate) struct PrefixedMetricsMeter { - prefix: &'static str, + prefix: String, meter: CM, } impl CoreMeter for PrefixedMetricsMeter { @@ -787,17 +787,17 @@ impl CoreMeter for PrefixedMetricsMeter { } fn counter(&self, mut params: MetricParameters) -> Arc { - params.name = (self.prefix.to_string() + &*params.name).into(); + params.name = (self.prefix.clone() + &*params.name).into(); self.meter.counter(params) } fn histogram(&self, mut params: MetricParameters) -> Arc { - params.name = (self.prefix.to_string() + &*params.name).into(); + params.name = (self.prefix.clone() + &*params.name).into(); self.meter.histogram(params) } fn gauge(&self, mut params: MetricParameters) -> Arc { - params.name = (self.prefix.to_string() + &*params.name).into(); + params.name = (self.prefix.clone() + &*params.name).into(); self.meter.gauge(params) } } @@ -815,7 +815,7 @@ mod tests { let telem_instance = TelemetryInstance::new( no_op_subscriber, None, - METRIC_PREFIX, + METRIC_PREFIX.to_string(), Some(call_buffer.clone()), true, ); diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index ab36ac6d8..36828614a 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -47,7 +47,7 @@ pub fn construct_filter_string(core_level: Level, other_level: Level) -> String /// Holds initialized tracing/metrics exporters, etc pub struct TelemetryInstance { - metric_prefix: &'static str, + metric_prefix: String, logs_out: Option>, metrics: Option>, trace_subscriber: Arc, @@ -58,7 +58,7 @@ impl TelemetryInstance { fn new( trace_subscriber: Arc, logs_out: Option>, - metric_prefix: &'static str, + metric_prefix: String, metrics: Option>, attach_service_name: bool, ) -> Self { @@ -71,8 +71,9 @@ impl TelemetryInstance { } } - /// Returns a trace subscriber which can be used with the tracing crate, or with our own - /// [set_trace_subscriber_for_current_thread] function. + /// Return the trace subscriber associated with the telemetry options/instance. Can be used + /// to manually set the default for a thread or globally using the `tracing` crate, or with + /// [set_trace_subscriber_for_current_thread] pub fn trace_subscriber(&self) -> Arc { self.trace_subscriber.clone() } @@ -83,21 +84,37 @@ impl TelemetryInstance { self.metrics = Some(meter); } - /// Returns our wrapper for metric meters, can be used to, ex: initialize clients - pub fn get_metric_meter(&self) -> Option { + /// Returns our wrapper for metric meters, including the `metric_prefix` from + /// [TelemetryOptions]. This should be used to initialize clients or for any other + /// temporal-owned metrics. User defined metrics should use [Self::get_metric_meter]. + pub fn get_temporal_metric_meter(&self) -> Option { self.metrics.clone().map(|m| { - let kvs = if self.attach_service_name { - vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)] - } else { - vec![] - }; + let kvs = self.default_kvs(); let attribs = MetricsAttributesOptions::new(kvs); TemporalMeter::new( - Arc::new(PrefixedMetricsMeter::new(self.metric_prefix, m)) as Arc, + Arc::new(PrefixedMetricsMeter::new(self.metric_prefix.clone(), m)) + as Arc, attribs, ) }) } + + /// Returns our wrapper for metric meters, including attaching the service name if enabled. + pub fn get_metric_meter(&self) -> Option { + self.metrics.clone().map(|m| { + let kvs = self.default_kvs(); + let attribs = MetricsAttributesOptions::new(kvs); + TemporalMeter::new(m, attribs) + }) + } + + fn default_kvs(&self) -> Vec { + if self.attach_service_name { + vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)] + } else { + vec![] + } + } } thread_local! { @@ -122,14 +139,6 @@ pub fn remove_trace_subscriber_for_current_thread() { SUB_GUARD.with(|sg| sg.take()); } -fn metric_prefix(opts: &TelemetryOptions) -> &'static str { - if opts.no_temporal_prefix_for_metrics { - "" - } else { - "temporal_" - } -} - impl CoreTelemetry for TelemetryInstance { fn fetch_buffered_logs(&self) -> Vec { if let Some(logs_out) = self.logs_out.as_ref() { @@ -155,7 +164,6 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result Result TelemetryOptions { .url(url) .build() .unwrap(); - ob.tracing(TraceExportConfig { - filter: filter_string.clone(), - exporter: TraceExporter::Otel(opts.clone()), - }); ob.metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap()) as Arc); } if let Some(addr) = env::var(PROM_ENABLE_ENV_VAR) diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index c627bb294..717993ef6 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -3,7 +3,10 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService}; use temporal_sdk_core::{init_worker, telemetry::start_prometheus_metric_exporter, CoreRuntime}; use temporal_sdk_core_api::{ - telemetry::{metrics::CoreMeter, PrometheusExporterOptionsBuilder, TelemetryOptions}, + telemetry::{ + metrics::{CoreMeter, MetricAttributes, MetricParameters}, + PrometheusExporterOptionsBuilder, TelemetryOptions, + }, worker::WorkerConfigBuilder, Worker, }; @@ -69,7 +72,7 @@ async fn prometheus_metrics_exported() { let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let opts = get_integ_server_options(); let mut raw_client = opts - .connect_no_namespace(rt.metric_meter(), None) + .connect_no_namespace(rt.telemetry().get_temporal_metric_meter(), None) .await .unwrap(); assert!(raw_client.get_client().capabilities().is_some()); @@ -88,6 +91,17 @@ async fn prometheus_metrics_exported() { )); // Verify counter names are appropriate (don't end w/ '_total') assert!(body.contains("temporal_request{")); + // Verify non-temporal metrics meter does not prefix + let mm = rt.telemetry().get_metric_meter().unwrap(); + let g = mm.inner.gauge(MetricParameters::from("mygauge")); + g.record( + 42, + &MetricAttributes::OTel { + kvs: Arc::new(vec![]), + }, + ); + let body = get_text(format!("http://{addr}/metrics")).await; + assert!(body.contains("\nmygauge 42")); } #[tokio::test] @@ -434,11 +448,12 @@ fn runtime_new() { let handle = rt.tokio_handle(); let _rt = handle.enter(); let (telemopts, addr, _aborter) = prom_metrics(); - rt.attach_late_init_metrics(telemopts.metrics.unwrap()); + rt.telemetry_mut() + .attach_late_init_metrics(telemopts.metrics.unwrap()); let opts = get_integ_server_options(); handle.block_on(async { let mut raw_client = opts - .connect_no_namespace(rt.metric_meter(), None) + .connect_no_namespace(rt.telemetry().get_temporal_metric_meter(), None) .await .unwrap(); assert!(raw_client.get_client().capabilities().is_some()); diff --git a/tests/main.rs b/tests/main.rs index d01f4f3e4..92654a459 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -37,7 +37,7 @@ mod integ_tests { let opts = get_integ_server_options(); let runtime = CoreRuntime::new_assume_tokio(get_integ_telem_options()).unwrap(); let mut retrying_client = opts - .connect_no_namespace(runtime.metric_meter(), None) + .connect_no_namespace(runtime.telemetry().get_temporal_metric_meter(), None) .await .unwrap();