Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/23748_add_indexer_ack_compression.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds proper support for compression of HEC indexer ack queries, using the sink's configured `compression` setting.

authors: sbalmos
24 changes: 13 additions & 11 deletions src/sinks/splunk_hec/common/acknowledgements.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
use hyper::Body;
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::{
collections::HashMap,
num::{NonZeroU8, NonZeroU64},
sync::Arc,
time::Duration,
};

use hyper::Body;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc::Receiver, oneshot::Sender};
use vector_lib::{configurable::configurable_component, event::EventStatus};

use super::service::{HttpRequestBuilder, MetadataFields};
use crate::sinks::util::Compressor;
use crate::{
config::AcknowledgementsConfig,
http::HttpClient,
internal_events::{
SplunkIndexerAcknowledgementAPIError, SplunkIndexerAcknowledgementAckAdded,
SplunkIndexerAcknowledgementAcksRemoved,
},
sinks::util::Compression,
};

/// Splunk HEC acknowledgement configuration.
Expand Down Expand Up @@ -96,12 +96,6 @@ impl HecAckClient {
client: HttpClient,
http_request_builder: Arc<HttpRequestBuilder>,
) -> Self {
// Reimplement with compression support, see https://github.com/vectordotdev/vector/issues/23748
let http_request_builder = Arc::new(HttpRequestBuilder {
compression: Compression::None,
..(*http_request_builder).clone()
});

Self {
acks: HashMap::new(),
retry_limit,
Expand Down Expand Up @@ -222,10 +216,18 @@ impl HecAckClient {
let request_body_bytes = crate::serde::json::to_bytes(request_body)
.map_err(|_| HecAckApiError::ClientBuildRequest)?
.freeze();
let mut compressor = Compressor::from(self.http_request_builder.compression);
compressor
.write_all(request_body_bytes.as_ref())
.map_err(|_| HecAckApiError::ClientBuildRequest)?;
let payload = compressor
.finish()
.map_err(|_| HecAckApiError::ClientBuildRequest)?
.freeze();
let request = self
.http_request_builder
.build_request(
request_body_bytes,
payload,
"/services/collector/ack",
None,
MetadataFields::default(),
Expand Down
Loading