Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion hyperactor_telemetry/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# @generated by autocargo from //monarch/hyperactor_telemetry:hyperactor_telemetry
# @generated by autocargo from //monarch/hyperactor_telemetry:[hyperactor_telemetry,prometheus_example]

[package]
name = "hyperactor_telemetry"
Expand All @@ -10,14 +10,20 @@ license = "BSD-3-Clause"
[lib]
edition = "2024"

[[bin]]
name = "prometheus_example"
path = "examples/prometheus_example.rs"

[dependencies]
anyhow = "1.0.98"
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
hdrhistogram = "7.5"
hyperactor = { version = "0.0.0", path = "../hyperactor" }
lazy_static = "1.5"
opentelemetry = "0.29"
opentelemetry-otlp = { version = "0.29", features = ["http-proto", "logs", "metrics", "reqwest-blocking-client", "trace"], default-features = false }
opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] }
rand = { version = "0.8", features = ["small_rng"] }
rusqlite = { version = "0.36.0", features = ["backup", "blob", "bundled", "column_decltype", "functions", "limits", "modern_sqlite", "serde_json"] }
Expand Down
159 changes: 159 additions & 0 deletions hyperactor_telemetry/examples/prometheus_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

//! Example showing how to use the Prometheus OTLP backend for telemetry.
//!
//! This example demonstrates the modern approach to Prometheus integration using
//! OpenTelemetry's OTLP HTTP protocol to send metrics directly to Prometheus.
//!
//! ## Setup
//!
//! 1. Start Prometheus with OTLP receiver:
//! ```bash
//! prometheus --web.enable-otlp-receiver
//! ```
//!
//! 2. Run this example:
//! ```bash
//! cd hyperactor_telemetry
//! HYPERACTOR_OTEL_BACKEND=prometheus \
//! OTEL_SERVICE_NAME=prometheus-example \
//! OTEL_RESOURCE_ATTRIBUTES=environment=demo,version=1.0.0 \
//! cargo run --example prometheus_example
//! ```
//!
//! ## Query Examples
//!
//! After running, you can query Prometheus:
//! - Rate of requests: `rate(http_requests_total[2m])`
//! - With resource attributes: `rate(http_requests_total[2m]) * on (job, instance) group_left (environment) target_info`
//! - P95 latency: `histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[2m]))`

use std::time::Duration;

use hyperactor::clock::Clock;
use hyperactor::clock::RealClock;
use hyperactor_telemetry::declare_static_counter;
use hyperactor_telemetry::declare_static_gauge;
use hyperactor_telemetry::declare_static_histogram;
use hyperactor_telemetry::initialize_logging_for_test;
use hyperactor_telemetry::kv_pairs;

// Declare some example metrics
declare_static_counter!(REQUESTS_TOTAL, "http_requests_total");
declare_static_histogram!(REQUEST_DURATION, "http_request_duration_seconds");
declare_static_gauge!(ACTIVE_CONNECTIONS, "active_connections");

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure environment if not already set
// Safety: Setting environment variables at program startup before any threads are created
unsafe {
if std::env::var("HYPERACTOR_OTEL_BACKEND").is_err() {
std::env::set_var("HYPERACTOR_OTEL_BACKEND", "prometheus");
}

// Set default OpenTelemetry configuration for OTLP mode
if std::env::var("OTEL_SERVICE_NAME").is_err() {
std::env::set_var("OTEL_SERVICE_NAME", "prometheus-example");
}
if std::env::var("OTEL_RESOURCE_ATTRIBUTES").is_err() {
std::env::set_var("OTEL_RESOURCE_ATTRIBUTES", "environment=demo,version=1.0.0");
}
if std::env::var("OTEL_METRIC_EXPORT_INTERVAL").is_err() {
std::env::set_var("OTEL_METRIC_EXPORT_INTERVAL", "5000"); // 5 seconds for demo
}
}

// Initialize telemetry
initialize_logging_for_test();

let endpoint = std::env::var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:9090/api/v1/otlp/v1/metrics".to_string());

println!("🚀 Starting Prometheus OTLP example...");
println!("📡 Sending metrics directly to Prometheus via OTLP");
println!("🔗 Endpoint: {}", endpoint);
println!("ℹ️ Make sure Prometheus is running with --web.enable-otlp-receiver");
println!("🎯 Query Prometheus at: http://localhost:9090");

println!("✅ OTLP exporter configured - metrics will be sent automatically!");

println!("Generating some sample metrics...");

// Generate some sample metrics using hyperactor telemetry macros
for i in 0..100 {
// Simulate HTTP requests
let status = if i % 10 == 0 { "500" } else { "200" };
let method = if i % 3 == 0 { "POST" } else { "GET" };

REQUESTS_TOTAL.add(
1,
kv_pairs!(
"method" => method,
"status" => status,
"endpoint" => "/api/data"
),
);

// Simulate request durations
let duration = 0.001 + (i as f64) * 0.001; // 1ms to 100ms
REQUEST_DURATION.record(
duration,
kv_pairs!(
"method" => method,
"endpoint" => "/api/data"
),
);

// Simulate active connections
let connections = 10.0 + (i as f64 % 20.0);
ACTIVE_CONNECTIONS.record(
connections,
kv_pairs!(
"server" => "primary"
),
);

// Small delay to spread metrics over time
RealClock.sleep(Duration::from_millis(50)).await;

if i % 20 == 0 {
println!("Generated {} metrics so far...", i + 1);
}
}

println!("✨ Finished generating metrics!");
println!("🎯 Check Prometheus for your metrics:");
println!(" - Prometheus UI: http://localhost:9090");
println!(" - Example query: rate(http_requests_total[2m])");
println!(
" - Resource attributes query: rate(http_requests_total[2m]) * on (job, instance) group_left (environment) target_info"
);
println!("📡 Metrics are being sent to Prometheus via OTLP every 5 seconds");

// Keep generating metrics to show real-time updates
println!("🔄 Continuing to generate metrics every 10 seconds...");
for _ in 0..5 {
// Generate 5 more batches then exit
RealClock.sleep(Duration::from_secs(10)).await;

// Generate a few more metrics
REQUESTS_TOTAL.add(
1,
kv_pairs!("method" => "GET", "status" => "200", "endpoint" => "/health"),
);
REQUEST_DURATION.record(0.001, kv_pairs!("method" => "GET", "endpoint" => "/health"));
ACTIVE_CONNECTIONS.record(15.0, kv_pairs!("server" => "primary"));

println!("📊 Sent batch of metrics to Prometheus");
}

println!("🎉 Example completed successfully!");
Ok(())
}
1 change: 1 addition & 0 deletions hyperactor_telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub mod in_memory_reader;
mod meta;
mod otel;
mod pool;
pub mod prometheus;
pub mod recorder;
mod spool;
pub mod sqlite;
Expand Down
140 changes: 129 additions & 11 deletions hyperactor_telemetry/src/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,142 @@
* LICENSE file in the root directory of this source tree.
*/

// Environment variable to select the OpenTelemetry backend
const OTEL_BACKEND_ENV: &str = "HYPERACTOR_OTEL_BACKEND";

#[derive(Debug, Clone, PartialEq)]
pub enum Backend {
Scuba,
Prometheus,
None,
}

impl Backend {
fn from_env() -> Self {
match std::env::var(OTEL_BACKEND_ENV).as_deref() {
Ok("prometheus") => Backend::Prometheus,
Ok("scuba") => Backend::Scuba,
Ok("none") | Ok("") => Backend::None,
_ => {
// Default behavior: use scuba if fbcode_build is enabled, otherwise none
#[cfg(fbcode_build)]
return Backend::Scuba;
#[cfg(not(fbcode_build))]
return Backend::None;
}
}
}
}

#[allow(dead_code)]
pub fn tracing_layer<
S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
>() -> Option<impl tracing_subscriber::Layer<S>> {
#[cfg(fbcode_build)]
{
Some(crate::meta::tracing_layer())
}
#[cfg(not(fbcode_build))]
{
None::<Box<dyn tracing_subscriber::Layer<S> + Send + Sync>>
>() -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync>> {
match Backend::from_env() {
Backend::Scuba => {
#[cfg(fbcode_build)]
return Some(Box::new(crate::meta::tracing_layer()));
#[cfg(not(fbcode_build))]
None
}
Backend::Prometheus => {
#[cfg(prometheus_build)]
return Some(Box::new(crate::prometheus::tracing_layer()));
#[cfg(not(prometheus_build))]
None
}
Backend::None => None,
}
}

#[allow(dead_code)]
pub fn init_metrics() {
#[cfg(fbcode_build)]
{
opentelemetry::global::set_meter_provider(crate::meta::meter_provider());
match Backend::from_env() {
Backend::Scuba => {
#[cfg(fbcode_build)]
opentelemetry::global::set_meter_provider(crate::meta::meter_provider());
}
Backend::Prometheus => {
if let Err(e) = crate::prometheus::initialize_prometheus_backend() {
tracing::error!("Failed to initialize Prometheus backend: {}", e);
}
}
Backend::None => {
tracing::warn!("Metrics backend is set to None, no metrics will be collected");
// Do nothing for None backend
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_backend_from_env_defaults() {
// Test default behavior when environment variable is not set
// SAFETY: Setting environment variables in tests is generally safe as long as tests are run serially.
unsafe {
std::env::remove_var(OTEL_BACKEND_ENV);
}

let backend = Backend::from_env();

#[cfg(fbcode_build)]
assert_eq!(backend, Backend::Scuba);

#[cfg(not(fbcode_build))]
assert_eq!(backend, Backend::None);
}

#[test]
fn test_backend_from_env_prometheus() {
// Test that prometheus backend is selected when env var is set to "prometheus"
// SAFETY: Setting environment variables in tests is generally safe as long as tests are run serially.
unsafe {
std::env::set_var(OTEL_BACKEND_ENV, "prometheus");
}

let backend = Backend::from_env();
assert_eq!(backend, Backend::Prometheus);

// SAFETY: Setting environment variables in tests is generally safe as long as tests are run serially.
unsafe {
std::env::remove_var(OTEL_BACKEND_ENV);
}
}

#[test]
fn test_backend_from_env_scuba() {
// Test that scuba backend is selected when env var is set to "scuba"
// SAFETY: Setting environment variables in tests is generally safe as long as tests are run serially.
unsafe {
std::env::set_var(OTEL_BACKEND_ENV, "scuba");
}

let backend = Backend::from_env();
assert_eq!(backend, Backend::Scuba);

// SAFETY: Setting environment variables in tests is generally safe as long as tests are run serially.
unsafe {
std::env::remove_var(OTEL_BACKEND_ENV);
}
}

#[test]
fn test_backend_from_env_empty_string() {
// Test that none backend is selected when env var is set to empty string
// SAFETY: Setting environment variables in tests is generally safe as long as tests are run serially.
unsafe {
std::env::set_var(OTEL_BACKEND_ENV, "");
}

let backend = Backend::from_env();
assert_eq!(backend, Backend::None);

// SAFETY: Setting environment variables in tests is generally safe as long as tests are run serially.
unsafe {
std::env::remove_var(OTEL_BACKEND_ENV);
}
}
}
Loading
Loading