Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Event {
&self,
storage: &impl ObjectStorage,
) -> Result<response::EventResponse, Error> {
let schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?;
let schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
if schema.is_empty() {
self.first_event(storage).await
} else {
Expand Down Expand Up @@ -147,7 +147,7 @@ impl Event {
);

// validate schema before attempting to append to parquet file
let stream_schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?;
let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
if stream_schema != event_schema.string_schema {
return Err(Error::SchemaMismatch(self.stream_name.clone()));
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn post_event(req: HttpRequest, body: web::Json<serde_json::Value>) ->
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let labels = utils::collect_labels(&req);

if let Err(e) = metadata::STREAM_INFO.schema(stream_name.clone()) {
if let Err(e) = metadata::STREAM_INFO.schema(&stream_name) {
// if stream doesn't exist, fail to post data
return response::ServerResponse {
msg: format!(
Expand Down
14 changes: 6 additions & 8 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
.to_http();
}

if let Err(e) = metadata::STREAM_INFO.delete_stream(stream_name.to_string()) {
if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) {
return response::ServerResponse {
msg: format!(
"failed to delete log stream {} from metadata due to err: {}",
Expand All @@ -82,7 +82,7 @@ pub async fn list(_: HttpRequest) -> impl Responder {
pub async fn schema(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

match metadata::STREAM_INFO.schema(stream_name.clone()) {
match metadata::STREAM_INFO.schema(&stream_name) {
Ok(schema) => response::ServerResponse {
msg: schema,
code: StatusCode::OK,
Expand Down Expand Up @@ -116,7 +116,7 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
pub async fn get_alert(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

match metadata::STREAM_INFO.alert(stream_name.clone()) {
match metadata::STREAM_INFO.alert(&stream_name) {
Ok(alert) => response::ServerResponse {
msg: alert,
code: StatusCode::OK,
Expand Down Expand Up @@ -178,9 +178,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
// Fail if unable to create log stream on object store backend
if let Err(e) = s3.create_stream(&stream_name).await {
// delete the stream from metadata because we couldn't create it on object store backend
metadata::STREAM_INFO
.delete_stream(stream_name.to_string())
.unwrap();
metadata::STREAM_INFO.delete_stream(&stream_name).unwrap();
return response::ServerResponse {
msg: format!(
"failed to create log stream {} due to err: {}",
Expand Down Expand Up @@ -217,8 +215,8 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
.await
{
Ok(_) => {
if let Err(e) = metadata::STREAM_INFO
.set_alert(stream_name.to_string(), alert_config.to_string())
if let Err(e) =
metadata::STREAM_INFO.set_alert(stream_name.clone(), alert_config.to_string())
{
return response::ServerResponse {
msg: format!(
Expand Down
24 changes: 12 additions & 12 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use std::sync::RwLock;
use crate::error::Error;
use crate::storage::ObjectStorage;

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct LogStreamMetadata {
pub schema: String,
pub alert_config: String,
pub stats: Stats,
}

#[derive(Debug, Deserialize, Serialize, Default, Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
pub struct Stats {
pub size: u64,
pub compressed_size: u64,
Expand Down Expand Up @@ -67,29 +67,29 @@ lazy_static! {
#[allow(clippy::all)]
impl STREAM_INFO {
pub fn set_schema(&self, stream_name: String, schema: String) -> Result<(), Error> {
let alert_config = self.alert(stream_name.clone())?;
let alert_config = self.alert(&stream_name)?;
self.add_stream(stream_name, schema, alert_config)
}

pub fn schema(&self, stream_name: String) -> Result<String, Error> {
pub fn schema(&self, stream_name: &str) -> Result<String, Error> {
let map = self.read().unwrap();
let meta = map
.get(&stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name))?;
.get(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))?;

Ok(meta.schema.clone())
}

pub fn set_alert(&self, stream_name: String, alert_config: String) -> Result<(), Error> {
let schema = self.schema(stream_name.clone())?;
let schema = self.schema(&stream_name)?;
self.add_stream(stream_name, schema, alert_config)
}

pub fn alert(&self, stream_name: String) -> Result<String, Error> {
pub fn alert(&self, stream_name: &str) -> Result<String, Error> {
let map = self.read().unwrap();
let meta = map
.get(&stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name))?;
.get(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?;

Ok(meta.alert_config.clone())
}
Expand All @@ -112,10 +112,10 @@ impl STREAM_INFO {
Ok(())
}

pub fn delete_stream(&self, stream_name: String) -> Result<(), Error> {
pub fn delete_stream(&self, stream_name: &str) -> Result<(), Error> {
let mut map = self.write().unwrap();
// TODO: Add check to confirm data deletion
map.remove(&stream_name);
map.remove(stream_name);

Ok(())
}
Expand Down