From 1ddf6af2a5d69f7aa6ee857bcbfffb519c55ad3c Mon Sep 17 00:00:00 2001 From: Sergey Yedrikov Date: Mon, 12 Jun 2023 14:26:02 -0400 Subject: [PATCH 01/16] feat: Add syslog codec Original commit from syedriko --- lib/codecs/src/encoding/format/mod.rs | 2 + lib/codecs/src/encoding/format/syslog.rs | 446 +++++++++++++++++++++ lib/codecs/src/encoding/mod.rs | 56 ++- src/codecs/encoding/config.rs | 3 +- src/codecs/encoding/encoder.rs | 3 +- src/components/validation/resources/mod.rs | 1 + src/sinks/websocket/sink.rs | 4 +- 7 files changed, 499 insertions(+), 16 deletions(-) create mode 100644 lib/codecs/src/encoding/format/syslog.rs diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index e61f7cae0bb96..2cb326d25229c 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -14,6 +14,7 @@ mod native_json; mod protobuf; mod raw_message; mod text; +mod syslog; use std::fmt::Debug; @@ -28,6 +29,7 @@ pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; pub use text::{TextSerializer, TextSerializerConfig}; +pub use syslog::{SyslogSerializer, SyslogSerializerConfig}; use vector_core::event::Event; /// Serialize a structured event into a byte frame. diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs new file mode 100644 index 0000000000000..017e054dcc850 --- /dev/null +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -0,0 +1,446 @@ +use bytes::{BufMut, BytesMut}; +use tokio_util::codec::Encoder; +use vector_core::{config::DataType, event::{Event, LogEvent}, schema}; +use chrono::{DateTime, SecondsFormat, Local}; +use vrl::value::Value; +use serde::{de, Deserialize}; +use vector_config::configurable_component; + +const NIL_VALUE: &'static str = "-"; + +/// Syslog RFC +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SyslogRFC { + /// RFC 3164 + Rfc3164, + + /// RFC 5424 + Rfc5424 +} + +impl Default for SyslogRFC { + fn default() -> Self { + SyslogRFC::Rfc5424 + } +} + +/// Syslog facility +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +enum Facility { + /// Syslog facility ordinal number + Fixed(u8), + + /// Syslog facility name + Field(String) +} + +impl Default for Facility { + fn default() -> Self { + Facility::Fixed(1) + } +} + +/// Syslog severity +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +enum Severity { + /// Syslog severity ordinal number + Fixed(u8), + + /// Syslog severity name + Field(String) +} + +impl Default for Severity { + fn default() -> Self { + Severity::Fixed(6) + } +} + +/// Config used to build a `SyslogSerializer`. +#[configurable_component] +#[derive(Debug, Clone, Default)] +pub struct SyslogSerializerConfig { + /// RFC + #[serde(default)] + rfc: SyslogRFC, + + /// Facility + #[serde(default)] + #[serde(deserialize_with = "deserialize_facility")] + facility: Facility, + + /// Severity + #[serde(default)] + #[serde(deserialize_with = "deserialize_severity")] + severity: Severity, + + /// Tag + #[serde(default)] + tag: String, + + /// Trim prefix + trim_prefix: Option, + + /// Payload key + #[serde(default)] + payload_key: String, + + /// Add log source + #[serde(default)] + add_log_source: bool, + + /// App Name, RFC 5424 only + #[serde(default = "default_app_name")] + app_name: String, + + /// Proc ID, RFC 5424 only + #[serde(default = "default_nil_value")] + proc_id: String, + + /// Msg ID, RFC 5424 only + #[serde(default = "default_nil_value")] + msg_id: String +} + +impl SyslogSerializerConfig { + /// Build the `SyslogSerializer` from this configuration. + pub fn build(&self) -> SyslogSerializer { + SyslogSerializer::new(&self) + } + + /// The data type of events that are accepted by `SyslogSerializer`. + pub fn input_type(&self) -> DataType { + DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + schema::Requirement::empty() + } +} + +/// Serializer that converts an `Event` to bytes using the Syslog format. +#[derive(Debug, Clone)] +pub struct SyslogSerializer { + config: SyslogSerializerConfig +} + +impl SyslogSerializer { + /// Creates a new `SyslogSerializer`. + pub fn new(conf: &SyslogSerializerConfig) -> Self { + Self { config: conf.clone() } + } +} + +impl Encoder for SyslogSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { + match event { + Event::Log(log) => { + let mut buf = String::from("<"); + let pri = get_num_facility(&self.config.facility, &log) * 8 + get_num_severity(&self.config.severity, &log); + buf.push_str(&pri.to_string()); + buf.push_str(">"); + match self.config.rfc { + SyslogRFC::Rfc3164 => { + let timestamp = get_timestamp(&log); + let formatted_timestamp = format!(" {} ", timestamp.format("%b %e %H:%M:%S")); + buf.push_str(&formatted_timestamp); + buf.push_str(&get_field("hostname", &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&self.config.tag, &log)); + buf.push_str(": "); + if self.config.add_log_source { + add_log_source(&log, &mut buf); + } + }, + SyslogRFC::Rfc5424 => { + buf.push_str("1 "); + let timestamp = get_timestamp(&log); + buf.push_str(×tamp.to_rfc3339_opts(SecondsFormat::Millis, true)); + buf.push(' '); + buf.push_str(&get_field("hostname", &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&&self.config.app_name, &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&&self.config.proc_id, &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&&self.config.msg_id, &log)); + buf.push_str(" - "); // no structured data + if self.config.add_log_source { + add_log_source(&log, &mut buf); + } + } + } + let mut payload = if self.config.payload_key.is_empty() { + serde_json::to_vec(&log).unwrap_or_default() + } else { + get_field(&&self.config.payload_key, &log).as_bytes().to_vec() + }; + let mut vec = buf.as_bytes().to_vec(); + vec.append(&mut payload); + buffer.put_slice(&vec); + }, + _ => {} + } + Ok(()) + } +} + +fn deserialize_facility<'de, D>(d: D) -> Result + where D: de::Deserializer<'de> +{ + let value: String = String::deserialize(d)?; + let num_value = value.parse::(); + match num_value { + Ok(num) => { + if num > 23 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"facility number too large")); + } else { + return Ok(Facility::Fixed(num)); + } + } + Err(_) => { + if let Some(field_name) = value.strip_prefix("$.message.") { + return Ok(Facility::Field(field_name.to_string())); + } else { + let num = match value.to_uppercase().as_str() { + "KERN" => 0, + "USER" => 1, + "MAIL" => 2, + "DAEMON" => 3, + "AUTH" => 4, + "SYSLOG" => 5, + "LPR" => 6, + "NEWS" => 7, + "UUCP" => 8, + "CRON" => 9, + "AUTHPRIV" => 10, + "FTP" => 11, + "NTP" => 12, + "SECURITY" => 13, + "CONSOLE" => 14, + "SOLARIS-CRON" => 15, + "LOCAL0" => 16, + "LOCAL1" => 17, + "LOCAL2" => 18, + "LOCAL3" => 19, + "LOCAL4" => 20, + "LOCAL5" => 21, + "LOCAL6" => 22, + "LOCAL7" => 23, + _ => 24, + }; + if num > 23 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown facility")); + } else { + return Ok(Facility::Fixed(num)) + } + } + } + } +} + +fn deserialize_severity<'de, D>(d: D) -> Result + where D: de::Deserializer<'de> +{ + let value: String = String::deserialize(d)?; + let num_value = value.parse::(); + match num_value { + Ok(num) => { + if num > 7 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"severity number too large")) + } else { + return Ok(Severity::Fixed(num)) + } + } + Err(_) => { + if let Some(field_name) = value.strip_prefix("$.message.") { + return Ok(Severity::Field(field_name.to_string())); + } else { + let num = match value.to_uppercase().as_str() { + "EMERGENCY" => 0, + "ALERT" => 1, + "CRITICAL" => 2, + "ERROR" => 3, + "WARNING" => 4, + "NOTICE" => 5, + "INFORMATIONAL" => 6, + "DEBUG" => 7, + _ => 8, + }; + if num > 7 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown severity")) + } else { + return Ok(Severity::Fixed(num)) + } + } + } + } +} + +fn default_app_name() -> String { + String::from("vector") +} + +fn default_nil_value() -> String { + String::from(NIL_VALUE) +} + +fn add_log_source(log: &LogEvent, buf: &mut String) { + buf.push_str("namespace_name="); + buf.push_str(&String::from_utf8( + log + .get("kubernetes.namespace_name") + .map(|h| h.coerce_to_bytes()) + .unwrap_or_default().to_vec() + ).unwrap()); + buf.push_str(", container_name="); + buf.push_str(&String::from_utf8( + log + .get("kubernetes.container_name") + .map(|h| h.coerce_to_bytes()) + .unwrap_or_default().to_vec() + ).unwrap()); + buf.push_str(", pod_name="); + buf.push_str(&String::from_utf8( + log + .get("kubernetes.pod_name") + .map(|h| h.coerce_to_bytes()) + .unwrap_or_default().to_vec() + ).unwrap()); + buf.push_str(", message="); +} + +fn get_num_facility(config_facility: &Facility, log: &LogEvent) -> u8 { + match config_facility { + Facility::Fixed(num) => return *num, + Facility::Field(field_name) => { + if let Some(field_value) = log.get(field_name.as_str()) { + let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); + let num_value = field_value_string.parse::(); + match num_value { + Ok(num) => { + if num > 23 { + return 1 // USER + } else { + return num + } + } + Err(_) => { + let num = match field_value_string.to_uppercase().as_str() { + "KERN" => 0, + "USER" => 1, + "MAIL" => 2, + "DAEMON" => 3, + "AUTH" => 4, + "SYSLOG" => 5, + "LPR" => 6, + "NEWS" => 7, + "UUCP" => 8, + "CRON" => 9, + "AUTHPRIV" => 10, + "FTP" => 11, + "NTP" => 12, + "SECURITY" => 13, + "CONSOLE" => 14, + "SOLARIS-CRON" => 15, + "LOCAL0" => 16, + "LOCAL1" => 17, + "LOCAL2" => 18, + "LOCAL3" => 19, + "LOCAL4" => 20, + "LOCAL5" => 21, + "LOCAL6" => 22, + "LOCAL7" => 23, + _ => 24, + }; + if num > 23 { + return 1 // USER + } else { + return num + } + } + } + } else { + return 1 // USER + } + } + } +} + +fn get_num_severity(config_severity: &Severity, log: &LogEvent) -> u8 { + match config_severity { + Severity::Fixed(num) => return *num, + Severity::Field(field_name) => { + if let Some(field_value) = log.get(field_name.as_str()) { + let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); + let num_value = field_value_string.parse::(); + match num_value { + Ok(num) => { + if num > 7 { + return 6 // INFORMATIONAL + } else { + return num + } + } + Err(_) => { + let num = match field_value_string.to_uppercase().as_str() { + "EMERGENCY" => 0, + "ALERT" => 1, + "CRITICAL" => 2, + "ERROR" => 3, + "WARNING" => 4, + "NOTICE" => 5, + "INFORMATIONAL" => 6, + "DEBUG" => 7, + _ => 8, + }; + if num > 7 { + return 6 // INFORMATIONAL + } else { + return num + } + } + } + } else { + return 6 // INFORMATIONAL + } + } + } +} + +fn get_field_or_config(config_name: &String, log: &LogEvent) -> String { + if let Some(field_name) = config_name.strip_prefix("$.message.") { + return get_field(field_name, log) + } else { + return config_name.clone() + } +} + +fn get_field(field_name: &str, log: &LogEvent) -> String { + if let Some(field_value) = log.get(field_name) { + return String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); + } else { + return NIL_VALUE.to_string() + } +} + +fn get_timestamp(log: &LogEvent) -> DateTime:: { + match log.get("@timestamp") { + Some(value) => { + if let Value::Timestamp(timestamp) = value { + DateTime::::from(*timestamp) + } else { + Local::now() + } + }, + _ => Local::now() + } +} + diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 2f28c27ec7bf8..7e969ae7b015c 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -8,17 +8,24 @@ use std::fmt::Debug; use bytes::BytesMut; pub use format::{ - AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CsvSerializer, - CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, - JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, - NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, ProtobufSerializer, - ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, - RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, + AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, + CsvSerializer, CsvSerializerConfig, + GelfSerializer, GelfSerializerConfig, + JsonSerializer, JsonSerializerConfig, + LogfmtSerializer, LogfmtSerializerConfig, + NativeJsonSerializer, NativeJsonSerializerConfig, + NativeSerializer, NativeSerializerConfig, + ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, + RawMessageSerializer, RawMessageSerializerConfig, + TextSerializer, TextSerializerConfig, + SyslogSerializer, SyslogSerializerConfig, }; pub use framing::{ - BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder, - CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder, - LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, + BoxedFramer, BoxedFramingError, + BytesEncoder, BytesEncoderConfig, + CharacterDelimitedEncoder, CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, + LengthDelimitedEncoder, LengthDelimitedEncoderConfig, + NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, }; use vector_config::configurable_component; use vector_core::{config::DataType, event::Event, schema}; @@ -259,6 +266,10 @@ pub enum SerializerConfig { /// transform) and removing the message field while doing additional parsing on it, as this /// could lead to the encoding emitting empty strings for the given event. Text(TextSerializerConfig), + + /// Syslog encoding + /// RFC 3164 and 5424 are supported + Syslog (SyslogSerializerConfig), } impl From for SerializerConfig { @@ -321,6 +332,12 @@ impl From for SerializerConfig { } } +impl From for SerializerConfig { + fn from(config: SyslogSerializerConfig) -> Self { + Self::Syslog(config) + } +} + impl SerializerConfig { /// Build the `Serializer` from this configuration. pub fn build(&self) -> Result> { @@ -341,6 +358,7 @@ impl SerializerConfig { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())), + SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())), } } @@ -367,7 +385,8 @@ impl SerializerConfig { | SerializerConfig::Logfmt | SerializerConfig::NativeJson | SerializerConfig::RawMessage - | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited, + | SerializerConfig::Text(_) + | SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited, } } @@ -386,6 +405,7 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), + SerializerConfig::Syslog(config) => config.input_type(), } } @@ -404,6 +424,7 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), + SerializerConfig::Syslog(config) => config.schema_requirement(), } } } @@ -431,6 +452,8 @@ pub enum Serializer { RawMessage(RawMessageSerializer), /// Uses a `TextSerializer` for serialization. Text(TextSerializer), + /// Uses a `SyslogSerializer` for serialization. + Syslog(SyslogSerializer), } impl Serializer { @@ -444,7 +467,8 @@ impl Serializer { | Serializer::Text(_) | Serializer::Native(_) | Serializer::Protobuf(_) - | Serializer::RawMessage(_) => false, + | Serializer::RawMessage(_) + | Serializer::Syslog(_) => false, } } @@ -465,7 +489,8 @@ impl Serializer { | Serializer::Text(_) | Serializer::Native(_) | Serializer::Protobuf(_) - | Serializer::RawMessage(_) => { + | Serializer::RawMessage(_) + | Serializer::Syslog(_) => { panic!("Serializer does not support JSON") } } @@ -532,6 +557,12 @@ impl From for Serializer { } } +impl From for Serializer { + fn from(serializer: SyslogSerializer) -> Self { + Self::Syslog(serializer) + } +} + impl tokio_util::codec::Encoder for Serializer { type Error = vector_common::Error; @@ -547,6 +578,7 @@ impl tokio_util::codec::Encoder for Serializer { Serializer::Protobuf(serializer) => serializer.encode(event, buffer), Serializer::RawMessage(serializer) => serializer.encode(event, buffer), Serializer::Text(serializer) => serializer.encode(event, buffer), + Serializer::Syslog(serializer) => serializer.encode(event, buffer), } } } diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index d16ec78b627e4..c742f14f84f41 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -123,7 +123,8 @@ impl EncodingConfigWithFraming { | Serializer::Logfmt(_) | Serializer::NativeJson(_) | Serializer::RawMessage(_) - | Serializer::Text(_), + | Serializer::Text(_) + | Serializer::Syslog(_), ) => NewlineDelimitedEncoder::new().into(), }; diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index d12f2ab85cb78..5bc74dcb258a2 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -122,7 +122,8 @@ impl Encoder { | Serializer::Logfmt(_) | Serializer::NativeJson(_) | Serializer::RawMessage(_) - | Serializer::Text(_), + | Serializer::Text(_) + | Serializer::Syslog(_), _, ) => "text/plain", } diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index a9b39a560988c..4b5dc66cad0eb 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -212,6 +212,7 @@ fn serializer_config_to_deserializer( }) } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, + SerializerConfig::Syslog(_) => todo!(), }; deserializer_config.build() diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 7c59253c28c6f..a87dfe296e8c3 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -236,12 +236,12 @@ impl WebSocketSink { const fn should_encode_as_binary(&self) -> bool { use vector_lib::codecs::encoding::Serializer::{ - Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, Protobuf, RawMessage, Text, + Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, Protobuf, RawMessage, Text, Syslog, }; match self.encoder.serializer() { RawMessage(_) | Avro(_) | Native(_) | Protobuf(_) => true, - Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false, + Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) | Syslog(_) => false, } } From 7407f7bc316344096a357aecb762338d252a4968 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Fri, 15 Mar 2024 17:09:52 +1300 Subject: [PATCH 02/16] chore: Split syslog encoder into separate files This is only a temporary change to make the diffs for future commits easier to follow. --- lib/codecs/src/encoding/format/syslog.rs | 439 ------------------ .../format/syslog/facility_severity.rs | 223 +++++++++ .../src/encoding/format/syslog/serializer.rs | 122 +++++ .../format/syslog/serializer_config.rs | 90 ++++ 4 files changed, 435 insertions(+), 439 deletions(-) create mode 100644 lib/codecs/src/encoding/format/syslog/facility_severity.rs create mode 100644 lib/codecs/src/encoding/format/syslog/serializer.rs create mode 100644 lib/codecs/src/encoding/format/syslog/serializer_config.rs diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 017e054dcc850..9077c806bd698 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -5,442 +5,3 @@ use chrono::{DateTime, SecondsFormat, Local}; use vrl::value::Value; use serde::{de, Deserialize}; use vector_config::configurable_component; - -const NIL_VALUE: &'static str = "-"; - -/// Syslog RFC -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -#[serde(rename_all = "snake_case")] -pub enum SyslogRFC { - /// RFC 3164 - Rfc3164, - - /// RFC 5424 - Rfc5424 -} - -impl Default for SyslogRFC { - fn default() -> Self { - SyslogRFC::Rfc5424 - } -} - -/// Syslog facility -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -enum Facility { - /// Syslog facility ordinal number - Fixed(u8), - - /// Syslog facility name - Field(String) -} - -impl Default for Facility { - fn default() -> Self { - Facility::Fixed(1) - } -} - -/// Syslog severity -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -enum Severity { - /// Syslog severity ordinal number - Fixed(u8), - - /// Syslog severity name - Field(String) -} - -impl Default for Severity { - fn default() -> Self { - Severity::Fixed(6) - } -} - -/// Config used to build a `SyslogSerializer`. -#[configurable_component] -#[derive(Debug, Clone, Default)] -pub struct SyslogSerializerConfig { - /// RFC - #[serde(default)] - rfc: SyslogRFC, - - /// Facility - #[serde(default)] - #[serde(deserialize_with = "deserialize_facility")] - facility: Facility, - - /// Severity - #[serde(default)] - #[serde(deserialize_with = "deserialize_severity")] - severity: Severity, - - /// Tag - #[serde(default)] - tag: String, - - /// Trim prefix - trim_prefix: Option, - - /// Payload key - #[serde(default)] - payload_key: String, - - /// Add log source - #[serde(default)] - add_log_source: bool, - - /// App Name, RFC 5424 only - #[serde(default = "default_app_name")] - app_name: String, - - /// Proc ID, RFC 5424 only - #[serde(default = "default_nil_value")] - proc_id: String, - - /// Msg ID, RFC 5424 only - #[serde(default = "default_nil_value")] - msg_id: String -} - -impl SyslogSerializerConfig { - /// Build the `SyslogSerializer` from this configuration. - pub fn build(&self) -> SyslogSerializer { - SyslogSerializer::new(&self) - } - - /// The data type of events that are accepted by `SyslogSerializer`. - pub fn input_type(&self) -> DataType { - DataType::Log - } - - /// The schema required by the serializer. - pub fn schema_requirement(&self) -> schema::Requirement { - schema::Requirement::empty() - } -} - -/// Serializer that converts an `Event` to bytes using the Syslog format. -#[derive(Debug, Clone)] -pub struct SyslogSerializer { - config: SyslogSerializerConfig -} - -impl SyslogSerializer { - /// Creates a new `SyslogSerializer`. - pub fn new(conf: &SyslogSerializerConfig) -> Self { - Self { config: conf.clone() } - } -} - -impl Encoder for SyslogSerializer { - type Error = vector_common::Error; - - fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { - match event { - Event::Log(log) => { - let mut buf = String::from("<"); - let pri = get_num_facility(&self.config.facility, &log) * 8 + get_num_severity(&self.config.severity, &log); - buf.push_str(&pri.to_string()); - buf.push_str(">"); - match self.config.rfc { - SyslogRFC::Rfc3164 => { - let timestamp = get_timestamp(&log); - let formatted_timestamp = format!(" {} ", timestamp.format("%b %e %H:%M:%S")); - buf.push_str(&formatted_timestamp); - buf.push_str(&get_field("hostname", &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&self.config.tag, &log)); - buf.push_str(": "); - if self.config.add_log_source { - add_log_source(&log, &mut buf); - } - }, - SyslogRFC::Rfc5424 => { - buf.push_str("1 "); - let timestamp = get_timestamp(&log); - buf.push_str(×tamp.to_rfc3339_opts(SecondsFormat::Millis, true)); - buf.push(' '); - buf.push_str(&get_field("hostname", &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&&self.config.app_name, &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&&self.config.proc_id, &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&&self.config.msg_id, &log)); - buf.push_str(" - "); // no structured data - if self.config.add_log_source { - add_log_source(&log, &mut buf); - } - } - } - let mut payload = if self.config.payload_key.is_empty() { - serde_json::to_vec(&log).unwrap_or_default() - } else { - get_field(&&self.config.payload_key, &log).as_bytes().to_vec() - }; - let mut vec = buf.as_bytes().to_vec(); - vec.append(&mut payload); - buffer.put_slice(&vec); - }, - _ => {} - } - Ok(()) - } -} - -fn deserialize_facility<'de, D>(d: D) -> Result - where D: de::Deserializer<'de> -{ - let value: String = String::deserialize(d)?; - let num_value = value.parse::(); - match num_value { - Ok(num) => { - if num > 23 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"facility number too large")); - } else { - return Ok(Facility::Fixed(num)); - } - } - Err(_) => { - if let Some(field_name) = value.strip_prefix("$.message.") { - return Ok(Facility::Field(field_name.to_string())); - } else { - let num = match value.to_uppercase().as_str() { - "KERN" => 0, - "USER" => 1, - "MAIL" => 2, - "DAEMON" => 3, - "AUTH" => 4, - "SYSLOG" => 5, - "LPR" => 6, - "NEWS" => 7, - "UUCP" => 8, - "CRON" => 9, - "AUTHPRIV" => 10, - "FTP" => 11, - "NTP" => 12, - "SECURITY" => 13, - "CONSOLE" => 14, - "SOLARIS-CRON" => 15, - "LOCAL0" => 16, - "LOCAL1" => 17, - "LOCAL2" => 18, - "LOCAL3" => 19, - "LOCAL4" => 20, - "LOCAL5" => 21, - "LOCAL6" => 22, - "LOCAL7" => 23, - _ => 24, - }; - if num > 23 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown facility")); - } else { - return Ok(Facility::Fixed(num)) - } - } - } - } -} - -fn deserialize_severity<'de, D>(d: D) -> Result - where D: de::Deserializer<'de> -{ - let value: String = String::deserialize(d)?; - let num_value = value.parse::(); - match num_value { - Ok(num) => { - if num > 7 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"severity number too large")) - } else { - return Ok(Severity::Fixed(num)) - } - } - Err(_) => { - if let Some(field_name) = value.strip_prefix("$.message.") { - return Ok(Severity::Field(field_name.to_string())); - } else { - let num = match value.to_uppercase().as_str() { - "EMERGENCY" => 0, - "ALERT" => 1, - "CRITICAL" => 2, - "ERROR" => 3, - "WARNING" => 4, - "NOTICE" => 5, - "INFORMATIONAL" => 6, - "DEBUG" => 7, - _ => 8, - }; - if num > 7 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown severity")) - } else { - return Ok(Severity::Fixed(num)) - } - } - } - } -} - -fn default_app_name() -> String { - String::from("vector") -} - -fn default_nil_value() -> String { - String::from(NIL_VALUE) -} - -fn add_log_source(log: &LogEvent, buf: &mut String) { - buf.push_str("namespace_name="); - buf.push_str(&String::from_utf8( - log - .get("kubernetes.namespace_name") - .map(|h| h.coerce_to_bytes()) - .unwrap_or_default().to_vec() - ).unwrap()); - buf.push_str(", container_name="); - buf.push_str(&String::from_utf8( - log - .get("kubernetes.container_name") - .map(|h| h.coerce_to_bytes()) - .unwrap_or_default().to_vec() - ).unwrap()); - buf.push_str(", pod_name="); - buf.push_str(&String::from_utf8( - log - .get("kubernetes.pod_name") - .map(|h| h.coerce_to_bytes()) - .unwrap_or_default().to_vec() - ).unwrap()); - buf.push_str(", message="); -} - -fn get_num_facility(config_facility: &Facility, log: &LogEvent) -> u8 { - match config_facility { - Facility::Fixed(num) => return *num, - Facility::Field(field_name) => { - if let Some(field_value) = log.get(field_name.as_str()) { - let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); - let num_value = field_value_string.parse::(); - match num_value { - Ok(num) => { - if num > 23 { - return 1 // USER - } else { - return num - } - } - Err(_) => { - let num = match field_value_string.to_uppercase().as_str() { - "KERN" => 0, - "USER" => 1, - "MAIL" => 2, - "DAEMON" => 3, - "AUTH" => 4, - "SYSLOG" => 5, - "LPR" => 6, - "NEWS" => 7, - "UUCP" => 8, - "CRON" => 9, - "AUTHPRIV" => 10, - "FTP" => 11, - "NTP" => 12, - "SECURITY" => 13, - "CONSOLE" => 14, - "SOLARIS-CRON" => 15, - "LOCAL0" => 16, - "LOCAL1" => 17, - "LOCAL2" => 18, - "LOCAL3" => 19, - "LOCAL4" => 20, - "LOCAL5" => 21, - "LOCAL6" => 22, - "LOCAL7" => 23, - _ => 24, - }; - if num > 23 { - return 1 // USER - } else { - return num - } - } - } - } else { - return 1 // USER - } - } - } -} - -fn get_num_severity(config_severity: &Severity, log: &LogEvent) -> u8 { - match config_severity { - Severity::Fixed(num) => return *num, - Severity::Field(field_name) => { - if let Some(field_value) = log.get(field_name.as_str()) { - let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); - let num_value = field_value_string.parse::(); - match num_value { - Ok(num) => { - if num > 7 { - return 6 // INFORMATIONAL - } else { - return num - } - } - Err(_) => { - let num = match field_value_string.to_uppercase().as_str() { - "EMERGENCY" => 0, - "ALERT" => 1, - "CRITICAL" => 2, - "ERROR" => 3, - "WARNING" => 4, - "NOTICE" => 5, - "INFORMATIONAL" => 6, - "DEBUG" => 7, - _ => 8, - }; - if num > 7 { - return 6 // INFORMATIONAL - } else { - return num - } - } - } - } else { - return 6 // INFORMATIONAL - } - } - } -} - -fn get_field_or_config(config_name: &String, log: &LogEvent) -> String { - if let Some(field_name) = config_name.strip_prefix("$.message.") { - return get_field(field_name, log) - } else { - return config_name.clone() - } -} - -fn get_field(field_name: &str, log: &LogEvent) -> String { - if let Some(field_value) = log.get(field_name) { - return String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); - } else { - return NIL_VALUE.to_string() - } -} - -fn get_timestamp(log: &LogEvent) -> DateTime:: { - match log.get("@timestamp") { - Some(value) => { - if let Value::Timestamp(timestamp) = value { - DateTime::::from(*timestamp) - } else { - Local::now() - } - }, - _ => Local::now() - } -} - diff --git a/lib/codecs/src/encoding/format/syslog/facility_severity.rs b/lib/codecs/src/encoding/format/syslog/facility_severity.rs new file mode 100644 index 0000000000000..8117d62709484 --- /dev/null +++ b/lib/codecs/src/encoding/format/syslog/facility_severity.rs @@ -0,0 +1,223 @@ +/// Syslog facility +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +enum Facility { + /// Syslog facility ordinal number + Fixed(u8), + + /// Syslog facility name + Field(String) +} + +impl Default for Facility { + fn default() -> Self { + Facility::Fixed(1) + } +} + +/// Syslog severity +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +enum Severity { + /// Syslog severity ordinal number + Fixed(u8), + + /// Syslog severity name + Field(String) +} + +impl Default for Severity { + fn default() -> Self { + Severity::Fixed(6) + } +} + +fn deserialize_facility<'de, D>(d: D) -> Result + where D: de::Deserializer<'de> +{ + let value: String = String::deserialize(d)?; + let num_value = value.parse::(); + match num_value { + Ok(num) => { + if num > 23 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"facility number too large")); + } else { + return Ok(Facility::Fixed(num)); + } + } + Err(_) => { + if let Some(field_name) = value.strip_prefix("$.message.") { + return Ok(Facility::Field(field_name.to_string())); + } else { + let num = match value.to_uppercase().as_str() { + "KERN" => 0, + "USER" => 1, + "MAIL" => 2, + "DAEMON" => 3, + "AUTH" => 4, + "SYSLOG" => 5, + "LPR" => 6, + "NEWS" => 7, + "UUCP" => 8, + "CRON" => 9, + "AUTHPRIV" => 10, + "FTP" => 11, + "NTP" => 12, + "SECURITY" => 13, + "CONSOLE" => 14, + "SOLARIS-CRON" => 15, + "LOCAL0" => 16, + "LOCAL1" => 17, + "LOCAL2" => 18, + "LOCAL3" => 19, + "LOCAL4" => 20, + "LOCAL5" => 21, + "LOCAL6" => 22, + "LOCAL7" => 23, + _ => 24, + }; + if num > 23 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown facility")); + } else { + return Ok(Facility::Fixed(num)) + } + } + } + } +} + +fn deserialize_severity<'de, D>(d: D) -> Result + where D: de::Deserializer<'de> +{ + let value: String = String::deserialize(d)?; + let num_value = value.parse::(); + match num_value { + Ok(num) => { + if num > 7 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"severity number too large")) + } else { + return Ok(Severity::Fixed(num)) + } + } + Err(_) => { + if let Some(field_name) = value.strip_prefix("$.message.") { + return Ok(Severity::Field(field_name.to_string())); + } else { + let num = match value.to_uppercase().as_str() { + "EMERGENCY" => 0, + "ALERT" => 1, + "CRITICAL" => 2, + "ERROR" => 3, + "WARNING" => 4, + "NOTICE" => 5, + "INFORMATIONAL" => 6, + "DEBUG" => 7, + _ => 8, + }; + if num > 7 { + return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown severity")) + } else { + return Ok(Severity::Fixed(num)) + } + } + } + } +} + +fn get_num_facility(config_facility: &Facility, log: &LogEvent) -> u8 { + match config_facility { + Facility::Fixed(num) => return *num, + Facility::Field(field_name) => { + if let Some(field_value) = log.get(field_name.as_str()) { + let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); + let num_value = field_value_string.parse::(); + match num_value { + Ok(num) => { + if num > 23 { + return 1 // USER + } else { + return num + } + } + Err(_) => { + let num = match field_value_string.to_uppercase().as_str() { + "KERN" => 0, + "USER" => 1, + "MAIL" => 2, + "DAEMON" => 3, + "AUTH" => 4, + "SYSLOG" => 5, + "LPR" => 6, + "NEWS" => 7, + "UUCP" => 8, + "CRON" => 9, + "AUTHPRIV" => 10, + "FTP" => 11, + "NTP" => 12, + "SECURITY" => 13, + "CONSOLE" => 14, + "SOLARIS-CRON" => 15, + "LOCAL0" => 16, + "LOCAL1" => 17, + "LOCAL2" => 18, + "LOCAL3" => 19, + "LOCAL4" => 20, + "LOCAL5" => 21, + "LOCAL6" => 22, + "LOCAL7" => 23, + _ => 24, + }; + if num > 23 { + return 1 // USER + } else { + return num + } + } + } + } else { + return 1 // USER + } + } + } +} + +fn get_num_severity(config_severity: &Severity, log: &LogEvent) -> u8 { + match config_severity { + Severity::Fixed(num) => return *num, + Severity::Field(field_name) => { + if let Some(field_value) = log.get(field_name.as_str()) { + let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); + let num_value = field_value_string.parse::(); + match num_value { + Ok(num) => { + if num > 7 { + return 6 // INFORMATIONAL + } else { + return num + } + } + Err(_) => { + let num = match field_value_string.to_uppercase().as_str() { + "EMERGENCY" => 0, + "ALERT" => 1, + "CRITICAL" => 2, + "ERROR" => 3, + "WARNING" => 4, + "NOTICE" => 5, + "INFORMATIONAL" => 6, + "DEBUG" => 7, + _ => 8, + }; + if num > 7 { + return 6 // INFORMATIONAL + } else { + return num + } + } + } + } else { + return 6 // INFORMATIONAL + } + } + } +} diff --git a/lib/codecs/src/encoding/format/syslog/serializer.rs b/lib/codecs/src/encoding/format/syslog/serializer.rs new file mode 100644 index 0000000000000..6921e6fe0c3db --- /dev/null +++ b/lib/codecs/src/encoding/format/syslog/serializer.rs @@ -0,0 +1,122 @@ +/// Serializer that converts an `Event` to bytes using the Syslog format. +#[derive(Debug, Clone)] +pub struct SyslogSerializer { + config: SyslogSerializerConfig +} + +impl SyslogSerializer { + /// Creates a new `SyslogSerializer`. + pub fn new(conf: &SyslogSerializerConfig) -> Self { + Self { config: conf.clone() } + } +} + +impl Encoder for SyslogSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { + match event { + Event::Log(log) => { + let mut buf = String::from("<"); + let pri = get_num_facility(&self.config.facility, &log) * 8 + get_num_severity(&self.config.severity, &log); + buf.push_str(&pri.to_string()); + buf.push_str(">"); + match self.config.rfc { + SyslogRFC::Rfc3164 => { + let timestamp = get_timestamp(&log); + let formatted_timestamp = format!(" {} ", timestamp.format("%b %e %H:%M:%S")); + buf.push_str(&formatted_timestamp); + buf.push_str(&get_field("hostname", &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&self.config.tag, &log)); + buf.push_str(": "); + if self.config.add_log_source { + add_log_source(&log, &mut buf); + } + }, + SyslogRFC::Rfc5424 => { + buf.push_str("1 "); + let timestamp = get_timestamp(&log); + buf.push_str(×tamp.to_rfc3339_opts(SecondsFormat::Millis, true)); + buf.push(' '); + buf.push_str(&get_field("hostname", &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&&self.config.app_name, &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&&self.config.proc_id, &log)); + buf.push(' '); + buf.push_str(&get_field_or_config(&&self.config.msg_id, &log)); + buf.push_str(" - "); // no structured data + if self.config.add_log_source { + add_log_source(&log, &mut buf); + } + } + } + let mut payload = if self.config.payload_key.is_empty() { + serde_json::to_vec(&log).unwrap_or_default() + } else { + get_field(&&self.config.payload_key, &log).as_bytes().to_vec() + }; + let mut vec = buf.as_bytes().to_vec(); + vec.append(&mut payload); + buffer.put_slice(&vec); + }, + _ => {} + } + Ok(()) + } +} + +fn get_field_or_config(config_name: &String, log: &LogEvent) -> String { + if let Some(field_name) = config_name.strip_prefix("$.message.") { + return get_field(field_name, log) + } else { + return config_name.clone() + } +} + +fn get_field(field_name: &str, log: &LogEvent) -> String { + if let Some(field_value) = log.get(field_name) { + return String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); + } else { + return NIL_VALUE.to_string() + } +} + +fn get_timestamp(log: &LogEvent) -> DateTime:: { + match log.get("@timestamp") { + Some(value) => { + if let Value::Timestamp(timestamp) = value { + DateTime::::from(*timestamp) + } else { + Local::now() + } + }, + _ => Local::now() + } +} + +fn add_log_source(log: &LogEvent, buf: &mut String) { + buf.push_str("namespace_name="); + buf.push_str(&String::from_utf8( + log + .get("kubernetes.namespace_name") + .map(|h| h.coerce_to_bytes()) + .unwrap_or_default().to_vec() + ).unwrap()); + buf.push_str(", container_name="); + buf.push_str(&String::from_utf8( + log + .get("kubernetes.container_name") + .map(|h| h.coerce_to_bytes()) + .unwrap_or_default().to_vec() + ).unwrap()); + buf.push_str(", pod_name="); + buf.push_str(&String::from_utf8( + log + .get("kubernetes.pod_name") + .map(|h| h.coerce_to_bytes()) + .unwrap_or_default().to_vec() + ).unwrap()); + buf.push_str(", message="); +} diff --git a/lib/codecs/src/encoding/format/syslog/serializer_config.rs b/lib/codecs/src/encoding/format/syslog/serializer_config.rs new file mode 100644 index 0000000000000..f0aae986c5c1d --- /dev/null +++ b/lib/codecs/src/encoding/format/syslog/serializer_config.rs @@ -0,0 +1,90 @@ +const NIL_VALUE: &'static str = "-"; + +/// Syslog RFC +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SyslogRFC { + /// RFC 3164 + Rfc3164, + + /// RFC 5424 + Rfc5424 +} + +impl Default for SyslogRFC { + fn default() -> Self { + SyslogRFC::Rfc5424 + } +} + +/// Config used to build a `SyslogSerializer`. +#[configurable_component] +#[derive(Debug, Clone, Default)] +pub struct SyslogSerializerConfig { + /// RFC + #[serde(default)] + rfc: SyslogRFC, + + /// Facility + #[serde(default)] + #[serde(deserialize_with = "deserialize_facility")] + facility: Facility, + + /// Severity + #[serde(default)] + #[serde(deserialize_with = "deserialize_severity")] + severity: Severity, + + /// Tag + #[serde(default)] + tag: String, + + /// Trim prefix + trim_prefix: Option, + + /// Payload key + #[serde(default)] + payload_key: String, + + /// Add log source + #[serde(default)] + add_log_source: bool, + + /// App Name, RFC 5424 only + #[serde(default = "default_app_name")] + app_name: String, + + /// Proc ID, RFC 5424 only + #[serde(default = "default_nil_value")] + proc_id: String, + + /// Msg ID, RFC 5424 only + #[serde(default = "default_nil_value")] + msg_id: String +} + +impl SyslogSerializerConfig { + /// Build the `SyslogSerializer` from this configuration. + pub fn build(&self) -> SyslogSerializer { + SyslogSerializer::new(&self) + } + + /// The data type of events that are accepted by `SyslogSerializer`. + pub fn input_type(&self) -> DataType { + DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + schema::Requirement::empty() + } +} + +fn default_app_name() -> String { + String::from("vector") +} + +fn default_nil_value() -> String { + String::from(NIL_VALUE) +} From c9aacd9fe9320844385ccba521454d53d32432ad Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Fri, 15 Mar 2024 17:10:00 +1300 Subject: [PATCH 03/16] refactor: Syslog facility and severity - Introduce a `Pri` struct with fields for severity and facility as enum values. - `Pri` uses `strum` crate to parse string values into their appropriate enum variant. - Handles the responsibility of encoding the two enum values ordinal values into the `PRIVAL` value for the encoder. - As `Facility` and `Severity` enums better represent their ordinal mapping directly - The `Fixed` + `Field` subtyping with custom deserializer isn't necessary. Parsing a string that represents the enum by name or its ordinal representation is much simpler. - Likewise this removes the need for the get methods as the enum can provide both the `String` or `u8` representation as needed. --- lib/codecs/Cargo.toml | 3 +- .../format/syslog/facility_severity.rs | 293 ++++++------------ 2 files changed, 90 insertions(+), 206 deletions(-) diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index f47ce0fc2d60d..d4d12e966994b 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -27,6 +27,7 @@ serde.workspace = true serde_json.workspace = true smallvec = { version = "1", default-features = false, features = ["union"] } snafu = { version = "0.7.5", default-features = false, features = ["futures"] } +strum = { version = "0.26", features = ["derive"], optional = true } syslog_loose = { version = "0.21", default-features = false, optional = true } tokio-util = { version = "0.7", default-features = false, features = ["codec"] } tracing = { version = "0.1", default-features = false } @@ -48,4 +49,4 @@ rstest = "0.18.2" vrl.workspace = true [features] -syslog = ["dep:syslog_loose"] +syslog = ["dep:syslog_loose", "dep:strum"] diff --git a/lib/codecs/src/encoding/format/syslog/facility_severity.rs b/lib/codecs/src/encoding/format/syslog/facility_severity.rs index 8117d62709484..786fe61cd0ead 100644 --- a/lib/codecs/src/encoding/format/syslog/facility_severity.rs +++ b/lib/codecs/src/encoding/format/syslog/facility_severity.rs @@ -1,223 +1,106 @@ -/// Syslog facility -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -enum Facility { - /// Syslog facility ordinal number - Fixed(u8), +use std::str::FromStr; +use strum::{FromRepr, EnumString}; - /// Syslog facility name - Field(String) +#[derive(Default, Debug)] +struct Pri { + facility: Facility, + severity: Severity, } -impl Default for Facility { - fn default() -> Self { - Facility::Fixed(1) +impl Pri { + fn from_str_variants(facility_variant: &str, severity_variant: &str) -> Self { + // The original PR had `deserialize_*()` methods parsed a value to a `u8` or stored a field key as a `String` + // Later the equivalent `get_num_*()` method would retrieve the `u8` value or lookup the field key for the actual value, + // otherwise it'd fallback to the default Facility/Severity value. + // This approach instead parses a string of the name or ordinal representation, + // any reference via field key lookup should have already happened by this point. + let facility = Facility::into_variant(&facility_variant).unwrap_or(Facility::User); + let severity = Severity::into_variant(&severity_variant).unwrap_or(Severity::Informational); + + Self { + facility, + severity, + } + } + + // The last paragraph describes how to compose the enums into `PRIVAL`: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 + fn encode(&self) -> String { + let prival = (self.facility as u8 * 8) + self.severity as u8; + ["<", &prival.to_string(), ">"].concat() } } -/// Syslog severity -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -enum Severity { - /// Syslog severity ordinal number - Fixed(u8), +// Facility + Severity mapping from Name => Ordinal number: - /// Syslog severity name - Field(String) +/// Syslog facility +#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] +#[strum(serialize_all = "kebab-case")] +enum Facility { + Kern = 0, + #[default] + User = 1, + Mail = 2, + Daemon = 3, + Auth = 4, + Syslog = 5, + LPR = 6, + News = 7, + UUCP = 8, + Cron = 9, + AuthPriv = 10, + FTP = 11, + NTP = 12, + Security = 13, + Console = 14, + SolarisCron = 15, + Local0 = 16, + Local1 = 17, + Local2 = 18, + Local3 = 19, + Local4 = 20, + Local5 = 21, + Local6 = 22, + Local7 = 23, } -impl Default for Severity { - fn default() -> Self { - Severity::Fixed(6) - } +/// Syslog severity +#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] +#[strum(serialize_all = "kebab-case")] +enum Severity { + Emergency = 0, + Alert = 1, + Critical = 2, + Error = 3, + Warning = 4, + Notice = 5, + #[default] + Informational = 6, + Debug = 7, } -fn deserialize_facility<'de, D>(d: D) -> Result - where D: de::Deserializer<'de> -{ - let value: String = String::deserialize(d)?; - let num_value = value.parse::(); - match num_value { - Ok(num) => { - if num > 23 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"facility number too large")); - } else { - return Ok(Facility::Fixed(num)); - } - } - Err(_) => { - if let Some(field_name) = value.strip_prefix("$.message.") { - return Ok(Facility::Field(field_name.to_string())); - } else { - let num = match value.to_uppercase().as_str() { - "KERN" => 0, - "USER" => 1, - "MAIL" => 2, - "DAEMON" => 3, - "AUTH" => 4, - "SYSLOG" => 5, - "LPR" => 6, - "NEWS" => 7, - "UUCP" => 8, - "CRON" => 9, - "AUTHPRIV" => 10, - "FTP" => 11, - "NTP" => 12, - "SECURITY" => 13, - "CONSOLE" => 14, - "SOLARIS-CRON" => 15, - "LOCAL0" => 16, - "LOCAL1" => 17, - "LOCAL2" => 18, - "LOCAL3" => 19, - "LOCAL4" => 20, - "LOCAL5" => 21, - "LOCAL6" => 22, - "LOCAL7" => 23, - _ => 24, - }; - if num > 23 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown facility")); - } else { - return Ok(Facility::Fixed(num)) - } - } - } - } -} +// Additionally support variants from string-based integers: +// Parse a string name, with fallback for parsing a string ordinal number. +impl Facility { + fn into_variant(variant_name: &str) -> Option { + let s = variant_name.to_ascii_lowercase(); -fn deserialize_severity<'de, D>(d: D) -> Result - where D: de::Deserializer<'de> -{ - let value: String = String::deserialize(d)?; - let num_value = value.parse::(); - match num_value { - Ok(num) => { - if num > 7 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"severity number too large")) - } else { - return Ok(Severity::Fixed(num)) - } - } - Err(_) => { - if let Some(field_name) = value.strip_prefix("$.message.") { - return Ok(Severity::Field(field_name.to_string())); - } else { - let num = match value.to_uppercase().as_str() { - "EMERGENCY" => 0, - "ALERT" => 1, - "CRITICAL" => 2, - "ERROR" => 3, - "WARNING" => 4, - "NOTICE" => 5, - "INFORMATIONAL" => 6, - "DEBUG" => 7, - _ => 8, - }; - if num > 7 { - return Err(de::Error::invalid_value(de::Unexpected::Unsigned(num as u64), &"unknown severity")) - } else { - return Ok(Severity::Fixed(num)) - } - } - } + s.parse::().map_or_else( + |_| Self::from_str(&s).ok(), + |num| Self::from_repr(num), + ) } } -fn get_num_facility(config_facility: &Facility, log: &LogEvent) -> u8 { - match config_facility { - Facility::Fixed(num) => return *num, - Facility::Field(field_name) => { - if let Some(field_value) = log.get(field_name.as_str()) { - let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); - let num_value = field_value_string.parse::(); - match num_value { - Ok(num) => { - if num > 23 { - return 1 // USER - } else { - return num - } - } - Err(_) => { - let num = match field_value_string.to_uppercase().as_str() { - "KERN" => 0, - "USER" => 1, - "MAIL" => 2, - "DAEMON" => 3, - "AUTH" => 4, - "SYSLOG" => 5, - "LPR" => 6, - "NEWS" => 7, - "UUCP" => 8, - "CRON" => 9, - "AUTHPRIV" => 10, - "FTP" => 11, - "NTP" => 12, - "SECURITY" => 13, - "CONSOLE" => 14, - "SOLARIS-CRON" => 15, - "LOCAL0" => 16, - "LOCAL1" => 17, - "LOCAL2" => 18, - "LOCAL3" => 19, - "LOCAL4" => 20, - "LOCAL5" => 21, - "LOCAL6" => 22, - "LOCAL7" => 23, - _ => 24, - }; - if num > 23 { - return 1 // USER - } else { - return num - } - } - } - } else { - return 1 // USER - } - } - } -} +// NOTE: The `strum` crate does not provide traits, +// requiring copy/paste of the prior impl instead. +impl Severity { + fn into_variant(variant_name: &str) -> Option { + let s = variant_name.to_ascii_lowercase(); -fn get_num_severity(config_severity: &Severity, log: &LogEvent) -> u8 { - match config_severity { - Severity::Fixed(num) => return *num, - Severity::Field(field_name) => { - if let Some(field_value) = log.get(field_name.as_str()) { - let field_value_string = String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); - let num_value = field_value_string.parse::(); - match num_value { - Ok(num) => { - if num > 7 { - return 6 // INFORMATIONAL - } else { - return num - } - } - Err(_) => { - let num = match field_value_string.to_uppercase().as_str() { - "EMERGENCY" => 0, - "ALERT" => 1, - "CRITICAL" => 2, - "ERROR" => 3, - "WARNING" => 4, - "NOTICE" => 5, - "INFORMATIONAL" => 6, - "DEBUG" => 7, - _ => 8, - }; - if num > 7 { - return 6 // INFORMATIONAL - } else { - return num - } - } - } - } else { - return 6 // INFORMATIONAL - } - } + s.parse::().map_or_else( + |_| Self::from_str(&s).ok(), + |num| Self::from_repr(num), + ) } } From d03c87f6d6dd714d6421be3597440ba8220e1fad Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Fri, 15 Mar 2024 13:02:38 +1300 Subject: [PATCH 04/16] refactor: `SyslogSerializer` `SyslogSerializer::encode()` has been simplified. - Only matching `Event::Log` is relevant, an `if let` bind instead of `match` helps remove a redundant level of nesting. - This method only focuses on boilerplate now, delegating the rest to `ConfigDecanter` (_adapt `LogEvent` + encoder config_) and `SyslogMessage` (_encode into syslog message string_). - This removes some complexity during actual encoding logic, which should only be concerned about directly encoding from one representation to another, not complimentary features related to Vector config or it's type system. The new `ConfigDecanter` is where many of the original helper methods that were used by `SyslogSerializer::encode()` now reside. This change better communicates the scope of their usage. - Any interaction with `LogEvent` is now contained within the methods of this new struct. Likewise for the consumption of the encoder configuration (instead of queries to config throughout encoding). - The `decant_config()` method better illustrates an overview of the data we're encoding and where that's being sourced from via the new `SyslogMessage` struct, which splits off the actual encoding responsibility (see next commit). --- .../src/encoding/format/syslog/serializer.rs | 219 ++++++++++-------- 1 file changed, 126 insertions(+), 93 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog/serializer.rs b/lib/codecs/src/encoding/format/syslog/serializer.rs index 6921e6fe0c3db..064e4bc96577e 100644 --- a/lib/codecs/src/encoding/format/syslog/serializer.rs +++ b/lib/codecs/src/encoding/format/syslog/serializer.rs @@ -15,108 +15,141 @@ impl Encoder for SyslogSerializer { type Error = vector_common::Error; fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { - match event { - Event::Log(log) => { - let mut buf = String::from("<"); - let pri = get_num_facility(&self.config.facility, &log) * 8 + get_num_severity(&self.config.severity, &log); - buf.push_str(&pri.to_string()); - buf.push_str(">"); - match self.config.rfc { - SyslogRFC::Rfc3164 => { - let timestamp = get_timestamp(&log); - let formatted_timestamp = format!(" {} ", timestamp.format("%b %e %H:%M:%S")); - buf.push_str(&formatted_timestamp); - buf.push_str(&get_field("hostname", &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&self.config.tag, &log)); - buf.push_str(": "); - if self.config.add_log_source { - add_log_source(&log, &mut buf); - } - }, - SyslogRFC::Rfc5424 => { - buf.push_str("1 "); - let timestamp = get_timestamp(&log); - buf.push_str(×tamp.to_rfc3339_opts(SecondsFormat::Millis, true)); - buf.push(' '); - buf.push_str(&get_field("hostname", &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&&self.config.app_name, &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&&self.config.proc_id, &log)); - buf.push(' '); - buf.push_str(&get_field_or_config(&&self.config.msg_id, &log)); - buf.push_str(" - "); // no structured data - if self.config.add_log_source { - add_log_source(&log, &mut buf); - } - } - } - let mut payload = if self.config.payload_key.is_empty() { - serde_json::to_vec(&log).unwrap_or_default() - } else { - get_field(&&self.config.payload_key, &log).as_bytes().to_vec() - }; - let mut vec = buf.as_bytes().to_vec(); - vec.append(&mut payload); - buffer.put_slice(&vec); - }, - _ => {} + if let Event::Log(log_event) = event { + let syslog_message = ConfigDecanter::new(log_event).decant_config(&self.config); + + let vec = syslog_message + .encode(&self.config.rfc) + .as_bytes() + .to_vec(); + buffer.put_slice(&vec); } + Ok(()) } } -fn get_field_or_config(config_name: &String, log: &LogEvent) -> String { - if let Some(field_name) = config_name.strip_prefix("$.message.") { - return get_field(field_name, log) - } else { - return config_name.clone() - } +// Adapts a `LogEvent` into a `SyslogMessage` based on config from `SyslogSerializerConfig`: +// - Splits off the responsibility of encoding logic to `SyslogMessage` (which is not dependent upon Vector types). +// - Majority of methods are only needed to support the `decant_config()` operation. +struct ConfigDecanter { + log: LogEvent, } -fn get_field(field_name: &str, log: &LogEvent) -> String { - if let Some(field_value) = log.get(field_name) { - return String::from_utf8(field_value.coerce_to_bytes().to_vec()).unwrap_or_default(); - } else { - return NIL_VALUE.to_string() +impl ConfigDecanter { + fn new(log: LogEvent) -> Self { + Self { + log, + } } -} -fn get_timestamp(log: &LogEvent) -> DateTime:: { - match log.get("@timestamp") { - Some(value) => { - if let Value::Timestamp(timestamp) = value { - DateTime::::from(*timestamp) - } else { - Local::now() - } - }, - _ => Local::now() + fn decant_config(&self, config: &SyslogSerializerConfig) -> SyslogMessage { + let x = |v| self.replace_if_proxied(v).unwrap_or_default(); + let facility = x(&config.facility); + let severity = x(&config.severity); + + let y = |v| self.replace_if_proxied_opt(v); + let app_name = y(&config.app_name).unwrap_or("vector".to_owned()); + let proc_id = y(&config.proc_id); + let msg_id = y(&config.msg_id); + + SyslogMessage { + pri: Pri::from_str_variants(&facility, &severity), + timestamp: self.get_timestamp(), + hostname: self.value_by_key("hostname"), + tag: Tag { + app_name, + proc_id, + msg_id, + }, + structured_data: None, + message: self.get_message(&config), + } } -} -fn add_log_source(log: &LogEvent, buf: &mut String) { - buf.push_str("namespace_name="); - buf.push_str(&String::from_utf8( - log - .get("kubernetes.namespace_name") - .map(|h| h.coerce_to_bytes()) - .unwrap_or_default().to_vec() - ).unwrap()); - buf.push_str(", container_name="); - buf.push_str(&String::from_utf8( - log - .get("kubernetes.container_name") - .map(|h| h.coerce_to_bytes()) - .unwrap_or_default().to_vec() - ).unwrap()); - buf.push_str(", pod_name="); - buf.push_str(&String::from_utf8( - log - .get("kubernetes.pod_name") - .map(|h| h.coerce_to_bytes()) - .unwrap_or_default().to_vec() - ).unwrap()); - buf.push_str(", message="); + fn replace_if_proxied_opt(&self, value: &Option) -> Option { + value.as_ref().and_then(|v| self.replace_if_proxied(v)) + } + + // When the value has the expected prefix, perform a lookup for a field key without that prefix part. + // A failed lookup returns `None`, while a value without the prefix uses the config value as-is. + // + // Q: Why `$.message.` as the prefix? (Appears to be JSONPath syntax?) + // NOTE: Originally named in PR as: `get_field_or_config()` + fn replace_if_proxied(&self, value: &str) -> Option { + value + .strip_prefix("$.message.") + .map_or( + Some(value.to_owned()), + |field_key| self.value_by_key(field_key), + ) + } + + // NOTE: Originally named in PR as: `get_field()` + // Now returns a `None` directly instead of converting to either `"-"` or `""` + fn value_by_key(&self, field_key: &str) -> Option { + self.log.get(field_key).and_then(|field_value| { + let bytes = field_value.coerce_to_bytes(); + String::from_utf8(bytes.to_vec()).ok() + }) + } + + fn get_timestamp(&self) -> DateTime:: { + // Q: Was this Timestamp key hard-coded to the needs of the original PR author? + // + // Key `@timestamp` depends on input: + // https://vector.dev/guides/level-up/managing-schemas/#example-custom-timestamp-field + // https://vector.dev/docs/about/under-the-hood/architecture/data-model/log/#timestamps + // NOTE: Log schema key renaming is unavailable when Log namespacing is enabled: + // https://vector.dev/docs/reference/configuration/global-options/#log_schema + // + // NOTE: Log namespacing has metadata `%vector.ingest_timestamp` from a source (file/demo_logs) instead of `timestamp`. + // As a `payload_key` it will not respect config `encoding.timestamp_format`, but does when + // using the parent object (`%vector`). Inputs without namespacing respect that config setting. + if let Some(Value::Timestamp(timestamp)) = self.log.get("@timestamp") { + // Q: Utc type returned is changed to Local? + // - Could otherwise return `*timestamp` as-is? Why is Local conversion necessary? + DateTime::::from(*timestamp) + } else { + // NOTE: Local time is encouraged by RFC 5424 when creating a fallback timestamp for RFC 3164 + Local::now() + } + } + + fn get_message(&self, config: &SyslogSerializerConfig) -> String { + let mut message = String::new(); + + if config.add_log_source { + message.push_str(self.add_log_source().as_str()); + } + + // `payload_key` configures where to source the value for the syslog `message`: + // - Field key (Valid) => Get value by lookup (value_by_key) + // - Field key (Invalid) => Empty string (unwrap_or_default) + // - Not configured => JSON encoded `LogEvent` (fallback?) + // + // Q: Was the JSON fallback intended by the original PR author only for debugging? + // Roughly equivalent to using `payload_key: .` (in YAML config)? + let payload = if config.payload_key.is_empty() { + serde_json::to_string(&self.log).ok() + } else { + self.value_by_key(&config.payload_key) + }; + + message.push_str(&payload.unwrap_or_default()); + message + } + + // NOTE: This is a third-party addition from the original PR author (it is not relevant to the syslog spec): + // TODO: Remove, as this type of additional data is better supported via VRL remap + `StructuredData`? + fn add_log_source(&self) -> String { + let get_value = |s| self.value_by_key(s).unwrap_or_default(); + + [ + "namespace_name=", get_value("kubernetes.namespace_name").as_str(), + ", container_name=", get_value("kubernetes.container_name").as_str(), + ", pod_name=", get_value("kubernetes.pod_name").as_str(), + ", message=" + ].concat() + } } From 0288a80add0022448a1fc2e5016dcefca0b7be5f Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Thu, 14 Mar 2024 17:51:43 +1300 Subject: [PATCH 05/16] refactor: `SyslogSerializerConfig` `SyslogSerializerConfig` has been simplified. - Facility / Severity deserializer methods aren't needed, as per their prior refactor with `strum`. - The `app_name` default is set via `decant_config()` when not configured explicitly. - The other two fields calling a `default_nil_value()` method instead use an option value which encodes `None` into the expected `-` value. - Everything else does not need a serde attribute to apply a default, the `Default` trait on the struct is sufficient. - `trim_prefix` was removed as it didn't seem relevant. `tag` was also removed as it's represented by several subfields in RFC 5424 which RFC 3164 can also use. `SyslogMessage::encode()` refactors the original PR encoding logic: - Syslog Header fields focused, the PRI and final message value have already been prepared prior. They are only referenced at the end of `encode()` to combine into the final string output. - While less efficient than `push_str()`, each match variant has a clear structure returned via the array `join(" ")` which minimizes the noise of `SP` from the original PR. Value preparation prior to this is clear and better documented. - `Tag` is a child struct to keep the main logic easy to grok. `StructuredData` is a similar case. --- .../format/syslog/serializer_config.rs | 173 +++++++++++++----- 1 file changed, 132 insertions(+), 41 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog/serializer_config.rs b/lib/codecs/src/encoding/format/syslog/serializer_config.rs index f0aae986c5c1d..57bf85aca6995 100644 --- a/lib/codecs/src/encoding/format/syslog/serializer_config.rs +++ b/lib/codecs/src/encoding/format/syslog/serializer_config.rs @@ -1,67 +1,49 @@ const NIL_VALUE: &'static str = "-"; +const SYSLOG_V1: &'static str = "1"; /// Syslog RFC #[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Default)] #[serde(rename_all = "snake_case")] pub enum SyslogRFC { /// RFC 3164 Rfc3164, + #[default] /// RFC 5424 Rfc5424 } -impl Default for SyslogRFC { - fn default() -> Self { - SyslogRFC::Rfc5424 - } -} - /// Config used to build a `SyslogSerializer`. #[configurable_component] -#[derive(Debug, Clone, Default)] +// Serde default makes all config keys optional. +// Each field assigns either a fixed value, or field name (lookup field key to retrieve dynamic value per `LogEvent`). +#[serde(default)] +#[derive(Clone, Debug, Default)] pub struct SyslogSerializerConfig { /// RFC - #[serde(default)] rfc: SyslogRFC, - /// Facility - #[serde(default)] - #[serde(deserialize_with = "deserialize_facility")] - facility: Facility, - + facility: String, /// Severity - #[serde(default)] - #[serde(deserialize_with = "deserialize_severity")] - severity: Severity, - - /// Tag - #[serde(default)] - tag: String, + severity: String, - /// Trim prefix - trim_prefix: Option, + /// App Name + app_name: Option, + /// Proc ID + proc_id: Option, + /// Msg ID + msg_id: Option, /// Payload key - #[serde(default)] payload_key: String, - /// Add log source - #[serde(default)] add_log_source: bool, - /// App Name, RFC 5424 only - #[serde(default = "default_app_name")] - app_name: String, - - /// Proc ID, RFC 5424 only - #[serde(default = "default_nil_value")] - proc_id: String, - - /// Msg ID, RFC 5424 only - #[serde(default = "default_nil_value")] - msg_id: String + // NOTE: The `tag` field was removed, it is better represented by the equivalents in RFC 5424. + // Q: The majority of the fields above pragmatically only make sense as config for keys to query? + // Q: What was `trim_prefix` for? It is not used in file, nor in Vector source tree. + // Q: `add_log_source` doesn't belong here? Better handled by the `remap` transform with structured data? } impl SyslogSerializerConfig { @@ -81,10 +63,119 @@ impl SyslogSerializerConfig { } } -fn default_app_name() -> String { - String::from("vector") +// ABNF definition: +// https://datatracker.ietf.org/doc/html/rfc5424#section-6 +// https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 +#[derive(Default, Debug)] +struct SyslogMessage { + pri: Pri, + timestamp: DateTime::, + hostname: Option, + tag: Tag, + structured_data: Option, + message: String, +} + +impl SyslogMessage { + fn encode(&self, rfc: &SyslogRFC) -> String { + // Q: NIL_VALUE is unlikely? Technically invalid for RFC 3164: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.4 + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 + let hostname = self.hostname.as_deref().unwrap_or(NIL_VALUE); + let structured_data = self.structured_data.as_ref().map(|sd| sd.encode()); + + let fields_encoded = match rfc { + SyslogRFC::Rfc3164 => { + // TIMESTAMP field format: + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 + let timestamp = self.timestamp.format("%b %e %H:%M:%S").to_string(); + // MSG part begins with TAG field + optional context: + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.3 + let mut msg_start = self.tag.encode_rfc_3164(); + // When RFC 5424 "Structured Data" is available, it can be compatible with RFC 3164 + // by including it in the RFC 3164 `CONTENT` field (part of MSG): + // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 + if let Some(sd) = structured_data.as_deref() { + msg_start = msg_start + " " + sd + } + + [ + timestamp.as_str(), + hostname, + &msg_start, + ].join(" ") + }, + SyslogRFC::Rfc5424 => { + // HEADER part fields: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 + let version = SYSLOG_V1; + let timestamp = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true); + let tag = self.tag.encode_rfc_5424(); + // Structured Data: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.3 + let sd = structured_data.as_deref().unwrap_or(NIL_VALUE); + + [ + version, + timestamp.as_str(), + hostname, + &tag, + sd + ].join(" ") + } + }; + + [ + &self.pri.encode(), + &fields_encoded, + " ", + &self.message, + ].concat() + + // Q: RFC 5424 MSG part should technically ensure UTF-8 message begins with BOM? + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.4 + } +} + +#[derive(Default, Debug)] +struct Tag { + app_name: String, + proc_id: Option, + msg_id: Option +} + +// NOTE: `.as_deref()` usage below avoids requiring `self.clone()` +impl Tag { + // Roughly equivalent - RFC 5424 fields can compose the start of + // an RFC 3164 MSG part (TAG + CONTENT fields): + // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 + fn encode_rfc_3164(&self) -> String { + let Self { app_name, proc_id, msg_id } = self; + + match proc_id.as_deref().or(msg_id.as_deref()) { + Some(context) => [&app_name, "[", &context, "]:"].concat(), + None => [&app_name, ":"].concat() + } + } + + // TAG was split into separate fields: APP-NAME, PROCID, MSGID + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.5 + fn encode_rfc_5424(&self) -> String { + let Self { app_name, proc_id, msg_id } = self; + + [ + &app_name, + proc_id.as_deref().unwrap_or(NIL_VALUE), + msg_id.as_deref().unwrap_or(NIL_VALUE), + ].join(" ") + } } -fn default_nil_value() -> String { - String::from(NIL_VALUE) +#[derive(Debug)] +struct StructuredData {} + +impl StructuredData { + fn encode(&self) -> String { + todo!() + } } From 1049ebdc8481ae6bb9465aba581f5618644643ef Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Fri, 15 Mar 2024 17:05:32 +1300 Subject: [PATCH 06/16] chore: Merge back into `syslog.rs` No changes beyond relocating the code into a single file. --- lib/codecs/src/encoding/format/syslog.rs | 454 +++++++++++++++++- .../format/syslog/facility_severity.rs | 106 ---- .../src/encoding/format/syslog/serializer.rs | 155 ------ .../format/syslog/serializer_config.rs | 181 ------- 4 files changed, 453 insertions(+), 443 deletions(-) delete mode 100644 lib/codecs/src/encoding/format/syslog/facility_severity.rs delete mode 100644 lib/codecs/src/encoding/format/syslog/serializer.rs delete mode 100644 lib/codecs/src/encoding/format/syslog/serializer_config.rs diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 9077c806bd698..01be0ad01d8c7 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -3,5 +3,457 @@ use tokio_util::codec::Encoder; use vector_core::{config::DataType, event::{Event, LogEvent}, schema}; use chrono::{DateTime, SecondsFormat, Local}; use vrl::value::Value; -use serde::{de, Deserialize}; use vector_config::configurable_component; + +use std::str::FromStr; +use strum::{FromRepr, EnumString}; + +/// Config used to build a `SyslogSerializer`. +#[configurable_component] +// Serde default makes all config keys optional. +// Each field assigns either a fixed value, or field name (lookup field key to retrieve dynamic value per `LogEvent`). +#[serde(default)] +#[derive(Clone, Debug, Default)] +pub struct SyslogSerializerConfig { + /// RFC + rfc: SyslogRFC, + /// Facility + facility: String, + /// Severity + severity: String, + + /// App Name + app_name: Option, + /// Proc ID + proc_id: Option, + /// Msg ID + msg_id: Option, + + /// Payload key + payload_key: String, + /// Add log source + add_log_source: bool, + + // NOTE: The `tag` field was removed, it is better represented by the equivalents in RFC 5424. + // Q: The majority of the fields above pragmatically only make sense as config for keys to query? + // Q: What was `trim_prefix` for? It is not used in file, nor in Vector source tree. + // Q: `add_log_source` doesn't belong here? Better handled by the `remap` transform with structured data? +} + +impl SyslogSerializerConfig { + /// Build the `SyslogSerializer` from this configuration. + pub fn build(&self) -> SyslogSerializer { + SyslogSerializer::new(&self) + } + + /// The data type of events that are accepted by `SyslogSerializer`. + pub fn input_type(&self) -> DataType { + DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + schema::Requirement::empty() + } +} + +/// Serializer that converts an `Event` to bytes using the Syslog format. +#[derive(Debug, Clone)] +pub struct SyslogSerializer { + config: SyslogSerializerConfig +} + +impl SyslogSerializer { + /// Creates a new `SyslogSerializer`. + pub fn new(conf: &SyslogSerializerConfig) -> Self { + Self { config: conf.clone() } + } +} + +impl Encoder for SyslogSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { + if let Event::Log(log_event) = event { + let syslog_message = ConfigDecanter::new(log_event).decant_config(&self.config); + + let vec = syslog_message + .encode(&self.config.rfc) + .as_bytes() + .to_vec(); + buffer.put_slice(&vec); + } + + Ok(()) + } +} + +// Adapts a `LogEvent` into a `SyslogMessage` based on config from `SyslogSerializerConfig`: +// - Splits off the responsibility of encoding logic to `SyslogMessage` (which is not dependent upon Vector types). +// - Majority of methods are only needed to support the `decant_config()` operation. +struct ConfigDecanter { + log: LogEvent, +} + +impl ConfigDecanter { + fn new(log: LogEvent) -> Self { + Self { + log, + } + } + + fn decant_config(&self, config: &SyslogSerializerConfig) -> SyslogMessage { + let x = |v| self.replace_if_proxied(v).unwrap_or_default(); + let facility = x(&config.facility); + let severity = x(&config.severity); + + let y = |v| self.replace_if_proxied_opt(v); + let app_name = y(&config.app_name).unwrap_or("vector".to_owned()); + let proc_id = y(&config.proc_id); + let msg_id = y(&config.msg_id); + + SyslogMessage { + pri: Pri::from_str_variants(&facility, &severity), + timestamp: self.get_timestamp(), + hostname: self.value_by_key("hostname"), + tag: Tag { + app_name, + proc_id, + msg_id, + }, + structured_data: None, + message: self.get_message(&config), + } + } + + fn replace_if_proxied_opt(&self, value: &Option) -> Option { + value.as_ref().and_then(|v| self.replace_if_proxied(v)) + } + + // When the value has the expected prefix, perform a lookup for a field key without that prefix part. + // A failed lookup returns `None`, while a value without the prefix uses the config value as-is. + // + // Q: Why `$.message.` as the prefix? (Appears to be JSONPath syntax?) + // NOTE: Originally named in PR as: `get_field_or_config()` + fn replace_if_proxied(&self, value: &str) -> Option { + value + .strip_prefix("$.message.") + .map_or( + Some(value.to_owned()), + |field_key| self.value_by_key(field_key), + ) + } + + // NOTE: Originally named in PR as: `get_field()` + // Now returns a `None` directly instead of converting to either `"-"` or `""` + fn value_by_key(&self, field_key: &str) -> Option { + self.log.get(field_key).and_then(|field_value| { + let bytes = field_value.coerce_to_bytes(); + String::from_utf8(bytes.to_vec()).ok() + }) + } + + fn get_timestamp(&self) -> DateTime:: { + // Q: Was this Timestamp key hard-coded to the needs of the original PR author? + // + // Key `@timestamp` depends on input: + // https://vector.dev/guides/level-up/managing-schemas/#example-custom-timestamp-field + // https://vector.dev/docs/about/under-the-hood/architecture/data-model/log/#timestamps + // NOTE: Log schema key renaming is unavailable when Log namespacing is enabled: + // https://vector.dev/docs/reference/configuration/global-options/#log_schema + // + // NOTE: Log namespacing has metadata `%vector.ingest_timestamp` from a source (file/demo_logs) instead of `timestamp`. + // As a `payload_key` it will not respect config `encoding.timestamp_format`, but does when + // using the parent object (`%vector`). Inputs without namespacing respect that config setting. + if let Some(Value::Timestamp(timestamp)) = self.log.get("@timestamp") { + // Q: Utc type returned is changed to Local? + // - Could otherwise return `*timestamp` as-is? Why is Local conversion necessary? + DateTime::::from(*timestamp) + } else { + // NOTE: Local time is encouraged by RFC 5424 when creating a fallback timestamp for RFC 3164 + Local::now() + } + } + + fn get_message(&self, config: &SyslogSerializerConfig) -> String { + let mut message = String::new(); + + if config.add_log_source { + message.push_str(self.add_log_source().as_str()); + } + + // `payload_key` configures where to source the value for the syslog `message`: + // - Field key (Valid) => Get value by lookup (value_by_key) + // - Field key (Invalid) => Empty string (unwrap_or_default) + // - Not configured => JSON encoded `LogEvent` (fallback?) + // + // Q: Was the JSON fallback intended by the original PR author only for debugging? + // Roughly equivalent to using `payload_key: .` (in YAML config)? + let payload = if config.payload_key.is_empty() { + serde_json::to_string(&self.log).ok() + } else { + self.value_by_key(&config.payload_key) + }; + + message.push_str(&payload.unwrap_or_default()); + message + } + + // NOTE: This is a third-party addition from the original PR author (it is not relevant to the syslog spec): + // TODO: Remove, as this type of additional data is better supported via VRL remap + `StructuredData`? + fn add_log_source(&self) -> String { + let get_value = |s| self.value_by_key(s).unwrap_or_default(); + + [ + "namespace_name=", get_value("kubernetes.namespace_name").as_str(), + ", container_name=", get_value("kubernetes.container_name").as_str(), + ", pod_name=", get_value("kubernetes.pod_name").as_str(), + ", message=" + ].concat() + } +} + +// +// SyslogMessage support +// + +const NIL_VALUE: &'static str = "-"; +const SYSLOG_V1: &'static str = "1"; + +/// Syslog RFC +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[serde(rename_all = "snake_case")] +pub enum SyslogRFC { + /// RFC 3164 + Rfc3164, + + #[default] + /// RFC 5424 + Rfc5424 +} + +// ABNF definition: +// https://datatracker.ietf.org/doc/html/rfc5424#section-6 +// https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 +#[derive(Default, Debug)] +struct SyslogMessage { + pri: Pri, + timestamp: DateTime::, + hostname: Option, + tag: Tag, + structured_data: Option, + message: String, +} + +impl SyslogMessage { + fn encode(&self, rfc: &SyslogRFC) -> String { + // Q: NIL_VALUE is unlikely? Technically invalid for RFC 3164: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.4 + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 + let hostname = self.hostname.as_deref().unwrap_or(NIL_VALUE); + let structured_data = self.structured_data.as_ref().map(|sd| sd.encode()); + + let fields_encoded = match rfc { + SyslogRFC::Rfc3164 => { + // TIMESTAMP field format: + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 + let timestamp = self.timestamp.format("%b %e %H:%M:%S").to_string(); + // MSG part begins with TAG field + optional context: + // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.3 + let mut msg_start = self.tag.encode_rfc_3164(); + // When RFC 5424 "Structured Data" is available, it can be compatible with RFC 3164 + // by including it in the RFC 3164 `CONTENT` field (part of MSG): + // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 + if let Some(sd) = structured_data.as_deref() { + msg_start = msg_start + " " + sd + } + + [ + timestamp.as_str(), + hostname, + &msg_start, + ].join(" ") + }, + SyslogRFC::Rfc5424 => { + // HEADER part fields: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 + let version = SYSLOG_V1; + let timestamp = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true); + let tag = self.tag.encode_rfc_5424(); + // Structured Data: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.3 + let sd = structured_data.as_deref().unwrap_or(NIL_VALUE); + + [ + version, + timestamp.as_str(), + hostname, + &tag, + sd + ].join(" ") + } + }; + + [ + &self.pri.encode(), + &fields_encoded, + " ", + &self.message, + ].concat() + + // Q: RFC 5424 MSG part should technically ensure UTF-8 message begins with BOM? + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.4 + } +} + +#[derive(Default, Debug)] +struct Tag { + app_name: String, + proc_id: Option, + msg_id: Option +} + +// NOTE: `.as_deref()` usage below avoids requiring `self.clone()` +impl Tag { + // Roughly equivalent - RFC 5424 fields can compose the start of + // an RFC 3164 MSG part (TAG + CONTENT fields): + // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 + fn encode_rfc_3164(&self) -> String { + let Self { app_name, proc_id, msg_id } = self; + + match proc_id.as_deref().or(msg_id.as_deref()) { + Some(context) => [&app_name, "[", &context, "]:"].concat(), + None => [&app_name, ":"].concat() + } + } + + // TAG was split into separate fields: APP-NAME, PROCID, MSGID + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.5 + fn encode_rfc_5424(&self) -> String { + let Self { app_name, proc_id, msg_id } = self; + + [ + &app_name, + proc_id.as_deref().unwrap_or(NIL_VALUE), + msg_id.as_deref().unwrap_or(NIL_VALUE), + ].join(" ") + } +} + +#[derive(Debug)] +struct StructuredData {} + +impl StructuredData { + fn encode(&self) -> String { + todo!() + } +} + +// +// Facility + Severity support +// + +#[derive(Default, Debug)] +struct Pri { + facility: Facility, + severity: Severity, +} + +impl Pri { + fn from_str_variants(facility_variant: &str, severity_variant: &str) -> Self { + // The original PR had `deserialize_*()` methods parsed a value to a `u8` or stored a field key as a `String` + // Later the equivalent `get_num_*()` method would retrieve the `u8` value or lookup the field key for the actual value, + // otherwise it'd fallback to the default Facility/Severity value. + // This approach instead parses a string of the name or ordinal representation, + // any reference via field key lookup should have already happened by this point. + let facility = Facility::into_variant(&facility_variant).unwrap_or(Facility::User); + let severity = Severity::into_variant(&severity_variant).unwrap_or(Severity::Informational); + + Self { + facility, + severity, + } + } + + // The last paragraph describes how to compose the enums into `PRIVAL`: + // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 + fn encode(&self) -> String { + let prival = (self.facility as u8 * 8) + self.severity as u8; + ["<", &prival.to_string(), ">"].concat() + } +} + +// Facility + Severity mapping from Name => Ordinal number: + +/// Syslog facility +#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] +#[strum(serialize_all = "kebab-case")] +enum Facility { + Kern = 0, + #[default] + User = 1, + Mail = 2, + Daemon = 3, + Auth = 4, + Syslog = 5, + LPR = 6, + News = 7, + UUCP = 8, + Cron = 9, + AuthPriv = 10, + FTP = 11, + NTP = 12, + Security = 13, + Console = 14, + SolarisCron = 15, + Local0 = 16, + Local1 = 17, + Local2 = 18, + Local3 = 19, + Local4 = 20, + Local5 = 21, + Local6 = 22, + Local7 = 23, +} + +/// Syslog severity +#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] +#[strum(serialize_all = "kebab-case")] +enum Severity { + Emergency = 0, + Alert = 1, + Critical = 2, + Error = 3, + Warning = 4, + Notice = 5, + #[default] + Informational = 6, + Debug = 7, +} + +// Additionally support variants from string-based integers: +// Parse a string name, with fallback for parsing a string ordinal number. +impl Facility { + fn into_variant(variant_name: &str) -> Option { + let s = variant_name.to_ascii_lowercase(); + + s.parse::().map_or_else( + |_| Self::from_str(&s).ok(), + |num| Self::from_repr(num), + ) + } +} + +// NOTE: The `strum` crate does not provide traits, +// requiring copy/paste of the prior impl instead. +impl Severity { + fn into_variant(variant_name: &str) -> Option { + let s = variant_name.to_ascii_lowercase(); + + s.parse::().map_or_else( + |_| Self::from_str(&s).ok(), + |num| Self::from_repr(num), + ) + } +} diff --git a/lib/codecs/src/encoding/format/syslog/facility_severity.rs b/lib/codecs/src/encoding/format/syslog/facility_severity.rs deleted file mode 100644 index 786fe61cd0ead..0000000000000 --- a/lib/codecs/src/encoding/format/syslog/facility_severity.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::str::FromStr; -use strum::{FromRepr, EnumString}; - -#[derive(Default, Debug)] -struct Pri { - facility: Facility, - severity: Severity, -} - -impl Pri { - fn from_str_variants(facility_variant: &str, severity_variant: &str) -> Self { - // The original PR had `deserialize_*()` methods parsed a value to a `u8` or stored a field key as a `String` - // Later the equivalent `get_num_*()` method would retrieve the `u8` value or lookup the field key for the actual value, - // otherwise it'd fallback to the default Facility/Severity value. - // This approach instead parses a string of the name or ordinal representation, - // any reference via field key lookup should have already happened by this point. - let facility = Facility::into_variant(&facility_variant).unwrap_or(Facility::User); - let severity = Severity::into_variant(&severity_variant).unwrap_or(Severity::Informational); - - Self { - facility, - severity, - } - } - - // The last paragraph describes how to compose the enums into `PRIVAL`: - // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 - fn encode(&self) -> String { - let prival = (self.facility as u8 * 8) + self.severity as u8; - ["<", &prival.to_string(), ">"].concat() - } -} - -// Facility + Severity mapping from Name => Ordinal number: - -/// Syslog facility -#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] -#[strum(serialize_all = "kebab-case")] -enum Facility { - Kern = 0, - #[default] - User = 1, - Mail = 2, - Daemon = 3, - Auth = 4, - Syslog = 5, - LPR = 6, - News = 7, - UUCP = 8, - Cron = 9, - AuthPriv = 10, - FTP = 11, - NTP = 12, - Security = 13, - Console = 14, - SolarisCron = 15, - Local0 = 16, - Local1 = 17, - Local2 = 18, - Local3 = 19, - Local4 = 20, - Local5 = 21, - Local6 = 22, - Local7 = 23, -} - -/// Syslog severity -#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] -#[strum(serialize_all = "kebab-case")] -enum Severity { - Emergency = 0, - Alert = 1, - Critical = 2, - Error = 3, - Warning = 4, - Notice = 5, - #[default] - Informational = 6, - Debug = 7, -} - -// Additionally support variants from string-based integers: -// Parse a string name, with fallback for parsing a string ordinal number. -impl Facility { - fn into_variant(variant_name: &str) -> Option { - let s = variant_name.to_ascii_lowercase(); - - s.parse::().map_or_else( - |_| Self::from_str(&s).ok(), - |num| Self::from_repr(num), - ) - } -} - -// NOTE: The `strum` crate does not provide traits, -// requiring copy/paste of the prior impl instead. -impl Severity { - fn into_variant(variant_name: &str) -> Option { - let s = variant_name.to_ascii_lowercase(); - - s.parse::().map_or_else( - |_| Self::from_str(&s).ok(), - |num| Self::from_repr(num), - ) - } -} diff --git a/lib/codecs/src/encoding/format/syslog/serializer.rs b/lib/codecs/src/encoding/format/syslog/serializer.rs deleted file mode 100644 index 064e4bc96577e..0000000000000 --- a/lib/codecs/src/encoding/format/syslog/serializer.rs +++ /dev/null @@ -1,155 +0,0 @@ -/// Serializer that converts an `Event` to bytes using the Syslog format. -#[derive(Debug, Clone)] -pub struct SyslogSerializer { - config: SyslogSerializerConfig -} - -impl SyslogSerializer { - /// Creates a new `SyslogSerializer`. - pub fn new(conf: &SyslogSerializerConfig) -> Self { - Self { config: conf.clone() } - } -} - -impl Encoder for SyslogSerializer { - type Error = vector_common::Error; - - fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { - if let Event::Log(log_event) = event { - let syslog_message = ConfigDecanter::new(log_event).decant_config(&self.config); - - let vec = syslog_message - .encode(&self.config.rfc) - .as_bytes() - .to_vec(); - buffer.put_slice(&vec); - } - - Ok(()) - } -} - -// Adapts a `LogEvent` into a `SyslogMessage` based on config from `SyslogSerializerConfig`: -// - Splits off the responsibility of encoding logic to `SyslogMessage` (which is not dependent upon Vector types). -// - Majority of methods are only needed to support the `decant_config()` operation. -struct ConfigDecanter { - log: LogEvent, -} - -impl ConfigDecanter { - fn new(log: LogEvent) -> Self { - Self { - log, - } - } - - fn decant_config(&self, config: &SyslogSerializerConfig) -> SyslogMessage { - let x = |v| self.replace_if_proxied(v).unwrap_or_default(); - let facility = x(&config.facility); - let severity = x(&config.severity); - - let y = |v| self.replace_if_proxied_opt(v); - let app_name = y(&config.app_name).unwrap_or("vector".to_owned()); - let proc_id = y(&config.proc_id); - let msg_id = y(&config.msg_id); - - SyslogMessage { - pri: Pri::from_str_variants(&facility, &severity), - timestamp: self.get_timestamp(), - hostname: self.value_by_key("hostname"), - tag: Tag { - app_name, - proc_id, - msg_id, - }, - structured_data: None, - message: self.get_message(&config), - } - } - - fn replace_if_proxied_opt(&self, value: &Option) -> Option { - value.as_ref().and_then(|v| self.replace_if_proxied(v)) - } - - // When the value has the expected prefix, perform a lookup for a field key without that prefix part. - // A failed lookup returns `None`, while a value without the prefix uses the config value as-is. - // - // Q: Why `$.message.` as the prefix? (Appears to be JSONPath syntax?) - // NOTE: Originally named in PR as: `get_field_or_config()` - fn replace_if_proxied(&self, value: &str) -> Option { - value - .strip_prefix("$.message.") - .map_or( - Some(value.to_owned()), - |field_key| self.value_by_key(field_key), - ) - } - - // NOTE: Originally named in PR as: `get_field()` - // Now returns a `None` directly instead of converting to either `"-"` or `""` - fn value_by_key(&self, field_key: &str) -> Option { - self.log.get(field_key).and_then(|field_value| { - let bytes = field_value.coerce_to_bytes(); - String::from_utf8(bytes.to_vec()).ok() - }) - } - - fn get_timestamp(&self) -> DateTime:: { - // Q: Was this Timestamp key hard-coded to the needs of the original PR author? - // - // Key `@timestamp` depends on input: - // https://vector.dev/guides/level-up/managing-schemas/#example-custom-timestamp-field - // https://vector.dev/docs/about/under-the-hood/architecture/data-model/log/#timestamps - // NOTE: Log schema key renaming is unavailable when Log namespacing is enabled: - // https://vector.dev/docs/reference/configuration/global-options/#log_schema - // - // NOTE: Log namespacing has metadata `%vector.ingest_timestamp` from a source (file/demo_logs) instead of `timestamp`. - // As a `payload_key` it will not respect config `encoding.timestamp_format`, but does when - // using the parent object (`%vector`). Inputs without namespacing respect that config setting. - if let Some(Value::Timestamp(timestamp)) = self.log.get("@timestamp") { - // Q: Utc type returned is changed to Local? - // - Could otherwise return `*timestamp` as-is? Why is Local conversion necessary? - DateTime::::from(*timestamp) - } else { - // NOTE: Local time is encouraged by RFC 5424 when creating a fallback timestamp for RFC 3164 - Local::now() - } - } - - fn get_message(&self, config: &SyslogSerializerConfig) -> String { - let mut message = String::new(); - - if config.add_log_source { - message.push_str(self.add_log_source().as_str()); - } - - // `payload_key` configures where to source the value for the syslog `message`: - // - Field key (Valid) => Get value by lookup (value_by_key) - // - Field key (Invalid) => Empty string (unwrap_or_default) - // - Not configured => JSON encoded `LogEvent` (fallback?) - // - // Q: Was the JSON fallback intended by the original PR author only for debugging? - // Roughly equivalent to using `payload_key: .` (in YAML config)? - let payload = if config.payload_key.is_empty() { - serde_json::to_string(&self.log).ok() - } else { - self.value_by_key(&config.payload_key) - }; - - message.push_str(&payload.unwrap_or_default()); - message - } - - // NOTE: This is a third-party addition from the original PR author (it is not relevant to the syslog spec): - // TODO: Remove, as this type of additional data is better supported via VRL remap + `StructuredData`? - fn add_log_source(&self) -> String { - let get_value = |s| self.value_by_key(s).unwrap_or_default(); - - [ - "namespace_name=", get_value("kubernetes.namespace_name").as_str(), - ", container_name=", get_value("kubernetes.container_name").as_str(), - ", pod_name=", get_value("kubernetes.pod_name").as_str(), - ", message=" - ].concat() - } -} diff --git a/lib/codecs/src/encoding/format/syslog/serializer_config.rs b/lib/codecs/src/encoding/format/syslog/serializer_config.rs deleted file mode 100644 index 57bf85aca6995..0000000000000 --- a/lib/codecs/src/encoding/format/syslog/serializer_config.rs +++ /dev/null @@ -1,181 +0,0 @@ -const NIL_VALUE: &'static str = "-"; -const SYSLOG_V1: &'static str = "1"; - -/// Syslog RFC -#[configurable_component] -#[derive(Clone, Debug, Default)] -#[serde(rename_all = "snake_case")] -pub enum SyslogRFC { - /// RFC 3164 - Rfc3164, - - #[default] - /// RFC 5424 - Rfc5424 -} - -/// Config used to build a `SyslogSerializer`. -#[configurable_component] -// Serde default makes all config keys optional. -// Each field assigns either a fixed value, or field name (lookup field key to retrieve dynamic value per `LogEvent`). -#[serde(default)] -#[derive(Clone, Debug, Default)] -pub struct SyslogSerializerConfig { - /// RFC - rfc: SyslogRFC, - /// Facility - facility: String, - /// Severity - severity: String, - - /// App Name - app_name: Option, - /// Proc ID - proc_id: Option, - /// Msg ID - msg_id: Option, - - /// Payload key - payload_key: String, - /// Add log source - add_log_source: bool, - - // NOTE: The `tag` field was removed, it is better represented by the equivalents in RFC 5424. - // Q: The majority of the fields above pragmatically only make sense as config for keys to query? - // Q: What was `trim_prefix` for? It is not used in file, nor in Vector source tree. - // Q: `add_log_source` doesn't belong here? Better handled by the `remap` transform with structured data? -} - -impl SyslogSerializerConfig { - /// Build the `SyslogSerializer` from this configuration. - pub fn build(&self) -> SyslogSerializer { - SyslogSerializer::new(&self) - } - - /// The data type of events that are accepted by `SyslogSerializer`. - pub fn input_type(&self) -> DataType { - DataType::Log - } - - /// The schema required by the serializer. - pub fn schema_requirement(&self) -> schema::Requirement { - schema::Requirement::empty() - } -} - -// ABNF definition: -// https://datatracker.ietf.org/doc/html/rfc5424#section-6 -// https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 -#[derive(Default, Debug)] -struct SyslogMessage { - pri: Pri, - timestamp: DateTime::, - hostname: Option, - tag: Tag, - structured_data: Option, - message: String, -} - -impl SyslogMessage { - fn encode(&self, rfc: &SyslogRFC) -> String { - // Q: NIL_VALUE is unlikely? Technically invalid for RFC 3164: - // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.4 - // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 - let hostname = self.hostname.as_deref().unwrap_or(NIL_VALUE); - let structured_data = self.structured_data.as_ref().map(|sd| sd.encode()); - - let fields_encoded = match rfc { - SyslogRFC::Rfc3164 => { - // TIMESTAMP field format: - // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 - let timestamp = self.timestamp.format("%b %e %H:%M:%S").to_string(); - // MSG part begins with TAG field + optional context: - // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.3 - let mut msg_start = self.tag.encode_rfc_3164(); - // When RFC 5424 "Structured Data" is available, it can be compatible with RFC 3164 - // by including it in the RFC 3164 `CONTENT` field (part of MSG): - // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 - if let Some(sd) = structured_data.as_deref() { - msg_start = msg_start + " " + sd - } - - [ - timestamp.as_str(), - hostname, - &msg_start, - ].join(" ") - }, - SyslogRFC::Rfc5424 => { - // HEADER part fields: - // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 - let version = SYSLOG_V1; - let timestamp = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true); - let tag = self.tag.encode_rfc_5424(); - // Structured Data: - // https://datatracker.ietf.org/doc/html/rfc5424#section-6.3 - let sd = structured_data.as_deref().unwrap_or(NIL_VALUE); - - [ - version, - timestamp.as_str(), - hostname, - &tag, - sd - ].join(" ") - } - }; - - [ - &self.pri.encode(), - &fields_encoded, - " ", - &self.message, - ].concat() - - // Q: RFC 5424 MSG part should technically ensure UTF-8 message begins with BOM? - // https://datatracker.ietf.org/doc/html/rfc5424#section-6.4 - } -} - -#[derive(Default, Debug)] -struct Tag { - app_name: String, - proc_id: Option, - msg_id: Option -} - -// NOTE: `.as_deref()` usage below avoids requiring `self.clone()` -impl Tag { - // Roughly equivalent - RFC 5424 fields can compose the start of - // an RFC 3164 MSG part (TAG + CONTENT fields): - // https://datatracker.ietf.org/doc/html/rfc5424#appendix-A.1 - fn encode_rfc_3164(&self) -> String { - let Self { app_name, proc_id, msg_id } = self; - - match proc_id.as_deref().or(msg_id.as_deref()) { - Some(context) => [&app_name, "[", &context, "]:"].concat(), - None => [&app_name, ":"].concat() - } - } - - // TAG was split into separate fields: APP-NAME, PROCID, MSGID - // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.5 - fn encode_rfc_5424(&self) -> String { - let Self { app_name, proc_id, msg_id } = self; - - [ - &app_name, - proc_id.as_deref().unwrap_or(NIL_VALUE), - msg_id.as_deref().unwrap_or(NIL_VALUE), - ].join(" ") - } -} - -#[derive(Debug)] -struct StructuredData {} - -impl StructuredData { - fn encode(&self) -> String { - todo!() - } -} From 3cdc1b46c4493cb607e0f3acb56741fd01c0bfaa Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Fri, 15 Mar 2024 17:06:05 +1300 Subject: [PATCH 07/16] feat: Add StructuredData support to Syslog encoder --- lib/codecs/src/encoding/format/syslog.rs | 86 ++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 7 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 01be0ad01d8c7..606a62dab55bc 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -2,9 +2,10 @@ use bytes::{BufMut, BytesMut}; use tokio_util::codec::Encoder; use vector_core::{config::DataType, event::{Event, LogEvent}, schema}; use chrono::{DateTime, SecondsFormat, Local}; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use vector_config::configurable_component; +use std::collections::HashMap; use std::str::FromStr; use strum::{FromRepr, EnumString}; @@ -121,7 +122,7 @@ impl ConfigDecanter { proc_id, msg_id, }, - structured_data: None, + structured_data: self.get_structured_data(), message: self.get_message(&config), } } @@ -153,6 +154,12 @@ impl ConfigDecanter { }) } + fn get_structured_data(&self) -> Option { + self.log.get("structured_data") + .and_then(|v| v.clone().into_object()) + .map(StructuredData::from) + } + fn get_timestamp(&self) -> DateTime:: { // Q: Was this Timestamp key hard-coded to the needs of the original PR author? // @@ -281,8 +288,6 @@ impl SyslogMessage { let version = SYSLOG_V1; let timestamp = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true); let tag = self.tag.encode_rfc_5424(); - // Structured Data: - // https://datatracker.ietf.org/doc/html/rfc5424#section-6.3 let sd = structured_data.as_deref().unwrap_or(NIL_VALUE); [ @@ -341,12 +346,79 @@ impl Tag { } } -#[derive(Debug)] -struct StructuredData {} +// Structured Data: +// https://datatracker.ietf.org/doc/html/rfc5424#section-6.3 +// An SD-ELEMENT consists of a name (SD-ID) + parameter key-value pairs (SD-PARAM) +type StructuredDataMap = HashMap>; +#[derive(Debug, Default)] +struct StructuredData { + elements: StructuredDataMap +} + +// Used by `SyslogMessage::encode()` +/* + Adapted `format_structured_data_rfc5424` method from: + https://github.com/vectordotdev/vector/blob/fafe8c50a4721fa3ddbea34e0641d3c145f14388/src/sources/syslog.rs#L1548-L1563 + No notable change in logic, uses `NIL_VALUE` constant, and adapts method to struct instead of free-standing. +*/ impl StructuredData { fn encode(&self) -> String { - todo!() + if self.elements.is_empty() { + NIL_VALUE.to_string() + } else { + let mut s = String::new(); + + for (sd_id, sd_params) in &self.elements { + s = s + "[" + sd_id; + for (key, value) in sd_params { + s = s + " " + key + "=\"" + value + "\""; + } + s += "]"; + } + + s + } + } +} + +// Used by `ConfigDecanter::decant_config()` +/* + Adapted `structured_data_from_fields()` method from: + https://github.com/vectordotdev/vector/blob/fafe8c50a4721fa3ddbea34e0641d3c145f14388/src/sources/syslog.rs#L1439-L1454 + + Refactored to `impl From` that uses `flat_map()` instead to collect K/V tuples into a `HashMap`. +*/ +impl From for StructuredData { + fn from(fields: ObjectMap) -> Self { + let elements = fields.into_iter().flat_map(|(sd_id, value)| { + let sd_params = value + .into_object()? + .into_iter() + .map(|(k, v)| (k.into(), value_to_string(v))) + .collect(); + + Some((sd_id.into(), sd_params)) + }).collect::(); + + Self { elements } + } +} + +// Only used as helper to support `StructuredData::from()` +/* + Adapted `value_to_string()` method from: + https://github.com/vectordotdev/vector/blob/fafe8c50a4721fa3ddbea34e0641d3c145f14388/src/sources/syslog.rs#L1569-L1579 + https://github.com/vectordotdev/vrl/blob/main/src/value/value/convert.rs + https://github.com/vectordotdev/vrl/blob/main/src/value/value/display.rs + + Simplified via `match` expression which seems better suited for this logic. +*/ +fn value_to_string(v: Value) -> String { + match v { + Value::Bytes(bytes) => String::from_utf8_lossy(&bytes).to_string(), + Value::Timestamp(timestamp) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true), + _ => v.to_string() } } From 3001b679089027f3e22d3edc2c7cc0567c50a4e6 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Fri, 15 Mar 2024 17:41:09 +1300 Subject: [PATCH 08/16] chore: Housekeeping - Drop notes referring to original PR differences + StructuredData adaption references. None of it should be relevant going forward. - Revise some other notes. - Drop `add_log_source` method (introduced from the original PR author) in favor of using `StructuredData` support instead. --- lib/codecs/src/encoding/format/syslog.rs | 53 ++---------------------- 1 file changed, 3 insertions(+), 50 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 606a62dab55bc..cc8a7dbb72a33 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -32,13 +32,8 @@ pub struct SyslogSerializerConfig { /// Payload key payload_key: String, - /// Add log source - add_log_source: bool, - // NOTE: The `tag` field was removed, it is better represented by the equivalents in RFC 5424. // Q: The majority of the fields above pragmatically only make sense as config for keys to query? - // Q: What was `trim_prefix` for? It is not used in file, nor in Vector source tree. - // Q: `add_log_source` doesn't belong here? Better handled by the `remap` transform with structured data? } impl SyslogSerializerConfig { @@ -145,8 +140,6 @@ impl ConfigDecanter { ) } - // NOTE: Originally named in PR as: `get_field()` - // Now returns a `None` directly instead of converting to either `"-"` or `""` fn value_by_key(&self, field_key: &str) -> Option { self.log.get(field_key).and_then(|field_value| { let bytes = field_value.coerce_to_bytes(); @@ -185,10 +178,6 @@ impl ConfigDecanter { fn get_message(&self, config: &SyslogSerializerConfig) -> String { let mut message = String::new(); - if config.add_log_source { - message.push_str(self.add_log_source().as_str()); - } - // `payload_key` configures where to source the value for the syslog `message`: // - Field key (Valid) => Get value by lookup (value_by_key) // - Field key (Invalid) => Empty string (unwrap_or_default) @@ -205,19 +194,6 @@ impl ConfigDecanter { message.push_str(&payload.unwrap_or_default()); message } - - // NOTE: This is a third-party addition from the original PR author (it is not relevant to the syslog spec): - // TODO: Remove, as this type of additional data is better supported via VRL remap + `StructuredData`? - fn add_log_source(&self) -> String { - let get_value = |s| self.value_by_key(s).unwrap_or_default(); - - [ - "namespace_name=", get_value("kubernetes.namespace_name").as_str(), - ", container_name=", get_value("kubernetes.container_name").as_str(), - ", pod_name=", get_value("kubernetes.pod_name").as_str(), - ", message=" - ].concat() - } } // @@ -319,7 +295,6 @@ struct Tag { msg_id: Option } -// NOTE: `.as_deref()` usage below avoids requiring `self.clone()` impl Tag { // Roughly equivalent - RFC 5424 fields can compose the start of // an RFC 3164 MSG part (TAG + CONTENT fields): @@ -356,12 +331,6 @@ struct StructuredData { } // Used by `SyslogMessage::encode()` -/* - Adapted `format_structured_data_rfc5424` method from: - https://github.com/vectordotdev/vector/blob/fafe8c50a4721fa3ddbea34e0641d3c145f14388/src/sources/syslog.rs#L1548-L1563 - - No notable change in logic, uses `NIL_VALUE` constant, and adapts method to struct instead of free-standing. -*/ impl StructuredData { fn encode(&self) -> String { if self.elements.is_empty() { @@ -383,12 +352,6 @@ impl StructuredData { } // Used by `ConfigDecanter::decant_config()` -/* - Adapted `structured_data_from_fields()` method from: - https://github.com/vectordotdev/vector/blob/fafe8c50a4721fa3ddbea34e0641d3c145f14388/src/sources/syslog.rs#L1439-L1454 - - Refactored to `impl From` that uses `flat_map()` instead to collect K/V tuples into a `HashMap`. -*/ impl From for StructuredData { fn from(fields: ObjectMap) -> Self { let elements = fields.into_iter().flat_map(|(sd_id, value)| { @@ -406,14 +369,6 @@ impl From for StructuredData { } // Only used as helper to support `StructuredData::from()` -/* - Adapted `value_to_string()` method from: - https://github.com/vectordotdev/vector/blob/fafe8c50a4721fa3ddbea34e0641d3c145f14388/src/sources/syslog.rs#L1569-L1579 - https://github.com/vectordotdev/vrl/blob/main/src/value/value/convert.rs - https://github.com/vectordotdev/vrl/blob/main/src/value/value/display.rs - - Simplified via `match` expression which seems better suited for this logic. -*/ fn value_to_string(v: Value) -> String { match v { Value::Bytes(bytes) => String::from_utf8_lossy(&bytes).to_string(), @@ -434,9 +389,6 @@ struct Pri { impl Pri { fn from_str_variants(facility_variant: &str, severity_variant: &str) -> Self { - // The original PR had `deserialize_*()` methods parsed a value to a `u8` or stored a field key as a `String` - // Later the equivalent `get_num_*()` method would retrieve the `u8` value or lookup the field key for the actual value, - // otherwise it'd fallback to the default Facility/Severity value. // This approach instead parses a string of the name or ordinal representation, // any reference via field key lookup should have already happened by this point. let facility = Facility::into_variant(&facility_variant).unwrap_or(Facility::User); @@ -505,7 +457,8 @@ enum Severity { } // Additionally support variants from string-based integers: -// Parse a string name, with fallback for parsing a string ordinal number. +// Attempts to parse a string for ordinal mapping first, otherwise try the variant name. +// NOTE: No error handling in place, invalid config will fallback to default during `decant_config()`. impl Facility { fn into_variant(variant_name: &str) -> Option { let s = variant_name.to_ascii_lowercase(); @@ -518,7 +471,7 @@ impl Facility { } // NOTE: The `strum` crate does not provide traits, -// requiring copy/paste of the prior impl instead. +// requiring copy/paste to repeat the previous impl for this enum too. impl Severity { fn into_variant(variant_name: &str) -> Option { let s = variant_name.to_ascii_lowercase(); From f8be8d96bca68e71451004131122329ce54d56bf Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Mon, 18 Mar 2024 17:03:27 +1300 Subject: [PATCH 09/16] chore: DRY `into_variant()` via `akin` crate This should be simple and lightweight enough to justify for the DRY benefit? This way the method doesn't need to be duplicated redundantly. That was required because there is no trait for `FromRepr` provided via `strum`. That would require a similar amount of lines for the small duplication here. The `akin` macro duplicates the `impl` block for each value in the `&enums` array. --- lib/codecs/Cargo.toml | 3 ++- lib/codecs/src/encoding/format/syslog.rs | 30 +++++++++--------------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index d4d12e966994b..cc51ad692ae7b 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -10,6 +10,7 @@ name = "generate-avro-fixtures" path = "tests/bin/generate-avro-fixtures.rs" [dependencies] +akin = { version = "0.4", optional = true } apache-avro = { version = "0.16.0", default-features = false } bytes = { version = "1", default-features = false } chrono.workspace = true @@ -49,4 +50,4 @@ rstest = "0.18.2" vrl.workspace = true [features] -syslog = ["dep:syslog_loose", "dep:strum"] +syslog = ["dep:syslog_loose", "dep:strum", "dep:akin"] diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index cc8a7dbb72a33..1f0450cb69e31 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -8,6 +8,7 @@ use vector_config::configurable_component; use std::collections::HashMap; use std::str::FromStr; use strum::{FromRepr, EnumString}; +use akin::akin; /// Config used to build a `SyslogSerializer`. #[configurable_component] @@ -459,26 +460,17 @@ enum Severity { // Additionally support variants from string-based integers: // Attempts to parse a string for ordinal mapping first, otherwise try the variant name. // NOTE: No error handling in place, invalid config will fallback to default during `decant_config()`. -impl Facility { - fn into_variant(variant_name: &str) -> Option { - let s = variant_name.to_ascii_lowercase(); - - s.parse::().map_or_else( - |_| Self::from_str(&s).ok(), - |num| Self::from_repr(num), - ) - } -} +akin! { + let &enums = [Facility, Severity]; -// NOTE: The `strum` crate does not provide traits, -// requiring copy/paste to repeat the previous impl for this enum too. -impl Severity { - fn into_variant(variant_name: &str) -> Option { - let s = variant_name.to_ascii_lowercase(); + impl *enums { + fn into_variant(variant_name: &str) -> Option { + let s = variant_name.to_ascii_lowercase(); - s.parse::().map_or_else( - |_| Self::from_str(&s).ok(), - |num| Self::from_repr(num), - ) + s.parse::().map_or_else( + |_| Self::from_str(&s).ok(), + |num| Self::from_repr(num), + ) + } } } From 536028731b2767148e85e77e14092037272984a6 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:44:54 +1300 Subject: [PATCH 10/16] chore: Minor revisions - `ConfigDecanter::get_message()` replaces the fallback method in favor of `to_string_lossy()` (a dedicated equivalent for converting `Value` type to a String type (_technically it is a CoW str, hence the follow-up with `to_string()`_)). - This also encodes the value better, especially for the default `log_namespace: false` as the message value (when `String`) is not quote wrapped, which matches the behaviour of the `text` encoder output. - Additionally uses the `LogEvent` method `get_message()` directly from `lib/vector-core/src/event /log_event.rs`. This can better retrieve the log message regardless of the `log_namespace` setting. - Encoding of RFC 5424 fields has changed to inline the `version` constant directly, instead of via a redundant variable. If there's ever multiple versions that need to be supported, it could be addressed then. - The RFC 5424 timestamp has a max precision of microseconds, thus this should be rounded and `AutoSi` can be used (_or `Micros` if it should have fixed padding instead of truncating trailing `000`_). --- lib/codecs/src/encoding/format/syslog.rs | 28 +++++++++++++----------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 1f0450cb69e31..23b11acb74c30 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -1,7 +1,7 @@ use bytes::{BufMut, BytesMut}; use tokio_util::codec::Encoder; use vector_core::{config::DataType, event::{Event, LogEvent}, schema}; -use chrono::{DateTime, SecondsFormat, Local}; +use chrono::{DateTime, SecondsFormat, Local, SubsecRound}; use vrl::value::{ObjectMap, Value}; use vector_config::configurable_component; @@ -177,23 +177,23 @@ impl ConfigDecanter { } fn get_message(&self, config: &SyslogSerializerConfig) -> String { - let mut message = String::new(); - // `payload_key` configures where to source the value for the syslog `message`: + // - Not configured => Encodes the default log message. // - Field key (Valid) => Get value by lookup (value_by_key) // - Field key (Invalid) => Empty string (unwrap_or_default) - // - Not configured => JSON encoded `LogEvent` (fallback?) - // - // Q: Was the JSON fallback intended by the original PR author only for debugging? - // Roughly equivalent to using `payload_key: .` (in YAML config)? + + // Ref: + // `log.get_message()`: + // https://github.com/vectordotdev/vector/blob/ad6a48efc0f79b2c18a5c1394e5d8603fdfd1bab/lib/vector-core/src/event/log_event.rs#L532-L541 + // `v.to_string_lossy()`: + // https://github.com/vectordotdev/vrl/blob/f2d71cd26cb8270230f531945d7dee4929235905/src/value/value/serde.rs#L34-L55 let payload = if config.payload_key.is_empty() { - serde_json::to_string(&self.log).ok() + self.log.get_message().map(|v| v.to_string_lossy().to_string() ) } else { self.value_by_key(&config.payload_key) }; - message.push_str(&payload.unwrap_or_default()); - message + payload.unwrap_or_default() } } @@ -242,6 +242,7 @@ impl SyslogMessage { SyslogRFC::Rfc3164 => { // TIMESTAMP field format: // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 + // https://docs.rs/chrono/latest/chrono/format/strftime/index.html let timestamp = self.timestamp.format("%b %e %H:%M:%S").to_string(); // MSG part begins with TAG field + optional context: // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.3 @@ -262,13 +263,14 @@ impl SyslogMessage { SyslogRFC::Rfc5424 => { // HEADER part fields: // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 - let version = SYSLOG_V1; - let timestamp = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true); + // TIME-FRAC max length is 6 digits (microseconds): + // https://datatracker.ietf.org/doc/html/rfc5424#section-6 + let timestamp = self.timestamp.round_subsecs(6).to_rfc3339_opts(SecondsFormat::AutoSi, true); let tag = self.tag.encode_rfc_5424(); let sd = structured_data.as_deref().unwrap_or(NIL_VALUE); [ - version, + SYSLOG_V1, timestamp.as_str(), hostname, &tag, From 38d0d611eb42eb9ae22cbeb0383482b4dd2150c5 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:25:46 +1300 Subject: [PATCH 11/16] chore: Switch from `DateTime` to `DateTime` - The original PR author appears to have relied on a hard-coded timestamp key here. - `DateTime` would render the timestamp field with the local timezone offset, but other than that `DateTime` would seem more consistent with usage in Vector, especially since any original TZ context is lost by this point? - Notes adjusted accordingly, with added TODO query for each encoding mode to potentially support configurable timezone. --- lib/codecs/src/encoding/format/syslog.rs | 53 ++++++++++++++++-------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 23b11acb74c30..3588e99e02bf3 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -1,7 +1,7 @@ use bytes::{BufMut, BytesMut}; use tokio_util::codec::Encoder; use vector_core::{config::DataType, event::{Event, LogEvent}, schema}; -use chrono::{DateTime, SecondsFormat, Local, SubsecRound}; +use chrono::{DateTime, SecondsFormat, SubsecRound, Utc}; use vrl::value::{ObjectMap, Value}; use vector_config::configurable_component; @@ -154,25 +154,37 @@ impl ConfigDecanter { .map(StructuredData::from) } - fn get_timestamp(&self) -> DateTime:: { - // Q: Was this Timestamp key hard-coded to the needs of the original PR author? + fn get_timestamp(&self) -> DateTime:: { + // Q: Should the timestamp source be configurable? (eg: Select a field from the `remap` transform) // - // Key `@timestamp` depends on input: - // https://vector.dev/guides/level-up/managing-schemas/#example-custom-timestamp-field - // https://vector.dev/docs/about/under-the-hood/architecture/data-model/log/#timestamps - // NOTE: Log schema key renaming is unavailable when Log namespacing is enabled: - // https://vector.dev/docs/reference/configuration/global-options/#log_schema + // Concerns: + // - A source with `log_namespace: true` seems to cause `get_timstamp()` to return `None`? + // Does not seem to retrieve `%vector.ingest_timestamp`? + // - A sink type `console` with `timestamp_format: unix_ms` converts a `Value::Timestamp` prior to the encoder logic + // to `Value::Integer(i64)` instead, which won't match this condition. // - // NOTE: Log namespacing has metadata `%vector.ingest_timestamp` from a source (file/demo_logs) instead of `timestamp`. - // As a `payload_key` it will not respect config `encoding.timestamp_format`, but does when - // using the parent object (`%vector`). Inputs without namespacing respect that config setting. - if let Some(Value::Timestamp(timestamp)) = self.log.get("@timestamp") { - // Q: Utc type returned is changed to Local? - // - Could otherwise return `*timestamp` as-is? Why is Local conversion necessary? - DateTime::::from(*timestamp) + // NOTE: + // Vector always manages `Value::Timestamp` as `DateTime`, any prior TZ information context is always dropped. + // If restoring the TZ for a log is important, it could be handled via a remap transform? + // + // Ref: + // `log.get_timestamp()`: + // https://github.com/vectordotdev/vector/blob/ad6a48efc0f79b2c18a5c1394e5d8603fdfd1bab/lib/vector-core/src/event/log_event.rs#L543-L552 + if let Some(Value::Timestamp(timestamp)) = self.log.get_timestamp() { + *timestamp } else { - // NOTE: Local time is encouraged by RFC 5424 when creating a fallback timestamp for RFC 3164 - Local::now() + // NOTE: + // When timezone information is missing Vector handles conversion to UTC by assuming the local TZ: + // https://vector.dev/docs/about/under-the-hood/architecture/data-model/log/#time-zones + // There is a global option for which TZ to assume (where the default is local TZ): + // https://vector.dev/docs/reference/configuration/global-options/#timezone + // https://github.com/vectordotdev/vector/blob/58a4a2ef52e606c0f9b9fa975cf114b661300584/lib/vector-core/src/config/global_options.rs#L233-L236 + // https://github.com/vectordotdev/vrl/blob/c010300710a00191cd406e57cd0f3e001923d598/src/compiler/datetime.rs#L88-L95 + // VRL remap can also override that: + // https://vector.dev/docs/reference/configuration/transforms/remap/#timezone + // Vector's `syslog` source type also uses `Utc::now()` internally as a fallback: + // https://github.com/vectordotdev/vector/blob/58a4a2ef52e606c0f9b9fa975cf114b661300584/src/sources/syslog.rs#L430-L438 + Utc::now() } } @@ -223,7 +235,7 @@ pub enum SyslogRFC { #[derive(Default, Debug)] struct SyslogMessage { pri: Pri, - timestamp: DateTime::, + timestamp: DateTime::, hostname: Option, tag: Tag, structured_data: Option, @@ -243,6 +255,10 @@ impl SyslogMessage { // TIMESTAMP field format: // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.2 // https://docs.rs/chrono/latest/chrono/format/strftime/index.html + // + // TODO: Should this remain as UTC or adjust to the local TZ of the environment (or Vector config)? + // RFC 5424 suggests (when adapting for RFC 3164) to present a timestamp with the local TZ of the log source: + // https://www.rfc-editor.org/rfc/rfc5424#appendix-A.1 let timestamp = self.timestamp.format("%b %e %H:%M:%S").to_string(); // MSG part begins with TAG field + optional context: // https://datatracker.ietf.org/doc/html/rfc3164#section-4.1.3 @@ -265,6 +281,7 @@ impl SyslogMessage { // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2 // TIME-FRAC max length is 6 digits (microseconds): // https://datatracker.ietf.org/doc/html/rfc5424#section-6 + // TODO: Likewise for RFC 5424, as UTC the offset will always render as `Z` if not configurable. let timestamp = self.timestamp.round_subsecs(6).to_rfc3339_opts(SecondsFormat::AutoSi, true); let tag = self.tag.encode_rfc_5424(); let sd = structured_data.as_deref().unwrap_or(NIL_VALUE); From 7ef97fb85832936fabe45ec43f6b35ab58414402 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Wed, 20 Mar 2024 19:45:15 +1300 Subject: [PATCH 12/16] chore: Adopt a separate options config struct + minor revisions - Move encoder config settings under a single `syslog` config field. This better mirrors configuration options for existing encoders like Avro and CSV. - `ConfigDecanter::value_by_key()` appears to accomplish roughly the same as the existing helper method `to_string_lossy()`. Prefer that instead. This also makes the `StructuredData` helper `value_to_string()` redundant too at a glance? - Added some reference for the priority value `PRIVAL`. - `Pri::from_str_variants()` uses the existing defaults for fallback, communicate that more clearly. Contextual note is no longer useful, removed. --- lib/codecs/src/encoding/format/syslog.rs | 78 ++++++++++++++---------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 3588e99e02bf3..d27f47025b8f2 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -12,11 +12,37 @@ use akin::akin; /// Config used to build a `SyslogSerializer`. #[configurable_component] +#[derive(Clone, Debug, Default)] +#[serde(default)] +pub struct SyslogSerializerConfig { + /// Options for the Syslog serializer. + pub syslog: SyslogSerializerOptions +} + +impl SyslogSerializerConfig { + /// Build the `SyslogSerializer` from this configuration. + pub fn build(&self) -> SyslogSerializer { + SyslogSerializer::new(&self) + } + + /// The data type of events that are accepted by `SyslogSerializer`. + pub fn input_type(&self) -> DataType { + DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + schema::Requirement::empty() + } +} + +/// Syslog serializer options. +#[configurable_component] +#[derive(Clone, Debug, Default)] // Serde default makes all config keys optional. // Each field assigns either a fixed value, or field name (lookup field key to retrieve dynamic value per `LogEvent`). #[serde(default)] -#[derive(Clone, Debug, Default)] -pub struct SyslogSerializerConfig { +pub struct SyslogSerializerOptions { /// RFC rfc: SyslogRFC, /// Facility @@ -37,23 +63,6 @@ pub struct SyslogSerializerConfig { // Q: The majority of the fields above pragmatically only make sense as config for keys to query? } -impl SyslogSerializerConfig { - /// Build the `SyslogSerializer` from this configuration. - pub fn build(&self) -> SyslogSerializer { - SyslogSerializer::new(&self) - } - - /// The data type of events that are accepted by `SyslogSerializer`. - pub fn input_type(&self) -> DataType { - DataType::Log - } - - /// The schema required by the serializer. - pub fn schema_requirement(&self) -> schema::Requirement { - schema::Requirement::empty() - } -} - /// Serializer that converts an `Event` to bytes using the Syslog format. #[derive(Debug, Clone)] pub struct SyslogSerializer { @@ -72,10 +81,10 @@ impl Encoder for SyslogSerializer { fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { if let Event::Log(log_event) = event { - let syslog_message = ConfigDecanter::new(log_event).decant_config(&self.config); + let syslog_message = ConfigDecanter::new(log_event).decant_config(&self.config.syslog); let vec = syslog_message - .encode(&self.config.rfc) + .encode(&self.config.syslog.rfc) .as_bytes() .to_vec(); buffer.put_slice(&vec); @@ -85,7 +94,7 @@ impl Encoder for SyslogSerializer { } } -// Adapts a `LogEvent` into a `SyslogMessage` based on config from `SyslogSerializerConfig`: +// Adapts a `LogEvent` into a `SyslogMessage` based on config from `SyslogSerializerOptions`: // - Splits off the responsibility of encoding logic to `SyslogMessage` (which is not dependent upon Vector types). // - Majority of methods are only needed to support the `decant_config()` operation. struct ConfigDecanter { @@ -99,7 +108,7 @@ impl ConfigDecanter { } } - fn decant_config(&self, config: &SyslogSerializerConfig) -> SyslogMessage { + fn decant_config(&self, config: &SyslogSerializerOptions) -> SyslogMessage { let x = |v| self.replace_if_proxied(v).unwrap_or_default(); let facility = x(&config.facility); let severity = x(&config.severity); @@ -142,9 +151,8 @@ impl ConfigDecanter { } fn value_by_key(&self, field_key: &str) -> Option { - self.log.get(field_key).and_then(|field_value| { - let bytes = field_value.coerce_to_bytes(); - String::from_utf8(bytes.to_vec()).ok() + self.log.get(field_key).map(|field_value| { + field_value.to_string_lossy().to_string() }) } @@ -188,7 +196,7 @@ impl ConfigDecanter { } } - fn get_message(&self, config: &SyslogSerializerConfig) -> String { + fn get_message(&self, config: &SyslogSerializerOptions) -> String { // `payload_key` configures where to source the value for the syslog `message`: // - Not configured => Encodes the default log message. // - Field key (Valid) => Get value by lookup (value_by_key) @@ -389,6 +397,12 @@ impl From for StructuredData { } // Only used as helper to support `StructuredData::from()` +// +// TODO: +// This method could be replaced in favor of calling `v.to_string_lossy().to_string()`? (As was done elsewhere in this file): +// https://github.com/vectordotdev/vrl/blob/f2d71cd26cb8270230f531945d7dee4929235905/src/value/value/serde.rs#L34-L55 +// Timestamp value is handled the same way via `timestamp_to_string()`: +// https://github.com/vectordotdev/vrl/blob/f2d71cd26cb8270230f531945d7dee4929235905/src/value/value.rs#L175-L179 fn value_to_string(v: Value) -> String { match v { Value::Bytes(bytes) => String::from_utf8_lossy(&bytes).to_string(), @@ -401,6 +415,10 @@ fn value_to_string(v: Value) -> String { // Facility + Severity support // +// PRIVAL: +// https://www.rfc-editor.org/rfc/rfc5424#section-6.2.1 +// > The number contained within these angle brackets is known as the Priority value (PRIVAL) +// and represents both the Facility and Severity. #[derive(Default, Debug)] struct Pri { facility: Facility, @@ -409,10 +427,8 @@ struct Pri { impl Pri { fn from_str_variants(facility_variant: &str, severity_variant: &str) -> Self { - // This approach instead parses a string of the name or ordinal representation, - // any reference via field key lookup should have already happened by this point. - let facility = Facility::into_variant(&facility_variant).unwrap_or(Facility::User); - let severity = Severity::into_variant(&severity_variant).unwrap_or(Severity::Informational); + let facility = Facility::into_variant(&facility_variant).unwrap_or_default(); + let severity = Severity::into_variant(&severity_variant).unwrap_or_default(); Self { facility, From 34e735d7ff4443807868ddaacb58ffd0204aaff0 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:40:14 +1300 Subject: [PATCH 13/16] chore: Switch from `String` to deserialize `Facility` + `Severity` enums To better communicate the allowed values, these two config fields can change from the `String` type to their appropriate enum type. - This relies on serde to deserialize the config value to the enum which adds a bit more noise to grok. - It does make `Pri::from_str_variants()` redundant, while the `into_variant()` methods are refactored to `deserialize()` with a proper error message emitted to match the what serde would normally emit for failed enum variant deserialization. - A drawback of this change is that these two config fields lost the ability to reference a different value path in the `LogEvent`. That'll be addressed in a future commit. --- lib/codecs/src/encoding/format/syslog.rs | 66 ++++++++++++++---------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index d27f47025b8f2..00918a6ce2139 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -6,9 +6,14 @@ use vrl::value::{ObjectMap, Value}; use vector_config::configurable_component; use std::collections::HashMap; + +// All of this block is to support the Facility + Severity enums with convenience of string or ordinal config value: use std::str::FromStr; -use strum::{FromRepr, EnumString}; +use strum::{FromRepr, EnumString, VariantNames}; +// `akin` macro for DRY impl to share with both enums due to lack of a `FromRepr` trait: use akin::akin; +// Custom deserialization with serde needed: +use serde::{Deserializer, de::Error}; /// Config used to build a `SyslogSerializer`. #[configurable_component] @@ -46,9 +51,11 @@ pub struct SyslogSerializerOptions { /// RFC rfc: SyslogRFC, /// Facility - facility: String, + #[serde(deserialize_with = "Facility::deserialize")] + facility: Facility, /// Severity - severity: String, + #[serde(deserialize_with = "Severity::deserialize")] + severity: Severity, /// App Name app_name: Option, @@ -109,17 +116,16 @@ impl ConfigDecanter { } fn decant_config(&self, config: &SyslogSerializerOptions) -> SyslogMessage { - let x = |v| self.replace_if_proxied(v).unwrap_or_default(); - let facility = x(&config.facility); - let severity = x(&config.severity); - let y = |v| self.replace_if_proxied_opt(v); let app_name = y(&config.app_name).unwrap_or("vector".to_owned()); let proc_id = y(&config.proc_id); let msg_id = y(&config.msg_id); SyslogMessage { - pri: Pri::from_str_variants(&facility, &severity), + pri: Pri { + facility: config.facility, + severity: config.severity, + }, timestamp: self.get_timestamp(), hostname: self.value_by_key("hostname"), tag: Tag { @@ -426,16 +432,6 @@ struct Pri { } impl Pri { - fn from_str_variants(facility_variant: &str, severity_variant: &str) -> Self { - let facility = Facility::into_variant(&facility_variant).unwrap_or_default(); - let severity = Severity::into_variant(&severity_variant).unwrap_or_default(); - - Self { - facility, - severity, - } - } - // The last paragraph describes how to compose the enums into `PRIVAL`: // https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 fn encode(&self) -> String { @@ -445,10 +441,15 @@ impl Pri { } // Facility + Severity mapping from Name => Ordinal number: +// NOTE: +// - `configurable_component(no_deser)` is used to match the existing functionality to support deserializing config with ordinal mapping. +// - `EnumString` with `strum(serialize_all = "kebab-case")` provides the `FromStr` support, while `FromRepr` handles ordinal support. +// - `VariantNames` assists with generating the equivalent `de::Error::unknown_variant` serde error message. /// Syslog facility -#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] +#[derive(Default, Debug, EnumString, FromRepr, VariantNames, Copy, Clone)] #[strum(serialize_all = "kebab-case")] +#[configurable_component(no_deser)] enum Facility { Kern = 0, #[default] @@ -478,8 +479,9 @@ enum Facility { } /// Syslog severity -#[derive(Default, Debug, EnumString, FromRepr, Copy, Clone)] +#[derive(Default, Debug, EnumString, FromRepr, VariantNames, Copy, Clone)] #[strum(serialize_all = "kebab-case")] +#[configurable_component(no_deser)] enum Severity { Emergency = 0, Alert = 1, @@ -494,18 +496,28 @@ enum Severity { // Additionally support variants from string-based integers: // Attempts to parse a string for ordinal mapping first, otherwise try the variant name. -// NOTE: No error handling in place, invalid config will fallback to default during `decant_config()`. +// NOTE: +// - While `serde(rename_all = "kebab-case")` attribute would deserialize like `FromStr` + `EnumString`, config input must strictly match. +// - To retain support for ordinal config input, a custom deserialize method is needed (as `derive(Deserialize)` is too basic): +// - Error message should roughly match `de::Error::unknown_variant` + akin! { let &enums = [Facility, Severity]; impl *enums { - fn into_variant(variant_name: &str) -> Option { - let s = variant_name.to_ascii_lowercase(); - - s.parse::().map_or_else( - |_| Self::from_str(&s).ok(), + fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = String::deserialize(deserializer)?; + + value.parse::().map_or_else( + |_| Self::from_str(&value.to_ascii_lowercase()).ok(), |num| Self::from_repr(num), - ) + ).ok_or(format!( + "Unknown variant `{value}`, expected one of `{variants}`", + variants=Self::VARIANTS.join("`, `") + )).map_err(D::Error::custom) } } } From 2ac3da2db28f3da1df8d088de6b2d44dc7567416 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Mon, 1 Apr 2024 15:04:52 +1300 Subject: [PATCH 14/16] fix: Support deserializing config value that is a number type In a YAML config a string can optionally be wrapped with quotes, while a number that isn't quote wrapped will be treated as a number type. The current support was only for string numbers, this change now supports flexibility for config using ordinal values in YAML regardless of quote usage. The previous `Self::into_variant(&s)` logic could have been used instead of bringing in `serde-aux`, but the external helper attribute approach seems easier to grok/follow as the intermediary container still seems required for a terse implementation. The match statement uses a reference (_which requires a deref for `from_repr`_) to appease the borrow checker for the later borrow needed by `value` in the error message. --- lib/codecs/Cargo.toml | 4 +++- lib/codecs/src/encoding/format/syslog.rs | 26 ++++++++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index cc51ad692ae7b..aab13651415a9 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -16,6 +16,7 @@ bytes = { version = "1", default-features = false } chrono.workspace = true csv-core = { version = "0.1.10", default-features = false } derivative = { version = "2", default-features = false } +derive_more = { version = "0.99", optional = true } dyn-clone = { version = "1", default-features = false } lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] } memchr = { version = "2", default-features = false } @@ -25,6 +26,7 @@ prost = { version = "0.12.3", default-features = false, features = ["std"] } prost-reflect = { version = "0.13", default-features = false, features = ["serde"] } regex = { version = "1.10.3", default-features = false, features = ["std", "perf"] } serde.workspace = true +serde-aux = { version = "4.5", optional = true } serde_json.workspace = true smallvec = { version = "1", default-features = false, features = ["union"] } snafu = { version = "0.7.5", default-features = false, features = ["futures"] } @@ -50,4 +52,4 @@ rstest = "0.18.2" vrl.workspace = true [features] -syslog = ["dep:syslog_loose", "dep:strum", "dep:akin"] +syslog = ["dep:syslog_loose", "dep:strum", "dep:akin", "dep:derive_more", "dep:serde-aux"] diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 00918a6ce2139..fdfd026f24712 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -13,7 +13,8 @@ use strum::{FromRepr, EnumString, VariantNames}; // `akin` macro for DRY impl to share with both enums due to lack of a `FromRepr` trait: use akin::akin; // Custom deserialization with serde needed: -use serde::{Deserializer, de::Error}; +use serde::{Deserialize, Deserializer, de::Error}; +use serde_aux::field_attributes::deserialize_number_from_string; /// Config used to build a `SyslogSerializer`. #[configurable_component] @@ -509,15 +510,28 @@ akin! { where D: Deserializer<'de>, { - let value = String::deserialize(deserializer)?; + let value = NumberOrString::deserialize(deserializer)?; + let variant: Option = match &value { + NumberOrString::Number(num) => Self::from_repr(*num), + NumberOrString::String(s) => Self::from_str(&s.to_ascii_lowercase()).ok(), + }; - value.parse::().map_or_else( - |_| Self::from_str(&value.to_ascii_lowercase()).ok(), - |num| Self::from_repr(num), - ).ok_or(format!( + variant.ok_or_else(|| format!( "Unknown variant `{value}`, expected one of `{variants}`", variants=Self::VARIANTS.join("`, `") )).map_err(D::Error::custom) } } } + +// An intermediary container to deserialize config value into. +// Ensures that a string number is properly deserialized to the `usize` variant. +#[derive(derive_more::Display, Deserialize)] +#[serde(untagged)] +enum NumberOrString { + Number( + #[serde(deserialize_with = "deserialize_number_from_string")] + usize + ), + String(String) +} From 7ba64bee325bd2869b4535abb8bd94b4ad0819cf Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Mon, 1 Apr 2024 15:10:06 +1300 Subject: [PATCH 15/16] chore: Add doc comments for enum variants to appease Vector requirement This seems redundant given the context? Mostly adds unnecessary noise. Could probably `impl Configurable` or similar to try workaround the requirement. The metadata description could generate the variant list similar to how it's been handled for error message handling? --- lib/codecs/src/encoding/format/syslog.rs | 33 ++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index fdfd026f24712..03d841d1cb23e 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -443,6 +443,7 @@ impl Pri { // Facility + Severity mapping from Name => Ordinal number: // NOTE: +// - Vector component enforces variant doc-comments, even though it's pointless for these enums? // - `configurable_component(no_deser)` is used to match the existing functionality to support deserializing config with ordinal mapping. // - `EnumString` with `strum(serialize_all = "kebab-case")` provides the `FromStr` support, while `FromRepr` handles ordinal support. // - `VariantNames` assists with generating the equivalent `de::Error::unknown_variant` serde error message. @@ -452,30 +453,54 @@ impl Pri { #[strum(serialize_all = "kebab-case")] #[configurable_component(no_deser)] enum Facility { + /// Kern Kern = 0, + /// User #[default] User = 1, + /// Mail Mail = 2, + /// Daemon Daemon = 3, + /// Auth Auth = 4, + /// Syslog Syslog = 5, + /// LPR LPR = 6, + /// News News = 7, + /// UUCP UUCP = 8, + /// Cron Cron = 9, + /// AuthPriv AuthPriv = 10, + /// FTP FTP = 11, + /// NTP NTP = 12, + /// Security Security = 13, + /// Console Console = 14, + /// SolarisCron SolarisCron = 15, + /// Local0 Local0 = 16, + /// Local1 Local1 = 17, + /// Local2 Local2 = 18, + /// Local3 Local3 = 19, + /// Local4 Local4 = 20, + /// Local5 Local5 = 21, + /// Local6 Local6 = 22, + /// Local7 Local7 = 23, } @@ -484,14 +509,22 @@ enum Facility { #[strum(serialize_all = "kebab-case")] #[configurable_component(no_deser)] enum Severity { + /// Emergency Emergency = 0, + /// Alert Alert = 1, + /// Critical Critical = 2, + /// Error Error = 3, + /// Warning Warning = 4, + /// Notice Notice = 5, + /// Informational #[default] Informational = 6, + /// Debug Debug = 7, } From ed202bbd71d8ce4e632c97c451ec30b4d0c5dc13 Mon Sep 17 00:00:00 2001 From: polarathene <5098581+polarathene@users.noreply.github.com> Date: Mon, 1 Apr 2024 18:44:19 +1300 Subject: [PATCH 16/16] chore: Use `snafu` for error message Not sure if this is worthwhile, but it adopts error message convention elsewhere I've seen by managing them via Snafu. --- lib/codecs/src/encoding/format/syslog.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/codecs/src/encoding/format/syslog.rs b/lib/codecs/src/encoding/format/syslog.rs index 03d841d1cb23e..64ed40eb08377 100644 --- a/lib/codecs/src/encoding/format/syslog.rs +++ b/lib/codecs/src/encoding/format/syslog.rs @@ -15,6 +15,7 @@ use akin::akin; // Custom deserialization with serde needed: use serde::{Deserialize, Deserializer, de::Error}; use serde_aux::field_attributes::deserialize_number_from_string; +use snafu::{Snafu, OptionExt}; /// Config used to build a `SyslogSerializer`. #[configurable_component] @@ -549,10 +550,10 @@ akin! { NumberOrString::String(s) => Self::from_str(&s.to_ascii_lowercase()).ok(), }; - variant.ok_or_else(|| format!( - "Unknown variant `{value}`, expected one of `{variants}`", - variants=Self::VARIANTS.join("`, `") - )).map_err(D::Error::custom) + variant.with_context(|| InvalidVariantSnafu { + input: value.to_string(), + variants: Self::VARIANTS.join("`, `"), + }).map_err(D::Error::custom) } } } @@ -568,3 +569,9 @@ enum NumberOrString { ), String(String) } + +#[derive(Debug, Snafu)] +enum StrumDeserializeError { + #[snafu(display("Unknown variant `{}`, expected one of `{}`", input, variants))] + InvalidVariant { input: String, variants: String }, +}