Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ mod writer;
use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;

use std::sync::Arc;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use chrono::NaiveDateTime;
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
Expand All @@ -44,6 +44,7 @@ pub struct Event {
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
pub time_partition: Option<String>,
pub custom_partition_values: HashMap<String, String>,
}

// Events holds the schema related to a each event for a single log stream
Expand All @@ -55,6 +56,14 @@ impl Event {
key = format!("{key}{parsed_timestamp_to_min}");
}

if !self.custom_partition_values.is_empty() {
let mut custom_partition_key = String::default();
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
custom_partition_key = format!("{custom_partition_key}&{k}={v}");
}
key = format!("{key}{custom_partition_key}");
}

let num_rows = self.rb.num_rows() as u64;
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
Expand All @@ -65,6 +74,7 @@ impl Event {
&key,
self.rb.clone(),
self.parsed_timestamp,
self.custom_partition_values,
)?;

metadata::STREAM_INFO.update_stats(
Expand Down Expand Up @@ -93,8 +103,15 @@ impl Event {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values,
)?;
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl EventFormat for Event {
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, None, false)?;
let data = flatten_json_body(self.data, None, None, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
21 changes: 18 additions & 3 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl Writer {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand All @@ -57,8 +58,13 @@ impl Writer {
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

self.disk
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
self.disk.push(
stream_name,
schema_key,
&rb,
parsed_timestamp,
custom_partition_values,
)?;
self.mem.push(schema_key, rb);
Ok(())
}
Expand All @@ -75,6 +81,7 @@ impl WriterTable {
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

Expand All @@ -85,6 +92,7 @@ impl WriterTable {
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
}
None => {
Expand All @@ -98,10 +106,17 @@ impl WriterTable {
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
} else {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
writer.push(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
Expand Down
13 changes: 10 additions & 3 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl FileWriter {
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
Expand All @@ -55,8 +56,13 @@ impl FileWriter {
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) =
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
let (path, writer) = init_new_stream_writer_file(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
Expand All @@ -82,9 +88,10 @@ fn init_new_stream_writer_file(
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values);
std::fs::create_dir_all(dir.data_path)?;

let file = OpenOptions::new().create(true).append(true).open(&path)?;
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';
Expand Down
175 changes: 138 additions & 37 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::utils::json::convert_array_to_object;
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use chrono::{DateTime, NaiveDateTime, Utc};
use http::StatusCode;
use serde_json::Value;
use std::collections::{BTreeMap, HashMap};
Expand Down Expand Up @@ -106,70 +106,170 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
let time_partition = object_store_format.time_partition;
let time_partition_limit = object_store_format.time_partition_limit;
let static_schema_flag = object_store_format.static_schema_flag;
let custom_partition = object_store_format.custom_partition;
let body_val: Value = serde_json::from_slice(&body)?;
let size: usize = body.len();
let mut parsed_timestamp = Utc::now().naive_utc();
if time_partition.is_none() {
let stream = stream_name.clone();
let (rb, is_first_event) = get_stream_schema(
stream.clone(),
req,
body_val,
static_schema_flag,
if custom_partition.is_none() {
let size = size as u64;
create_process_record_batch(
stream_name.clone(),
req.clone(),
body_val.clone(),
static_schema_flag.clone(),
None,
parsed_timestamp,
HashMap::new(),
size,
)
.await?;
} else {
let data =
convert_array_to_object(body_val.clone(), None, None, custom_partition.clone())?;
let custom_partition = custom_partition.unwrap();
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();

for value in data {
let custom_partition_values =
get_custom_partition_values(&value, &custom_partition_list);

let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
req.clone(),
value.clone(),
static_schema_flag.clone(),
None,
parsed_timestamp,
custom_partition_values.clone(),
size,
)
.await?;
}
}
} else if custom_partition.is_none() {
let data = convert_array_to_object(
body_val.clone(),
time_partition.clone(),
time_partition_limit,
None,
)?;
event::Event {
rb,
stream_name: stream,
origin_format: "json",
origin_size: size as u64,
is_first_event,
parsed_timestamp,
time_partition,
for value in data {
parsed_timestamp = get_parsed_timestamp(&value, &time_partition);
let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
req.clone(),
value.clone(),
static_schema_flag.clone(),
time_partition.clone(),
parsed_timestamp,
HashMap::new(),
size,
)
.await?;
}
.process()
.await?;
} else {
let data = convert_array_to_object(
body_val.clone(),
time_partition.clone(),
time_partition_limit,
custom_partition.clone(),
)?;
let custom_partition = custom_partition.unwrap();
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();

for value in data {
let body_timestamp = value.get(&time_partition.clone().unwrap().to_string());
parsed_timestamp = body_timestamp
.unwrap()
.to_owned()
.as_str()
.unwrap()
.parse::<DateTime<Utc>>()
.unwrap()
.naive_utc();
let custom_partition_values =
get_custom_partition_values(&value, &custom_partition_list);

let (rb, is_first_event) = get_stream_schema(
parsed_timestamp = get_parsed_timestamp(&value, &time_partition);
let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
req.clone(),
value.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
event::Event {
rb,
stream_name: stream_name.clone(),
origin_format: "json",
origin_size: value.to_string().into_bytes().len() as u64,
is_first_event,
parsed_timestamp,
time_partition: time_partition.clone(),
}
.process()
custom_partition_values.clone(),
size,
)
.await?;
}
}

Ok(())
}

fn get_parsed_timestamp(body: &Value, time_partition: &Option<String>) -> NaiveDateTime {
let body_timestamp = body.get(&time_partition.clone().unwrap().to_string());
let parsed_timestamp = body_timestamp
.unwrap()
.to_owned()
.as_str()
.unwrap()
.parse::<DateTime<Utc>>()
.unwrap()
.naive_utc();
parsed_timestamp
}

fn get_custom_partition_values(
body: &Value,
custom_partition_list: &[&str],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value.clone() {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
custom_partition_values
}

#[allow(clippy::too_many_arguments)]
async fn create_process_record_batch(
stream_name: String,
req: HttpRequest,
value: Value,
static_schema_flag: Option<String>,
time_partition: Option<String>,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
origin_size: u64,
) -> Result<(), PostError> {
let (rb, is_first_event) = get_stream_schema(
stream_name.clone(),
req.clone(),
value.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
event::Event {
rb,
stream_name: stream_name.clone(),
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: time_partition.clone(),
custom_partition_values: custom_partition_values.clone(),
}
.process()
.await?;

Ok(())
}

fn get_stream_schema(
stream_name: String,
req: HttpRequest,
Expand Down Expand Up @@ -216,6 +316,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
"",
"",
"",
"",
Arc::new(Schema::empty()),
)
.await?;
Expand Down
Loading