From 69644bbf092a3fa40033cee709ef2a689931b4a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 19 Sep 2025 15:58:32 +0200 Subject: [PATCH 1/9] feat(enrichment tables): add expired items output to memory enrichment table Adds a new output to memory enrichment table source that exports items as they are expired out of this cache. Related: #23784 --- src/enrichment_tables/memory/config.rs | 28 +++- src/enrichment_tables/memory/source.rs | 157 +++++++++++------- src/enrichment_tables/memory/table.rs | 54 +++++- .../cue/reference/generated/configuration.cue | 10 +- 4 files changed, 181 insertions(+), 68 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 265792888b5be..b1c51a31049d7 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -14,12 +14,15 @@ use vector_lib::{ }; use vrl::{path::OwnedTargetPath, value::Kind}; -use super::{internal_events::InternalMetricsConfig, source::MemorySourceConfig}; +use super::{ + Memory, + internal_events::InternalMetricsConfig, + source::{EXPIRED_ROUTE, MemorySourceConfig}, +}; use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, }, - enrichment_tables::memory::Memory, sinks::Healthcheck, sources::Source, }; @@ -187,10 +190,23 @@ impl SourceConfig for MemoryConfig { } .with_standard_vector_source_metadata(); - vec![SourceOutput::new_maybe_logs( - DataType::Log, - schema_definition, - )] + if self + .source_config + .as_ref() + .map(|c| c.export_expired_items) + .unwrap_or_default() + { + vec![ + SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()), + SourceOutput::new_maybe_logs(DataType::Log, schema_definition) + .with_port(EXPIRED_ROUTE), + ] + } else { + vec![SourceOutput::new_maybe_logs( + DataType::Log, + schema_definition, + )] + } } fn can_acknowledge(&self) -> bool { diff --git a/src/enrichment_tables/memory/source.rs b/src/enrichment_tables/memory/source.rs index c31bee239154b..8515ef0204e98 100644 --- a/src/enrichment_tables/memory/source.rs +++ b/src/enrichment_tables/memory/source.rs @@ -21,13 +21,16 @@ use vector_lib::{ use super::{Memory, MemoryConfig}; use crate::{SourceSender, internal_events::StreamClosedError}; +pub(crate) const EXPIRED_ROUTE: &str = "expired"; + /// Configuration for memory enrichment table source functionality. #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] #[serde(deny_unknown_fields)] pub struct MemorySourceConfig { /// Interval for exporting all data from the table when used as a source. - pub export_interval: NonZeroU64, + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_interval: Option, /// Batch size for data exporting. Used to prevent exporting entire table at /// once and blocking the system. /// @@ -40,6 +43,10 @@ pub struct MemorySourceConfig { /// By default, export will not remove data from cache #[serde(default = "crate::serde::default_false")] pub remove_after_export: bool, + /// Set to true to add a source output for expired items (named 'expired'). + /// Expired items ignore other settings and are exported as they are flushed from the table. + #[serde(default = "crate::serde::default_false")] + pub export_expired_items: bool, /// Key to use for this component when used as a source. This must be different from the /// component key. pub source_key: String, @@ -64,71 +71,107 @@ impl MemorySource { .as_ref() .expect("Unexpected missing source config in memory table used as a source."); let mut interval = IntervalStream::new(interval(Duration::from_secs( - source_config.export_interval.into(), + source_config + .export_interval + .map(Into::into) + .unwrap_or(u64::MAX), ))) .take_until(self.shutdown); + let mut expired_receiver = self.memory.subscribe_to_expired_items(); + + loop { + tokio::select! { + interal_time = interval.next() => { + if interal_time.is_none() { + break; + } + let mut sent = 0_usize; + loop { + let mut events = Vec::new(); + { + let mut writer = self.memory.write_handle.lock().unwrap(); + if let Some(reader) = self.memory.get_read_handle().read() { + let now = Instant::now(); + let utc_now = Utc::now(); + events = reader + .iter() + .skip(if source_config.remove_after_export { + 0 + } else { + sent + }) + .take(if let Some(batch_size) = source_config.export_batch_size { + batch_size as usize + } else { + usize::MAX + }) + .filter_map(|(k, v)| { + if source_config.remove_after_export { + writer.write_handle.empty(k.clone()); + } + v.get_one().map(|v| (k, v)) + }) + .filter_map(|(k, v)| { + let mut event = Event::Log(LogEvent::from_map( + v.as_object_map(now, k).ok()?, + EventMetadata::default(), + )); + let log = event.as_mut_log(); + self.log_namespace.insert_standard_vector_source_metadata( + log, + MemoryConfig::NAME, + utc_now, + ); - while interval.next().await.is_some() { - let mut sent = 0_usize; - loop { - let mut events = Vec::new(); - { - let mut writer = self.memory.write_handle.lock().unwrap(); - if let Some(reader) = self.memory.get_read_handle().read() { - let now = Instant::now(); - let utc_now = Utc::now(); - events = reader - .iter() - .skip(if source_config.remove_after_export { - 0 - } else { - sent - }) - .take(if let Some(batch_size) = source_config.export_batch_size { - batch_size as usize - } else { - usize::MAX - }) - .filter_map(|(k, v)| { + Some(event) + }) + .collect::>(); if source_config.remove_after_export { - writer.write_handle.empty(k.clone()); + writer.write_handle.refresh(); } - v.get_one().map(|v| (k, v)) - }) - .filter_map(|(k, v)| { - let mut event = Event::Log(LogEvent::from_map( - v.as_object_map(now, k).ok()?, - EventMetadata::default(), - )); - let log = event.as_mut_log(); - self.log_namespace.insert_standard_vector_source_metadata( - log, - MemoryConfig::NAME, - utc_now, - ); + } + } + let count = events.len(); + let byte_size = events.size_of(); + let json_size = events.estimated_json_encoded_size_of(); + bytes_received.emit(ByteSize(byte_size)); + events_received.emit(CountByteSize(count, json_size)); + if self.out.send_batch(events).await.is_err() { + emit!(StreamClosedError { count }); + } - Some(event) - }) - .collect::>(); - if source_config.remove_after_export { - writer.write_handle.refresh(); + sent += count; + match source_config.export_batch_size { + None => break, + Some(export_batch_size) if count < export_batch_size as usize => break, + _ => {} } } - } - let count = events.len(); - let byte_size = events.size_of(); - let json_size = events.estimated_json_encoded_size_of(); - bytes_received.emit(ByteSize(byte_size)); - events_received.emit(CountByteSize(count, json_size)); - if self.out.send_batch(events).await.is_err() { - emit!(StreamClosedError { count }); - } + }, - sent += count; - match source_config.export_batch_size { - None => break, - Some(export_batch_size) if count < export_batch_size as usize => break, - _ => {} + Ok(expired) = expired_receiver.recv() => { + let now = Instant::now(); + let events = expired.into_iter().filter_map(|(key, expired_event)| { + let mut event = Event::Log(LogEvent::from_map( + expired_event.as_object_map(now, &key).ok()?, + EventMetadata::default(), + )); + let log = event.as_mut_log(); + self.log_namespace.insert_standard_vector_source_metadata( + log, + MemoryConfig::NAME, + Utc::now(), + ); + Some(event) + }).collect::>(); + let count = events.len(); + let byte_size = events.size_of(); + let json_size = events.estimated_json_encoded_size_of(); + bytes_received.emit(ByteSize(byte_size)); + events_received.emit(CountByteSize(count, json_size)); + if self.out.send_batch_named(EXPIRED_ROUTE, events).await.is_err() { + emit!(StreamClosedError { count }); + } } } } diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 720babdc52366..fd153fe253382 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -14,7 +14,10 @@ use evmap::{ use evmap_derive::ShallowCopy; use futures::{StreamExt, stream::BoxStream}; use thread_local::ThreadLocal; -use tokio::time::interval; +use tokio::{ + sync::broadcast::{Receiver, Sender}, + time::interval, +}; use tokio_stream::wrappers::IntervalStream; use vector_lib::{ ByteSizeOf, EstimatedJsonEncodedSizeOf, @@ -96,16 +99,23 @@ pub(super) struct MemoryWriter { /// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure. pub struct Memory { - pub(super) read_handle_factory: evmap::ReadHandleFactory, - pub(super) read_handle: ThreadLocal>, + read_handle_factory: evmap::ReadHandleFactory, + read_handle: ThreadLocal>, pub(super) write_handle: Arc>, pub(super) config: MemoryConfig, + #[allow(dead_code)] + expired_items_receiver: Receiver>, + expired_items_sender: Sender>, } impl Memory { /// Creates a new [Memory] based on the provided config. pub fn new(config: MemoryConfig) -> Self { let (read_handle, write_handle) = evmap::new(); + // Buffer could only be used if source is stuck exporting available items, but in that case, + // publishing will not happen either, because the lock would be held, so this buffer is not + // that important + let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5); Self { config, read_handle_factory: read_handle.factory(), @@ -114,6 +124,8 @@ impl Memory { write_handle, metadata: MemoryMetadata::default(), })), + expired_items_sender: expired_tx, + expired_items_receiver: expired_rx, } } @@ -122,6 +134,10 @@ impl Memory { .get_or(|| self.read_handle_factory.handle()) } + pub(super) fn subscribe_to_expired_items(&self) -> Receiver> { + self.expired_items_sender.subscribe() + } + fn handle_value(&self, value: ObjectMap) { let mut writer = self.write_handle.lock().expect("mutex poisoned"); let now = Instant::now(); @@ -214,6 +230,33 @@ impl Memory { } fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) { + // First publish items to be removed, if needed + if self + .config + .source_config + .as_ref() + .map(|c| c.export_expired_items) + .unwrap_or_default() + { + let pending_removal = writer + .write_handle + .pending() + .iter() + // We only use empty operation to remove keys + .filter_map(|o| match o { + evmap::Operation::Empty(k) => Some(k), + _ => None, + }) + .filter_map(|key| { + writer + .write_handle + .get_one(key) + .map(|v| (key.to_string(), v.clone())) + }) + .collect::>(); + let _ = self.expired_items_sender.send(pending_removal); + } + writer.write_handle.refresh(); if let Some(reader) = self.get_read_handle().read() { let mut byte_size = 0; @@ -250,6 +293,8 @@ impl Clone for Memory { read_handle: ThreadLocal::new(), write_handle: Arc::clone(&self.write_handle), config: self.config.clone(), + expired_items_sender: self.expired_items_sender.clone(), + expired_items_receiver: self.expired_items_sender.subscribe(), } } } @@ -954,9 +999,10 @@ mod tests { async fn source_spec_compliance() { let mut memory_config = MemoryConfig::default(); memory_config.source_config = Some(MemorySourceConfig { - export_interval: NonZeroU64::try_from(1).unwrap(), + export_interval: Some(NonZeroU64::try_from(1).unwrap()), export_batch_size: None, remove_after_export: false, + export_expired_items: false, source_key: "test".to_string(), }); let memory = memory_config.get_or_build_memory().await; diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 849f1a66abc6b..588f55b977dc5 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -169,10 +169,18 @@ generated: configuration: configuration: { """ required: false } + export_expired_items: { + type: bool: default: false + description: """ + Set to true to add a source output for expired items (named 'expired'). + Expired items ignore other settings and are exported as they are flushed from the table. + """ + required: false + } export_interval: { type: uint: {} description: "Interval for exporting all data from the table when used as a source." - required: true + required: false } remove_after_export: { type: bool: default: false From 78439e89c8eb69adbfc78f17c307bda0a5aa3a5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 19 Sep 2025 16:05:32 +0200 Subject: [PATCH 2/9] Add changelog entry --- changelog.d/23815_memory_enrichment_expired_output.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/23815_memory_enrichment_expired_output.feature.md diff --git a/changelog.d/23815_memory_enrichment_expired_output.feature.md b/changelog.d/23815_memory_enrichment_expired_output.feature.md new file mode 100644 index 0000000000000..6bd44c05abceb --- /dev/null +++ b/changelog.d/23815_memory_enrichment_expired_output.feature.md @@ -0,0 +1,3 @@ +Added `expired` output to the memory enrichment table source, to export items as they expire in the cache. + +authors: esensar Quad9DNS From 69bed7ade8aed8c0f0094f9e4df5d70ecf05a6df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 24 Sep 2025 10:16:05 +0200 Subject: [PATCH 3/9] Move `MemorySourceConfig` to `config.rs` --- src/enrichment_tables/memory/config.rs | 35 ++++++++++++++++++++++---- src/enrichment_tables/memory/source.rs | 29 --------------------- src/enrichment_tables/memory/table.rs | 2 +- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index b1c51a31049d7..56b1ee076a0a6 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -14,11 +14,7 @@ use vector_lib::{ }; use vrl::{path::OwnedTargetPath, value::Kind}; -use super::{ - Memory, - internal_events::InternalMetricsConfig, - source::{EXPIRED_ROUTE, MemorySourceConfig}, -}; +use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE}; use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, @@ -77,6 +73,35 @@ pub struct MemoryConfig { memory: Arc>>>, } +/// Configuration for memory enrichment table source functionality. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct MemorySourceConfig { + /// Interval for exporting all data from the table when used as a source. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_interval: Option, + /// Batch size for data exporting. Used to prevent exporting entire table at + /// once and blocking the system. + /// + /// By default, batches are not used and entire table is exported. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_batch_size: Option, + /// If set to true, all data will be removed from cache after exporting. + /// Only valid if used as a source and export_interval > 0 + /// + /// By default, export will not remove data from cache + #[serde(default = "crate::serde::default_false")] + pub remove_after_export: bool, + /// Set to true to add a source output for expired items (named 'expired'). + /// Expired items ignore other settings and are exported as they are flushed from the table. + #[serde(default = "crate::serde::default_false")] + pub export_expired_items: bool, + /// Key to use for this component when used as a source. This must be different from the + /// component key. + pub source_key: String, +} + impl PartialEq for MemoryConfig { fn eq(&self, other: &Self) -> bool { self.ttl == other.ttl diff --git a/src/enrichment_tables/memory/source.rs b/src/enrichment_tables/memory/source.rs index 8515ef0204e98..fcf600f4718ab 100644 --- a/src/enrichment_tables/memory/source.rs +++ b/src/enrichment_tables/memory/source.rs @@ -23,35 +23,6 @@ use crate::{SourceSender, internal_events::StreamClosedError}; pub(crate) const EXPIRED_ROUTE: &str = "expired"; -/// Configuration for memory enrichment table source functionality. -#[configurable_component] -#[derive(Clone, Debug, PartialEq, Eq)] -#[serde(deny_unknown_fields)] -pub struct MemorySourceConfig { - /// Interval for exporting all data from the table when used as a source. - #[serde(skip_serializing_if = "vector_lib::serde::is_default")] - pub export_interval: Option, - /// Batch size for data exporting. Used to prevent exporting entire table at - /// once and blocking the system. - /// - /// By default, batches are not used and entire table is exported. - #[serde(skip_serializing_if = "vector_lib::serde::is_default")] - pub export_batch_size: Option, - /// If set to true, all data will be removed from cache after exporting. - /// Only valid if used as a source and export_interval > 0 - /// - /// By default, export will not remove data from cache - #[serde(default = "crate::serde::default_false")] - pub remove_after_export: bool, - /// Set to true to add a source output for expired items (named 'expired'). - /// Expired items ignore other settings and are exported as they are flushed from the table. - #[serde(default = "crate::serde::default_false")] - pub export_expired_items: bool, - /// Key to use for this component when used as a source. This must be different from the - /// component key. - pub source_key: String, -} - /// A struct that represents Memory when used as a source. pub(crate) struct MemorySource { pub(super) memory: Memory, diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index fd153fe253382..ca46a78cfa71d 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -448,7 +448,7 @@ mod tests { use super::*; use crate::{ enrichment_tables::memory::{ - internal_events::InternalMetricsConfig, source::MemorySourceConfig, + config::MemorySourceConfig, internal_events::InternalMetricsConfig, }, event::{Event, LogEvent}, test_util::components::{ From 58896ee53b1f63a26be58f0116cecdaa9236ac92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 24 Sep 2025 10:35:22 +0200 Subject: [PATCH 4/9] Add `MemoryEntryPair` struct --- src/enrichment_tables/memory/source.rs | 12 +++++------- src/enrichment_tables/memory/table.rs | 23 ++++++++++++++++------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/enrichment_tables/memory/source.rs b/src/enrichment_tables/memory/source.rs index fcf600f4718ab..7d9c6c3a2585d 100644 --- a/src/enrichment_tables/memory/source.rs +++ b/src/enrichment_tables/memory/source.rs @@ -1,7 +1,4 @@ -use std::{ - num::NonZeroU64, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use chrono::Utc; use futures::StreamExt; @@ -10,7 +7,6 @@ use tokio_stream::wrappers::IntervalStream; use vector_lib::{ ByteSizeOf, EstimatedJsonEncodedSizeOf, config::LogNamespace, - configurable::configurable_component, event::{Event, EventMetadata, LogEvent}, internal_event::{ ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol, @@ -19,7 +15,9 @@ use vector_lib::{ }; use super::{Memory, MemoryConfig}; -use crate::{SourceSender, internal_events::StreamClosedError}; +use crate::{ + SourceSender, enrichment_tables::memory::MemoryEntryPair, internal_events::StreamClosedError, +}; pub(crate) const EXPIRED_ROUTE: &str = "expired"; @@ -122,7 +120,7 @@ impl MemorySource { Ok(expired) = expired_receiver.recv() => { let now = Instant::now(); - let events = expired.into_iter().filter_map(|(key, expired_event)| { + let events = expired.into_iter().filter_map(|MemoryEntryPair { key, entry: expired_event }| { let mut event = Event::Log(LogEvent::from_map( expired_event.as_object_map(now, &key).ok()?, EventMetadata::default(), diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index ca46a78cfa71d..ed9af7a4aca4f 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -91,6 +91,15 @@ struct MemoryMetadata { byte_size: u64, } +/// [`MemoryEntry`] combined with its key +#[derive(Clone)] +pub(super) struct MemoryEntryPair { + /// Key of this entry + pub(super) key: String, + /// The value of this entry + pub(super) entry: MemoryEntry, +} + // Used to ensure that these 2 are locked together pub(super) struct MemoryWriter { pub(super) write_handle: evmap::WriteHandle, @@ -104,8 +113,8 @@ pub struct Memory { pub(super) write_handle: Arc>, pub(super) config: MemoryConfig, #[allow(dead_code)] - expired_items_receiver: Receiver>, - expired_items_sender: Sender>, + expired_items_receiver: Receiver>, + expired_items_sender: Sender>, } impl Memory { @@ -134,7 +143,7 @@ impl Memory { .get_or(|| self.read_handle_factory.handle()) } - pub(super) fn subscribe_to_expired_items(&self) -> Receiver> { + pub(super) fn subscribe_to_expired_items(&self) -> Receiver> { self.expired_items_sender.subscribe() } @@ -248,10 +257,10 @@ impl Memory { _ => None, }) .filter_map(|key| { - writer - .write_handle - .get_one(key) - .map(|v| (key.to_string(), v.clone())) + writer.write_handle.get_one(key).map(|v| MemoryEntryPair { + key: key.to_string(), + entry: v.clone(), + }) }) .collect::>(); let _ = self.expired_items_sender.send(pending_removal); From e948ab102170ea8a050e104a42bc6d9ea929e3a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 24 Sep 2025 10:53:57 +0200 Subject: [PATCH 5/9] Extract out select branches into separate functions --- lib/vector-common/src/internal_event/mod.rs | 2 +- src/enrichment_tables/memory/source.rs | 210 ++++++++++++-------- 2 files changed, 123 insertions(+), 89 deletions(-) diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index bc8a71f54f1d1..83c06260c0d5e 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -10,7 +10,7 @@ pub mod service; use std::ops::{Add, AddAssign}; -pub use bytes_received::BytesReceived; +pub use bytes_received::{BytesReceived, BytesReceivedHandle}; pub use bytes_sent::BytesSent; #[allow(clippy::module_name_repetitions)] pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache}; diff --git a/src/enrichment_tables/memory/source.rs b/src/enrichment_tables/memory/source.rs index 7d9c6c3a2585d..74a5487efc3e5 100644 --- a/src/enrichment_tables/memory/source.rs +++ b/src/enrichment_tables/memory/source.rs @@ -9,14 +9,17 @@ use vector_lib::{ config::LogNamespace, event::{Event, EventMetadata, LogEvent}, internal_event::{ - ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol, + ByteSize, BytesReceived, BytesReceivedHandle, CountByteSize, EventsReceived, + EventsReceivedHandle, InternalEventHandle, Protocol, }, shutdown::ShutdownSignal, }; use super::{Memory, MemoryConfig}; use crate::{ - SourceSender, enrichment_tables::memory::MemoryEntryPair, internal_events::StreamClosedError, + SourceSender, + enrichment_tables::memory::{MemoryEntryPair, MemorySourceConfig}, + internal_events::StreamClosedError, }; pub(crate) const EXPIRED_ROUTE: &str = "expired"; @@ -37,7 +40,7 @@ impl MemorySource { .memory .config .source_config - .as_ref() + .clone() .expect("Unexpected missing source config in memory table used as a source."); let mut interval = IntervalStream::new(interval(Duration::from_secs( source_config @@ -45,106 +48,137 @@ impl MemorySource { .map(Into::into) .unwrap_or(u64::MAX), ))) - .take_until(self.shutdown); + .take_until(self.shutdown.clone()); let mut expired_receiver = self.memory.subscribe_to_expired_items(); loop { tokio::select! { - interal_time = interval.next() => { - if interal_time.is_none() { + interval_time = interval.next() => { + if interval_time.is_none() { break; } - let mut sent = 0_usize; - loop { - let mut events = Vec::new(); - { - let mut writer = self.memory.write_handle.lock().unwrap(); - if let Some(reader) = self.memory.get_read_handle().read() { - let now = Instant::now(); - let utc_now = Utc::now(); - events = reader - .iter() - .skip(if source_config.remove_after_export { - 0 - } else { - sent - }) - .take(if let Some(batch_size) = source_config.export_batch_size { - batch_size as usize - } else { - usize::MAX - }) - .filter_map(|(k, v)| { - if source_config.remove_after_export { - writer.write_handle.empty(k.clone()); - } - v.get_one().map(|v| (k, v)) - }) - .filter_map(|(k, v)| { - let mut event = Event::Log(LogEvent::from_map( - v.as_object_map(now, k).ok()?, - EventMetadata::default(), - )); - let log = event.as_mut_log(); - self.log_namespace.insert_standard_vector_source_metadata( - log, - MemoryConfig::NAME, - utc_now, - ); - - Some(event) - }) - .collect::>(); - if source_config.remove_after_export { - writer.write_handle.refresh(); - } - } - } - let count = events.len(); - let byte_size = events.size_of(); - let json_size = events.estimated_json_encoded_size_of(); - bytes_received.emit(ByteSize(byte_size)); - events_received.emit(CountByteSize(count, json_size)); - if self.out.send_batch(events).await.is_err() { - emit!(StreamClosedError { count }); - } - - sent += count; - match source_config.export_batch_size { - None => break, - Some(export_batch_size) if count < export_batch_size as usize => break, - _ => {} - } - } + self.export_table_items(&source_config, &events_received, &bytes_received).await; }, Ok(expired) = expired_receiver.recv() => { + self.export_expired_entries(expired, &events_received, &bytes_received).await; + } + } + } + + Ok(()) + } + + async fn export_table_items( + &mut self, + source_config: &MemorySourceConfig, + events_received: &EventsReceivedHandle, + bytes_received: &BytesReceivedHandle, + ) { + let mut sent = 0_usize; + loop { + let mut events = Vec::new(); + { + let mut writer = self.memory.write_handle.lock().unwrap(); + if let Some(reader) = self.memory.get_read_handle().read() { let now = Instant::now(); - let events = expired.into_iter().filter_map(|MemoryEntryPair { key, entry: expired_event }| { - let mut event = Event::Log(LogEvent::from_map( - expired_event.as_object_map(now, &key).ok()?, + let utc_now = Utc::now(); + events = reader + .iter() + .skip(if source_config.remove_after_export { + 0 + } else { + sent + }) + .take(if let Some(batch_size) = source_config.export_batch_size { + batch_size as usize + } else { + usize::MAX + }) + .filter_map(|(k, v)| { + if source_config.remove_after_export { + writer.write_handle.empty(k.clone()); + } + v.get_one().map(|v| (k, v)) + }) + .filter_map(|(k, v)| { + let mut event = Event::Log(LogEvent::from_map( + v.as_object_map(now, k).ok()?, EventMetadata::default(), - )); - let log = event.as_mut_log(); - self.log_namespace.insert_standard_vector_source_metadata( - log, - MemoryConfig::NAME, - Utc::now(), - ); - Some(event) - }).collect::>(); - let count = events.len(); - let byte_size = events.size_of(); - let json_size = events.estimated_json_encoded_size_of(); - bytes_received.emit(ByteSize(byte_size)); - events_received.emit(CountByteSize(count, json_size)); - if self.out.send_batch_named(EXPIRED_ROUTE, events).await.is_err() { - emit!(StreamClosedError { count }); + )); + let log = event.as_mut_log(); + self.log_namespace.insert_standard_vector_source_metadata( + log, + MemoryConfig::NAME, + utc_now, + ); + + Some(event) + }) + .collect::>(); + if source_config.remove_after_export { + writer.write_handle.refresh(); } } } + let count = events.len(); + let byte_size = events.size_of(); + let json_size = events.estimated_json_encoded_size_of(); + bytes_received.emit(ByteSize(byte_size)); + events_received.emit(CountByteSize(count, json_size)); + if self.out.send_batch(events).await.is_err() { + emit!(StreamClosedError { count }); + } + + sent += count; + match source_config.export_batch_size { + None => break, + Some(export_batch_size) if count < export_batch_size as usize => break, + _ => {} + } } + } - Ok(()) + async fn export_expired_entries( + &mut self, + entries: Vec, + events_received: &EventsReceivedHandle, + bytes_received: &BytesReceivedHandle, + ) { + let now = Instant::now(); + let events = entries + .into_iter() + .filter_map( + |MemoryEntryPair { + key, + entry: expired_event, + }| { + let mut event = Event::Log(LogEvent::from_map( + expired_event.as_object_map(now, &key).ok()?, + EventMetadata::default(), + )); + let log = event.as_mut_log(); + self.log_namespace.insert_standard_vector_source_metadata( + log, + MemoryConfig::NAME, + Utc::now(), + ); + Some(event) + }, + ) + .collect::>(); + let count = events.len(); + let byte_size = events.size_of(); + let json_size = events.estimated_json_encoded_size_of(); + bytes_received.emit(ByteSize(byte_size)); + events_received.emit(CountByteSize(count, json_size)); + if self + .out + .send_batch_named(EXPIRED_ROUTE, events) + .await + .is_err() + { + emit!(StreamClosedError { count }); + } } } From 1614d32c827ff7f5ed5c8e7c1a089cb4e114378b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 24 Sep 2025 15:25:03 +0200 Subject: [PATCH 6/9] Add output section --- src/enrichment_tables/memory/config.rs | 2 +- website/cue/reference/configuration.cue | 25 +++++++++++++++++-- .../cue/reference/generated/configuration.cue | 2 +- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 56b1ee076a0a6..9b17ce29750c3 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -93,7 +93,7 @@ pub struct MemorySourceConfig { /// By default, export will not remove data from cache #[serde(default = "crate::serde::default_false")] pub remove_after_export: bool, - /// Set to true to add a source output for expired items (named 'expired'). + /// Set to true to export expired items via the `expired` output port. /// Expired items ignore other settings and are exported as they are flushed from the table. #[serde(default = "crate::serde::default_false")] pub export_expired_items: bool, diff --git a/website/cue/reference/configuration.cue b/website/cue/reference/configuration.cue index 2170e9837afee..2321cea179a15 100644 --- a/website/cue/reference/configuration.cue +++ b/website/cue/reference/configuration.cue @@ -1,8 +1,12 @@ package metadata configuration: { - configuration: #Schema - how_it_works: #HowItWorks + configuration: #Schema | { + enrichment_tables: #SchemaField | { + outputs: [components.#Output, ...components.#Output] + } + } + how_it_works: #HowItWorks } configuration: { @@ -44,5 +48,22 @@ configuration: { } } } + + enrichment_tables: { + outputs: [ + { + name: components._default_output.name + description: """ + Default output stream. Only applies to memory enrichment table. Only active if `source_config.export_interval` is defined. Use `` as an input to downstream transforms and sinks. + """ + }, + { + name: "expired" + description: """ + Output stream of expired items. Only applies to memory enrichment table. Only active if `source_config.export_expired_items` is enabled. Use `.expired` as an input to downstream transforms and sinks. + """ + }, + ] + } } } diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 588f55b977dc5..01d10cc5aee6d 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -172,7 +172,7 @@ generated: configuration: configuration: { export_expired_items: { type: bool: default: false description: """ - Set to true to add a source output for expired items (named 'expired'). + Set to true to export expired items via the `expired` output port. Expired items ignore other settings and are exported as they are flushed from the table. """ required: false From a69070849ec46371ad811beed323980bce4eca64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 24 Sep 2025 15:33:43 +0200 Subject: [PATCH 7/9] Log errors for failed expired items export --- src/enrichment_tables/memory/table.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index ed9af7a4aca4f..c287a273cc6a2 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -263,7 +263,12 @@ impl Memory { }) }) .collect::>(); - let _ = self.expired_items_sender.send(pending_removal); + if let Err(error) = self.expired_items_sender.send(pending_removal) { + error!( + message = "Error exporting expired items from memory enrichment table.", + %error + ); + } } writer.write_handle.refresh(); From 531f349e1cb380561ea6542e1f76c05173c545b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 25 Sep 2025 10:31:41 +0200 Subject: [PATCH 8/9] Add `internal_log_rate_limit` to expired items export error message --- src/enrichment_tables/memory/table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index c287a273cc6a2..8ee65a4850ac2 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -266,7 +266,8 @@ impl Memory { if let Err(error) = self.expired_items_sender.send(pending_removal) { error!( message = "Error exporting expired items from memory enrichment table.", - %error + error = %error + internal_log_rate_limit = true, ); } } From 27af2591a769e26e60c24b5284399e4ce5208c20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 25 Sep 2025 16:00:56 +0200 Subject: [PATCH 9/9] Add missing comma --- src/enrichment_tables/memory/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 8ee65a4850ac2..10df50b43d842 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -266,7 +266,7 @@ impl Memory { if let Err(error) = self.expired_items_sender.send(pending_removal) { error!( message = "Error exporting expired items from memory enrichment table.", - error = %error + error = %error, internal_log_rate_limit = true, ); }