diff --git a/Cargo.lock b/Cargo.lock index b523c8b60..2c5294955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -350,6 +350,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "auto_ops" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7460f7dd8e100147b82a63afca1a20eb6c231ee36b90ba7272e14951cb58af59" + [[package]] name = "autocfg" version = "1.4.0" @@ -2857,6 +2863,17 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "openmetrics-parser" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e40a68c62e09c5dfec2f6472af3bd5e8ddf506fcf14c78ece23794ffbb874eca" +dependencies = [ + "auto_ops", + "pest", + "pest_derive", +] + [[package]] name = "openssl" version = "0.10.73" @@ -3006,6 +3023,50 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1db05f56d34358a8b1066f67cbb203ee3e7ed2ba674a6263a1d5ec6db2204323" +dependencies = [ + "memchr", + "thiserror 2.0.12", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb056d9e8ea77922845ec74a1c4e8fb17e7c218cc4fc11a15c5d25e189aa40bc" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e404e638f781eb3202dc82db6760c8ae8a1eeef7fb3fa8264b2ef280504966" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.102", +] + +[[package]] +name = "pest_meta" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edd1101f170f5903fde0914f899bb503d9ff5271d7ba76bbb70bea63690cc0d5" +dependencies = [ + "pest", + "sha2", +] + [[package]] name = "phf" version = "0.11.3" @@ -4813,6 +4874,7 @@ dependencies = [ "chrono", "derive_more", "formatjson", + "openmetrics-parser", "pretty_assertions", "rstest", "serde", @@ -5077,6 +5139,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "ucd-trie" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" + [[package]] name = "uncased" version = "0.9.10" diff --git a/packages/metrics/Cargo.toml b/packages/metrics/Cargo.toml index 0597785f4..07adf1789 100644 --- a/packages/metrics/Cargo.toml +++ b/packages/metrics/Cargo.toml @@ -17,6 +17,7 @@ version.workspace = true [dependencies] chrono = { version = "0", default-features = false, features = ["clock"] } derive_more = { version = "2", features = ["constructor"] } +openmetrics-parser = "0.4.4" serde = { version = "1", features = ["derive"] } serde_json = "1.0.140" thiserror = "2" diff --git a/packages/metrics/src/metric_collection/mod.rs b/packages/metrics/src/metric_collection/mod.rs index e183236aa..3c13a3c86 100644 --- a/packages/metrics/src/metric_collection/mod.rs +++ b/packages/metrics/src/metric_collection/mod.rs @@ -229,8 +229,138 @@ impl MetricCollection { Ok(()) } + + /// Parse a Prometheus exposition text (0.0.4 or OpenMetrics-compatible) into a MetricCollection. + /// This currently supports Counter and Gauge families. Other families will return UnsupportedPrometheusMetricType. + pub fn from_prometheus_text(input: &str, now: DurationSinceUnixEpoch) -> Result { + use openmetrics_parser::prometheus::parse_prometheus; + use openmetrics_parser::{MetricNumber, MetricsExposition, PrometheusType, PrometheusValue}; + + let exposition: MetricsExposition = + parse_prometheus(input).map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?; + + let mut counters: HashMap> = HashMap::new(); + let mut gauges: HashMap> = HashMap::new(); + + for (family_name, family) in exposition.families { + match family.family_type { + PrometheusType::Counter => { + let mut metric = Metric::::new( + MetricName::new(&family_name), + None, + if family.help.is_empty() { None } else { Some(MetricDescription::new(&family.help)) }, + SampleCollection::default(), + ); + for sample in family.into_iter_samples() { + // Build LabelSet from parser's labelset + let parser_label_set = sample + .get_labelset() + .map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?; + let labels: Vec<(String, String)> = parser_label_set + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + let label_set: LabelSet = labels.into(); + + // Timestamp (prefer sample's, fallback to now) + let ts = sample.timestamp.and_then(|t| { + if t.is_finite() && t >= 0.0 { + let secs_f = t.trunc(); + let frac = t - secs_f; + let secs = secs_f as u64; + let nanos = (frac * 1_000_000_000.0).round() as u32; + let (secs, nanos) = if nanos >= 1_000_000_000 { (secs + 1, nanos - 1_000_000_000) } else { (secs, nanos) }; + Some(DurationSinceUnixEpoch::new(secs, nanos)) + } else { + None + } + }).unwrap_or(now); + + // Value + let value = match sample.value { + PrometheusValue::Counter(cv) => match cv.value { + MetricNumber::Int(i) if i >= 0 => i as u64, + MetricNumber::Float(f) if f.is_finite() && f >= 0.0 => f as u64, + _ => 0, + }, + PrometheusValue::Unknown(_) => 0, + _ => { + return Err(Error::UnsupportedPrometheusMetricType { + metric_name: family_name.clone(), + metric_type: "counter(value-mismatch)".to_string(), + }) + } + }; + + metric.absolute(&label_set, value, ts); + } + counters.insert(MetricName::new(&family_name), metric); + } + PrometheusType::Gauge => { + let mut metric = Metric::::new( + MetricName::new(&family_name), + None, + if family.help.is_empty() { None } else { Some(MetricDescription::new(&family.help)) }, + SampleCollection::default(), + ); + for sample in family.into_iter_samples() { + let parser_label_set = sample + .get_labelset() + .map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?; + let labels: Vec<(String, String)> = parser_label_set + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + let label_set: LabelSet = labels.into(); + + // Timestamp (prefer sample's, fallback to now) + let ts = sample.timestamp.and_then(|t| { + if t.is_finite() && t >= 0.0 { + let secs_f = t.trunc(); + let frac = t - secs_f; + let secs = secs_f as u64; + let nanos = (frac * 1_000_000_000.0).round() as u32; + let (secs, nanos) = if nanos >= 1_000_000_000 { (secs + 1, nanos - 1_000_000_000) } else { (secs, nanos) }; + Some(DurationSinceUnixEpoch::new(secs, nanos)) + } else { + None + } + }).unwrap_or(now); + + let value = match sample.value { + PrometheusValue::Gauge(mn) => mn.as_f64(), + PrometheusValue::Unknown(_) => 0.0, + _ => { + return Err(Error::UnsupportedPrometheusMetricType { + metric_name: family_name.clone(), + metric_type: "gauge(value-mismatch)".to_string(), + }) + } + }; + + metric.set(&label_set, value, ts); + } + gauges.insert(MetricName::new(&family_name), metric); + } + other => { + return Err(Error::UnsupportedPrometheusMetricType { + metric_name: family_name.clone(), + metric_type: format!("{other:?}"), + }); + } + } + } + + let counters = MetricKindCollection::new(counters.into_values().collect()) + .map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?; + let gauges = MetricKindCollection::new(gauges.into_values().collect()) + .map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?; + + MetricCollection::new(counters, gauges) + } } + #[derive(thiserror::Error, Debug, Clone)] pub enum Error { #[error("Metric names must be unique across all metrics types.")] @@ -247,6 +377,12 @@ pub enum Error { #[error("Cannot create metric with name '{metric_name}': another metric with this name already exists")] MetricNameCollisionAdding { metric_name: MetricName }, + + #[error("Failed to parse Prometheus text: {message}")] + PrometheusTextParse { message: String }, + + #[error("Unsupported or unknown Prometheus metric type '{metric_type}' for metric '{metric_name}'")] + UnsupportedPrometheusMetricType { metric_name: String, metric_type: String }, } /// Implements serialization for `MetricCollection`. @@ -722,7 +858,21 @@ udp_tracker_server_performance_avg_announce_processing_time_ns{server_binding_ip } #[test] - fn it_should_allow_serializing_to_prometheus_format() { + fn it_should_allow_deserializing_from_prometheus_text() { + // Given the fixture's Prometheus exposition + let (expected_metric_collection, _expected_json, prometheus_text) = MetricCollectionFixture::default().deconstruct(); + let time = DurationSinceUnixEpoch::from_secs(1_743_552_000); + + // Only ensure a trailing newline for the parser + let with_trailing_newline = if prometheus_text.ends_with('\n') { prometheus_text } else { format!("{}\n", prometheus_text) }; + let parsed = MetricCollection::from_prometheus_text(&with_trailing_newline, time).unwrap(); + + // Then it should match the fixture object + assert_eq!(parsed, expected_metric_collection); + } + + #[test] + fn it_should_allow_serializing_to_prometheus_format_again() { let (metric_collection, _expected_json, expected_prometheus) = MetricCollectionFixture::default().deconstruct(); let prometheus_output = metric_collection.to_prometheus();