diff --git a/changelog.d/23815_memory_enrichment_expired_output.feature.md b/changelog.d/23815_memory_enrichment_expired_output.feature.md new file mode 100644 index 0000000000000..6bd44c05abceb --- /dev/null +++ b/changelog.d/23815_memory_enrichment_expired_output.feature.md @@ -0,0 +1,3 @@ +Added `expired` output to the memory enrichment table source, to export items as they expire in the cache. + +authors: esensar Quad9DNS diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index bc8a71f54f1d1..83c06260c0d5e 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -10,7 +10,7 @@ pub mod service; use std::ops::{Add, AddAssign}; -pub use bytes_received::BytesReceived; +pub use bytes_received::{BytesReceived, BytesReceivedHandle}; pub use bytes_sent::BytesSent; #[allow(clippy::module_name_repetitions)] pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache}; diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 265792888b5be..9b17ce29750c3 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -14,12 +14,11 @@ use vector_lib::{ }; use vrl::{path::OwnedTargetPath, value::Kind}; -use super::{internal_events::InternalMetricsConfig, source::MemorySourceConfig}; +use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE}; use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, }, - enrichment_tables::memory::Memory, sinks::Healthcheck, sources::Source, }; @@ -74,6 +73,35 @@ pub struct MemoryConfig { memory: Arc>>>, } +/// Configuration for memory enrichment table source functionality. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct MemorySourceConfig { + /// Interval for exporting all data from the table when used as a source. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_interval: Option, + /// Batch size for data exporting. Used to prevent exporting entire table at + /// once and blocking the system. + /// + /// By default, batches are not used and entire table is exported. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_batch_size: Option, + /// If set to true, all data will be removed from cache after exporting. + /// Only valid if used as a source and export_interval > 0 + /// + /// By default, export will not remove data from cache + #[serde(default = "crate::serde::default_false")] + pub remove_after_export: bool, + /// Set to true to export expired items via the `expired` output port. + /// Expired items ignore other settings and are exported as they are flushed from the table. + #[serde(default = "crate::serde::default_false")] + pub export_expired_items: bool, + /// Key to use for this component when used as a source. This must be different from the + /// component key. + pub source_key: String, +} + impl PartialEq for MemoryConfig { fn eq(&self, other: &Self) -> bool { self.ttl == other.ttl @@ -187,10 +215,23 @@ impl SourceConfig for MemoryConfig { } .with_standard_vector_source_metadata(); - vec![SourceOutput::new_maybe_logs( - DataType::Log, - schema_definition, - )] + if self + .source_config + .as_ref() + .map(|c| c.export_expired_items) + .unwrap_or_default() + { + vec![ + SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()), + SourceOutput::new_maybe_logs(DataType::Log, schema_definition) + .with_port(EXPIRED_ROUTE), + ] + } else { + vec![SourceOutput::new_maybe_logs( + DataType::Log, + schema_definition, + )] + } } fn can_acknowledge(&self) -> bool { diff --git a/src/enrichment_tables/memory/source.rs b/src/enrichment_tables/memory/source.rs index c31bee239154b..74a5487efc3e5 100644 --- a/src/enrichment_tables/memory/source.rs +++ b/src/enrichment_tables/memory/source.rs @@ -1,7 +1,4 @@ -use std::{ - num::NonZeroU64, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use chrono::Utc; use futures::StreamExt; @@ -10,40 +7,22 @@ use tokio_stream::wrappers::IntervalStream; use vector_lib::{ ByteSizeOf, EstimatedJsonEncodedSizeOf, config::LogNamespace, - configurable::configurable_component, event::{Event, EventMetadata, LogEvent}, internal_event::{ - ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol, + ByteSize, BytesReceived, BytesReceivedHandle, CountByteSize, EventsReceived, + EventsReceivedHandle, InternalEventHandle, Protocol, }, shutdown::ShutdownSignal, }; use super::{Memory, MemoryConfig}; -use crate::{SourceSender, internal_events::StreamClosedError}; +use crate::{ + SourceSender, + enrichment_tables::memory::{MemoryEntryPair, MemorySourceConfig}, + internal_events::StreamClosedError, +}; -/// Configuration for memory enrichment table source functionality. -#[configurable_component] -#[derive(Clone, Debug, PartialEq, Eq)] -#[serde(deny_unknown_fields)] -pub struct MemorySourceConfig { - /// Interval for exporting all data from the table when used as a source. - pub export_interval: NonZeroU64, - /// Batch size for data exporting. Used to prevent exporting entire table at - /// once and blocking the system. - /// - /// By default, batches are not used and entire table is exported. - #[serde(skip_serializing_if = "vector_lib::serde::is_default")] - pub export_batch_size: Option, - /// If set to true, all data will be removed from cache after exporting. - /// Only valid if used as a source and export_interval > 0 - /// - /// By default, export will not remove data from cache - #[serde(default = "crate::serde::default_false")] - pub remove_after_export: bool, - /// Key to use for this component when used as a source. This must be different from the - /// component key. - pub source_key: String, -} +pub(crate) const EXPIRED_ROUTE: &str = "expired"; /// A struct that represents Memory when used as a source. pub(crate) struct MemorySource { @@ -61,78 +40,145 @@ impl MemorySource { .memory .config .source_config - .as_ref() + .clone() .expect("Unexpected missing source config in memory table used as a source."); let mut interval = IntervalStream::new(interval(Duration::from_secs( - source_config.export_interval.into(), + source_config + .export_interval + .map(Into::into) + .unwrap_or(u64::MAX), ))) - .take_until(self.shutdown); + .take_until(self.shutdown.clone()); + let mut expired_receiver = self.memory.subscribe_to_expired_items(); - while interval.next().await.is_some() { - let mut sent = 0_usize; - loop { - let mut events = Vec::new(); - { - let mut writer = self.memory.write_handle.lock().unwrap(); - if let Some(reader) = self.memory.get_read_handle().read() { - let now = Instant::now(); - let utc_now = Utc::now(); - events = reader - .iter() - .skip(if source_config.remove_after_export { - 0 - } else { - sent - }) - .take(if let Some(batch_size) = source_config.export_batch_size { - batch_size as usize - } else { - usize::MAX - }) - .filter_map(|(k, v)| { - if source_config.remove_after_export { - writer.write_handle.empty(k.clone()); - } - v.get_one().map(|v| (k, v)) - }) - .filter_map(|(k, v)| { - let mut event = Event::Log(LogEvent::from_map( - v.as_object_map(now, k).ok()?, - EventMetadata::default(), - )); - let log = event.as_mut_log(); - self.log_namespace.insert_standard_vector_source_metadata( - log, - MemoryConfig::NAME, - utc_now, - ); - - Some(event) - }) - .collect::>(); - if source_config.remove_after_export { - writer.write_handle.refresh(); - } + loop { + tokio::select! { + interval_time = interval.next() => { + if interval_time.is_none() { + break; } - } - let count = events.len(); - let byte_size = events.size_of(); - let json_size = events.estimated_json_encoded_size_of(); - bytes_received.emit(ByteSize(byte_size)); - events_received.emit(CountByteSize(count, json_size)); - if self.out.send_batch(events).await.is_err() { - emit!(StreamClosedError { count }); - } + self.export_table_items(&source_config, &events_received, &bytes_received).await; + }, - sent += count; - match source_config.export_batch_size { - None => break, - Some(export_batch_size) if count < export_batch_size as usize => break, - _ => {} + Ok(expired) = expired_receiver.recv() => { + self.export_expired_entries(expired, &events_received, &bytes_received).await; } } } Ok(()) } + + async fn export_table_items( + &mut self, + source_config: &MemorySourceConfig, + events_received: &EventsReceivedHandle, + bytes_received: &BytesReceivedHandle, + ) { + let mut sent = 0_usize; + loop { + let mut events = Vec::new(); + { + let mut writer = self.memory.write_handle.lock().unwrap(); + if let Some(reader) = self.memory.get_read_handle().read() { + let now = Instant::now(); + let utc_now = Utc::now(); + events = reader + .iter() + .skip(if source_config.remove_after_export { + 0 + } else { + sent + }) + .take(if let Some(batch_size) = source_config.export_batch_size { + batch_size as usize + } else { + usize::MAX + }) + .filter_map(|(k, v)| { + if source_config.remove_after_export { + writer.write_handle.empty(k.clone()); + } + v.get_one().map(|v| (k, v)) + }) + .filter_map(|(k, v)| { + let mut event = Event::Log(LogEvent::from_map( + v.as_object_map(now, k).ok()?, + EventMetadata::default(), + )); + let log = event.as_mut_log(); + self.log_namespace.insert_standard_vector_source_metadata( + log, + MemoryConfig::NAME, + utc_now, + ); + + Some(event) + }) + .collect::>(); + if source_config.remove_after_export { + writer.write_handle.refresh(); + } + } + } + let count = events.len(); + let byte_size = events.size_of(); + let json_size = events.estimated_json_encoded_size_of(); + bytes_received.emit(ByteSize(byte_size)); + events_received.emit(CountByteSize(count, json_size)); + if self.out.send_batch(events).await.is_err() { + emit!(StreamClosedError { count }); + } + + sent += count; + match source_config.export_batch_size { + None => break, + Some(export_batch_size) if count < export_batch_size as usize => break, + _ => {} + } + } + } + + async fn export_expired_entries( + &mut self, + entries: Vec, + events_received: &EventsReceivedHandle, + bytes_received: &BytesReceivedHandle, + ) { + let now = Instant::now(); + let events = entries + .into_iter() + .filter_map( + |MemoryEntryPair { + key, + entry: expired_event, + }| { + let mut event = Event::Log(LogEvent::from_map( + expired_event.as_object_map(now, &key).ok()?, + EventMetadata::default(), + )); + let log = event.as_mut_log(); + self.log_namespace.insert_standard_vector_source_metadata( + log, + MemoryConfig::NAME, + Utc::now(), + ); + Some(event) + }, + ) + .collect::>(); + let count = events.len(); + let byte_size = events.size_of(); + let json_size = events.estimated_json_encoded_size_of(); + bytes_received.emit(ByteSize(byte_size)); + events_received.emit(CountByteSize(count, json_size)); + if self + .out + .send_batch_named(EXPIRED_ROUTE, events) + .await + .is_err() + { + emit!(StreamClosedError { count }); + } + } } diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 720babdc52366..10df50b43d842 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -14,7 +14,10 @@ use evmap::{ use evmap_derive::ShallowCopy; use futures::{StreamExt, stream::BoxStream}; use thread_local::ThreadLocal; -use tokio::time::interval; +use tokio::{ + sync::broadcast::{Receiver, Sender}, + time::interval, +}; use tokio_stream::wrappers::IntervalStream; use vector_lib::{ ByteSizeOf, EstimatedJsonEncodedSizeOf, @@ -88,6 +91,15 @@ struct MemoryMetadata { byte_size: u64, } +/// [`MemoryEntry`] combined with its key +#[derive(Clone)] +pub(super) struct MemoryEntryPair { + /// Key of this entry + pub(super) key: String, + /// The value of this entry + pub(super) entry: MemoryEntry, +} + // Used to ensure that these 2 are locked together pub(super) struct MemoryWriter { pub(super) write_handle: evmap::WriteHandle, @@ -96,16 +108,23 @@ pub(super) struct MemoryWriter { /// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure. pub struct Memory { - pub(super) read_handle_factory: evmap::ReadHandleFactory, - pub(super) read_handle: ThreadLocal>, + read_handle_factory: evmap::ReadHandleFactory, + read_handle: ThreadLocal>, pub(super) write_handle: Arc>, pub(super) config: MemoryConfig, + #[allow(dead_code)] + expired_items_receiver: Receiver>, + expired_items_sender: Sender>, } impl Memory { /// Creates a new [Memory] based on the provided config. pub fn new(config: MemoryConfig) -> Self { let (read_handle, write_handle) = evmap::new(); + // Buffer could only be used if source is stuck exporting available items, but in that case, + // publishing will not happen either, because the lock would be held, so this buffer is not + // that important + let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5); Self { config, read_handle_factory: read_handle.factory(), @@ -114,6 +133,8 @@ impl Memory { write_handle, metadata: MemoryMetadata::default(), })), + expired_items_sender: expired_tx, + expired_items_receiver: expired_rx, } } @@ -122,6 +143,10 @@ impl Memory { .get_or(|| self.read_handle_factory.handle()) } + pub(super) fn subscribe_to_expired_items(&self) -> Receiver> { + self.expired_items_sender.subscribe() + } + fn handle_value(&self, value: ObjectMap) { let mut writer = self.write_handle.lock().expect("mutex poisoned"); let now = Instant::now(); @@ -214,6 +239,39 @@ impl Memory { } fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) { + // First publish items to be removed, if needed + if self + .config + .source_config + .as_ref() + .map(|c| c.export_expired_items) + .unwrap_or_default() + { + let pending_removal = writer + .write_handle + .pending() + .iter() + // We only use empty operation to remove keys + .filter_map(|o| match o { + evmap::Operation::Empty(k) => Some(k), + _ => None, + }) + .filter_map(|key| { + writer.write_handle.get_one(key).map(|v| MemoryEntryPair { + key: key.to_string(), + entry: v.clone(), + }) + }) + .collect::>(); + if let Err(error) = self.expired_items_sender.send(pending_removal) { + error!( + message = "Error exporting expired items from memory enrichment table.", + error = %error, + internal_log_rate_limit = true, + ); + } + } + writer.write_handle.refresh(); if let Some(reader) = self.get_read_handle().read() { let mut byte_size = 0; @@ -250,6 +308,8 @@ impl Clone for Memory { read_handle: ThreadLocal::new(), write_handle: Arc::clone(&self.write_handle), config: self.config.clone(), + expired_items_sender: self.expired_items_sender.clone(), + expired_items_receiver: self.expired_items_sender.subscribe(), } } } @@ -403,7 +463,7 @@ mod tests { use super::*; use crate::{ enrichment_tables::memory::{ - internal_events::InternalMetricsConfig, source::MemorySourceConfig, + config::MemorySourceConfig, internal_events::InternalMetricsConfig, }, event::{Event, LogEvent}, test_util::components::{ @@ -954,9 +1014,10 @@ mod tests { async fn source_spec_compliance() { let mut memory_config = MemoryConfig::default(); memory_config.source_config = Some(MemorySourceConfig { - export_interval: NonZeroU64::try_from(1).unwrap(), + export_interval: Some(NonZeroU64::try_from(1).unwrap()), export_batch_size: None, remove_after_export: false, + export_expired_items: false, source_key: "test".to_string(), }); let memory = memory_config.get_or_build_memory().await; diff --git a/website/cue/reference/configuration.cue b/website/cue/reference/configuration.cue index 2170e9837afee..2321cea179a15 100644 --- a/website/cue/reference/configuration.cue +++ b/website/cue/reference/configuration.cue @@ -1,8 +1,12 @@ package metadata configuration: { - configuration: #Schema - how_it_works: #HowItWorks + configuration: #Schema | { + enrichment_tables: #SchemaField | { + outputs: [components.#Output, ...components.#Output] + } + } + how_it_works: #HowItWorks } configuration: { @@ -44,5 +48,22 @@ configuration: { } } } + + enrichment_tables: { + outputs: [ + { + name: components._default_output.name + description: """ + Default output stream. Only applies to memory enrichment table. Only active if `source_config.export_interval` is defined. Use `` as an input to downstream transforms and sinks. + """ + }, + { + name: "expired" + description: """ + Output stream of expired items. Only applies to memory enrichment table. Only active if `source_config.export_expired_items` is enabled. Use `.expired` as an input to downstream transforms and sinks. + """ + }, + ] + } } } diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 849f1a66abc6b..01d10cc5aee6d 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -169,10 +169,18 @@ generated: configuration: configuration: { """ required: false } + export_expired_items: { + type: bool: default: false + description: """ + Set to true to export expired items via the `expired` output port. + Expired items ignore other settings and are exported as they are flushed from the table. + """ + required: false + } export_interval: { type: uint: {} description: "Interval for exporting all data from the table when used as a source." - required: true + required: false } remove_after_export: { type: bool: default: false