Skip to content

Commit 3cd543e

Browse files
committed
add AWS CloudWatch Metrics sink storage_resolution config
1 parent 1ec7191 commit 3cd543e

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

src/sinks/aws_cloudwatch_metrics/mod.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use aws_sdk_cloudwatch::{
1515
use aws_smithy_types::DateTime as AwsDateTime;
1616
use futures::{FutureExt, SinkExt, stream};
1717
use futures_util::{future, future::BoxFuture};
18+
use indexmap::IndexMap;
1819
use tower::Service;
1920
use vector_lib::{
2021
ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, sink::VectorSink,
@@ -113,6 +114,14 @@ pub struct CloudWatchMetricsSinkConfig {
113114
skip_serializing_if = "crate::serde::is_default"
114115
)]
115116
acknowledgements: AcknowledgementsConfig,
117+
118+
/// A map from metric name to AWS storage resolution.
119+
/// Valid values are 1 (high resolution) and 60 (standard resolution).
120+
/// If unset, the AWS SDK default of 60 (standard resolution) is used.
121+
/// See [AWS Metrics Resolution](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Resolution_definition)
122+
/// See [MetricDatum::storage_resolution()](https://docs.rs/aws-sdk-cloudwatch/1.91.0/aws_sdk_cloudwatch/types/struct.MetricDatum.html#structfield.storage_resolution)
123+
#[serde(default)]
124+
pub storage_resolution: IndexMap<String, i32>,
116125
}
117126

118127
impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
@@ -223,6 +232,7 @@ fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
223232
#[derive(Clone)]
224233
pub struct CloudWatchMetricsSvc {
225234
client: CloudwatchClient,
235+
storage_resolution: IndexMap<String, i32>,
226236
}
227237

228238
impl CloudWatchMetricsSvc {
@@ -234,7 +244,10 @@ impl CloudWatchMetricsSvc {
234244
let batch = config.batch.into_batch_settings()?;
235245
let request_settings = config.request.into_settings();
236246

237-
let service = CloudWatchMetricsSvc { client };
247+
let service = CloudWatchMetricsSvc {
248+
client,
249+
storage_resolution: config.storage_resolution,
250+
};
238251
let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
239252
let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();
240253

@@ -263,6 +276,7 @@ impl CloudWatchMetricsSvc {
263276
}
264277

265278
fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
279+
let resolutions = &self.storage_resolution;
266280
events
267281
.into_iter()
268282
.filter_map(|event| {
@@ -271,6 +285,7 @@ impl CloudWatchMetricsSvc {
271285
.timestamp()
272286
.map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
273287
let dimensions = event.tags().map(tags_to_dimensions);
288+
let resolution = resolutions.get(&metric_name).map(|x| *x);
274289
// AwsCloudwatchMetricNormalize converts these to the right MetricKind
275290
match event.value() {
276291
MetricValue::Counter { value } => Some(
@@ -279,6 +294,7 @@ impl CloudWatchMetricsSvc {
279294
.value(*value)
280295
.set_timestamp(timestamp)
281296
.set_dimensions(dimensions)
297+
.set_storage_resolution(resolution)
282298
.build(),
283299
),
284300
MetricValue::Distribution {
@@ -291,6 +307,7 @@ impl CloudWatchMetricsSvc {
291307
.set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
292308
.set_timestamp(timestamp)
293309
.set_dimensions(dimensions)
310+
.set_storage_resolution(resolution)
294311
.build(),
295312
),
296313
MetricValue::Set { values } => Some(
@@ -299,6 +316,7 @@ impl CloudWatchMetricsSvc {
299316
.value(values.len() as f64)
300317
.set_timestamp(timestamp)
301318
.set_dimensions(dimensions)
319+
.set_storage_resolution(resolution)
302320
.build(),
303321
),
304322
MetricValue::Gauge { value } => Some(
@@ -307,6 +325,7 @@ impl CloudWatchMetricsSvc {
307325
.value(*value)
308326
.set_timestamp(timestamp)
309327
.set_dimensions(dimensions)
328+
.set_storage_resolution(resolution)
310329
.build(),
311330
),
312331
_ => None,

src/sinks/aws_cloudwatch_metrics/tests.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ fn config() -> CloudWatchMetricsSinkConfig {
2323
CloudWatchMetricsSinkConfig {
2424
default_namespace: "vector".into(),
2525
region: RegionOrEndpoint::with_region("us-east-1".to_owned()),
26+
storage_resolution: IndexMap::from([("bytes_out".to_owned(), 1)]),
2627
..Default::default()
2728
}
2829
}
@@ -33,7 +34,10 @@ async fn svc() -> CloudWatchMetricsSvc {
3334
.create_client(&ProxyConfig::from_env())
3435
.await
3536
.unwrap();
36-
CloudWatchMetricsSvc { client }
37+
CloudWatchMetricsSvc {
38+
client,
39+
storage_resolution: config.storage_resolution,
40+
}
3741
}
3842

3943
#[tokio::test]
@@ -80,6 +84,7 @@ async fn encode_events_basic_counter() {
8084
.metric_name("bytes_out")
8185
.value(2.5)
8286
.timestamp(timestamp("2018-11-14T08:09:10.123Z"))
87+
.storage_resolution(1)
8388
.build(),
8489
MetricDatum::builder()
8590
.metric_name("healthcheck")

0 commit comments

Comments
 (0)