diff --git a/server/src/event.rs b/server/src/event.rs index 80acdf050..23e13fbdf 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -29,16 +29,17 @@ use std::fs::OpenOptions; use std::io::BufReader; use std::sync::Arc; use std::sync::Mutex; +use std::sync::MutexGuard; use std::sync::RwLock; use crate::metadata; -use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::response; use crate::storage::ObjectStorage; use crate::Error; type LocalWriter = Mutex>>; +type LocalWriterGuard<'a> = MutexGuard<'a, Option>>; lazy_static! { #[derive(Default)] @@ -47,45 +48,57 @@ lazy_static! { impl STREAM_WRITERS { // append to a existing stream - fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), ()> { - let hashmap_guard = STREAM_WRITERS.read().unwrap(); + fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), StreamWriterError> { + let hashmap_guard = STREAM_WRITERS + .read() + .map_err(|_| StreamWriterError::RwPoisioned)?; + match hashmap_guard.get(stream) { Some(localwriter) => { - let mut writer_guard = localwriter.lock().unwrap(); + let mut writer_guard = localwriter + .lock() + .map_err(|_| StreamWriterError::MutexPoisioned)?; + + // if it's some writer then we write without dropping any lock + // hashmap cannot be brought mutably at any point until this finishes if let Some(ref mut writer) = *writer_guard { - writer.write(record).map_err(|_| ())?; + writer.write(record).map_err(StreamWriterError::Writer)?; } else { - drop(writer_guard); - drop(hashmap_guard); - STREAM_WRITERS::set_entry(stream, record).unwrap(); + // pass on this mutex to set entry so that it can be reused + // we have a guard for underlying entry thus + // hashmap must not be availible as mutable to any other thread + STREAM_WRITERS::set_entry(writer_guard, stream, record)?; } } + // entry is not present thus we create it None => { + // this requires mutable borrow of the map so we drop this read lock and wait for write lock drop(hashmap_guard); - STREAM_WRITERS::create_entry(stream.to_string(), record).unwrap(); + STREAM_WRITERS::create_entry(stream.to_string(), record)?; } }; Ok(()) } // create a new entry with new stream_writer - // todo: error type // Only create entry for valid streams - fn create_entry(stream: String, record: &RecordBatch) -> Result<(), ()> { - let mut hashmap_guard = STREAM_WRITERS.write().unwrap(); - - if STREAM_INFO.schema(&stream).is_err() { - return Err(()); - } + fn create_entry(stream: String, record: &RecordBatch) -> Result<(), StreamWriterError> { + let mut hashmap_guard = STREAM_WRITERS + .write() + .map_err(|_| StreamWriterError::RwPoisioned)?; let file = OpenOptions::new() .append(true) .create_new(true) .open(data_file_path(&stream)) - .map_err(|_| ())?; + .map_err(StreamWriterError::Io)?; - let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?; - stream_writer.write(record).map_err(|_| ())?; + let mut stream_writer = StreamWriter::try_new(file, &record.schema()) + .expect("File and RecordBatch both are checked"); + + stream_writer + .write(record) + .map_err(StreamWriterError::Writer)?; hashmap_guard.insert(stream, Mutex::new(Some(stream_writer))); @@ -93,56 +106,68 @@ impl STREAM_WRITERS { } // Deleting a logstream requires that metadata is deleted first - pub fn delete_entry(stream: &str) -> Result<(), ()> { - let mut hashmap_guard = STREAM_WRITERS.write().unwrap(); - - if STREAM_INFO.schema(stream).is_ok() { - return Err(()); - } + pub fn delete_entry(stream: &str) -> Result<(), StreamWriterError> { + let mut hashmap_guard = STREAM_WRITERS + .write() + .map_err(|_| StreamWriterError::RwPoisioned)?; hashmap_guard.remove(stream); Ok(()) } - fn set_entry(stream: &str, record: &RecordBatch) -> Result<(), ()> { + fn set_entry( + mut writer_guard: LocalWriterGuard, + stream: &str, + record: &RecordBatch, + ) -> Result<(), StreamWriterError> { let file = OpenOptions::new() .append(true) .create_new(true) .open(data_file_path(stream)) - .map_err(|_| ())?; + .map_err(StreamWriterError::Io)?; - let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?; - stream_writer.write(record).map_err(|_| ())?; + let mut stream_writer = StreamWriter::try_new(file, &record.schema()) + .expect("File and RecordBatch both are checked"); - STREAM_WRITERS - .read() - .expect("Current Thread should not hold any lock") - .get(stream) - .expect("set entry is only called on valid entries") - .lock() - .expect("Poisioning is not handled yet") - .replace(stream_writer); // replace the stream writer behind this mutex + stream_writer + .write(record) + .map_err(StreamWriterError::Writer)?; + + writer_guard.replace(stream_writer); // replace the stream writer behind this mutex Ok(()) } // Unset the entry so that - pub fn unset_entry(stream: &str) { - let guard = STREAM_WRITERS.read().unwrap(); + pub fn unset_entry(stream: &str) -> Result<(), StreamWriterError> { + let guard = STREAM_WRITERS + .read() + .map_err(|_| StreamWriterError::RwPoisioned)?; let stream_writer = match guard.get(stream) { Some(writer) => writer, - None => return, + None => return Ok(()), }; stream_writer .lock() - .expect("Poisioning is not handled yet") + .map_err(|_| StreamWriterError::MutexPoisioned)? .take(); + + Ok(()) } } #[derive(Debug, thiserror::Error)] -enum StreamWriterError {} +pub enum StreamWriterError { + #[error("Arrow writer failed: {0}")] + Writer(arrow::error::ArrowError), + #[error("Io Error when creating new file: {0}")] + Io(std::io::Error), + #[error("RwLock was poisioned")] + RwPoisioned, + #[error("Mutex was poisioned")] + MutexPoisioned, +} fn data_file_path(stream_name: &str) -> String { format!( @@ -253,7 +278,7 @@ impl Event { let rb = event.next()?.ok_or(Error::MissingRecord)?; let stream_name = &self.stream_name; - STREAM_WRITERS::append_to_local(stream_name, &rb).map_err(|_| Error::MissingRecord)?; + STREAM_WRITERS::append_to_local(stream_name, &rb).unwrap(); Ok(0) } diff --git a/server/src/storage.rs b/server/src/storage.rs index 103fb8e4a..764b86823 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -199,7 +199,7 @@ impl StorageDir { let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp"); fs::rename(self.data_path.join("data.records"), &record_tmp_file_path) .map_err(|_| MoveDataError::Rename)?; - event::STREAM_WRITERS::unset_entry(&self.stream_name); + event::STREAM_WRITERS::unset_entry(&self.stream_name).unwrap(); let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?; let reader = StreamReader::try_new(file, None)?; let schema = reader.schema();