diff --git a/server/src/event.rs b/server/src/event.rs index 6bdb06678..cb36fa893 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -26,7 +26,7 @@ use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; use parquet::file::properties::WriterProperties; use parquet::file::reader::SerializedFileReader; use std::fs; -use std::io::{BufReader, Cursor, Seek, SeekFrom, Write}; +use std::io::BufReader; use std::sync::Arc; use crate::metadata; @@ -59,49 +59,68 @@ impl Event { &self, storage: &impl ObjectStorage, ) -> Result { - let schema = metadata::STREAM_INFO.schema(&self.stream_name)?; - if schema.is_empty() { - self.first_event(storage).await + let Schema { + arrow_schema, + string_schema, + } = self.infer_schema().map_err(|e| { + error!("Failed to infer schema for event. {:?}", e); + e + })?; + + let event = self.get_reader(arrow_schema); + let size = self.body_size(); + + let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; + let is_first_event = stream_schema.is_empty(); + // if stream schema is empty then it is first event. + let compressed_size = if is_first_event { + // process first event and store schema in obect store + self.process_first_event(event, string_schema.clone(), storage) + .await? } else { - self.event() + // validate schema before processing the event + if stream_schema != string_schema { + return Err(Error::SchemaMismatch(self.stream_name.clone())); + } else { + self.process_event(event)? + } + }; + + if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size) + { + error!("Couldn't update stream stats. {:?}", e); } + + let msg = if is_first_event { + format!( + "Intial Event recieved for log stream {}, schema uploaded successfully", + &self.stream_name, + ) + } else { + format!("Event recieved for log stream {}", &self.stream_name) + }; + + Ok(response::EventResponse { msg }) } // This is called when the first event of a log stream is received. The first event is // special because we parse this event to generate the schema for the log stream. This // schema is then enforced on rest of the events sent to this log stream. - async fn first_event( + async fn process_first_event( &self, + mut event: json::Reader, + string_schema: String, storage: &impl ObjectStorage, - ) -> Result { - let mut c = Cursor::new(Vec::new()); - let reader = self.body.as_bytes(); - let size = reader.len() as u64; - - c.write_all(reader)?; - c.seek(SeekFrom::Start(0))?; - let buf_reader = BufReader::new(reader); - - let options = json::reader::DecoderOptions::new().with_batch_size(1024); - let mut event = json::Reader::new( - buf_reader, - Arc::new(self.infer_schema()?.arrow_schema), - options, - ); + ) -> Result { let rb = event.next()?.ok_or(Error::MissingRecord)?; // Store record batch to Parquet file on local cache let compressed_size = self.convert_arrow_parquet(rb)?; - if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size) - { - error!("Couldn't update stream stats. {:?}", e); - } // Put the inferred schema to object store - let schema = self.infer_schema()?.string_schema; let stream_name = &self.stream_name; storage - .put_schema(stream_name.clone(), schema.clone()) + .put_schema(stream_name.clone(), string_schema.clone()) .await .map_err(|e| response::EventError { msg: format!( @@ -110,81 +129,42 @@ impl Event { ), })?; + // set the schema in memory for this stream metadata::STREAM_INFO - .set_schema(stream_name.to_string(), schema) + .set_schema(self.stream_name.clone(), string_schema) .map_err(|e| response::EventError { msg: format!( "Failed to set schema for log stream {} due to err: {}", - stream_name, e + &self.stream_name, e ), })?; - Ok(response::EventResponse { - msg: format!( - "Intial Event recieved for log stream {}, schema uploaded successfully", - self.stream_name - ), - }) + Ok(compressed_size) } // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. - fn event(&self) -> Result { - let mut c = Cursor::new(Vec::new()); - let reader = self.body.as_bytes(); - let size = reader.len() as u64; - - c.write_all(reader)?; - c.seek(SeekFrom::Start(0))?; - - match self.infer_schema() { - Ok(event_schema) => { - let options = json::reader::DecoderOptions::new().with_batch_size(1024); - let mut event = json::Reader::new( - self.body.as_bytes(), - Arc::new(event_schema.arrow_schema), - options, - ); - - // validate schema before attempting to append to parquet file - let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; - if stream_schema != event_schema.string_schema { - return Err(Error::SchemaMismatch(self.stream_name.clone())); - } - - let next_event_rb = event.next()?.ok_or(Error::MissingRecord)?; - - let compressed_size = match self.convert_parquet_rb_reader() { - Ok(mut arrow_reader) => { - let mut total_size = 0; - let rb = arrow_reader.get_record_reader(2048).unwrap(); - for prev_rb in rb { - let new_rb = RecordBatch::concat( - &std::sync::Arc::new(arrow_reader.get_schema().unwrap()), - &[next_event_rb.clone(), prev_rb.unwrap()], - )?; - total_size += self.convert_arrow_parquet(new_rb)?; - } - - total_size - } - Err(_) => self.convert_arrow_parquet(next_event_rb)?, - }; - if let Err(e) = - metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size) - { - error!("Couldn't update stream stats. {:?}", e); + fn process_event(&self, mut event: json::Reader) -> Result { + let next_event_rb = event.next()?.ok_or(Error::MissingRecord)?; + + let compressed_size = match self.convert_parquet_rb_reader() { + Ok(mut arrow_reader) => { + let mut total_size = 0; + let rb = arrow_reader.get_record_reader(2048).unwrap(); + for prev_rb in rb { + let new_rb = RecordBatch::concat( + &std::sync::Arc::new(arrow_reader.get_schema().unwrap()), + &[next_event_rb.clone(), prev_rb.unwrap()], + )?; + total_size += self.convert_arrow_parquet(new_rb)?; } - Ok(response::EventResponse { - msg: format!("Event recieved for log stream {}", &self.stream_name), - }) - } - Err(e) => { - error!("Failed to infer schema for event. {:?}", e); - Err(e) + total_size } - } + Err(_) => self.convert_arrow_parquet(next_event_rb)?, + }; + + Ok(compressed_size) } // inferSchema is a constructor to Schema @@ -201,6 +181,18 @@ impl Event { }) } + fn get_reader(&self, arrow_schema: arrow::datatypes::Schema) -> json::Reader<&[u8]> { + json::Reader::new( + self.body.as_bytes(), + Arc::new(arrow_schema), + json::reader::DecoderOptions::new().with_batch_size(1024), + ) + } + + fn body_size(&self) -> u64 { + self.body.as_bytes().len() as u64 + } + // convert arrow record batch to parquet // and write it to local cache path as a data.parquet file. fn convert_arrow_parquet(&self, rb: RecordBatch) -> Result {