Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/otlp_encoding.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added `use_otlp_encoding` option to the `opentelemetry` sink.
When set to `true` the sink assumes the Vector events are structured based on OTLP.

authors: pront
14 changes: 11 additions & 3 deletions lib/codecs/src/encoding/format/protobuf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;

use crate::encoding::BuildError;
use bytes::BytesMut;
use prost_reflect::{MessageDescriptor, prost::Message as _};
use tokio_util::codec::Encoder;
Expand All @@ -9,9 +10,10 @@ use vector_core::{
event::{Event, Value},
schema,
};
use vrl::protobuf::{descriptor::get_message_descriptor, encode::encode_message};

use crate::encoding::BuildError;
use vrl::protobuf::{
descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
encode::encode_message,
};

/// Config used to build a `ProtobufSerializer`.
#[configurable_component]
Expand Down Expand Up @@ -72,6 +74,12 @@ impl ProtobufSerializer {
Self { message_descriptor }
}

/// Creates a new serializer instance using the descriptor bytes directly.
pub fn new_from_bytes(desc_bytes: &[u8], message_type: &str) -> vector_common::Result<Self> {
let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
Ok(Self { message_descriptor })
}

/// Get a description of the message type used in serialization.
pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
self.message_descriptor.descriptor_proto()
Expand Down
6 changes: 6 additions & 0 deletions lib/codecs/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ pub enum SerializerConfig {
Text(TextSerializerConfig),
}

impl Default for SerializerConfig {
fn default() -> Self {
Self::Json(JsonSerializerConfig::default())
}
}

impl From<AvroSerializerConfig> for SerializerConfig {
fn from(config: AvroSerializerConfig) -> Self {
Self::Avro { avro: config.avro }
Expand Down
7 changes: 7 additions & 0 deletions lib/opentelemetry-proto/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
pub const LOGS_REQUEST_MESSAGE_TYPE: &str =
"opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest";
pub const TRACES_REQUEST_MESSAGE_TYPE: &str =
"opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest";
pub const METRICS_REQUEST_MESSAGE_TYPE: &str =
"opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest";

/// Service stub and clients.
pub mod collector {
pub mod trace {
Expand Down
4 changes: 2 additions & 2 deletions src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::codecs::Transformer;

/// Encoding configuration.
#[configurable_component]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
/// Configures how events are encoded into raw bytes.
/// The selected encoding also determines which input types (logs, metrics, traces) are supported.
pub struct EncodingConfig {
Expand Down Expand Up @@ -60,7 +60,7 @@ where

/// Encoding configuration.
#[configurable_component]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct EncodingConfigWithFraming {
#[configurable(derived)]
Expand Down
150 changes: 80 additions & 70 deletions src/sinks/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct HttpSinkConfig {
#[serde(default)]
pub compression: Compression,

/// If not specified, `encoding.codec` will default to `json`.
/// If `encoding.framing` is not specified, it will be deduced from `encoding.codec`.
#[serde(flatten)]
pub encoding: EncodingConfigWithFraming,

Expand Down Expand Up @@ -170,79 +172,15 @@ impl HttpSinkConfig {
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
Ok(Encoder::<Framer>::new(framer, serializer))
}
}

impl GenerateConfig for HttpSinkConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"uri = "https://10.22.212.22:9000/endpoint"
encoding.codec = "json""#,
)
.unwrap()
}
}

async fn healthcheck(uri: UriSerde, auth: Option<Auth>, client: HttpClient) -> crate::Result<()> {
let auth = auth.choose_one(&uri.auth)?;
let uri = uri.with_default_parts();
let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap();

if let Some(auth) = auth {
auth.apply(&mut request);
}

let response = client.send(request).await?;

match response.status() {
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}

pub(super) fn validate_headers(
headers: &BTreeMap<String, String>,
configures_auth: bool,
) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
let headers = crate::sinks::util::http::validate_headers(headers)?;

for name in headers.keys() {
if configures_auth && name.inner() == AUTHORIZATION {
return Err("Authorization header can not be used with defined auth options".into());
}
}

Ok(headers)
}

pub(super) fn validate_payload_wrapper(
payload_prefix: &str,
payload_suffix: &str,
encoder: &Encoder<Framer>,
) -> crate::Result<(String, String)> {
let payload = [payload_prefix, "{}", payload_suffix].join("");
match (
encoder.serializer(),
encoder.framer(),
serde_json::from_str::<serde_json::Value>(&payload),
) {
(
Serializer::Json(_),
Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
Err(_),
) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()),
_ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())),
}
}

#[async_trait]
#[typetag::serde(name = "http")]
impl SinkConfig for HttpSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
pub(crate) async fn build_with_encoder(
&self,
cx: SinkContext,
encoder: Encoder<Framer>,
transformer: Transformer,
) -> crate::Result<(VectorSink, Healthcheck)> {
let batch_settings = self.batch.validate()?.into_batcher_settings()?;

let encoder = self.build_encoder()?;
let transformer = self.encoding.transformer();

let mut request = self.request.clone();
request.add_old_option(self.headers.clone());

Expand Down Expand Up @@ -350,6 +288,78 @@ impl SinkConfig for HttpSinkConfig {

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}
}

impl GenerateConfig for HttpSinkConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"uri = "https://10.22.212.22:9000/endpoint"
encoding.codec = "json""#,
)
.unwrap()
}
}

async fn healthcheck(uri: UriSerde, auth: Option<Auth>, client: HttpClient) -> crate::Result<()> {
let auth = auth.choose_one(&uri.auth)?;
let uri = uri.with_default_parts();
let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap();

if let Some(auth) = auth {
auth.apply(&mut request);
}

let response = client.send(request).await?;

match response.status() {
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}

pub(super) fn validate_headers(
headers: &BTreeMap<String, String>,
configures_auth: bool,
) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
let headers = crate::sinks::util::http::validate_headers(headers)?;

for name in headers.keys() {
if configures_auth && name.inner() == AUTHORIZATION {
return Err("Authorization header can not be used with defined auth options".into());
}
}

Ok(headers)
}

pub(super) fn validate_payload_wrapper(
payload_prefix: &str,
payload_suffix: &str,
encoder: &Encoder<Framer>,
) -> crate::Result<(String, String)> {
let payload = [payload_prefix, "{}", payload_suffix].join("");
match (
encoder.serializer(),
encoder.framer(),
serde_json::from_str::<serde_json::Value>(&payload),
) {
(
Serializer::Json(_),
Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
Err(_),
) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()),
_ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())),
}
}

#[async_trait]
#[typetag::serde(name = "http")]
impl SinkConfig for HttpSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let encoder = self.build_encoder()?;
self.build_with_encoder(cx, encoder, self.encoding.transformer())
.await
}

fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type())
Expand Down
66 changes: 56 additions & 10 deletions src/sinks/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
use crate::codecs::Encoder;
use crate::{
codecs::{EncodingConfigWithFraming, Transformer},
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
sinks::{
Healthcheck, VectorSink,
http::config::{HttpMethod, HttpSinkConfig},
},
};
use indoc::indoc;
use vector_config::component::GenerateConfig;
use vector_lib::codecs::encoding::{Framer, ProtobufSerializer, Serializer};
use vector_lib::opentelemetry::proto::{
LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE, TRACES_REQUEST_MESSAGE_TYPE,
};
use vector_lib::{
codecs::{
JsonSerializerConfig,
Expand All @@ -8,22 +21,26 @@ use vector_lib::{
configurable::configurable_component,
};

use crate::{
codecs::{EncodingConfigWithFraming, Transformer},
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
sinks::{
Healthcheck, VectorSink,
http::config::{HttpMethod, HttpSinkConfig},
},
};

/// Configuration for the `OpenTelemetry` sink.
#[configurable_component(sink("opentelemetry", "Deliver OTLP data over HTTP."))]
#[derive(Clone, Debug, Default)]
pub struct OpenTelemetryConfig {
/// Protocol configuration
#[configurable(derived)]
protocol: Protocol,

/// Setting this field to `true`, will override all encoding settings and it will encode requests based on the
/// [OpenTelemetry protocol](https://opentelemetry.io/docs/specs/otel/protocol/).
///
/// The endpoint is used to determine the data type:
/// * v1/logs → OTLP Logs
/// * v1/traces → OTLP Traces
/// * v1/metrics → OTLP Metrics
///
/// More information available [here](https://opentelemetry.io/docs/specs/otlp/?utm_source=chatgpt.com#otlphttp-request).
#[configurable(derived)]
#[serde(default)]
pub use_otlp_encoding: bool,
}

/// The protocol used to send data to OpenTelemetry.
Expand Down Expand Up @@ -78,7 +95,23 @@ impl GenerateConfig for OpenTelemetryConfig {
impl SinkConfig for OpenTelemetryConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
match &self.protocol {
Protocol::Http(config) => config.build(cx).await,
Protocol::Http(config) => {
if self.use_otlp_encoding {
let serializer = ProtobufSerializer::new_from_bytes(
vector_lib::opentelemetry::proto::DESCRIPTOR_BYTES,
to_message_type(&config.uri.to_string())?,
)?;
let encoder = Encoder::<Framer>::new(
FramingConfig::Bytes.build(),
Serializer::Protobuf(serializer),
);
config
.build_with_encoder(cx, encoder, config.encoding.transformer())
.await
} else {
config.build(cx).await
}
}
}
}

Expand All @@ -95,6 +128,19 @@ impl SinkConfig for OpenTelemetryConfig {
}
}

/// Checks if an endpoint ends with a known OTEL proto request.
pub fn to_message_type(endpoint: &str) -> crate::Result<&'static str> {
if endpoint.ends_with("v1/logs") {
Ok(LOGS_REQUEST_MESSAGE_TYPE)
} else if endpoint.ends_with("v1/traces") {
Ok(TRACES_REQUEST_MESSAGE_TYPE)
} else if endpoint.ends_with("v1/metrics") {
Ok(METRICS_REQUEST_MESSAGE_TYPE)
} else {
Err(format!("Endpoint {endpoint} not supported, should end with 'v1/logs', 'v1/metrics' or 'v1/traces'.").into())
}
}

#[cfg(test)]
mod test {
#[test]
Expand Down
Loading
Loading