diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index f47ce0fc2d60d..aab13651415a9 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -10,11 +10,13 @@ name = "generate-avro-fixtures" path = "tests/bin/generate-avro-fixtures.rs" [dependencies] +akin = { version = "0.4", optional = true } apache-avro = { version = "0.16.0", default-features = false } bytes = { version = "1", default-features = false } chrono.workspace = true csv-core = { version = "0.1.10", default-features = false } derivative = { version = "2", default-features = false } +derive_more = { version = "0.99", optional = true } dyn-clone = { version = "1", default-features = false } lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] } memchr = { version = "2", default-features = false } @@ -24,9 +26,11 @@ prost = { version = "0.12.3", default-features = false, features = ["std"] } prost-reflect = { version = "0.13", default-features = false, features = ["serde"] } regex = { version = "1.10.3", default-features = false, features = ["std", "perf"] } serde.workspace = true +serde-aux = { version = "4.5", optional = true } serde_json.workspace = true smallvec = { version = "1", default-features = false, features = ["union"] } snafu = { version = "0.7.5", default-features = false, features = ["futures"] } +strum = { version = "0.26", features = ["derive"], optional = true } syslog_loose = { version = "0.21", default-features = false, optional = true } tokio-util = { version = "0.7", default-features = false, features = ["codec"] } tracing = { version = "0.1", default-features = false } @@ -48,4 +52,4 @@ rstest = "0.18.2" vrl.workspace = true [features] -syslog = ["dep:syslog_loose"] +syslog = ["dep:syslog_loose", "dep:strum", "dep:akin", "dep:derive_more", "dep:serde-aux"] diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index e61f7cae0bb96..2cb326d25229c 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -14,6 +14,7 @@ mod native_json; mod protobuf; mod raw_message; mod text; +mod syslog; use std::fmt::Debug; @@ -28,6 +29,7 @@ pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; pub use text::{TextSerializer, TextSerializerConfig}; +pub use syslog::{SyslogSerializer, SyslogSerializerConfig}; use vector_core::event::Event; /// Serialize a structured event into a byte frame. diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs new file mode 100644 index 0000000000000..64ed40eb08377 --- /dev/null +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -0,0 +1,577 @@ +use bytes::{BufMut, BytesMut}; +use tokio_util::codec::Encoder; +use vector_core::{config::DataType, event::{Event, LogEvent}, schema}; +use chrono::{DateTime, SecondsFormat, SubsecRound, Utc}; +use vrl::value::{ObjectMap, Value}; +use vector_config::configurable_component; + +use std::collections::HashMap; + +// All of this block is to support the Facility + Severity enums with convenience of string or ordinal config value: +use std::str::FromStr; +use strum::{FromRepr, EnumString, VariantNames}; +// `akin` macro for DRY impl to share with both enums due to lack of a `FromRepr` trait: +use akin::akin; +// Custom deserialization with serde needed: +use serde::{Deserialize, Deserializer, de::Error}; +use serde_aux::field_attributes::deserialize_number_from_string; +use snafu::{Snafu, OptionExt}; + +/// Config used to build a `SyslogSerializer`. +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[serde(default)] +pub struct SyslogSerializerConfig { + /// Options for the Syslog serializer. + pub syslog: SyslogSerializerOptions +} + +impl SyslogSerializerConfig { + /// Build the `SyslogSerializer` from this configuration. + pub fn build(&self) -> SyslogSerializer { + SyslogSerializer::new(&self) + } + + /// The data type of events that are accepted by `SyslogSerializer`. + pub fn input_type(&self) -> DataType { + DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + schema::Requirement::empty() + } +} + +/// Syslog serializer options. +#[configurable_component] +#[derive(Clone, Debug, Default)] +// Serde default makes all config keys optional. +// Each field assigns either a fixed value, or field name (lookup field key to retrieve dynamic value per `LogEvent`). +#[serde(default)] +pub struct SyslogSerializerOptions { + /// RFC + rfc: SyslogRFC, + /// Facility + #[serde(deserialize_with = "Facility::deserialize")] + facility: Facility, + /// Severity + #[serde(deserialize_with = "Severity::deserialize")] + severity: Severity, + + /// App Name + app_name: Option, + /// Proc ID + proc_id: Option, + /// Msg ID + msg_id: Option, + + /// Payload key + payload_key: String, + + // Q: The majority of the fields above pragmatically only make sense as config for keys to query? +} + +/// Serializer that converts an `Event` to bytes using the Syslog format. +#[derive(Debug, Clone)] +pub struct SyslogSerializer { + config: SyslogSerializerConfig +} + +impl SyslogSerializer { + /// Creates a new `SyslogSerializer`. + pub fn new(conf: &SyslogSerializerConfig) -> Self { + Self { config: conf.clone() } + } +} + +impl Encoder for SyslogSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { + if let Event::Log(log_event) = event { + let syslog_message = ConfigDecanter::new(log_event).decant_config(&self.config.syslog); + + let vec = syslog_message + .encode(&self.config.syslog.rfc) + .as_bytes() + .to_vec(); + buffer.put_slice(&vec); + } + + Ok(()) + } +} + +// Adapts a `LogEvent` into a `SyslogMessage` based on config from `SyslogSerializerOptions`: +// - Splits off the responsibility of encoding logic to `SyslogMessage` (which is not dependent upon Vector types). +// - Majority of methods are only needed to support the `decant_config()` operation. +struct ConfigDecanter { + log: LogEvent, +} + +impl ConfigDecanter { + fn new(log: LogEvent) -> Self { + Self { + log, + } + } + + fn decant_config(&self, config: &SyslogSerializerOptions) -> SyslogMessage { + let y = |v| self.replace_if_proxied_opt(v); + let app_name = y(&config.app_name).unwrap_or("vector".to_owned()); + let proc_id = y(&config.proc_id); + let msg_id = y(&config.msg_id); + + SyslogMessage { + pri: Pri { + facility: config.facility, + severity: config.severity, + }, + timestamp: self.get_timestamp(), + hostname: self.value_by_key("hostname"), + tag: Tag { + app_name, + proc_id, + msg_id, + }, + structured_data: self.get_structured_data(), + message: self.get_message(&config), + } + } + + fn replace_if_proxied_opt(&self, value: &Option) -> Option { + value.as_ref().and_then(|v| self.replace_if_proxied(v)) + } + + // When the value has the expected prefix, perform a lookup for a field key without that prefix part. + // A failed lookup returns `None`, while a value without the prefix uses the config value as-is. + // + // Q: Why `$.message.` as the prefix? (Appears to be JSONPath syntax?) + // NOTE: Originally named in PR as: `get_field_or_config()` + fn replace_if_proxied(&self, value: &str) -> Option { + value + .strip_prefix("$.message.") + .map_or( + Some(value.to_owned()), + |field_key| self.value_by_key(field_key), + ) + } + + fn value_by_key(&self, field_key: &str) -> Option { + self.log.get(field_key).map(|field_value| { + field_value.to_string_lossy().to_string() + }) + } + + fn get_structured_data(&self) -> Option { + self.log.get("structured_data") + .and_then(|v| v.clone().into_object()) + .map(StructuredData::from) + } + + fn get_timestamp(&self) -> DateTime:: { + // Q: Should the timestamp source be configurable? (eg: Select a field from the `remap` transform) + // + // Concerns: + // - A source with `log_namespace: true` seems to cause `get_timstamp()` to return `None`? + // Does not seem to retrieve `%vector.ingest_timestamp`? + // - A sink type `console` with `timestamp_format: unix_ms` converts a `Value::Timestamp` prior to the encoder logic + // to `Value::Integer(i64)` instead, which won't match this condition. + // + // NOTE: + // Vector always manages `Value::Timestamp` as `DateTime`, any prior TZ information context is always dropped. + // If restoring the TZ for a log is important, it could be handled via a remap transform? + // + // Ref: + // `log.get_timestamp()`: + // https://github.com/vectordotdev/vector/blob/ad6a48efc0f79b2c18a5c1394e5d8603fdfd1bab/lib/vector-core/src/event/log_event.rs#L543-L552 + if let Some(Value::Timestamp(timestamp)) = self.log.get_timestamp() { + *timestamp + } else { + // NOTE: + // When timezone information is missing Vector handles conversion to UTC by assuming the local TZ: + // https://vector.dev/docs/about/under-the-hood/architecture/data-model/log/#time-zones + // There is a global option for which TZ to assume (where the default is local TZ): + // https://vector.dev/docs/reference/configuration/global-options/#timezone + // https://github.com/vectordotdev/vector/blob/58a4a2ef52e606c0f9b9fa975cf114b661300584/lib/vector-core/src/config/global_options.rs#L233-L236 + // https://github.com/vectordotdev/vrl/blob/c010300710a00191cd406e57cd0f3e001923d598/src/compiler/datetime.rs#L88-L95 + // VRL remap can also override that: + // https://vector.dev/docs/reference/configuration/transforms/remap/#timezone + // Vector's `syslog` source type also uses `Utc::now()` internally as a fallback: + // https://github.com/vectordotdev/vector/blob/58a4a2ef52e606c0f9b9fa975cf114b661300584/src/sources/syslog.rs#L430-L438 + Utc::now() + } + } + + fn get_message(&self, config: &SyslogSerializerOptions) -> String { + // `payload_key` configures where to source the value for the syslog `message`: + // - Not configured => Encodes the default log message. + // - Field key (Valid) => Get value by lookup (value_by_key) + // - Field key (Invalid) => Empty string (unwrap_or_default) + + // Ref: + // `log.get_message()`: + // https://github.com/vectordotdev/vector/blob/ad6a48efc0f79b2c18a5c1394e5d8603fdfd1bab/lib/vector-core/src/event/log_event.rs#L532-L541 + // `v.to_string_lossy()`: + // https://github.com/vectordotdev/vrl/blob/f2d71cd26cb8270230f531945d7dee4929235905/src/value/value/serde.rs#L34-L55 + let payload = if config.payload_key.is_empty() { + self.log.get_message().map(|v| v.to_string_lossy().to_string() ) + } else { + self.value_by_key(&config.payload_key) + }; + + payload.unwrap_or_default() + } +} + +// +// SyslogMessage support +// + +const NIL_VALUE: &'static str = "-"; +const SYSLOG_V1: &'static str = "1"; + +/// Syslog RFC +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[serde(rename_all = "snake_case")] +pub enum SyslogRFC { + /// RFC 3164 + Rfc3164, + + #[default] + /// RFC 5424 + Rfc5424 +} + +// ABNF definition: +// https://datatracker.ietf.org/doc/html/rfc5424#section-6 +// https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 +#[derive(Default, Debug)] +struct SyslogMessage { + pri: Pri, + timestamp: DateTime::, + hostname: Option, + tag: Tag, + structured_data: Option, + message: String, +} + +impl SyslogMessage { + fn encode(&self, rfc: &SyslogRFC) -> String { + // Q: NIL_VALUE is unlikely? Technically invalid for RFC 3164: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.4 + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 + let hostname = self.hostname.as_deref().unwrap_or(NIL_VALUE); + let structured_data = self.structured_data.as_ref().map(|sd| sd.encode()); + + let fields_encoded = match rfc { + SyslogRFC::Rfc3164 => { + // TIMESTAMP field format: + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 + // https://docs.rs/chrono/latest/chrono/format/strftime/index.html + // + // TODO: Should this remain as UTC or adjust to the local TZ of the environment (or Vector config)? + // RFC 5424 suggests (when adapting for RFC 3164) to present a timestamp with the local TZ of the log source: + // https://www.rfc-editor.org/rfc/rfc5424#appendix-A.1 + let timestamp = self.timestamp.format("%b %e %H:%M:%S").to_string(); + // MSG part begins with TAG field + optional context: + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.3 + let mut msg_start = self.tag.encode_rfc_3164(); + // When RFC 5424 "Structured Data" is available, it can be compatible with RFC 3164 + // by including it in the RFC 3164 `CONTENT` field (part of MSG): + // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 + if let Some(sd) = structured_data.as_deref() { + msg_start = msg_start + " " + sd + } + + [ + timestamp.as_str(), + hostname, + &msg_start, + ].join(" ") + }, + SyslogRFC::Rfc5424 => { + // HEADER part fields: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 + // TIME-FRAC max length is 6 digits (microseconds): + // https://datatracker.ietf.org/doc/html/rfc5424#section-6 + // TODO: Likewise for RFC 5424, as UTC the offset will always render as `Z` if not configurable. + let timestamp = self.timestamp.round_subsecs(6).to_rfc3339_opts(SecondsFormat::AutoSi, true); + let tag = self.tag.encode_rfc_5424(); + let sd = structured_data.as_deref().unwrap_or(NIL_VALUE); + + [ + SYSLOG_V1, + timestamp.as_str(), + hostname, + &tag, + sd + ].join(" ") + } + }; + + [ + &self.pri.encode(), + &fields_encoded, + " ", + &self.message, + ].concat() + + // Q: RFC 5424 MSG part should technically ensure UTF-8 message begins with BOM? + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.4 + } +} + +#[derive(Default, Debug)] +struct Tag { + app_name: String, + proc_id: Option, + msg_id: Option +} + +impl Tag { + // Roughly equivalent - RFC 5424 fields can compose the start of + // an RFC 3164 MSG part (TAG + CONTENT fields): + // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 + fn encode_rfc_3164(&self) -> String { + let Self { app_name, proc_id, msg_id } = self; + + match proc_id.as_deref().or(msg_id.as_deref()) { + Some(context) => [&app_name, "[", &context, "]:"].concat(), + None => [&app_name, ":"].concat() + } + } + + // TAG was split into separate fields: APP-NAME, PROCID, MSGID + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.5 + fn encode_rfc_5424(&self) -> String { + let Self { app_name, proc_id, msg_id } = self; + + [ + &app_name, + proc_id.as_deref().unwrap_or(NIL_VALUE), + msg_id.as_deref().unwrap_or(NIL_VALUE), + ].join(" ") + } +} + +// Structured Data: +// https://datatracker.ietf.org/doc/html/rfc5424#section-6.3 +// An SD-ELEMENT consists of a name (SD-ID) + parameter key-value pairs (SD-PARAM) +type StructuredDataMap = HashMap>; +#[derive(Debug, Default)] +struct StructuredData { + elements: StructuredDataMap +} + +// Used by `SyslogMessage::encode()` +impl StructuredData { + fn encode(&self) -> String { + if self.elements.is_empty() { + NIL_VALUE.to_string() + } else { + let mut s = String::new(); + + for (sd_id, sd_params) in &self.elements { + s = s + "[" + sd_id; + for (key, value) in sd_params { + s = s + " " + key + "=\"" + value + "\""; + } + s += "]"; + } + + s + } + } +} + +// Used by `ConfigDecanter::decant_config()` +impl From for StructuredData { + fn from(fields: ObjectMap) -> Self { + let elements = fields.into_iter().flat_map(|(sd_id, value)| { + let sd_params = value + .into_object()? + .into_iter() + .map(|(k, v)| (k.into(), value_to_string(v))) + .collect(); + + Some((sd_id.into(), sd_params)) + }).collect::(); + + Self { elements } + } +} + +// Only used as helper to support `StructuredData::from()` +// +// TODO: +// This method could be replaced in favor of calling `v.to_string_lossy().to_string()`? (As was done elsewhere in this file): +// https://github.com/vectordotdev/vrl/blob/f2d71cd26cb8270230f531945d7dee4929235905/src/value/value/serde.rs#L34-L55 +// Timestamp value is handled the same way via `timestamp_to_string()`: +// https://github.com/vectordotdev/vrl/blob/f2d71cd26cb8270230f531945d7dee4929235905/src/value/value.rs#L175-L179 +fn value_to_string(v: Value) -> String { + match v { + Value::Bytes(bytes) => String::from_utf8_lossy(&bytes).to_string(), + Value::Timestamp(timestamp) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true), + _ => v.to_string() + } +} + +// +// Facility + Severity support +// + +// PRIVAL: +// https://www.rfc-editor.org/rfc/rfc5424#section-6.2.1 +// > The number contained within these angle brackets is known as the Priority value (PRIVAL) +// and represents both the Facility and Severity. +#[derive(Default, Debug)] +struct Pri { + facility: Facility, + severity: Severity, +} + +impl Pri { + // The last paragraph describes how to compose the enums into `PRIVAL`: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 + fn encode(&self) -> String { + let prival = (self.facility as u8 * 8) + self.severity as u8; + ["<", &prival.to_string(), ">"].concat() + } +} + +// Facility + Severity mapping from Name => Ordinal number: +// NOTE: +// - Vector component enforces variant doc-comments, even though it's pointless for these enums? +// - `configurable_component(no_deser)` is used to match the existing functionality to support deserializing config with ordinal mapping. +// - `EnumString` with `strum(serialize_all = "kebab-case")` provides the `FromStr` support, while `FromRepr` handles ordinal support. +// - `VariantNames` assists with generating the equivalent `de::Error::unknown_variant` serde error message. + +/// Syslog facility +#[derive(Default, Debug, EnumString, FromRepr, VariantNames, Copy, Clone)] +#[strum(serialize_all = "kebab-case")] +#[configurable_component(no_deser)] +enum Facility { + /// Kern + Kern = 0, + /// User + #[default] + User = 1, + /// Mail + Mail = 2, + /// Daemon + Daemon = 3, + /// Auth + Auth = 4, + /// Syslog + Syslog = 5, + /// LPR + LPR = 6, + /// News + News = 7, + /// UUCP + UUCP = 8, + /// Cron + Cron = 9, + /// AuthPriv + AuthPriv = 10, + /// FTP + FTP = 11, + /// NTP + NTP = 12, + /// Security + Security = 13, + /// Console + Console = 14, + /// SolarisCron + SolarisCron = 15, + /// Local0 + Local0 = 16, + /// Local1 + Local1 = 17, + /// Local2 + Local2 = 18, + /// Local3 + Local3 = 19, + /// Local4 + Local4 = 20, + /// Local5 + Local5 = 21, + /// Local6 + Local6 = 22, + /// Local7 + Local7 = 23, +} + +/// Syslog severity +#[derive(Default, Debug, EnumString, FromRepr, VariantNames, Copy, Clone)] +#[strum(serialize_all = "kebab-case")] +#[configurable_component(no_deser)] +enum Severity { + /// Emergency + Emergency = 0, + /// Alert + Alert = 1, + /// Critical + Critical = 2, + /// Error + Error = 3, + /// Warning + Warning = 4, + /// Notice + Notice = 5, + /// Informational + #[default] + Informational = 6, + /// Debug + Debug = 7, +} + +// Additionally support variants from string-based integers: +// Attempts to parse a string for ordinal mapping first, otherwise try the variant name. +// NOTE: +// - While `serde(rename_all = "kebab-case")` attribute would deserialize like `FromStr` + `EnumString`, config input must strictly match. +// - To retain support for ordinal config input, a custom deserialize method is needed (as `derive(Deserialize)` is too basic): +// - Error message should roughly match `de::Error::unknown_variant` + +akin! { + let &enums = [Facility, Severity]; + + impl *enums { + fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = NumberOrString::deserialize(deserializer)?; + let variant: Option = match &value { + NumberOrString::Number(num) => Self::from_repr(*num), + NumberOrString::String(s) => Self::from_str(&s.to_ascii_lowercase()).ok(), + }; + + variant.with_context(|| InvalidVariantSnafu { + input: value.to_string(), + variants: Self::VARIANTS.join("`, `"), + }).map_err(D::Error::custom) + } + } +} + +// An intermediary container to deserialize config value into. +// Ensures that a string number is properly deserialized to the `usize` variant. +#[derive(derive_more::Display, Deserialize)] +#[serde(untagged)] +enum NumberOrString { + Number( + #[serde(deserialize_with = "deserialize_number_from_string")] + usize + ), + String(String) +} + +#[derive(Debug, Snafu)] +enum StrumDeserializeError { + #[snafu(display("Unknown variant `{}`, expected one of `{}`", input, variants))] + InvalidVariant { input: String, variants: String }, +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 2f28c27ec7bf8..7e969ae7b015c 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -8,17 +8,24 @@ use std::fmt::Debug; use bytes::BytesMut; pub use format::{ - AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CsvSerializer, - CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, - JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, - NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, ProtobufSerializer, - ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, - RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, + AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, + CsvSerializer, CsvSerializerConfig, + GelfSerializer, GelfSerializerConfig, + JsonSerializer, JsonSerializerConfig, + LogfmtSerializer, LogfmtSerializerConfig, + NativeJsonSerializer, NativeJsonSerializerConfig, + NativeSerializer, NativeSerializerConfig, + ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, + RawMessageSerializer, RawMessageSerializerConfig, + TextSerializer, TextSerializerConfig, + SyslogSerializer, SyslogSerializerConfig, }; pub use framing::{ - BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder, - CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder, - LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, + BoxedFramer, BoxedFramingError, + BytesEncoder, BytesEncoderConfig, + CharacterDelimitedEncoder, CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, + LengthDelimitedEncoder, LengthDelimitedEncoderConfig, + NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, }; use vector_config::configurable_component; use vector_core::{config::DataType, event::Event, schema}; @@ -259,6 +266,10 @@ pub enum SerializerConfig { /// transform) and removing the message field while doing additional parsing on it, as this /// could lead to the encoding emitting empty strings for the given event. Text(TextSerializerConfig), + + /// Syslog encoding + /// RFC 3164 and 5424 are supported + Syslog (SyslogSerializerConfig), } impl From for SerializerConfig { @@ -321,6 +332,12 @@ impl From for SerializerConfig { } } +impl From for SerializerConfig { + fn from(config: SyslogSerializerConfig) -> Self { + Self::Syslog(config) + } +} + impl SerializerConfig { /// Build the `Serializer` from this configuration. pub fn build(&self) -> Result> { @@ -341,6 +358,7 @@ impl SerializerConfig { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())), + SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())), } } @@ -367,7 +385,8 @@ impl SerializerConfig { | SerializerConfig::Logfmt | SerializerConfig::NativeJson | SerializerConfig::RawMessage - | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited, + | SerializerConfig::Text(_) + | SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited, } } @@ -386,6 +405,7 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), + SerializerConfig::Syslog(config) => config.input_type(), } } @@ -404,6 +424,7 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), + SerializerConfig::Syslog(config) => config.schema_requirement(), } } } @@ -431,6 +452,8 @@ pub enum Serializer { RawMessage(RawMessageSerializer), /// Uses a `TextSerializer` for serialization. Text(TextSerializer), + /// Uses a `SyslogSerializer` for serialization. + Syslog(SyslogSerializer), } impl Serializer { @@ -444,7 +467,8 @@ impl Serializer { | Serializer::Text(_) | Serializer::Native(_) | Serializer::Protobuf(_) - | Serializer::RawMessage(_) => false, + | Serializer::RawMessage(_) + | Serializer::Syslog(_) => false, } } @@ -465,7 +489,8 @@ impl Serializer { | Serializer::Text(_) | Serializer::Native(_) | Serializer::Protobuf(_) - | Serializer::RawMessage(_) => { + | Serializer::RawMessage(_) + | Serializer::Syslog(_) => { panic!("Serializer does not support JSON") } } @@ -532,6 +557,12 @@ impl From for Serializer { } } +impl From for Serializer { + fn from(serializer: SyslogSerializer) -> Self { + Self::Syslog(serializer) + } +} + impl tokio_util::codec::Encoder for Serializer { type Error = vector_common::Error; @@ -547,6 +578,7 @@ impl tokio_util::codec::Encoder for Serializer { Serializer::Protobuf(serializer) => serializer.encode(event, buffer), Serializer::RawMessage(serializer) => serializer.encode(event, buffer), Serializer::Text(serializer) => serializer.encode(event, buffer), + Serializer::Syslog(serializer) => serializer.encode(event, buffer), } } } diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index d16ec78b627e4..c742f14f84f41 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -123,7 +123,8 @@ impl EncodingConfigWithFraming { | Serializer::Logfmt(_) | Serializer::NativeJson(_) | Serializer::RawMessage(_) - | Serializer::Text(_), + | Serializer::Text(_) + | Serializer::Syslog(_), ) => NewlineDelimitedEncoder::new().into(), }; diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index d12f2ab85cb78..5bc74dcb258a2 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -122,7 +122,8 @@ impl Encoder { | Serializer::Logfmt(_) | Serializer::NativeJson(_) | Serializer::RawMessage(_) - | Serializer::Text(_), + | Serializer::Text(_) + | Serializer::Syslog(_), _, ) => "text/plain", } diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index a9b39a560988c..4b5dc66cad0eb 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -212,6 +212,7 @@ fn serializer_config_to_deserializer( }) } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, + SerializerConfig::Syslog(_) => todo!(), }; deserializer_config.build() diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 7c59253c28c6f..a87dfe296e8c3 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -236,12 +236,12 @@ impl WebSocketSink { const fn should_encode_as_binary(&self) -> bool { use vector_lib::codecs::encoding::Serializer::{ - Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, Protobuf, RawMessage, Text, + Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, Protobuf, RawMessage, Text, Syslog, }; match self.encoder.serializer() { RawMessage(_) | Avro(_) | Native(_) | Protobuf(_) => true, - Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false, + Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) | Syslog(_) => false, } }