Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 68 additions & 43 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ use std::fs::OpenOptions;
use std::io::BufReader;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::sync::RwLock;

use crate::metadata;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::response;
use crate::storage::ObjectStorage;
use crate::Error;

type LocalWriter = Mutex<Option<StreamWriter<std::fs::File>>>;
type LocalWriterGuard<'a> = MutexGuard<'a, Option<StreamWriter<std::fs::File>>>;

lazy_static! {
#[derive(Default)]
Expand All @@ -47,102 +48,126 @@ lazy_static! {

impl STREAM_WRITERS {
// append to a existing stream
fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), ()> {
let hashmap_guard = STREAM_WRITERS.read().unwrap();
fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), StreamWriterError> {
let hashmap_guard = STREAM_WRITERS
.read()
.map_err(|_| StreamWriterError::RwPoisioned)?;

match hashmap_guard.get(stream) {
Some(localwriter) => {
let mut writer_guard = localwriter.lock().unwrap();
let mut writer_guard = localwriter
.lock()
.map_err(|_| StreamWriterError::MutexPoisioned)?;

// if it's some writer then we write without dropping any lock
// hashmap cannot be brought mutably at any point until this finishes
if let Some(ref mut writer) = *writer_guard {
writer.write(record).map_err(|_| ())?;
writer.write(record).map_err(StreamWriterError::Writer)?;
} else {
drop(writer_guard);
drop(hashmap_guard);
STREAM_WRITERS::set_entry(stream, record).unwrap();
// pass on this mutex to set entry so that it can be reused
// we have a guard for underlying entry thus
// hashmap must not be availible as mutable to any other thread
STREAM_WRITERS::set_entry(writer_guard, stream, record)?;
}
}
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
drop(hashmap_guard);
STREAM_WRITERS::create_entry(stream.to_string(), record).unwrap();
STREAM_WRITERS::create_entry(stream.to_string(), record)?;
}
};
Ok(())
}

// create a new entry with new stream_writer
// todo: error type
// Only create entry for valid streams
fn create_entry(stream: String, record: &RecordBatch) -> Result<(), ()> {
let mut hashmap_guard = STREAM_WRITERS.write().unwrap();

if STREAM_INFO.schema(&stream).is_err() {
return Err(());
}
fn create_entry(stream: String, record: &RecordBatch) -> Result<(), StreamWriterError> {
let mut hashmap_guard = STREAM_WRITERS
.write()
.map_err(|_| StreamWriterError::RwPoisioned)?;

let file = OpenOptions::new()
.append(true)
.create_new(true)
.open(data_file_path(&stream))
.map_err(|_| ())?;
.map_err(StreamWriterError::Io)?;

let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?;
stream_writer.write(record).map_err(|_| ())?;
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
.expect("File and RecordBatch both are checked");

stream_writer
.write(record)
.map_err(StreamWriterError::Writer)?;

hashmap_guard.insert(stream, Mutex::new(Some(stream_writer)));

Ok(())
}

// Deleting a logstream requires that metadata is deleted first
pub fn delete_entry(stream: &str) -> Result<(), ()> {
let mut hashmap_guard = STREAM_WRITERS.write().unwrap();

if STREAM_INFO.schema(stream).is_ok() {
return Err(());
}
pub fn delete_entry(stream: &str) -> Result<(), StreamWriterError> {
let mut hashmap_guard = STREAM_WRITERS
.write()
.map_err(|_| StreamWriterError::RwPoisioned)?;

hashmap_guard.remove(stream);

Ok(())
}

fn set_entry(stream: &str, record: &RecordBatch) -> Result<(), ()> {
fn set_entry(
mut writer_guard: LocalWriterGuard,
stream: &str,
record: &RecordBatch,
) -> Result<(), StreamWriterError> {
let file = OpenOptions::new()
.append(true)
.create_new(true)
.open(data_file_path(stream))
.map_err(|_| ())?;
.map_err(StreamWriterError::Io)?;

let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?;
stream_writer.write(record).map_err(|_| ())?;
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
.expect("File and RecordBatch both are checked");

STREAM_WRITERS
.read()
.expect("Current Thread should not hold any lock")
.get(stream)
.expect("set entry is only called on valid entries")
.lock()
.expect("Poisioning is not handled yet")
.replace(stream_writer); // replace the stream writer behind this mutex
stream_writer
.write(record)
.map_err(StreamWriterError::Writer)?;

writer_guard.replace(stream_writer); // replace the stream writer behind this mutex

Ok(())
}

// Unset the entry so that
pub fn unset_entry(stream: &str) {
let guard = STREAM_WRITERS.read().unwrap();
pub fn unset_entry(stream: &str) -> Result<(), StreamWriterError> {
let guard = STREAM_WRITERS
.read()
.map_err(|_| StreamWriterError::RwPoisioned)?;
let stream_writer = match guard.get(stream) {
Some(writer) => writer,
None => return,
None => return Ok(()),
};
stream_writer
.lock()
.expect("Poisioning is not handled yet")
.map_err(|_| StreamWriterError::MutexPoisioned)?
.take();

Ok(())
}
}

#[derive(Debug, thiserror::Error)]
enum StreamWriterError {}
pub enum StreamWriterError {
#[error("Arrow writer failed: {0}")]
Writer(arrow::error::ArrowError),
#[error("Io Error when creating new file: {0}")]
Io(std::io::Error),
#[error("RwLock was poisioned")]
RwPoisioned,
#[error("Mutex was poisioned")]
MutexPoisioned,
}

fn data_file_path(stream_name: &str) -> String {
format!(
Expand Down Expand Up @@ -253,7 +278,7 @@ impl Event {
let rb = event.next()?.ok_or(Error::MissingRecord)?;
let stream_name = &self.stream_name;

STREAM_WRITERS::append_to_local(stream_name, &rb).map_err(|_| Error::MissingRecord)?;
STREAM_WRITERS::append_to_local(stream_name, &rb).unwrap();

Ok(0)
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl StorageDir {
let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp");
fs::rename(self.data_path.join("data.records"), &record_tmp_file_path)
.map_err(|_| MoveDataError::Rename)?;
event::STREAM_WRITERS::unset_entry(&self.stream_name);
event::STREAM_WRITERS::unset_entry(&self.stream_name).unwrap();
let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?;
let reader = StreamReader::try_new(file, None)?;
let schema = reader.schema();
Expand Down