Skip to content

Commit 0613d20

Browse files
elohmeierpront
andauthored
feat(prometheus_remote_write source): Add optional NaN value filtering (#23774)
* feat(prometheus_remote_write source): Add optional NaN value filtering - Add skip_nan_values configuration option (defaults to false) - Filter NaN samples for counters and gauges individually - Filter entire histograms/summaries if sum or component values contain NaN - Add comprehensive unit and integration tests for NaN filtering - Maintain backward compatibility with default behavior * cargo fmt * cleanup tests * ran cargo fmt * dry * rm unrelated test * make generate-component-docs --------- Co-authored-by: Pavlos Rontidis <[email protected]>
1 parent 7495f1e commit 0613d20

File tree

4 files changed

+366
-7
lines changed

4 files changed

+366
-7
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
The `prometheus_remote_write` source now supports optional NaN value filtering via the `skip_nan_values` configuration option.
2+
3+
When enabled, metric samples with NaN values are discarded during parsing, preventing downstream processing of invalid metrics. For counters and gauges, individual samples with NaN values are filtered. For histograms and summaries, the entire metric is filtered if any component contains NaN values (sum, bucket limits, or quantile values).
4+
5+
This feature defaults to `false` to maintain backward compatibility.
6+
7+
authors: elohmeier

src/sources/prometheus/parser.rs

Lines changed: 157 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ fn utc_timestamp(timestamp: Option<i64>, default: DateTime<Utc>) -> DateTime<Utc
2020
#[cfg(any(test, feature = "sources-prometheus-scrape"))]
2121
pub(super) fn parse_text(packet: &str) -> Result<Vec<Event>, ParserError> {
2222
vector_lib::prometheus::parser::parse_text(packet)
23-
.map(|group| reparse_groups(group, vec![], false))
23+
.map(|group| reparse_groups(group, vec![], false, false))
2424
}
2525

2626
#[cfg(any(test, feature = "sources-prometheus-pushgateway"))]
@@ -30,19 +30,29 @@ pub(super) fn parse_text_with_overrides(
3030
aggregate_metrics: bool,
3131
) -> Result<Vec<Event>, ParserError> {
3232
vector_lib::prometheus::parser::parse_text(packet)
33-
.map(|group| reparse_groups(group, tag_overrides, aggregate_metrics))
33+
.map(|group| reparse_groups(group, tag_overrides, aggregate_metrics, false))
34+
}
35+
36+
#[cfg(test)]
37+
fn parse_text_with_nan_filtering(packet: &str) -> Result<Vec<Event>, ParserError> {
38+
vector_lib::prometheus::parser::parse_text(packet)
39+
.map(|group| reparse_groups(group, vec![], false, true))
3440
}
3541

3642
#[cfg(feature = "sources-prometheus-remote-write")]
37-
pub(super) fn parse_request(request: proto::WriteRequest) -> Result<Vec<Event>, ParserError> {
43+
pub(super) fn parse_request(
44+
request: proto::WriteRequest,
45+
skip_nan_values: bool,
46+
) -> Result<Vec<Event>, ParserError> {
3847
vector_lib::prometheus::parser::parse_request(request)
39-
.map(|group| reparse_groups(group, vec![], false))
48+
.map(|group| reparse_groups(group, vec![], false, skip_nan_values))
4049
}
4150

4251
fn reparse_groups(
4352
groups: Vec<MetricGroup>,
4453
tag_overrides: impl IntoIterator<Item = (String, String)> + Clone,
4554
aggregate_metrics: bool,
55+
skip_nan_values: bool,
4656
) -> Vec<Event> {
4757
let mut result = Vec::new();
4858
let start = Utc::now();
@@ -57,6 +67,10 @@ fn reparse_groups(
5767
match group.metrics {
5868
GroupKind::Counter(metrics) => {
5969
for (key, metric) in metrics {
70+
if skip_nan_values && metric.value.is_nan() {
71+
continue;
72+
}
73+
6074
let tags = combine_tags(key.labels, tag_overrides.clone());
6175

6276
let counter = Metric::new(
@@ -74,6 +88,10 @@ fn reparse_groups(
7488
}
7589
GroupKind::Gauge(metrics) | GroupKind::Untyped(metrics) => {
7690
for (key, metric) in metrics {
91+
if skip_nan_values && metric.value.is_nan() {
92+
continue;
93+
}
94+
7795
let tags = combine_tags(key.labels, tag_overrides.clone());
7896

7997
let gauge = Metric::new(
@@ -92,6 +110,12 @@ fn reparse_groups(
92110
}
93111
GroupKind::Histogram(metrics) => {
94112
for (key, metric) in metrics {
113+
if skip_nan_values
114+
&& (metric.sum.is_nan() || metric.buckets.iter().any(|b| b.bucket.is_nan()))
115+
{
116+
continue;
117+
}
118+
95119
let tags = combine_tags(key.labels, tag_overrides.clone());
96120

97121
let mut buckets = metric.buckets;
@@ -132,6 +156,16 @@ fn reparse_groups(
132156
}
133157
GroupKind::Summary(metrics) => {
134158
for (key, metric) in metrics {
159+
if skip_nan_values
160+
&& (metric.sum.is_nan()
161+
|| metric
162+
.quantiles
163+
.iter()
164+
.any(|q| q.quantile.is_nan() || q.value.is_nan()))
165+
{
166+
continue;
167+
}
168+
135169
let tags = combine_tags(key.labels, tag_overrides.clone());
136170

137171
result.push(
@@ -1353,4 +1387,123 @@ mod test {
13531387
]),
13541388
);
13551389
}
1390+
1391+
#[test]
1392+
fn test_skip_nan_counter_enabled() {
1393+
let exp = r#"
1394+
# TYPE name counter
1395+
name{labelname="val1"} NaN 1612411506789
1396+
name{labelname="val2"} 123.0 1612411506789
1397+
"#;
1398+
1399+
let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1400+
assert_eq!(result.len(), 1);
1401+
assert_eq!(result[0].name(), "name");
1402+
match result[0].value() {
1403+
MetricValue::Counter { value } => {
1404+
assert_eq!(*value, 123.0);
1405+
}
1406+
_ => unreachable!(),
1407+
}
1408+
assert_eq!(result[0].tags().unwrap().get("labelname").unwrap(), "val2");
1409+
}
1410+
1411+
#[test]
1412+
fn test_skip_nan_counter_disabled() {
1413+
let exp = r#"
1414+
# TYPE name counter
1415+
name{labelname="val1"} NaN 1612411506789
1416+
name{labelname="val2"} 123.0 1612411506789
1417+
"#;
1418+
1419+
let result = events_to_metrics(parse_text(exp)).unwrap();
1420+
assert_eq!(result.len(), 2);
1421+
1422+
// Find the NaN metric
1423+
let nan_metric = result
1424+
.iter()
1425+
.find(|m| m.tags().as_ref().and_then(|tags| tags.get("labelname")) == Some("val1"))
1426+
.unwrap();
1427+
1428+
match nan_metric.value() {
1429+
MetricValue::Counter { value } => {
1430+
assert!(value.is_nan());
1431+
}
1432+
_ => unreachable!(),
1433+
}
1434+
}
1435+
1436+
#[test]
1437+
fn test_skip_nan_gauge_enabled() {
1438+
let exp = r#"
1439+
# TYPE name gauge
1440+
name{labelname="val1"} NaN 1612411506789
1441+
name{labelname="val2"} 123.0 1612411506789
1442+
"#;
1443+
1444+
let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1445+
assert_eq!(result.len(), 1);
1446+
assert_eq!(result[0].name(), "name");
1447+
match result[0].value() {
1448+
MetricValue::Gauge { value } => {
1449+
assert_eq!(*value, 123.0);
1450+
}
1451+
_ => unreachable!(),
1452+
}
1453+
}
1454+
1455+
#[test]
1456+
fn test_skip_nan_histogram_bucket_enabled() {
1457+
let exp = r#"
1458+
# TYPE duration histogram
1459+
duration_bucket{le="1"} 133988 1612411506789
1460+
duration_bucket{le="NaN"} 144320 1612411506789
1461+
duration_sum 53423 1612411506789
1462+
duration_count 144320 1612411506789
1463+
"#;
1464+
1465+
let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1466+
assert_eq!(result.len(), 0); // Should skip entire histogram due to NaN bucket le value
1467+
}
1468+
1469+
#[test]
1470+
fn test_skip_nan_histogram_sum_enabled() {
1471+
let exp = r#"
1472+
# TYPE duration histogram
1473+
duration_bucket{le="1"} 133988 1612411506789
1474+
duration_bucket{le="+Inf"} 144320 1612411506789
1475+
duration_sum NaN 1612411506789
1476+
duration_count 144320 1612411506789
1477+
"#;
1478+
1479+
let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1480+
assert_eq!(result.len(), 0); // Should skip entire histogram due to NaN sum
1481+
}
1482+
1483+
#[test]
1484+
fn test_skip_nan_summary_enabled() {
1485+
let exp = r#"
1486+
# TYPE duration summary
1487+
duration{quantile="0.5"} NaN 1612411506789
1488+
duration{quantile="0.99"} 76656 1612411506789
1489+
duration_sum 1.7560473e+07 1612411506789
1490+
duration_count 2693 1612411506789
1491+
"#;
1492+
1493+
let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1494+
assert_eq!(result.len(), 0); // Should skip entire summary due to NaN quantile value
1495+
}
1496+
1497+
#[test]
1498+
fn test_skip_nan_all_valid_values() {
1499+
let exp = r#"
1500+
# TYPE counter_metric counter
1501+
counter_metric 123.0 1612411506789
1502+
# TYPE gauge_metric gauge
1503+
gauge_metric 456.0 1612411506789
1504+
"#;
1505+
1506+
let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1507+
assert_eq!(result.len(), 2); // Both should be preserved
1508+
}
13561509
}

0 commit comments

Comments
 (0)