Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions core-api/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,21 @@ pub trait CoreTelemetry {
#[derive(Debug, Clone, derive_builder::Builder)]
#[non_exhaustive]
pub struct TelemetryOptions {
/// Optional trace exporter - set as None to disable.
#[builder(setter(into, strip_option), default)]
pub tracing: Option<TraceExportConfig>,
/// Optional logger - set as None to disable.
#[builder(setter(into, strip_option), default)]
pub logging: Option<Logger>,
/// Optional metrics exporter - set as None to disable.
#[builder(setter(into, strip_option), default)]
pub metrics: Option<Arc<dyn CoreMeter>>,
/// If set true, strip the prefix `temporal_` from metrics, if present. Will be removed
/// eventually as the prefix is consistent with other SDKs.
#[builder(default)]
pub no_temporal_prefix_for_metrics: bool,
/// If set true (the default) explicitly attach a `service_name` label to all metrics. Turn this
/// off if your collection system supports the `target_info` metric from the OpenMetrics spec.
/// For more, see
/// [here](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
#[builder(default = "true")]
pub attach_service_name: bool,
/// A prefix to be applied to all core-created metrics. Defaults to "temporal_".
#[builder(default = "METRIC_PREFIX.to_string()")]
pub metric_prefix: String,
}

/// Options for exporting to an OpenTelemetry Collector
Expand All @@ -67,9 +63,6 @@ pub struct OtelCollectorOptions {
// A map of tags to be applied to all metrics
#[builder(default)]
pub global_tags: HashMap<String, String>,
/// A prefix to be applied to all metrics. Defaults to "temporal_".
#[builder(default = "METRIC_PREFIX")]
pub metric_prefix: &'static str,
}

/// Options for exporting metrics to Prometheus
Expand All @@ -79,9 +72,6 @@ pub struct PrometheusExporterOptions {
// A map of tags to be applied to all metrics
#[builder(default)]
pub global_tags: HashMap<String, String>,
/// A prefix to be applied to all metrics. Defaults to "temporal_".
#[builder(default = "METRIC_PREFIX")]
pub metric_prefix: &'static str,
/// If set true, all counters will include a "_total" suffix
#[builder(default = "false")]
pub counters_total_suffix: bool,
Expand All @@ -91,23 +81,6 @@ pub struct PrometheusExporterOptions {
pub unit_suffix: bool,
}

/// Configuration for the external export of traces
#[derive(Debug, Clone)]
pub struct TraceExportConfig {
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
pub filter: String,
/// Where they should go
pub exporter: TraceExporter,
}

/// Control where traces are exported.
#[derive(Debug, Clone)]
pub enum TraceExporter {
// TODO: Remove
/// Export traces to an OpenTelemetry Collector <https://opentelemetry.io/docs/collector/>.
Otel(OtelCollectorOptions),
}

/// Control where logs go
#[derive(Debug, Clone)]
pub enum Logger {
Expand Down
24 changes: 4 additions & 20 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ use std::sync::Arc;
use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics};
use temporal_sdk_core_api::{
errors::{CompleteActivityError, PollActivityError, PollWfError},
telemetry::{
metrics::{CoreMeter, TemporalMeter},
TelemetryOptions,
},
telemetry::TelemetryOptions,
Worker as WorkerTrait,
};
use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
Expand Down Expand Up @@ -265,27 +262,14 @@ impl CoreRuntime {
self.runtime_handle.clone()
}

/// Returns the metric meter used for recording metrics, if they were enabled.
pub fn metric_meter(&self) -> Option<TemporalMeter> {
self.telemetry.get_metric_meter()
}

/// Return the trace subscriber associated with the telemetry options/instance. Can be used
/// to manually set the default for a thread or globally using the `tracing` crate, or with
/// [set_trace_subscriber_for_current_thread]
pub fn trace_subscriber(&self) -> Arc<dyn tracing::Subscriber + Send + Sync> {
self.telemetry.trace_subscriber()
}

/// Return a reference to the owned [TelemetryInstance]
pub fn telemetry(&self) -> &TelemetryInstance {
&self.telemetry
}

/// Some metric meters cannot be initialized until after a tokio runtime has started and after
/// other telemetry has initted (ex: prometheus). They can be attached here.
pub fn attach_late_init_metrics(&mut self, meter: Arc<dyn CoreMeter + 'static>) {
self.telemetry.attach_late_init_metrics(meter)
/// Return a mutable reference to the owned [TelemetryInstance]
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
&mut self.telemetry
}
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl MetricsContext {
}

pub(crate) fn top_level(namespace: String, tq: String, telemetry: &TelemetryInstance) -> Self {
if let Some(mut meter) = telemetry.get_metric_meter() {
if let Some(mut meter) = telemetry.get_temporal_metric_meter() {
meter
.default_attribs
.attributes
Expand Down Expand Up @@ -778,7 +778,7 @@ impl Gauge for MemoryGaugeU64 {

#[derive(Debug, derive_more::Constructor)]
pub(crate) struct PrefixedMetricsMeter<CM> {
prefix: &'static str,
prefix: String,
meter: CM,
}
impl<CM: CoreMeter> CoreMeter for PrefixedMetricsMeter<CM> {
Expand All @@ -787,17 +787,17 @@ impl<CM: CoreMeter> CoreMeter for PrefixedMetricsMeter<CM> {
}

fn counter(&self, mut params: MetricParameters) -> Arc<dyn Counter> {
params.name = (self.prefix.to_string() + &*params.name).into();
params.name = (self.prefix.clone() + &*params.name).into();
self.meter.counter(params)
}

fn histogram(&self, mut params: MetricParameters) -> Arc<dyn Histogram> {
params.name = (self.prefix.to_string() + &*params.name).into();
params.name = (self.prefix.clone() + &*params.name).into();
self.meter.histogram(params)
}

fn gauge(&self, mut params: MetricParameters) -> Arc<dyn Gauge> {
params.name = (self.prefix.to_string() + &*params.name).into();
params.name = (self.prefix.clone() + &*params.name).into();
self.meter.gauge(params)
}
}
Expand All @@ -815,7 +815,7 @@ mod tests {
let telem_instance = TelemetryInstance::new(
no_op_subscriber,
None,
METRIC_PREFIX,
METRIC_PREFIX.to_string(),
Some(call_buffer.clone()),
true,
);
Expand Down
52 changes: 30 additions & 22 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub fn construct_filter_string(core_level: Level, other_level: Level) -> String

/// Holds initialized tracing/metrics exporters, etc
pub struct TelemetryInstance {
metric_prefix: &'static str,
metric_prefix: String,
logs_out: Option<Mutex<CoreLogsOut>>,
metrics: Option<Arc<dyn CoreMeter + 'static>>,
trace_subscriber: Arc<dyn Subscriber + Send + Sync>,
Expand All @@ -58,7 +58,7 @@ impl TelemetryInstance {
fn new(
trace_subscriber: Arc<dyn Subscriber + Send + Sync>,
logs_out: Option<Mutex<CoreLogsOut>>,
metric_prefix: &'static str,
metric_prefix: String,
metrics: Option<Arc<dyn CoreMeter + 'static>>,
attach_service_name: bool,
) -> Self {
Expand All @@ -71,8 +71,9 @@ impl TelemetryInstance {
}
}

/// Returns a trace subscriber which can be used with the tracing crate, or with our own
/// [set_trace_subscriber_for_current_thread] function.
/// Return the trace subscriber associated with the telemetry options/instance. Can be used
/// to manually set the default for a thread or globally using the `tracing` crate, or with
/// [set_trace_subscriber_for_current_thread]
pub fn trace_subscriber(&self) -> Arc<dyn Subscriber + Send + Sync> {
self.trace_subscriber.clone()
}
Expand All @@ -83,21 +84,37 @@ impl TelemetryInstance {
self.metrics = Some(meter);
}

/// Returns our wrapper for metric meters, can be used to, ex: initialize clients
pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
/// Returns our wrapper for metric meters, including the `metric_prefix` from
/// [TelemetryOptions]. This should be used to initialize clients or for any other
/// temporal-owned metrics. User defined metrics should use [Self::get_metric_meter].
pub fn get_temporal_metric_meter(&self) -> Option<TemporalMeter> {
self.metrics.clone().map(|m| {
let kvs = if self.attach_service_name {
vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)]
} else {
vec![]
};
let kvs = self.default_kvs();
let attribs = MetricsAttributesOptions::new(kvs);
TemporalMeter::new(
Arc::new(PrefixedMetricsMeter::new(self.metric_prefix, m)) as Arc<dyn CoreMeter>,
Arc::new(PrefixedMetricsMeter::new(self.metric_prefix.clone(), m))
as Arc<dyn CoreMeter>,
attribs,
)
})
}

/// Returns our wrapper for metric meters, including attaching the service name if enabled.
pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
self.metrics.clone().map(|m| {
let kvs = self.default_kvs();
let attribs = MetricsAttributesOptions::new(kvs);
TemporalMeter::new(m, attribs)
})
}

fn default_kvs(&self) -> Vec<MetricKeyValue> {
if self.attach_service_name {
vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)]
} else {
vec![]
}
}
}

thread_local! {
Expand All @@ -122,14 +139,6 @@ pub fn remove_trace_subscriber_for_current_thread() {
SUB_GUARD.with(|sg| sg.take());
}

fn metric_prefix(opts: &TelemetryOptions) -> &'static str {
if opts.no_temporal_prefix_for_metrics {
""
} else {
"temporal_"
}
}

impl CoreTelemetry for TelemetryInstance {
fn fetch_buffered_logs(&self) -> Vec<CoreLog> {
if let Some(logs_out) = self.logs_out.as_ref() {
Expand All @@ -155,7 +164,6 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
// way which is nice.
// Parts of telem dat ====
let mut logs_out = None;
let metric_prefix = metric_prefix(&opts);
// =======================

// Tracing subscriber layers =========
Expand Down Expand Up @@ -211,7 +219,7 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
Ok(TelemetryInstance::new(
Arc::new(reg),
logs_out,
metric_prefix,
opts.metric_prefix,
opts.metrics,
opts.attach_service_name,
))
Expand Down
8 changes: 2 additions & 6 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use temporal_sdk_core_api::{
errors::{PollActivityError, PollWfError},
telemetry::{
metrics::CoreMeter, Logger, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
TelemetryOptions, TelemetryOptionsBuilder, TraceExportConfig, TraceExporter,
TelemetryOptions, TelemetryOptionsBuilder,
},
Worker as CoreWorker,
};
Expand Down Expand Up @@ -146,7 +146,7 @@ pub fn init_integ_telem() {
let telemetry_options = get_integ_telem_options();
let rt =
CoreRuntime::new_assume_tokio(telemetry_options).expect("Core runtime inits cleanly");
let _ = tracing::subscriber::set_global_default(rt.trace_subscriber());
let _ = tracing::subscriber::set_global_default(rt.telemetry().trace_subscriber());
rt
});
}
Expand Down Expand Up @@ -590,10 +590,6 @@ pub fn get_integ_telem_options() -> TelemetryOptions {
.url(url)
.build()
.unwrap();
ob.tracing(TraceExportConfig {
filter: filter_string.clone(),
exporter: TraceExporter::Otel(opts.clone()),
});
ob.metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap()) as Arc<dyn CoreMeter>);
}
if let Some(addr) = env::var(PROM_ENABLE_ENV_VAR)
Expand Down
23 changes: 19 additions & 4 deletions tests/integ_tests/metrics_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService};
use temporal_sdk_core::{init_worker, telemetry::start_prometheus_metric_exporter, CoreRuntime};
use temporal_sdk_core_api::{
telemetry::{metrics::CoreMeter, PrometheusExporterOptionsBuilder, TelemetryOptions},
telemetry::{
metrics::{CoreMeter, MetricAttributes, MetricParameters},
PrometheusExporterOptionsBuilder, TelemetryOptions,
},
worker::WorkerConfigBuilder,
Worker,
};
Expand Down Expand Up @@ -69,7 +72,7 @@ async fn prometheus_metrics_exported() {
let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
let opts = get_integ_server_options();
let mut raw_client = opts
.connect_no_namespace(rt.metric_meter(), None)
.connect_no_namespace(rt.telemetry().get_temporal_metric_meter(), None)
.await
.unwrap();
assert!(raw_client.get_client().capabilities().is_some());
Expand All @@ -88,6 +91,17 @@ async fn prometheus_metrics_exported() {
));
// Verify counter names are appropriate (don't end w/ '_total')
assert!(body.contains("temporal_request{"));
// Verify non-temporal metrics meter does not prefix
let mm = rt.telemetry().get_metric_meter().unwrap();
let g = mm.inner.gauge(MetricParameters::from("mygauge"));
g.record(
42,
&MetricAttributes::OTel {
kvs: Arc::new(vec![]),
},
);
let body = get_text(format!("http://{addr}/metrics")).await;
assert!(body.contains("\nmygauge 42"));
}

#[tokio::test]
Expand Down Expand Up @@ -434,11 +448,12 @@ fn runtime_new() {
let handle = rt.tokio_handle();
let _rt = handle.enter();
let (telemopts, addr, _aborter) = prom_metrics();
rt.attach_late_init_metrics(telemopts.metrics.unwrap());
rt.telemetry_mut()
.attach_late_init_metrics(telemopts.metrics.unwrap());
let opts = get_integ_server_options();
handle.block_on(async {
let mut raw_client = opts
.connect_no_namespace(rt.metric_meter(), None)
.connect_no_namespace(rt.telemetry().get_temporal_metric_meter(), None)
.await
.unwrap();
assert!(raw_client.get_client().capabilities().is_some());
Expand Down
2 changes: 1 addition & 1 deletion tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod integ_tests {
let opts = get_integ_server_options();
let runtime = CoreRuntime::new_assume_tokio(get_integ_telem_options()).unwrap();
let mut retrying_client = opts
.connect_no_namespace(runtime.metric_meter(), None)
.connect_no_namespace(runtime.telemetry().get_temporal_metric_meter(), None)
.await
.unwrap();

Expand Down