Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add AWS CloudWatch Metrics sink `storage_resolution` config.

authors: trxcllnt
21 changes: 20 additions & 1 deletion src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use aws_sdk_cloudwatch::{
use aws_smithy_types::DateTime as AwsDateTime;
use futures::{FutureExt, SinkExt, stream};
use futures_util::{future, future::BoxFuture};
use indexmap::IndexMap;
use tower::Service;
use vector_lib::{
ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, sink::VectorSink,
Expand Down Expand Up @@ -113,6 +114,14 @@ pub struct CloudWatchMetricsSinkConfig {
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,

/// A map from metric name to AWS storage resolution.
/// Valid values are 1 (high resolution) and 60 (standard resolution).
/// If unset, the AWS SDK default of 60 (standard resolution) is used.
/// See [AWS Metrics Resolution](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Resolution_definition)
/// See [MetricDatum::storage_resolution()](https://docs.rs/aws-sdk-cloudwatch/1.91.0/aws_sdk_cloudwatch/types/struct.MetricDatum.html#structfield.storage_resolution)
#[serde(default)]
pub storage_resolution: IndexMap<String, i32>,
}

impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
Expand Down Expand Up @@ -223,6 +232,7 @@ fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
#[derive(Clone)]
pub struct CloudWatchMetricsSvc {
client: CloudwatchClient,
storage_resolution: IndexMap<String, i32>,
}

impl CloudWatchMetricsSvc {
Expand All @@ -234,7 +244,10 @@ impl CloudWatchMetricsSvc {
let batch = config.batch.into_batch_settings()?;
let request_settings = config.request.into_settings();

let service = CloudWatchMetricsSvc { client };
let service = CloudWatchMetricsSvc {
client,
storage_resolution: config.storage_resolution,
};
let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();

Expand Down Expand Up @@ -263,6 +276,7 @@ impl CloudWatchMetricsSvc {
}

fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
let resolutions = &self.storage_resolution;
events
.into_iter()
.filter_map(|event| {
Expand All @@ -271,6 +285,7 @@ impl CloudWatchMetricsSvc {
.timestamp()
.map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
let dimensions = event.tags().map(tags_to_dimensions);
let resolution = resolutions.get(&metric_name).map(|x| *x);
// AwsCloudwatchMetricNormalize converts these to the right MetricKind
match event.value() {
MetricValue::Counter { value } => Some(
Expand All @@ -279,6 +294,7 @@ impl CloudWatchMetricsSvc {
.value(*value)
.set_timestamp(timestamp)
.set_dimensions(dimensions)
.set_storage_resolution(resolution)
.build(),
),
MetricValue::Distribution {
Expand All @@ -291,6 +307,7 @@ impl CloudWatchMetricsSvc {
.set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
.set_timestamp(timestamp)
.set_dimensions(dimensions)
.set_storage_resolution(resolution)
.build(),
),
MetricValue::Set { values } => Some(
Expand All @@ -299,6 +316,7 @@ impl CloudWatchMetricsSvc {
.value(values.len() as f64)
.set_timestamp(timestamp)
.set_dimensions(dimensions)
.set_storage_resolution(resolution)
.build(),
),
MetricValue::Gauge { value } => Some(
Expand All @@ -307,6 +325,7 @@ impl CloudWatchMetricsSvc {
.value(*value)
.set_timestamp(timestamp)
.set_dimensions(dimensions)
.set_storage_resolution(resolution)
.build(),
),
_ => None,
Expand Down
7 changes: 6 additions & 1 deletion src/sinks/aws_cloudwatch_metrics/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn config() -> CloudWatchMetricsSinkConfig {
CloudWatchMetricsSinkConfig {
default_namespace: "vector".into(),
region: RegionOrEndpoint::with_region("us-east-1".to_owned()),
storage_resolution: IndexMap::from([("bytes_out".to_owned(), 1)]),
..Default::default()
}
}
Expand All @@ -33,7 +34,10 @@ async fn svc() -> CloudWatchMetricsSvc {
.create_client(&ProxyConfig::from_env())
.await
.unwrap();
CloudWatchMetricsSvc { client }
CloudWatchMetricsSvc {
client,
storage_resolution: config.storage_resolution,
}
}

#[tokio::test]
Expand Down Expand Up @@ -80,6 +84,7 @@ async fn encode_events_basic_counter() {
.metric_name("bytes_out")
.value(2.5)
.timestamp(timestamp("2018-11-14T08:09:10.123Z"))
.storage_resolution(1)
.build(),
MetricDatum::builder()
.metric_name("healthcheck")
Expand Down
Loading