Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 80 additions & 88 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::properties::WriterProperties;
use parquet::file::reader::SerializedFileReader;
use std::fs;
use std::io::{BufReader, Cursor, Seek, SeekFrom, Write};
use std::io::BufReader;
use std::sync::Arc;

use crate::metadata;
Expand Down Expand Up @@ -59,49 +59,68 @@ impl Event {
&self,
storage: &impl ObjectStorage,
) -> Result<response::EventResponse, Error> {
let schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
if schema.is_empty() {
self.first_event(storage).await
let Schema {
arrow_schema,
string_schema,
} = self.infer_schema().map_err(|e| {
error!("Failed to infer schema for event. {:?}", e);
e
})?;

let event = self.get_reader(arrow_schema);
let size = self.body_size();

let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
let is_first_event = stream_schema.is_empty();
// if stream schema is empty then it is first event.
let compressed_size = if is_first_event {
// process first event and store schema in obect store
self.process_first_event(event, string_schema.clone(), storage)
.await?
} else {
self.event()
// validate schema before processing the event
if stream_schema != string_schema {
return Err(Error::SchemaMismatch(self.stream_name.clone()));
} else {
self.process_event(event)?
}
};

if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size)
{
error!("Couldn't update stream stats. {:?}", e);
}

let msg = if is_first_event {
format!(
"Intial Event recieved for log stream {}, schema uploaded successfully",
&self.stream_name,
)
} else {
format!("Event recieved for log stream {}", &self.stream_name)
};

Ok(response::EventResponse { msg })
}

// This is called when the first event of a log stream is received. The first event is
// special because we parse this event to generate the schema for the log stream. This
// schema is then enforced on rest of the events sent to this log stream.
async fn first_event(
async fn process_first_event<R: std::io::Read>(
&self,
mut event: json::Reader<R>,
string_schema: String,
storage: &impl ObjectStorage,
) -> Result<response::EventResponse, Error> {
let mut c = Cursor::new(Vec::new());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Underlying memory for this Cursor is a newly allocated Vec. Reader is then written in this vec but there is no usage of c beyond these 3 line. Is this dead code ?

let reader = self.body.as_bytes();
let size = reader.len() as u64;

c.write_all(reader)?;
c.seek(SeekFrom::Start(0))?;
let buf_reader = BufReader::new(reader);

let options = json::reader::DecoderOptions::new().with_batch_size(1024);
let mut event = json::Reader::new(
buf_reader,
Arc::new(self.infer_schema()?.arrow_schema),
options,
);
) -> Result<u64, Error> {
let rb = event.next()?.ok_or(Error::MissingRecord)?;

// Store record batch to Parquet file on local cache
let compressed_size = self.convert_arrow_parquet(rb)?;
if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size)
{
error!("Couldn't update stream stats. {:?}", e);
}

// Put the inferred schema to object store
let schema = self.infer_schema()?.string_schema;
let stream_name = &self.stream_name;
storage
.put_schema(stream_name.clone(), schema.clone())
.put_schema(stream_name.clone(), string_schema.clone())
.await
.map_err(|e| response::EventError {
msg: format!(
Expand All @@ -110,81 +129,42 @@ impl Event {
),
})?;

// set the schema in memory for this stream
metadata::STREAM_INFO
.set_schema(stream_name.to_string(), schema)
.set_schema(self.stream_name.clone(), string_schema)
.map_err(|e| response::EventError {
msg: format!(
"Failed to set schema for log stream {} due to err: {}",
stream_name, e
&self.stream_name, e
),
})?;

Ok(response::EventResponse {
msg: format!(
"Intial Event recieved for log stream {}, schema uploaded successfully",
self.stream_name
),
})
Ok(compressed_size)
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn event(&self) -> Result<response::EventResponse, Error> {
let mut c = Cursor::new(Vec::new());
let reader = self.body.as_bytes();
let size = reader.len() as u64;

c.write_all(reader)?;
c.seek(SeekFrom::Start(0))?;

match self.infer_schema() {
Ok(event_schema) => {
let options = json::reader::DecoderOptions::new().with_batch_size(1024);
let mut event = json::Reader::new(
self.body.as_bytes(),
Arc::new(event_schema.arrow_schema),
options,
);

// validate schema before attempting to append to parquet file
let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
if stream_schema != event_schema.string_schema {
return Err(Error::SchemaMismatch(self.stream_name.clone()));
}

let next_event_rb = event.next()?.ok_or(Error::MissingRecord)?;

let compressed_size = match self.convert_parquet_rb_reader() {
Ok(mut arrow_reader) => {
let mut total_size = 0;
let rb = arrow_reader.get_record_reader(2048).unwrap();
for prev_rb in rb {
let new_rb = RecordBatch::concat(
&std::sync::Arc::new(arrow_reader.get_schema().unwrap()),
&[next_event_rb.clone(), prev_rb.unwrap()],
)?;
total_size += self.convert_arrow_parquet(new_rb)?;
}

total_size
}
Err(_) => self.convert_arrow_parquet(next_event_rb)?,
};
if let Err(e) =
metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size)
{
error!("Couldn't update stream stats. {:?}", e);
fn process_event<R: std::io::Read>(&self, mut event: json::Reader<R>) -> Result<u64, Error> {
let next_event_rb = event.next()?.ok_or(Error::MissingRecord)?;

let compressed_size = match self.convert_parquet_rb_reader() {
Ok(mut arrow_reader) => {
let mut total_size = 0;
let rb = arrow_reader.get_record_reader(2048).unwrap();
for prev_rb in rb {
let new_rb = RecordBatch::concat(
&std::sync::Arc::new(arrow_reader.get_schema().unwrap()),
&[next_event_rb.clone(), prev_rb.unwrap()],
)?;
total_size += self.convert_arrow_parquet(new_rb)?;
}

Ok(response::EventResponse {
msg: format!("Event recieved for log stream {}", &self.stream_name),
})
}
Err(e) => {
error!("Failed to infer schema for event. {:?}", e);
Err(e)
total_size
}
}
Err(_) => self.convert_arrow_parquet(next_event_rb)?,
};

Ok(compressed_size)
}

// inferSchema is a constructor to Schema
Expand All @@ -201,6 +181,18 @@ impl Event {
})
}

fn get_reader(&self, arrow_schema: arrow::datatypes::Schema) -> json::Reader<&[u8]> {
json::Reader::new(
self.body.as_bytes(),
Arc::new(arrow_schema),
json::reader::DecoderOptions::new().with_batch_size(1024),
)
}

fn body_size(&self) -> u64 {
self.body.as_bytes().len() as u64
}

// convert arrow record batch to parquet
// and write it to local cache path as a data.parquet file.
fn convert_arrow_parquet(&self, rb: RecordBatch) -> Result<u64, Error> {
Expand Down