From 77a015936fb68761c9d2ded14614cd6aa6de3301 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sun, 14 Aug 2022 15:51:37 +0530 Subject: [PATCH] change stream_name type --- server/src/event.rs | 4 ++-- server/src/handlers/event.rs | 2 +- server/src/handlers/logstream.rs | 14 ++++++-------- server/src/metadata.rs | 24 ++++++++++++------------ 4 files changed, 21 insertions(+), 23 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index ca63f967c..6bdb06678 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -59,7 +59,7 @@ impl Event { &self, storage: &impl ObjectStorage, ) -> Result { - let schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?; + let schema = metadata::STREAM_INFO.schema(&self.stream_name)?; if schema.is_empty() { self.first_event(storage).await } else { @@ -147,7 +147,7 @@ impl Event { ); // validate schema before attempting to append to parquet file - let stream_schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?; + let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; if stream_schema != event_schema.string_schema { return Err(Error::SchemaMismatch(self.stream_name.clone())); } diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 6508529e9..660832b0b 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -76,7 +76,7 @@ pub async fn post_event(req: HttpRequest, body: web::Json) -> let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let labels = utils::collect_labels(&req); - if let Err(e) = metadata::STREAM_INFO.schema(stream_name.clone()) { + if let Err(e) = metadata::STREAM_INFO.schema(&stream_name) { // if stream doesn't exist, fail to post data return response::ServerResponse { msg: format!( diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index ced5afa3c..d77e74953 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -57,7 +57,7 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { .to_http(); } - if let Err(e) = metadata::STREAM_INFO.delete_stream(stream_name.to_string()) { + if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) { return response::ServerResponse { msg: format!( "failed to delete log stream {} from metadata due to err: {}", @@ -82,7 +82,7 @@ pub async fn list(_: HttpRequest) -> impl Responder { pub async fn schema(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - match metadata::STREAM_INFO.schema(stream_name.clone()) { + match metadata::STREAM_INFO.schema(&stream_name) { Ok(schema) => response::ServerResponse { msg: schema, code: StatusCode::OK, @@ -116,7 +116,7 @@ pub async fn schema(req: HttpRequest) -> HttpResponse { pub async fn get_alert(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - match metadata::STREAM_INFO.alert(stream_name.clone()) { + match metadata::STREAM_INFO.alert(&stream_name) { Ok(alert) => response::ServerResponse { msg: alert, code: StatusCode::OK, @@ -178,9 +178,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse { // Fail if unable to create log stream on object store backend if let Err(e) = s3.create_stream(&stream_name).await { // delete the stream from metadata because we couldn't create it on object store backend - metadata::STREAM_INFO - .delete_stream(stream_name.to_string()) - .unwrap(); + metadata::STREAM_INFO.delete_stream(&stream_name).unwrap(); return response::ServerResponse { msg: format!( "failed to create log stream {} due to err: {}", @@ -217,8 +215,8 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> .await { Ok(_) => { - if let Err(e) = metadata::STREAM_INFO - .set_alert(stream_name.to_string(), alert_config.to_string()) + if let Err(e) = + metadata::STREAM_INFO.set_alert(stream_name.clone(), alert_config.to_string()) { return response::ServerResponse { msg: format!( diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a8f3f77c4..73717ebbc 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -26,14 +26,14 @@ use std::sync::RwLock; use crate::error::Error; use crate::storage::ObjectStorage; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct LogStreamMetadata { pub schema: String, pub alert_config: String, pub stats: Stats, } -#[derive(Debug, Deserialize, Serialize, Default, Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)] pub struct Stats { pub size: u64, pub compressed_size: u64, @@ -67,29 +67,29 @@ lazy_static! { #[allow(clippy::all)] impl STREAM_INFO { pub fn set_schema(&self, stream_name: String, schema: String) -> Result<(), Error> { - let alert_config = self.alert(stream_name.clone())?; + let alert_config = self.alert(&stream_name)?; self.add_stream(stream_name, schema, alert_config) } - pub fn schema(&self, stream_name: String) -> Result { + pub fn schema(&self, stream_name: &str) -> Result { let map = self.read().unwrap(); let meta = map - .get(&stream_name) - .ok_or(Error::StreamMetaNotFound(stream_name))?; + .get(stream_name) + .ok_or(Error::StreamMetaNotFound(stream_name.to_string()))?; Ok(meta.schema.clone()) } pub fn set_alert(&self, stream_name: String, alert_config: String) -> Result<(), Error> { - let schema = self.schema(stream_name.clone())?; + let schema = self.schema(&stream_name)?; self.add_stream(stream_name, schema, alert_config) } - pub fn alert(&self, stream_name: String) -> Result { + pub fn alert(&self, stream_name: &str) -> Result { let map = self.read().unwrap(); let meta = map - .get(&stream_name) - .ok_or(Error::StreamMetaNotFound(stream_name))?; + .get(stream_name) + .ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?; Ok(meta.alert_config.clone()) } @@ -112,10 +112,10 @@ impl STREAM_INFO { Ok(()) } - pub fn delete_stream(&self, stream_name: String) -> Result<(), Error> { + pub fn delete_stream(&self, stream_name: &str) -> Result<(), Error> { let mut map = self.write().unwrap(); // TODO: Add check to confirm data deletion - map.remove(&stream_name); + map.remove(stream_name); Ok(()) }