Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 6 additions & 46 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,6 @@ pub const INTERNAL_STREAM_NAME: &str = "pmeta";

const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);

pub async fn sync_cache_with_ingestors(
url: &str,
ingestor: IngestorMetadata,
body: bool,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let request_body: Bytes = Bytes::from(body.to_string());
let client = reqwest::Client::new();
let resp = client
.put(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.body(request_body)
.send()
.await
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to set cache: {}\n Error: {:?}",
ingestor.domain_name,
err
);
StreamError::Network(err)
})?;

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to set cache: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.text().await
);
}

Ok(())
}

// forward the create/update stream request to all ingestors to keep them in sync
pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
Expand All @@ -122,7 +82,7 @@ pub async fn sync_streams_with_ingestors(
continue;
}
let url = format!(
"{}{}/logstream/{}",
"{}{}/logstream/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
Expand Down Expand Up @@ -176,7 +136,7 @@ pub async fn sync_users_with_roles_with_ingestors(
continue;
}
let url = format!(
"{}{}/user/{}/role",
"{}{}/user/{}/role/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -224,7 +184,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
continue;
}
let url = format!(
"{}{}/user/{}",
"{}{}/user/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -285,7 +245,7 @@ pub async fn sync_user_creation_with_ingestors(
continue;
}
let url = format!(
"{}{}/user/{}",
"{}{}/user/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -333,7 +293,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
continue;
}
let url = format!(
"{}{}/user/{}/generate-new-password",
"{}{}/user/{}/generate-new-password/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -389,7 +349,7 @@ pub async fn sync_role_update_with_ingestors(
continue;
}
let url = format!(
"{}{}/role/{}",
"{}{}/role/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
name
Expand Down
Loading
Loading