Skip to content

Commit 769924d

Browse files
Merge pull request parseablehq#9 from de-sh/multiple-log-sources
suggestions
2 parents 01d7e8b + bf2366e commit 769924d

File tree

4 files changed

+15
-26
lines changed

4 files changed

+15
-26
lines changed

src/event/format/mod.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow_array::RecordBatch;
2828
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2929
use chrono::{DateTime, Utc};
3030
use serde::{Deserialize, Serialize};
31-
use serde_json::{json, Value};
31+
use serde_json::Value;
3232

3333
use crate::{
3434
metadata::SchemaVersion,
@@ -92,31 +92,21 @@ impl Display for LogSource {
9292
}
9393
}
9494

95+
/// Contains the format name and a list of known field names that are associated with the said format.
96+
/// Stored on disk as part of `ObjectStoreFormat` in stream.json
9597
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
9698
pub struct LogSourceEntry {
9799
pub log_source_format: LogSource,
98100
pub fields: HashSet<String>,
99101
}
100102

101103
impl LogSourceEntry {
102-
pub fn new(log_source_format: &LogSource, fields: HashSet<String>) -> Self {
104+
pub fn new(log_source_format: LogSource, fields: HashSet<String>) -> Self {
103105
LogSourceEntry {
104-
log_source_format: log_source_format.clone(),
106+
log_source_format,
105107
fields,
106108
}
107109
}
108-
109-
pub fn add_log_source(&mut self, log_source_format: LogSource, fields: HashSet<String>) {
110-
self.log_source_format = log_source_format;
111-
self.fields = fields;
112-
}
113-
114-
pub fn to_value(&self) -> Value {
115-
json!([{
116-
"log_source_format": self.log_source_format,
117-
"fields": self.fields,
118-
}])
119-
}
120110
}
121111

122112
// Global Trait for event format

src/handlers/http/ingest.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7272
return Err(PostError::OtelNotSupported);
7373
}
7474

75-
let log_source_entry = LogSourceEntry::new(&log_source, HashSet::new());
75+
let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
7676
PARSEABLE
7777
.create_stream_if_not_exists(
7878
&stream_name,
@@ -130,7 +130,7 @@ pub async fn handle_otel_logs_ingestion(
130130
let stream_name = stream_name.to_str().unwrap().to_owned();
131131

132132
let log_source_entry = LogSourceEntry::new(
133-
&log_source,
133+
log_source.clone(),
134134
OTEL_LOG_KNOWN_FIELD_LIST
135135
.iter()
136136
.map(|&s| s.to_string())
@@ -168,7 +168,7 @@ pub async fn handle_otel_metrics_ingestion(
168168
}
169169
let stream_name = stream_name.to_str().unwrap().to_owned();
170170
let log_source_entry = LogSourceEntry::new(
171-
&log_source,
171+
log_source.clone(),
172172
OTEL_METRICS_KNOWN_FIELD_LIST
173173
.iter()
174174
.map(|&s| s.to_string())
@@ -207,7 +207,7 @@ pub async fn handle_otel_traces_ingestion(
207207
}
208208
let stream_name = stream_name.to_str().unwrap().to_owned();
209209
let log_source_entry = LogSourceEntry::new(
210-
&log_source,
210+
log_source.clone(),
211211
OTEL_TRACES_KNOWN_FIELD_LIST
212212
.iter()
213213
.map(|&s| s.to_string())

src/migration/stream_metadata_migration.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,13 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value {
191191
"version".to_owned(),
192192
Value::String(storage::CURRENT_SCHEMA_VERSION.into()),
193193
);
194-
let log_source = stream_metadata_map.get("log_source");
195194
let mut log_source_entry = LogSourceEntry::default();
196-
if log_source.is_some() {
197-
if let Ok(log_source) = serde_json::from_value::<LogSource>(log_source.unwrap().clone()) {
198-
log_source_entry.add_log_source(log_source, HashSet::new());
195+
if let Some(log_source) = stream_metadata_map.get("log_source") {
196+
if let Ok(log_source) = serde_json::from_value::<LogSource>(log_source.clone()) {
197+
log_source_entry = LogSourceEntry::new(log_source, HashSet::new());
199198
}
200199
}
201-
stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
200+
stream_metadata_map.insert("log_source".to_owned(), json!(log_source_entry));
202201
stream_metadata
203202
}
204203

src/parseable/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ impl Parseable {
352352
}
353353

354354
pub async fn create_internal_stream_if_not_exists(&self) -> Result<(), StreamError> {
355-
let log_source_entry = LogSourceEntry::new(&LogSource::Pmeta, HashSet::new());
355+
let log_source_entry = LogSourceEntry::new(LogSource::Pmeta, HashSet::new());
356356
match self
357357
.create_stream_if_not_exists(
358358
INTERNAL_STREAM_NAME,
@@ -532,7 +532,7 @@ impl Parseable {
532532
custom_partition.as_ref(),
533533
static_schema_flag,
534534
)?;
535-
let log_source_entry = LogSourceEntry::new(&log_source, HashSet::new());
535+
let log_source_entry = LogSourceEntry::new(log_source, HashSet::new());
536536
self.create_stream(
537537
stream_name.to_string(),
538538
&time_partition,

0 commit comments

Comments
 (0)