Skip to content

Commit a4ded4a

Browse files
feat(enrichment tables): add expired items output to memory enrichment table (#23815)
* feat(enrichment tables): add expired items output to memory enrichment table Adds a new output to memory enrichment table source that exports items as they are expired out of this cache. Related: #23784 * Add changelog entry * Move `MemorySourceConfig` to `config.rs` * Add `MemoryEntryPair` struct * Extract out select branches into separate functions * Add output section * Log errors for failed expired items export * Add `internal_log_rate_limit` to expired items export error message * Add missing comma --------- Co-authored-by: Thomas <[email protected]>
1 parent a7d91b3 commit a4ded4a

File tree

7 files changed

+287
-107
lines changed

7 files changed

+287
-107
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added `expired` output to the memory enrichment table source, to export items as they expire in the cache.
2+
3+
authors: esensar Quad9DNS

lib/vector-common/src/internal_event/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub mod service;
1010

1111
use std::ops::{Add, AddAssign};
1212

13-
pub use bytes_received::BytesReceived;
13+
pub use bytes_received::{BytesReceived, BytesReceivedHandle};
1414
pub use bytes_sent::BytesSent;
1515
#[allow(clippy::module_name_repetitions)]
1616
pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache};

src/enrichment_tables/memory/config.rs

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@ use vector_lib::{
1414
};
1515
use vrl::{path::OwnedTargetPath, value::Kind};
1616

17-
use super::{internal_events::InternalMetricsConfig, source::MemorySourceConfig};
17+
use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE};
1818
use crate::{
1919
config::{
2020
EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
2121
},
22-
enrichment_tables::memory::Memory,
2322
sinks::Healthcheck,
2423
sources::Source,
2524
};
@@ -74,6 +73,35 @@ pub struct MemoryConfig {
7473
memory: Arc<Mutex<Option<Box<Memory>>>>,
7574
}
7675

76+
/// Configuration for memory enrichment table source functionality.
77+
#[configurable_component]
78+
#[derive(Clone, Debug, PartialEq, Eq)]
79+
#[serde(deny_unknown_fields)]
80+
pub struct MemorySourceConfig {
81+
/// Interval for exporting all data from the table when used as a source.
82+
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
83+
pub export_interval: Option<NonZeroU64>,
84+
/// Batch size for data exporting. Used to prevent exporting entire table at
85+
/// once and blocking the system.
86+
///
87+
/// By default, batches are not used and entire table is exported.
88+
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
89+
pub export_batch_size: Option<u64>,
90+
/// If set to true, all data will be removed from cache after exporting.
91+
/// Only valid if used as a source and export_interval > 0
92+
///
93+
/// By default, export will not remove data from cache
94+
#[serde(default = "crate::serde::default_false")]
95+
pub remove_after_export: bool,
96+
/// Set to true to export expired items via the `expired` output port.
97+
/// Expired items ignore other settings and are exported as they are flushed from the table.
98+
#[serde(default = "crate::serde::default_false")]
99+
pub export_expired_items: bool,
100+
/// Key to use for this component when used as a source. This must be different from the
101+
/// component key.
102+
pub source_key: String,
103+
}
104+
77105
impl PartialEq for MemoryConfig {
78106
fn eq(&self, other: &Self) -> bool {
79107
self.ttl == other.ttl
@@ -187,10 +215,23 @@ impl SourceConfig for MemoryConfig {
187215
}
188216
.with_standard_vector_source_metadata();
189217

190-
vec![SourceOutput::new_maybe_logs(
191-
DataType::Log,
192-
schema_definition,
193-
)]
218+
if self
219+
.source_config
220+
.as_ref()
221+
.map(|c| c.export_expired_items)
222+
.unwrap_or_default()
223+
{
224+
vec![
225+
SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()),
226+
SourceOutput::new_maybe_logs(DataType::Log, schema_definition)
227+
.with_port(EXPIRED_ROUTE),
228+
]
229+
} else {
230+
vec![SourceOutput::new_maybe_logs(
231+
DataType::Log,
232+
schema_definition,
233+
)]
234+
}
194235
}
195236

196237
fn can_acknowledge(&self) -> bool {
Lines changed: 138 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
num::NonZeroU64,
3-
time::{Duration, Instant},
4-
};
1+
use std::time::{Duration, Instant};
52

63
use chrono::Utc;
74
use futures::StreamExt;
@@ -10,40 +7,22 @@ use tokio_stream::wrappers::IntervalStream;
107
use vector_lib::{
118
ByteSizeOf, EstimatedJsonEncodedSizeOf,
129
config::LogNamespace,
13-
configurable::configurable_component,
1410
event::{Event, EventMetadata, LogEvent},
1511
internal_event::{
16-
ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
12+
ByteSize, BytesReceived, BytesReceivedHandle, CountByteSize, EventsReceived,
13+
EventsReceivedHandle, InternalEventHandle, Protocol,
1714
},
1815
shutdown::ShutdownSignal,
1916
};
2017

2118
use super::{Memory, MemoryConfig};
22-
use crate::{SourceSender, internal_events::StreamClosedError};
19+
use crate::{
20+
SourceSender,
21+
enrichment_tables::memory::{MemoryEntryPair, MemorySourceConfig},
22+
internal_events::StreamClosedError,
23+
};
2324

24-
/// Configuration for memory enrichment table source functionality.
25-
#[configurable_component]
26-
#[derive(Clone, Debug, PartialEq, Eq)]
27-
#[serde(deny_unknown_fields)]
28-
pub struct MemorySourceConfig {
29-
/// Interval for exporting all data from the table when used as a source.
30-
pub export_interval: NonZeroU64,
31-
/// Batch size for data exporting. Used to prevent exporting entire table at
32-
/// once and blocking the system.
33-
///
34-
/// By default, batches are not used and entire table is exported.
35-
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
36-
pub export_batch_size: Option<u64>,
37-
/// If set to true, all data will be removed from cache after exporting.
38-
/// Only valid if used as a source and export_interval > 0
39-
///
40-
/// By default, export will not remove data from cache
41-
#[serde(default = "crate::serde::default_false")]
42-
pub remove_after_export: bool,
43-
/// Key to use for this component when used as a source. This must be different from the
44-
/// component key.
45-
pub source_key: String,
46-
}
25+
pub(crate) const EXPIRED_ROUTE: &str = "expired";
4726

4827
/// A struct that represents Memory when used as a source.
4928
pub(crate) struct MemorySource {
@@ -61,78 +40,145 @@ impl MemorySource {
6140
.memory
6241
.config
6342
.source_config
64-
.as_ref()
43+
.clone()
6544
.expect("Unexpected missing source config in memory table used as a source.");
6645
let mut interval = IntervalStream::new(interval(Duration::from_secs(
67-
source_config.export_interval.into(),
46+
source_config
47+
.export_interval
48+
.map(Into::into)
49+
.unwrap_or(u64::MAX),
6850
)))
69-
.take_until(self.shutdown);
51+
.take_until(self.shutdown.clone());
52+
let mut expired_receiver = self.memory.subscribe_to_expired_items();
7053

71-
while interval.next().await.is_some() {
72-
let mut sent = 0_usize;
73-
loop {
74-
let mut events = Vec::new();
75-
{
76-
let mut writer = self.memory.write_handle.lock().unwrap();
77-
if let Some(reader) = self.memory.get_read_handle().read() {
78-
let now = Instant::now();
79-
let utc_now = Utc::now();
80-
events = reader
81-
.iter()
82-
.skip(if source_config.remove_after_export {
83-
0
84-
} else {
85-
sent
86-
})
87-
.take(if let Some(batch_size) = source_config.export_batch_size {
88-
batch_size as usize
89-
} else {
90-
usize::MAX
91-
})
92-
.filter_map(|(k, v)| {
93-
if source_config.remove_after_export {
94-
writer.write_handle.empty(k.clone());
95-
}
96-
v.get_one().map(|v| (k, v))
97-
})
98-
.filter_map(|(k, v)| {
99-
let mut event = Event::Log(LogEvent::from_map(
100-
v.as_object_map(now, k).ok()?,
101-
EventMetadata::default(),
102-
));
103-
let log = event.as_mut_log();
104-
self.log_namespace.insert_standard_vector_source_metadata(
105-
log,
106-
MemoryConfig::NAME,
107-
utc_now,
108-
);
109-
110-
Some(event)
111-
})
112-
.collect::<Vec<_>>();
113-
if source_config.remove_after_export {
114-
writer.write_handle.refresh();
115-
}
54+
loop {
55+
tokio::select! {
56+
interval_time = interval.next() => {
57+
if interval_time.is_none() {
58+
break;
11659
}
117-
}
118-
let count = events.len();
119-
let byte_size = events.size_of();
120-
let json_size = events.estimated_json_encoded_size_of();
121-
bytes_received.emit(ByteSize(byte_size));
122-
events_received.emit(CountByteSize(count, json_size));
123-
if self.out.send_batch(events).await.is_err() {
124-
emit!(StreamClosedError { count });
125-
}
60+
self.export_table_items(&source_config, &events_received, &bytes_received).await;
61+
},
12662

127-
sent += count;
128-
match source_config.export_batch_size {
129-
None => break,
130-
Some(export_batch_size) if count < export_batch_size as usize => break,
131-
_ => {}
63+
Ok(expired) = expired_receiver.recv() => {
64+
self.export_expired_entries(expired, &events_received, &bytes_received).await;
13265
}
13366
}
13467
}
13568

13669
Ok(())
13770
}
71+
72+
async fn export_table_items(
73+
&mut self,
74+
source_config: &MemorySourceConfig,
75+
events_received: &EventsReceivedHandle,
76+
bytes_received: &BytesReceivedHandle,
77+
) {
78+
let mut sent = 0_usize;
79+
loop {
80+
let mut events = Vec::new();
81+
{
82+
let mut writer = self.memory.write_handle.lock().unwrap();
83+
if let Some(reader) = self.memory.get_read_handle().read() {
84+
let now = Instant::now();
85+
let utc_now = Utc::now();
86+
events = reader
87+
.iter()
88+
.skip(if source_config.remove_after_export {
89+
0
90+
} else {
91+
sent
92+
})
93+
.take(if let Some(batch_size) = source_config.export_batch_size {
94+
batch_size as usize
95+
} else {
96+
usize::MAX
97+
})
98+
.filter_map(|(k, v)| {
99+
if source_config.remove_after_export {
100+
writer.write_handle.empty(k.clone());
101+
}
102+
v.get_one().map(|v| (k, v))
103+
})
104+
.filter_map(|(k, v)| {
105+
let mut event = Event::Log(LogEvent::from_map(
106+
v.as_object_map(now, k).ok()?,
107+
EventMetadata::default(),
108+
));
109+
let log = event.as_mut_log();
110+
self.log_namespace.insert_standard_vector_source_metadata(
111+
log,
112+
MemoryConfig::NAME,
113+
utc_now,
114+
);
115+
116+
Some(event)
117+
})
118+
.collect::<Vec<_>>();
119+
if source_config.remove_after_export {
120+
writer.write_handle.refresh();
121+
}
122+
}
123+
}
124+
let count = events.len();
125+
let byte_size = events.size_of();
126+
let json_size = events.estimated_json_encoded_size_of();
127+
bytes_received.emit(ByteSize(byte_size));
128+
events_received.emit(CountByteSize(count, json_size));
129+
if self.out.send_batch(events).await.is_err() {
130+
emit!(StreamClosedError { count });
131+
}
132+
133+
sent += count;
134+
match source_config.export_batch_size {
135+
None => break,
136+
Some(export_batch_size) if count < export_batch_size as usize => break,
137+
_ => {}
138+
}
139+
}
140+
}
141+
142+
async fn export_expired_entries(
143+
&mut self,
144+
entries: Vec<MemoryEntryPair>,
145+
events_received: &EventsReceivedHandle,
146+
bytes_received: &BytesReceivedHandle,
147+
) {
148+
let now = Instant::now();
149+
let events = entries
150+
.into_iter()
151+
.filter_map(
152+
|MemoryEntryPair {
153+
key,
154+
entry: expired_event,
155+
}| {
156+
let mut event = Event::Log(LogEvent::from_map(
157+
expired_event.as_object_map(now, &key).ok()?,
158+
EventMetadata::default(),
159+
));
160+
let log = event.as_mut_log();
161+
self.log_namespace.insert_standard_vector_source_metadata(
162+
log,
163+
MemoryConfig::NAME,
164+
Utc::now(),
165+
);
166+
Some(event)
167+
},
168+
)
169+
.collect::<Vec<_>>();
170+
let count = events.len();
171+
let byte_size = events.size_of();
172+
let json_size = events.estimated_json_encoded_size_of();
173+
bytes_received.emit(ByteSize(byte_size));
174+
events_received.emit(CountByteSize(count, json_size));
175+
if self
176+
.out
177+
.send_batch_named(EXPIRED_ROUTE, events)
178+
.await
179+
.is_err()
180+
{
181+
emit!(StreamClosedError { count });
182+
}
183+
}
138184
}

0 commit comments

Comments
 (0)