diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 108252af0..732fae020 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -23,9 +23,11 @@ use crate::handlers::http::cluster::utils::{ }; use crate::handlers::http::ingest::{ingest_internal_stream, PostError}; use crate::handlers::http::logstream::error::StreamError; +use crate::handlers::http::role::RoleError; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; +use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; use crate::stats::Stats; use crate::storage::object_storage::ingestor_metadata_path; @@ -364,6 +366,63 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), Ok(()) } +// forward the put role request to all ingestors to keep them in sync +pub async fn sync_role_update_with_ingestors( + name: String, + body: Vec, +) -> Result<(), RoleError> { + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + RoleError::Anyhow(err) + })?; + + let roles = to_vec(&body).map_err(|err| { + log::error!("Fatal: failed to serialize roles: {:?}", err); + RoleError::SerdeError(err) + })?; + let roles = Bytes::from(roles); + let client = reqwest::Client::new(); + + for ingestor in ingestor_infos.iter() { + if !utils::check_liveness(&ingestor.domain_name).await { + log::warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } + let url = format!( + "{}{}/role/{}", + ingestor.domain_name, + base_path_without_preceding_slash(), + name + ); + + let res = client + .put(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .body(roles.clone()) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + RoleError::Network(err) + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await + ); + } + } + + Ok(()) +} + pub async fn fetch_daily_stats_from_ingestors( stream_name: &str, date: &str, diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 43491e0d2..9e6203018 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -186,6 +186,12 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result Result { let creds = extract_session_key(req); - let stream = req.match_info().get("logstream"); + let mut stream = req.match_info().get("logstream"); + if stream.is_none() { + if let Some((_, stream_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + { + stream = Some(stream_name.to_str().unwrap()); + } + } creds.map(|key| Users.authorize(key, action, stream, None)) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index aec5bd612..895fb8b8a 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -185,6 +185,7 @@ impl IngestServer { .service(Self::analytics_factory()) .service(Server::get_liveness_factory()) .service(Self::get_user_webscope()) + .service(Server::get_user_role_webscope()) .service(Server::get_metrics_webscope()) .service(Server::get_readiness_factory()), ) diff --git a/server/src/handlers/http/role.rs b/server/src/handlers/http/role.rs index 56759fec5..22f61ba05 100644 --- a/server/src/handlers/http/role.rs +++ b/server/src/handlers/http/role.rs @@ -17,10 +17,11 @@ */ use actix_web::{http::header::ContentType, web, HttpResponse, Responder}; +use bytes::Bytes; use http::StatusCode; use crate::{ - option::CONFIG, + option::{Mode, CONFIG}, rbac::{ map::{mut_roles, DEFAULT_ROLE}, role::model::DefaultPrivilege, @@ -28,18 +29,26 @@ use crate::{ storage::{self, ObjectStorageError, StorageMetadata}, }; +use super::cluster::sync_role_update_with_ingestors; + // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one -pub async fn put( - name: web::Path, - body: web::Json>, -) -> Result { +pub async fn put(name: web::Path, body: Bytes) -> Result { let name = name.into_inner(); - let privileges = body.into_inner(); + let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); - put_metadata(&metadata).await?; - mut_roles().insert(name, privileges); + if CONFIG.parseable.mode == Mode::Ingest { + let _ = storage::put_staging_metadata(&metadata); + mut_roles().insert(name.clone(), privileges.clone()); + } else { + put_metadata(&metadata).await?; + mut_roles().insert(name.clone(), privileges.clone()); + if CONFIG.parseable.mode == Mode::Query { + sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?; + } + } + Ok(HttpResponse::Ok().finish()) } @@ -118,6 +127,12 @@ pub enum RoleError { ObjectStorageError(#[from] ObjectStorageError), #[error("Cannot perform this operation as role is assigned to an existing user.")] RoleInUse, + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), + #[error("{0}")] + SerdeError(#[from] serde_json::Error), + #[error("Network Error: {0}")] + Network(#[from] reqwest::Error), } impl actix_web::ResponseError for RoleError { @@ -125,6 +140,9 @@ impl actix_web::ResponseError for RoleError { match self { Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::RoleInUse => StatusCode::BAD_REQUEST, + Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::SerdeError(_) => StatusCode::BAD_REQUEST, + Self::Network(_) => StatusCode::BAD_GATEWAY, } } diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 0cbd21554..522c5a895 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -234,6 +234,7 @@ pub mod model { Action::GetDashboard, Action::CreateDashboard, Action::DeleteDashboard, + Action::GetUserRoles, ], stream: Some("*".to_string()), tag: None, @@ -269,6 +270,7 @@ pub mod model { Action::ListFilter, Action::CreateFilter, Action::DeleteFilter, + Action::GetUserRoles, ], stream: None, tag: None, @@ -294,6 +296,7 @@ pub mod model { Action::CreateDashboard, Action::DeleteDashboard, Action::GetStreamInfo, + Action::GetUserRoles, ], stream: None, tag: None,