Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added `expired` output to the memory enrichment table source, to export items as they expire in the cache.

authors: esensar Quad9DNS
2 changes: 1 addition & 1 deletion lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod service;

use std::ops::{Add, AddAssign};

pub use bytes_received::BytesReceived;
pub use bytes_received::{BytesReceived, BytesReceivedHandle};
pub use bytes_sent::BytesSent;
#[allow(clippy::module_name_repetitions)]
pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache};
Expand Down
53 changes: 47 additions & 6 deletions src/enrichment_tables/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ use vector_lib::{
};
use vrl::{path::OwnedTargetPath, value::Kind};

use super::{internal_events::InternalMetricsConfig, source::MemorySourceConfig};
use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE};
use crate::{
config::{
EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
},
enrichment_tables::memory::Memory,
sinks::Healthcheck,
sources::Source,
};
Expand Down Expand Up @@ -74,6 +73,35 @@ pub struct MemoryConfig {
memory: Arc<Mutex<Option<Box<Memory>>>>,
}

/// Configuration for memory enrichment table source functionality.
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct MemorySourceConfig {
/// Interval for exporting all data from the table when used as a source.
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
pub export_interval: Option<NonZeroU64>,
/// Batch size for data exporting. Used to prevent exporting entire table at
/// once and blocking the system.
///
/// By default, batches are not used and entire table is exported.
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
pub export_batch_size: Option<u64>,
/// If set to true, all data will be removed from cache after exporting.
/// Only valid if used as a source and export_interval > 0
///
/// By default, export will not remove data from cache
#[serde(default = "crate::serde::default_false")]
pub remove_after_export: bool,
/// Set to true to export expired items via the `expired` output port.
/// Expired items ignore other settings and are exported as they are flushed from the table.
#[serde(default = "crate::serde::default_false")]
pub export_expired_items: bool,
/// Key to use for this component when used as a source. This must be different from the
/// component key.
pub source_key: String,
}

impl PartialEq for MemoryConfig {
fn eq(&self, other: &Self) -> bool {
self.ttl == other.ttl
Expand Down Expand Up @@ -187,10 +215,23 @@ impl SourceConfig for MemoryConfig {
}
.with_standard_vector_source_metadata();

vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
if self
.source_config
.as_ref()
.map(|c| c.export_expired_items)
.unwrap_or_default()
{
vec![
SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()),
SourceOutput::new_maybe_logs(DataType::Log, schema_definition)
.with_port(EXPIRED_ROUTE),
]
} else {
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}
}

fn can_acknowledge(&self) -> bool {
Expand Down
230 changes: 138 additions & 92 deletions src/enrichment_tables/memory/source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
num::NonZeroU64,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};

use chrono::Utc;
use futures::StreamExt;
Expand All @@ -10,40 +7,22 @@ use tokio_stream::wrappers::IntervalStream;
use vector_lib::{
ByteSizeOf, EstimatedJsonEncodedSizeOf,
config::LogNamespace,
configurable::configurable_component,
event::{Event, EventMetadata, LogEvent},
internal_event::{
ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
ByteSize, BytesReceived, BytesReceivedHandle, CountByteSize, EventsReceived,
EventsReceivedHandle, InternalEventHandle, Protocol,
},
shutdown::ShutdownSignal,
};

use super::{Memory, MemoryConfig};
use crate::{SourceSender, internal_events::StreamClosedError};
use crate::{
SourceSender,
enrichment_tables::memory::{MemoryEntryPair, MemorySourceConfig},
internal_events::StreamClosedError,
};

/// Configuration for memory enrichment table source functionality.
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct MemorySourceConfig {
/// Interval for exporting all data from the table when used as a source.
pub export_interval: NonZeroU64,
/// Batch size for data exporting. Used to prevent exporting entire table at
/// once and blocking the system.
///
/// By default, batches are not used and entire table is exported.
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
pub export_batch_size: Option<u64>,
/// If set to true, all data will be removed from cache after exporting.
/// Only valid if used as a source and export_interval > 0
///
/// By default, export will not remove data from cache
#[serde(default = "crate::serde::default_false")]
pub remove_after_export: bool,
/// Key to use for this component when used as a source. This must be different from the
/// component key.
pub source_key: String,
}
pub(crate) const EXPIRED_ROUTE: &str = "expired";

/// A struct that represents Memory when used as a source.
pub(crate) struct MemorySource {
Expand All @@ -61,78 +40,145 @@ impl MemorySource {
.memory
.config
.source_config
.as_ref()
.clone()
.expect("Unexpected missing source config in memory table used as a source.");
let mut interval = IntervalStream::new(interval(Duration::from_secs(
source_config.export_interval.into(),
source_config
.export_interval
.map(Into::into)
.unwrap_or(u64::MAX),
)))
.take_until(self.shutdown);
.take_until(self.shutdown.clone());
let mut expired_receiver = self.memory.subscribe_to_expired_items();

while interval.next().await.is_some() {
let mut sent = 0_usize;
loop {
let mut events = Vec::new();
{
let mut writer = self.memory.write_handle.lock().unwrap();
if let Some(reader) = self.memory.get_read_handle().read() {
let now = Instant::now();
let utc_now = Utc::now();
events = reader
.iter()
.skip(if source_config.remove_after_export {
0
} else {
sent
})
.take(if let Some(batch_size) = source_config.export_batch_size {
batch_size as usize
} else {
usize::MAX
})
.filter_map(|(k, v)| {
if source_config.remove_after_export {
writer.write_handle.empty(k.clone());
}
v.get_one().map(|v| (k, v))
})
.filter_map(|(k, v)| {
let mut event = Event::Log(LogEvent::from_map(
v.as_object_map(now, k).ok()?,
EventMetadata::default(),
));
let log = event.as_mut_log();
self.log_namespace.insert_standard_vector_source_metadata(
log,
MemoryConfig::NAME,
utc_now,
);

Some(event)
})
.collect::<Vec<_>>();
if source_config.remove_after_export {
writer.write_handle.refresh();
}
loop {
tokio::select! {
interval_time = interval.next() => {
if interval_time.is_none() {
break;
}
}
let count = events.len();
let byte_size = events.size_of();
let json_size = events.estimated_json_encoded_size_of();
bytes_received.emit(ByteSize(byte_size));
events_received.emit(CountByteSize(count, json_size));
if self.out.send_batch(events).await.is_err() {
emit!(StreamClosedError { count });
}
self.export_table_items(&source_config, &events_received, &bytes_received).await;
},

sent += count;
match source_config.export_batch_size {
None => break,
Some(export_batch_size) if count < export_batch_size as usize => break,
_ => {}
Ok(expired) = expired_receiver.recv() => {
self.export_expired_entries(expired, &events_received, &bytes_received).await;
}
}
}

Ok(())
}

async fn export_table_items(
&mut self,
source_config: &MemorySourceConfig,
events_received: &EventsReceivedHandle,
bytes_received: &BytesReceivedHandle,
) {
let mut sent = 0_usize;
loop {
let mut events = Vec::new();
{
let mut writer = self.memory.write_handle.lock().unwrap();
if let Some(reader) = self.memory.get_read_handle().read() {
let now = Instant::now();
let utc_now = Utc::now();
events = reader
.iter()
.skip(if source_config.remove_after_export {
0
} else {
sent
})
.take(if let Some(batch_size) = source_config.export_batch_size {
batch_size as usize
} else {
usize::MAX
})
.filter_map(|(k, v)| {
if source_config.remove_after_export {
writer.write_handle.empty(k.clone());
}
v.get_one().map(|v| (k, v))
})
.filter_map(|(k, v)| {
let mut event = Event::Log(LogEvent::from_map(
v.as_object_map(now, k).ok()?,
EventMetadata::default(),
));
let log = event.as_mut_log();
self.log_namespace.insert_standard_vector_source_metadata(
log,
MemoryConfig::NAME,
utc_now,
);

Some(event)
})
.collect::<Vec<_>>();
if source_config.remove_after_export {
writer.write_handle.refresh();
}
}
}
let count = events.len();
let byte_size = events.size_of();
let json_size = events.estimated_json_encoded_size_of();
bytes_received.emit(ByteSize(byte_size));
events_received.emit(CountByteSize(count, json_size));
if self.out.send_batch(events).await.is_err() {
emit!(StreamClosedError { count });
}

sent += count;
match source_config.export_batch_size {
None => break,
Some(export_batch_size) if count < export_batch_size as usize => break,
_ => {}
}
}
}

async fn export_expired_entries(
&mut self,
entries: Vec<MemoryEntryPair>,
events_received: &EventsReceivedHandle,
bytes_received: &BytesReceivedHandle,
) {
let now = Instant::now();
let events = entries
.into_iter()
.filter_map(
|MemoryEntryPair {
key,
entry: expired_event,
}| {
let mut event = Event::Log(LogEvent::from_map(
expired_event.as_object_map(now, &key).ok()?,
EventMetadata::default(),
));
let log = event.as_mut_log();
self.log_namespace.insert_standard_vector_source_metadata(
log,
MemoryConfig::NAME,
Utc::now(),
);
Some(event)
},
)
.collect::<Vec<_>>();
let count = events.len();
let byte_size = events.size_of();
let json_size = events.estimated_json_encoded_size_of();
bytes_received.emit(ByteSize(byte_size));
events_received.emit(CountByteSize(count, json_size));
if self
.out
.send_batch_named(EXPIRED_ROUTE, events)
.await
.is_err()
{
emit!(StreamClosedError { count });
}
}
}
Loading
Loading