diff --git a/server/src/event.rs b/server/src/event.rs index eeb95e0b0..03c4f26e5 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -23,13 +23,13 @@ mod writer; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; - use std::sync::Arc; use self::error::EventError; pub use self::writer::STREAM_WRITERS; use crate::metadata; use chrono::NaiveDateTime; +use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const DEFAULT_TAGS_KEY: &str = "p_tags"; @@ -44,6 +44,7 @@ pub struct Event { pub is_first_event: bool, pub parsed_timestamp: NaiveDateTime, pub time_partition: Option, + pub custom_partition_values: HashMap, } // Events holds the schema related to a each event for a single log stream @@ -55,6 +56,14 @@ impl Event { key = format!("{key}{parsed_timestamp_to_min}"); } + if !self.custom_partition_values.is_empty() { + let mut custom_partition_key = String::default(); + for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) { + custom_partition_key = format!("{custom_partition_key}&{k}={v}"); + } + key = format!("{key}{custom_partition_key}"); + } + let num_rows = self.rb.num_rows() as u64; if self.is_first_event { commit_schema(&self.stream_name, self.rb.schema())?; @@ -65,6 +74,7 @@ impl Event { &key, self.rb.clone(), self.parsed_timestamp, + self.custom_partition_values, )?; metadata::STREAM_INFO.update_stats( @@ -93,8 +103,15 @@ impl Event { schema_key: &str, rb: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?; + STREAM_WRITERS.append_to_local( + stream_name, + schema_key, + rb, + parsed_timestamp, + custom_partition_values, + )?; Ok(()) } } diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index b2b9f88c9..ed697af00 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -48,7 +48,7 @@ impl EventFormat for Event { static_schema_flag: Option, time_partition: Option, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, None, None, false)?; + let data = flatten_json_body(self.data, None, None, None, false)?; let stream_schema = schema; // incoming event may be a single json or a json array diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 737a0a514..b508197b1 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -49,6 +49,7 @@ impl Writer { schema_key: &str, rb: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), StreamWriterError> { let rb = utils::arrow::replace_columns( rb.schema(), @@ -57,8 +58,13 @@ impl Writer { &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - self.disk - .push(stream_name, schema_key, &rb, parsed_timestamp)?; + self.disk.push( + stream_name, + schema_key, + &rb, + parsed_timestamp, + custom_partition_values, + )?; self.mem.push(schema_key, rb); Ok(()) } @@ -75,6 +81,7 @@ impl WriterTable { schema_key: &str, record: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); @@ -85,6 +92,7 @@ impl WriterTable { schema_key, record, parsed_timestamp, + custom_partition_values, )?; } None => { @@ -98,10 +106,17 @@ impl WriterTable { schema_key, record, parsed_timestamp, + custom_partition_values, )?; } else { let mut writer = Writer::default(); - writer.push(stream_name, schema_key, record, parsed_timestamp)?; + writer.push( + stream_name, + schema_key, + record, + parsed_timestamp, + custom_partition_values, + )?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 1b193eb4c..60639015e 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -44,6 +44,7 @@ impl FileWriter { schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { @@ -55,8 +56,13 @@ impl FileWriter { // entry is not present thus we create it None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = - init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; + let (path, writer) = init_new_stream_writer_file( + stream_name, + schema_key, + record, + parsed_timestamp, + custom_partition_values, + )?; self.insert( schema_key.to_owned(), ArrowWriter { @@ -82,9 +88,10 @@ fn init_new_stream_writer_file( schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key, parsed_timestamp); + let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values); std::fs::create_dir_all(dir.data_path)?; let file = OpenOptions::new().create(true).append(true).open(&path)?; diff --git a/server/src/handlers.rs b/server/src/handlers.rs index d610011cf..883eceb65 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -25,6 +25,7 @@ const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; +const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7532ad59e..954fd4a8f 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -35,7 +35,7 @@ use crate::utils::json::convert_array_to_object; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_schema::{Field, Schema}; use bytes::Bytes; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; @@ -106,63 +106,96 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result let time_partition = object_store_format.time_partition; let time_partition_limit = object_store_format.time_partition_limit; let static_schema_flag = object_store_format.static_schema_flag; + let custom_partition = object_store_format.custom_partition; let body_val: Value = serde_json::from_slice(&body)?; let size: usize = body.len(); let mut parsed_timestamp = Utc::now().naive_utc(); if time_partition.is_none() { - let stream = stream_name.clone(); - let (rb, is_first_event) = get_stream_schema( - stream.clone(), - req, - body_val, - static_schema_flag, + if custom_partition.is_none() { + let size = size as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + body_val.clone(), + static_schema_flag.clone(), + None, + parsed_timestamp, + HashMap::new(), + size, + ) + .await?; + } else { + let data = + convert_array_to_object(body_val.clone(), None, None, custom_partition.clone())?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + + for value in data { + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); + + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + None, + parsed_timestamp, + custom_partition_values.clone(), + size, + ) + .await?; + } + } + } else if custom_partition.is_none() { + let data = convert_array_to_object( + body_val.clone(), time_partition.clone(), + time_partition_limit, + None, )?; - event::Event { - rb, - stream_name: stream, - origin_format: "json", - origin_size: size as u64, - is_first_event, - parsed_timestamp, - time_partition, + for value in data { + parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + parsed_timestamp, + HashMap::new(), + size, + ) + .await?; } - .process() - .await?; } else { let data = convert_array_to_object( body_val.clone(), time_partition.clone(), time_partition_limit, + custom_partition.clone(), )?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + for value in data { - let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); - parsed_timestamp = body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .unwrap() - .naive_utc(); + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); - let (rb, is_first_event) = get_stream_schema( + parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( stream_name.clone(), req.clone(), value.clone(), static_schema_flag.clone(), time_partition.clone(), - )?; - event::Event { - rb, - stream_name: stream_name.clone(), - origin_format: "json", - origin_size: value.to_string().into_bytes().len() as u64, - is_first_event, parsed_timestamp, - time_partition: time_partition.clone(), - } - .process() + custom_partition_values.clone(), + size, + ) .await?; } } @@ -170,6 +203,73 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result Ok(()) } +fn get_parsed_timestamp(body: &Value, time_partition: &Option) -> NaiveDateTime { + let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); + let parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + parsed_timestamp +} + +fn get_custom_partition_values( + body: &Value, + custom_partition_list: &[&str], +) -> HashMap { + let mut custom_partition_values: HashMap = HashMap::new(); + for custom_partition_field in custom_partition_list { + let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + custom_partition_values +} + +#[allow(clippy::too_many_arguments)] +async fn create_process_record_batch( + stream_name: String, + req: HttpRequest, + value: Value, + static_schema_flag: Option, + time_partition: Option, + parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, + origin_size: u64, +) -> Result<(), PostError> { + let (rb, is_first_event) = get_stream_schema( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + )?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values: custom_partition_values.clone(), + } + .process() + .await?; + + Ok(()) +} + fn get_stream_schema( stream_name: String, req: HttpRequest, @@ -216,6 +316,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr "", "", "", + "", Arc::new(Schema::empty()), ) .await?; diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 1fa5a10be..5910982f6 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -21,7 +21,9 @@ use super::base_path_without_preceding_slash; use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use crate::alerts::Alerts; -use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY}; +use crate::handlers::{ + CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, +}; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; @@ -224,6 +226,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result>(); + if custom_partition_list.len() > 3 { + return Err(StreamError::Custom { + msg: "maximum 3 custom partition keys are supported".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + } let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let mut schema = Arc::new(Schema::empty()); @@ -240,8 +257,11 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result, ) -> Result<(), CreateStreamError> { @@ -561,6 +583,7 @@ pub async fn create_stream( &stream_name, time_partition, time_partition_limit, + custom_partition, static_schema_flag, schema.clone(), ) @@ -591,6 +614,7 @@ pub async fn create_stream( created_at, time_partition.to_string(), time_partition_limit.to_string(), + custom_partition.to_string(), static_schema_flag.to_string(), static_schema, ); @@ -630,6 +654,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, pub time_partition: Option, pub time_partition_limit: Option, + pub custom_partition: Option, pub static_schema_flag: Option, } @@ -100,6 +101,13 @@ impl StreamInfo { .map(|metadata| metadata.time_partition.clone()) } + pub fn get_custom_partition(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.custom_partition.clone()) + } + pub fn get_static_schema_flag( &self, stream_name: &str, @@ -160,13 +168,14 @@ impl StreamInfo { metadata.first_event_at = first_event_at; }) } - + #[allow(clippy::too_many_arguments)] pub fn add_stream( &self, stream_name: String, created_at: String, time_partition: String, time_partition_limit: String, + custom_partition: String, static_schema_flag: String, static_schema: HashMap>, ) { @@ -187,6 +196,11 @@ impl StreamInfo { } else { Some(time_partition_limit) }, + custom_partition: if custom_partition.is_empty() { + None + } else { + Some(custom_partition) + }, static_schema_flag: if static_schema_flag != "true" { None } else { @@ -244,6 +258,7 @@ impl StreamInfo { first_event_at: meta.first_event_at, time_partition: meta.time_partition, time_partition_limit: meta.time_partition_limit, + custom_partition: meta.custom_partition, static_schema_flag: meta.static_schema_flag, }; diff --git a/server/src/static_schema.rs b/server/src/static_schema.rs index b7305da88..488f88f88 100644 --- a/server/src/static_schema.rs +++ b/server/src/static_schema.rs @@ -41,17 +41,42 @@ pub struct Metadata {} pub fn convert_static_schema_to_arrow_schema( static_schema: StaticSchema, time_partition: &str, + custom_partition: &str, ) -> Result, AnyError> { let mut parsed_schema = ParsedSchema { fields: Vec::new(), metadata: HashMap::new(), }; let mut time_partition_exists: bool = false; + + if !custom_partition.is_empty() { + let custom_partition_list = custom_partition.split(',').collect::>(); + let mut custom_partition_exists: HashMap = + HashMap::with_capacity(custom_partition_list.len()); + + for partition in &custom_partition_list { + for field in &static_schema.fields { + if &field.name == partition { + custom_partition_exists.insert(partition.to_string(), true); + } + } + } + for partition in custom_partition_list { + if !custom_partition_exists.contains_key(partition) { + return Err(anyhow! { + format!( + "custom partition field {partition} does not exist in the schema for the static schema logstream" + ), + }); + } + } + } for mut field in static_schema.fields { if !time_partition.is_empty() && field.name == time_partition { time_partition_exists = true; field.data_type = "datetime".to_string(); } + let parsed_field = Fields { name: field.name.clone(), diff --git a/server/src/storage.rs b/server/src/storage.rs index 3de62ef4f..d6fb2fb7b 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -95,6 +95,8 @@ pub struct ObjectStoreFormat { #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub custom_partition: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, } @@ -112,6 +114,8 @@ pub struct StreamInfo { #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub custom_partition: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, } @@ -159,6 +163,7 @@ impl Default for ObjectStoreFormat { retention: None, time_partition: None, time_partition_limit: None, + custom_partition: None, static_schema_flag: None, } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index b0846d0d8..5244feb04 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -27,7 +27,6 @@ use super::{ use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::option::Mode; - use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, @@ -128,6 +127,7 @@ pub trait ObjectStorage: Sync + 'static { stream_name: &str, time_partition: &str, time_partition_limit: &str, + custom_partition: &str, static_schema_flag: &str, schema: Arc, ) -> Result<(), ObjectStorageError> { @@ -145,6 +145,11 @@ pub trait ObjectStorage: Sync + 'static { } else { format.time_partition_limit = Some(time_partition_limit.to_string()); } + if custom_partition.is_empty() { + format.custom_partition = None; + } else { + format.custom_partition = Some(custom_partition.to_string()); + } if static_schema_flag != "true" { format.static_schema_flag = None; } else { @@ -439,9 +444,17 @@ pub trait ObjectStorage: Sync + 'static { let time_partition = STREAM_INFO .get_time_partition(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let dir = StorageDir::new(stream); - let schema = convert_disk_files_to_parquet(stream, &dir, time_partition) + let custom_partition = STREAM_INFO + .get_custom_partition(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let dir = StorageDir::new(stream); + let schema = convert_disk_files_to_parquet( + stream, + &dir, + time_partition, + custom_partition.clone(), + ) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; if let Some(schema) = schema { let static_schema_flag = STREAM_INFO @@ -467,7 +480,16 @@ pub trait ObjectStorage: Sync + 'static { .expect("only parquet files are returned by iterator") .to_str() .expect("filename is valid string"); - let file_suffix = str::replacen(filename, ".", "/", 3); + let mut file_suffix = str::replacen(filename, ".", "/", 3); + + let custom_partition_clone = custom_partition.clone(); + if custom_partition_clone.is_some() { + let custom_partition_fields = custom_partition_clone.unwrap(); + let custom_partition_list = + custom_partition_fields.split(',').collect::>(); + file_suffix = + str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + } let stream_relative_path = format!("{stream}/{file_suffix}"); self.upload_file(&stream_relative_path, &file).await?; let absolute_path = self diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index abeac7062..9e6e724a0 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -38,7 +38,8 @@ use crate::{ }; use arrow_schema::{ArrowError, Schema}; use base64::Engine; -use chrono::{NaiveDateTime, Timelike}; +use chrono::{NaiveDateTime, Timelike, Utc}; +use itertools::Itertools; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -64,10 +65,17 @@ impl StorageDir { Self { data_path } } - pub fn file_time_suffix(time: NaiveDateTime, extention: &str) -> String { - let uri = utils::date_to_prefix(time.date()) + pub fn file_time_suffix( + time: NaiveDateTime, + custom_partition_values: HashMap, + extention: &str, + ) -> String { + let mut uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour()) + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); + if !custom_partition_values.is_empty() { + uri = uri + &utils::custom_partition_to_prefix(custom_partition_values); + } let local_uri = str::replace(&uri, "/", "."); let hostname = hostname_unchecked(); if CONFIG.parseable.mode == Mode::Ingest { @@ -78,27 +86,37 @@ impl StorageDir { } } - fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { + fn filename_by_time( + stream_hash: &str, + time: NaiveDateTime, + custom_partition_values: HashMap, + ) -> String { format!( "{}.{}", stream_hash, - Self::file_time_suffix(time, ARROW_FILE_EXTENSION) + Self::file_time_suffix(time, custom_partition_values, ARROW_FILE_EXTENSION) ) } - fn filename_by_current_time(stream_hash: &str, parsed_timestamp: NaiveDateTime) -> String { - Self::filename_by_time(stream_hash, parsed_timestamp) + fn filename_by_current_time( + stream_hash: &str, + parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, + ) -> String { + Self::filename_by_time(stream_hash, parsed_timestamp, custom_partition_values) } pub fn path_by_current_time( &self, stream_hash: &str, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> PathBuf { - self.data_path.join(Self::filename_by_current_time( - stream_hash, - parsed_timestamp, - )) + let server_time_in_min = Utc::now().format("%Y%m%dT%H%M").to_string(); + let mut filename = + Self::filename_by_current_time(stream_hash, parsed_timestamp, custom_partition_values); + filename = format!("{}{}", server_time_in_min, filename); + self.data_path.join(filename) } pub fn arrow_files(&self) -> Vec { @@ -106,10 +124,11 @@ impl StorageDir { return vec![]; }; - let paths: Vec = dir + let paths = dir .flatten() .map(|file| file.path()) .filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows"))) + .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); paths @@ -121,7 +140,7 @@ impl StorageDir { let mut grouped_arrow_file: HashMap> = HashMap::new(); let arrow_files = self.arrow_files(); for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); + let key = Self::arrow_path_to_parquet(&arrow_file_path, String::default()); grouped_arrow_file .entry(key) .or_default() @@ -135,28 +154,25 @@ impl StorageDir { &self, exclude: NaiveDateTime, ) -> HashMap> { - let hot_filename = StorageDir::file_time_suffix(exclude, ARROW_FILE_EXTENSION); - // hashmap but exclude where hot filename matches let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); - arrow_files.retain(|path| { !path .file_name() .unwrap() .to_str() .unwrap() - .ends_with(&hot_filename) + .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) }); - + let random_string = + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); + let key = Self::arrow_path_to_parquet(&arrow_file_path, random_string.clone()); grouped_arrow_file .entry(key) .or_default() .push(arrow_file_path); } - grouped_arrow_file } @@ -171,22 +187,12 @@ impl StorageDir { .collect() } - fn arrow_path_to_parquet(path: &Path) -> PathBuf { + fn arrow_path_to_parquet(path: &Path, random_string: String) -> PathBuf { let filename = path.file_stem().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); let filename = filename.rsplit_once('.').expect("contains the delim `.`"); let filename = format!("{}.{}", filename.0, filename.1); - let random_string = - rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); let filename_with_random_number = format!("{}.{}.{}", filename, random_string, "arrows"); - /* - let file_stem = path.file_stem().unwrap().to_str().unwrap(); - let random_string = - rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 20); - let (_, filename) = file_stem.split_once('.').unwrap(); - let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows"); - */ - let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); @@ -197,7 +203,7 @@ impl StorageDir { #[allow(unused)] pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { let data_path = CONFIG.parseable.local_stream_data_path(stream_name); - let dir = StorageDir::file_time_suffix(time, PARQUET_FILE_EXTENSION); + let dir = StorageDir::file_time_suffix(time, HashMap::new(), PARQUET_FILE_EXTENSION); data_path.join(dir) } @@ -206,6 +212,7 @@ pub fn convert_disk_files_to_parquet( stream: &str, dir: &StorageDir, time_partition: Option, + custom_partition: Option, ) -> Result, MoveDataError> { let mut schemas = Vec::new(); @@ -241,14 +248,25 @@ pub fn convert_disk_files_to_parquet( if let Some(time_partition) = time_partition.as_ref() { index_time_partition = merged_schema.index_of(time_partition).unwrap(); } + let mut custom_partition_fields: HashMap = HashMap::new(); + if let Some(custom_partition) = custom_partition.as_ref() { + for custom_partition_field in custom_partition.split(',') { + let index = merged_schema.index_of(custom_partition_field).unwrap(); + custom_partition_fields.insert(custom_partition_field.to_string(), index); + } + } let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = parquet_writer_props(time_partition.clone(), index_time_partition).build(); + let props = parquet_writer_props( + time_partition.clone(), + index_time_partition, + custom_partition_fields, + ) + .build(); schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; - - for ref record in record_reader.merged_iter(schema) { + for ref record in record_reader.merged_iter(schema, time_partition.clone()) { writer.write(record)?; } @@ -278,36 +296,41 @@ pub fn convert_disk_files_to_parquet( fn parquet_writer_props( time_partition: Option, index_time_partition: usize, + custom_partition_fields: HashMap, ) -> WriterPropertiesBuilder { let index_time_partition: i32 = index_time_partition as i32; - + let mut time_partition_field = DEFAULT_TIMESTAMP_KEY.to_string(); if let Some(time_partition) = time_partition { - WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec![time_partition]), - Encoding::DELTA_BYTE_ARRAY, - ) - .set_sorting_columns(Some(vec![SortingColumn { - column_idx: index_time_partition, - descending: true, - nulls_first: true, - }])) - } else { - WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]), - Encoding::DELTA_BINARY_PACKED, - ) - .set_sorting_columns(Some(vec![SortingColumn { - column_idx: index_time_partition, - descending: true, - nulls_first: true, - }])) + time_partition_field = time_partition; } + let mut sorting_column_vec: Vec = Vec::new(); + sorting_column_vec.push(SortingColumn { + column_idx: index_time_partition, + descending: true, + nulls_first: true, + }); + let mut props = WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec![time_partition_field]), + Encoding::DELTA_BINARY_PACKED, + ); + + for (field, index) in custom_partition_fields { + let field = ColumnPath::new(vec![field]); + let encoding = Encoding::DELTA_BYTE_ARRAY; + props = props.set_column_encoding(field, encoding); + let sorting_column = SortingColumn { + column_idx: index as i32, + descending: true, + nulls_first: true, + }; + sorting_column_vec.push(sorting_column); + } + props = props.set_sorting_columns(Some(sorting_column_vec)); + + props } pub fn get_ingestor_info() -> anyhow::Result { diff --git a/server/src/utils.rs b/server/src/utils.rs index ec60f115a..df2efdaa9 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -24,8 +24,9 @@ pub mod uid; pub mod update; use crate::option::CONFIG; use chrono::{DateTime, NaiveDate, Timelike, Utc}; +use itertools::Itertools; use sha2::{Digest, Sha256}; - +use std::collections::HashMap; use std::env; use url::Url; @@ -62,6 +63,14 @@ pub fn date_to_prefix(date: NaiveDate) -> String { date.replace("UTC", "") } +pub fn custom_partition_to_prefix(custom_partition: HashMap) -> String { + let mut prefix = String::default(); + for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) { + prefix.push_str(&format!("{key}={value}/", key = key, value = value)); + } + prefix +} + pub fn hour_to_prefix(hour: u32) -> String { format!("hour={hour:02}/") } diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index ef76ddf3f..b39349c7c 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -74,11 +74,16 @@ impl MergedReverseRecordReader { Ok(Self { readers }) } - pub fn merged_iter(self, schema: Arc) -> impl Iterator { + pub fn merged_iter( + self, + schema: Arc, + time_partition: Option, + ) -> impl Iterator { let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); - kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { - let a_time = get_timestamp_millis(a); - let b_time = get_timestamp_millis(b); + kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| { + // Capture time_partition by value + let a_time = get_timestamp_millis(a, time_partition.clone()); + let b_time = get_timestamp_millis(b, time_partition.clone()); a_time > b_time }) .map(|batch| reverse(&batch)) @@ -95,7 +100,23 @@ impl MergedReverseRecordReader { } } -fn get_timestamp_millis(batch: &RecordBatch) -> i64 { +fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { + match time_partition { + Some(time_partition) => { + let time_partition = time_partition.as_str(); + match batch.column_by_name(time_partition) { + Some(column) => column + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + None => get_default_timestamp_millis(batch), + } + } + None => get_default_timestamp_millis(batch), + } +} +fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { match batch .column(0) .as_any() diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index ad1572c72..526fb532f 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -25,6 +25,7 @@ pub fn flatten_json_body( body: serde_json::Value, time_partition: Option, time_partition_limit: Option, + custom_partition: Option, validation_required: bool, ) -> Result { flatten::flatten( @@ -32,6 +33,7 @@ pub fn flatten_json_body( "_", time_partition, time_partition_limit, + custom_partition, validation_required, ) } @@ -40,8 +42,15 @@ pub fn convert_array_to_object( body: Value, time_partition: Option, time_partition_limit: Option, + custom_partition: Option, ) -> Result, anyhow::Error> { - let data = flatten_json_body(body, time_partition, time_partition_limit, true)?; + let data = flatten_json_body( + body, + time_partition, + time_partition_limit, + custom_partition, + true, + )?; let value_arr = match data { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], diff --git a/server/src/utils/json/flatten.rs b/server/src/utils/json/flatten.rs index 82f74a532..574671321 100644 --- a/server/src/utils/json/flatten.rs +++ b/server/src/utils/json/flatten.rs @@ -27,20 +27,30 @@ pub fn flatten( separator: &str, time_partition: Option, time_partition_limit: Option, + custom_partition: Option, validation_required: bool, ) -> Result { match nested_value { Value::Object(nested_dict) => { if validation_required { let validate_time_partition_result = validate_time_partition( - Value::Object(nested_dict.clone()), + &Value::Object(nested_dict.clone()), time_partition.clone(), time_partition_limit.clone(), ); + + let validate_custom_partition_result = validate_custom_partition( + &Value::Object(nested_dict.clone()), + custom_partition.clone(), + ); if validate_time_partition_result.is_ok() { - let mut map = Map::new(); - flatten_object(&mut map, None, nested_dict, separator)?; - Ok(Value::Object(map)) + if validate_custom_partition_result.is_ok() { + let mut map = Map::new(); + flatten_object(&mut map, None, nested_dict, separator)?; + Ok(Value::Object(map)) + } else { + Err(anyhow!(validate_custom_partition_result.unwrap_err())) + } } else { Err(anyhow!(validate_time_partition_result.unwrap_err())) } @@ -55,19 +65,24 @@ pub fn flatten( let value: Value = _value.clone(); if validation_required { let validate_time_partition_result = validate_time_partition( - value, + &value, time_partition.clone(), time_partition_limit.clone(), ); - + let validate_custom_partition_result = + validate_custom_partition(&value, custom_partition.clone()); if validate_time_partition_result.is_ok() { - let value = std::mem::replace(_value, Value::Null); - let mut map = Map::new(); - let Value::Object(obj) = value else { - return Err(anyhow!("Expected object in array of objects")); - }; - flatten_object(&mut map, None, obj, separator)?; - *_value = Value::Object(map); + if validate_custom_partition_result.is_ok() { + let value = std::mem::replace(_value, Value::Null); + let mut map = Map::new(); + let Value::Object(obj) = value else { + return Err(anyhow!("Expected object in array of objects")); + }; + flatten_object(&mut map, None, obj, separator)?; + *_value = Value::Object(map); + } else { + return Err(anyhow!(validate_custom_partition_result.unwrap_err())); + } } else { return Err(anyhow!(validate_time_partition_result.unwrap_err())); } @@ -87,8 +102,47 @@ pub fn flatten( } } +pub fn validate_custom_partition( + value: &Value, + custom_partition: Option, +) -> Result { + if custom_partition.is_none() { + return Ok(true); + } else { + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + for custom_partition_field in &custom_partition_list { + if value.get(custom_partition_field.trim()).is_none() { + return Err(anyhow!(format!( + "ingestion failed as field {} is not part of the log", + custom_partition_field + ))); + } else { + let custom_partition_value = value + .get(custom_partition_field.trim()) + .unwrap() + .to_string(); + if custom_partition_value.is_empty() { + return Err(anyhow!(format!( + "ingestion failed as field {} is empty", + custom_partition_field + ))); + } + if custom_partition_value.contains('.') { + return Err(anyhow!(format!( + "ingestion failed as field {} contains a period", + custom_partition_field + ))); + } + } + } + } + + Ok(true) +} + pub fn validate_time_partition( - value: Value, + value: &Value, time_partition: Option, time_partition_limit: Option, ) -> Result { @@ -258,19 +312,28 @@ mod tests { #[test] fn flatten_single_key_string() { let obj = json!({"key": "value"}); - assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); + assert_eq!( + obj.clone(), + flatten(obj, "_", None, None, None, false).unwrap() + ); } #[test] fn flatten_single_key_int() { let obj = json!({"key": 1}); - assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); + assert_eq!( + obj.clone(), + flatten(obj, "_", None, None, None, false).unwrap() + ); } #[test] fn flatten_multiple_key_value() { let obj = json!({"key1": 1, "key2": "value2"}); - assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); + assert_eq!( + obj.clone(), + flatten(obj, "_", None, None, None, false).unwrap() + ); } #[test] @@ -278,7 +341,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key":"value"}}); assert_eq!( json!({"key": "value", "nested_key.key": "value"}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -287,7 +350,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); assert_eq!( json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -296,7 +359,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); assert_eq!( json!({"key": "value", "nested_key.key1": [1,2,3]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -305,7 +368,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -314,7 +377,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -323,7 +386,7 @@ mod tests { let obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -332,7 +395,7 @@ mod tests { let obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -341,14 +404,14 @@ mod tests { let obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } #[test] fn flatten_mixed_object() { let obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(obj, ".", None, None, false).is_err()); + assert!(flatten(obj, ".", None, None, None, false).is_err()); } #[test]