From 0c222d73cf474003321fde58c26a8519c060e1b8 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 15 May 2023 19:45:34 +0530 Subject: [PATCH 01/16] Add multi user support --- Cargo.lock | 29 ++++++++++ server/Cargo.toml | 1 + server/src/handlers/http.rs | 19 ++++-- server/src/handlers/http/rbac.rs | 73 +++++++++++++++++++++++ server/src/main.rs | 4 +- server/src/rbac.rs | 22 +++++++ server/src/rbac/user.rs | 87 ++++++++++++++++++++++++++++ server/src/storage/store_metadata.rs | 29 ++++++---- 8 files changed, 248 insertions(+), 16 deletions(-) create mode 100644 server/src/handlers/http/rbac.rs create mode 100644 server/src/rbac.rs create mode 100644 server/src/rbac/user.rs diff --git a/Cargo.lock b/Cargo.lock index d66dbc4cc..6340b6b17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -341,6 +341,17 @@ dependencies = [ "backtrace", ] +[[package]] +name = "argon2" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95c2fcf79ad1932ac6269a738109997a83c227c09b75842ae564dc8ede6a861c" +dependencies = [ + "base64ct", + "blake2", + "password-hash", +] + [[package]] name = "arrayref" version = "0.3.6" @@ -955,6 +966,12 @@ dependencies = [ "simd-abstraction", ] +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bitflags" version = "1.3.2" @@ -2946,6 +2963,7 @@ dependencies = [ "actix-web-prometheus", "actix-web-static-files", "anyhow", + "argon2", "arrow-array", "arrow-ipc", "arrow-json", @@ -3007,6 +3025,17 @@ dependencies = [ "zip", ] +[[package]] +name = "password-hash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166" +dependencies = [ + "base64ct", + "rand_core", + "subtle", +] + [[package]] name = "paste" version = "1.0.11" diff --git a/server/Cargo.toml b/server/Cargo.toml index 53d11d046..8b821217f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -78,6 +78,7 @@ pyroscope = { version = "0.5.3", optional = true } pyroscope_pprofrs = { version = "0.2", optional = true } uptime_lib = "0.2.2" regex = "1.7.3" +argon2 = "0.5.0" [build-dependencies] static-files = "0.2" diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 982d80e08..ac313c9b5 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -30,11 +30,13 @@ use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys}; use crate::option::CONFIG; +use crate::rbac::get_user_map; mod health_check; mod ingest; mod logstream; mod query; +mod rbac; include!(concat!(env!("OUT_DIR"), "/generated.rs")); @@ -63,10 +65,13 @@ async fn validator( req: ServiceRequest, credentials: BasicAuth, ) -> Result { - if credentials.user_id().trim() == CONFIG.parseable.username - && credentials.password().unwrap().trim() == CONFIG.parseable.password - { - return Ok(req); + let username = credentials.user_id().trim(); + let password = credentials.password().unwrap().trim(); + + if let Some(user) = get_user_map().read().unwrap().get(username) { + if user.verify(password) { + return Ok(req); + } } Err((actix_web::error::ErrorUnauthorized("Unauthorized"), req)) @@ -158,6 +163,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { .route(web::get().to(logstream::get_retention)), ); + let user_api = web::scope("/user") + .service(web::resource("/create/{username}").route(web::post().to(rbac::put_user))) + .service(web::resource("/reset/{username}").route(web::put().to(rbac::reset_password))) + .service(web::resource("/delete/{username}").route(web::get().to(rbac::delete_user))); + cfg.service( // Base path "{url}/api/v1" web::scope(&base_path()) @@ -184,6 +194,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { logstream_api, ), ) + .service(user_api) .wrap(HttpAuthentication::basic(validator)), ) // GET "/" ==> Serve the static frontend directory diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs new file mode 100644 index 000000000..8783d0720 --- /dev/null +++ b/server/src/handlers/http/rbac.rs @@ -0,0 +1,73 @@ +use crate::rbac::{ + get_user_map, + user::{PassCode, User}, +}; +use actix_web::{http::header::ContentType, web, Responder}; +use http::StatusCode; + +pub async fn put_user(username: web::Path) -> Result { + let username = username.into_inner(); + let mut user_map = get_user_map().write().unwrap(); + // fail this request if the user is already in the map + // there is an exisiting config for this user + if user_map.contains_key(&username) { + return Err(RBACError::UserExists); + } + // todo: update parseable.json first + let (user, password) = User::create_new(username); + // set this user to user map + user_map.insert(user); + Ok(password) +} + +pub async fn reset_password(username: web::Path) -> Result { + let username = username.into_inner(); + let mut user_map = get_user_map().write().unwrap(); + // fail this request if the user does not exists + let Some(user) = user_map.get_mut(&username) else { + return Err(RBACError::UserDoesNotExist); + }; + // get new password for this user + let PassCode { password, hashcode } = User::gen_new_password(); + // todo: update parseable.json first + // update in mem table + user.password = hashcode; + + Ok(password) +} + +pub async fn delete_user(username: web::Path) -> Result { + let username = username.into_inner(); + let mut user_map = get_user_map().write().unwrap(); + // fail this request if the user does not exists + if !user_map.contains_key(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // todo: delete from parseable.json first + // update in mem table + user_map.remove(&username); + Ok("deleted user") +} + +#[derive(Debug, thiserror::Error)] +pub enum RBACError { + #[error("User exists already")] + UserExists, + #[error("User does not exist")] + UserDoesNotExist, +} + +impl actix_web::ResponseError for RBACError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::UserExists => StatusCode::BAD_REQUEST, + Self::UserDoesNotExist => StatusCode::NOT_FOUND, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 1c2f0321a..c72c82d7b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -41,6 +41,7 @@ mod metrics; mod migration; mod option; mod query; +mod rbac; mod response; mod stats; mod storage; @@ -61,8 +62,9 @@ async fn main() -> anyhow::Result<()> { let storage = CONFIG.storage().get_object_store(); CONFIG.validate_staging()?; let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; + rbac::set_user_map(Vec::new()); metadata.set_global(); - banner::print(&CONFIG, storage::StorageMetadata::global()).await; let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); diff --git a/server/src/rbac.rs b/server/src/rbac.rs new file mode 100644 index 000000000..f2ba3e782 --- /dev/null +++ b/server/src/rbac.rs @@ -0,0 +1,22 @@ +use std::sync::RwLock; + +use once_cell::sync::OnceCell; + +use self::user::{get_admin_user, User, UserMap}; + +pub mod user; + +pub static USERS: OnceCell> = OnceCell::new(); + +pub fn get_user_map() -> &'static RwLock { + USERS.get().expect("user map is set") +} + +pub fn set_user_map(users: Vec) { + let mut map = UserMap::default(); + for user in users { + map.insert(user) + } + map.insert(get_admin_user()); + USERS.set(RwLock::new(map)).expect("map is only set once") +} diff --git a/server/src/rbac/user.rs b/server/src/rbac/user.rs new file mode 100644 index 000000000..be2b75482 --- /dev/null +++ b/server/src/rbac/user.rs @@ -0,0 +1,87 @@ +use std::collections::HashMap; + +use argon2::{ + password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, + Argon2, PasswordHash, PasswordVerifier, +}; + +use rand::distributions::{Alphanumeric, DistString}; + +use crate::option::CONFIG; + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct User { + pub username: String, + pub password: String, + // fill this + pub role: Vec<()>, +} + +impl User { + // create a new User and return self with password generated for said user. + pub fn create_new(username: String) -> (Self, String) { + let PassCode { password, hashcode } = Self::gen_new_password(); + ( + Self { + username, + password: hashcode, + role: Vec::new(), + }, + password, + ) + } + + // Verification works because the PasswordHash is in PHC format + // $[$v=][$=(,=)*][$[$]] + // ref https://github.com/P-H-C/phc-string-format/blob/master/phc-sf-spec.md#specification + pub fn verify(&self, password: &str) -> bool { + let parsed_hash = PasswordHash::new(&self.password).unwrap(); + Argon2::default() + .verify_password(password.as_bytes(), &parsed_hash) + .is_ok() + } + + // gen new password + pub fn gen_new_password() -> PassCode { + let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + let hashcode = gen_hash(&password); + PassCode { password, hashcode } + } +} + +fn gen_hash(password: &str) -> String { + let salt = SaltString::generate(&mut OsRng); + let argon2 = Argon2::default(); + let hashcode = argon2 + .hash_password(password.as_bytes(), &salt) + .expect("can hash random alphanumeric") + .to_string(); + + hashcode +} + +#[derive(Debug, Default, derive_more::Deref, derive_more::DerefMut)] +pub struct UserMap(HashMap); + +impl UserMap { + pub fn insert(&mut self, user: User) { + self.0.insert(user.username.clone(), user); + } +} + +pub struct PassCode { + pub password: String, + pub hashcode: String, +} + +pub fn get_admin_user() -> User { + let username = CONFIG.parseable.username.clone(); + let password = CONFIG.parseable.password.clone(); + let hashcode = gen_hash(&password); + + User { + username, + password: hashcode, + role: Vec::new(), + } +} diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 498861407..d19981221 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -24,12 +24,21 @@ use std::{ use once_cell::sync::OnceCell; use std::io; -use crate::{option::CONFIG, utils::uid}; +use crate::{option::CONFIG, rbac::user::User, utils::uid}; use super::object_storage::PARSEABLE_METADATA_FILE_NAME; -pub static STORAGE_METADATA: OnceCell = OnceCell::new(); +// Expose some static variables for internal usage +pub static STORAGE_METADATA: OnceCell = OnceCell::new(); +// For use in global static +#[derive(Debug, PartialEq, Eq)] +pub struct StaticStorageMetadata { + pub mode: String, + pub deployment_id: uid::Uid, +} + +// Type for serialization and deserialization #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct StorageMetadata { pub version: String, @@ -42,13 +51,6 @@ pub struct StorageMetadata { pub stream: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct User { - username: String, - password: String, - role: String, -} - impl StorageMetadata { pub fn new() -> Self { Self { @@ -62,14 +64,19 @@ impl StorageMetadata { } } - pub fn global() -> &'static Self { + pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get() .expect("gloabal static is initialized") } pub fn set_global(self) { - STORAGE_METADATA.set(self).expect("only set once") + let metadata = StaticStorageMetadata { + mode: self.mode, + deployment_id: self.deployment_id, + }; + + STORAGE_METADATA.set(metadata).expect("only set once") } } From 68f4ac6dbf2e8f25d5da27e127a768d91be5bd09 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 16 May 2023 12:26:10 +0530 Subject: [PATCH 02/16] Update metadata on storage --- server/src/handlers/http/rbac.rs | 95 ++++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 17 deletions(-) diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 8783d0720..056fbfa62 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -1,52 +1,110 @@ -use crate::rbac::{ - get_user_map, - user::{PassCode, User}, +use crate::{ + option::CONFIG, + rbac::{ + get_user_map, + user::{PassCode, User}, + }, + storage::{ObjectStorageError, StorageMetadata}, }; use actix_web::{http::header::ContentType, web, Responder}; use http::StatusCode; +use tokio::sync::Mutex; + +// async aware lock for updating storage metadata and user map atomicically +static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); pub async fn put_user(username: web::Path) -> Result { let username = username.into_inner(); - let mut user_map = get_user_map().write().unwrap(); + // get exclusive lock + let _ = UPDATE_LOCK.lock().await; // fail this request if the user is already in the map // there is an exisiting config for this user - if user_map.contains_key(&username) { + if get_user_map().read().unwrap().contains_key(&username) { + return Err(RBACError::UserExists); + } + + let mut metadata = get_metadata().await?; + if metadata.user.iter().any(|user| user.username == username) { + // should be unreachable given state is always consistent return Err(RBACError::UserExists); } - // todo: update parseable.json first + let (user, password) = User::create_new(username); + metadata.user.push(user.clone()); + put_metadata(&metadata).await?; // set this user to user map - user_map.insert(user); + get_user_map().write().unwrap().insert(user); + Ok(password) } pub async fn reset_password(username: web::Path) -> Result { let username = username.into_inner(); - let mut user_map = get_user_map().write().unwrap(); + // get exclusive lock + let _ = UPDATE_LOCK.lock().await; // fail this request if the user does not exists - let Some(user) = user_map.get_mut(&username) else { + if !get_user_map().read().unwrap().contains_key(&username) { return Err(RBACError::UserDoesNotExist); - }; + } // get new password for this user let PassCode { password, hashcode } = User::gen_new_password(); - // todo: update parseable.json first + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .user + .iter_mut() + .find(|user| user.username == username) + { + user.password = hashcode.clone(); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata).await?; + // update in mem table - user.password = hashcode; + get_user_map() + .write() + .unwrap() + .get_mut(&username) + .expect("checked that user exists in map") + .password = hashcode; Ok(password) } pub async fn delete_user(username: web::Path) -> Result { let username = username.into_inner(); - let mut user_map = get_user_map().write().unwrap(); + let _ = UPDATE_LOCK.lock().await; // fail this request if the user does not exists - if !user_map.contains_key(&username) { + if !get_user_map().read().unwrap().contains_key(&username) { return Err(RBACError::UserDoesNotExist); }; - // todo: delete from parseable.json first + // delete from parseable.json first + let mut metadata = get_metadata().await?; + metadata.user.retain(|user| user.username != username); + put_metadata(&metadata).await?; // update in mem table - user_map.remove(&username); - Ok("deleted user") + get_user_map().write().unwrap().remove(&username); + Ok(format!("deleted user: {}", username)) +} + +async fn get_metadata() -> Result { + let metadata = CONFIG + .storage() + .get_object_store() + .get_metadata() + .await? + .expect("metadata is initialized"); + Ok(metadata) +} + +async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { + CONFIG + .storage() + .get_object_store() + .put_metadata(metadata) + .await } #[derive(Debug, thiserror::Error)] @@ -55,6 +113,8 @@ pub enum RBACError { UserExists, #[error("User does not exist")] UserDoesNotExist, + #[error("Failed to connect to storage: {0}")] + ObjectStorageError(#[from] ObjectStorageError), } impl actix_web::ResponseError for RBACError { @@ -62,6 +122,7 @@ impl actix_web::ResponseError for RBACError { match self { Self::UserExists => StatusCode::BAD_REQUEST, Self::UserDoesNotExist => StatusCode::NOT_FOUND, + Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } From 4c52e643f82d4e14ac276b9033ac83e3a2f1d67f Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 16 May 2023 12:34:49 +0530 Subject: [PATCH 03/16] Set Users on startup --- server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main.rs b/server/src/main.rs index c72c82d7b..a289b883e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -63,7 +63,7 @@ async fn main() -> anyhow::Result<()> { CONFIG.validate_staging()?; let metadata = storage::resolve_parseable_metadata().await?; banner::print(&CONFIG, &metadata).await; - rbac::set_user_map(Vec::new()); + rbac::set_user_map(metadata.user.clone()); metadata.set_global(); let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); From 4623b8402bb8266d6a13b61154ed50e35f2c9626 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 16 May 2023 14:20:28 +0530 Subject: [PATCH 04/16] Refactor metadata check --- server/src/handlers/http/rbac.rs | 18 +++-- server/src/storage.rs | 56 +-------------- server/src/storage/store_metadata.rs | 100 ++++++++++++++++++++++----- server/src/validator.rs | 25 ++++++- 4 files changed, 121 insertions(+), 78 deletions(-) diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 056fbfa62..460a98847 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -4,7 +4,8 @@ use crate::{ get_user_map, user::{PassCode, User}, }, - storage::{ObjectStorageError, StorageMetadata}, + storage::{self, ObjectStorageError, StorageMetadata}, + validator::{self, error::UsernameValidationError}, }; use actix_web::{http::header::ContentType, web, Responder}; use http::StatusCode; @@ -15,6 +16,8 @@ static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); pub async fn put_user(username: web::Path) -> Result { let username = username.into_inner(); + // verify username + validator::verify_username(&username)?; // get exclusive lock let _ = UPDATE_LOCK.lock().await; // fail this request if the user is already in the map @@ -100,11 +103,11 @@ async fn get_metadata() -> Result Result<(), ObjectStorageError> { - CONFIG - .storage() - .get_object_store() - .put_metadata(metadata) - .await + // put to remote + storage::put_remote_metadata(metadata).await?; + // put to staging + storage::put_staging_metadata(metadata)?; + Ok(()) } #[derive(Debug, thiserror::Error)] @@ -115,6 +118,8 @@ pub enum RBACError { UserDoesNotExist, #[error("Failed to connect to storage: {0}")] ObjectStorageError(#[from] ObjectStorageError), + #[error("invalid Username: {0}")] + ValidationError(#[from] UsernameValidationError), } impl actix_web::ResponseError for RBACError { @@ -122,6 +127,7 @@ impl actix_web::ResponseError for RBACError { match self { Self::UserExists => StatusCode::BAD_REQUEST, Self::UserDoesNotExist => StatusCode::NOT_FOUND, + Self::ValidationError(_) => StatusCode::BAD_REQUEST, Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/storage.rs b/server/src/storage.rs index 46f80b979..561e3722a 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,13 +16,11 @@ * */ -use crate::option::CONFIG; use crate::stats::Stats; use chrono::Local; use std::fmt::Debug; -use std::fs::create_dir_all; mod localfs; mod object_storage; @@ -34,10 +32,11 @@ mod store_metadata; pub use localfs::{FSConfig, LocalFS}; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::{S3Config, S3}; -pub use store_metadata::StorageMetadata; +pub use store_metadata::{ + put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, +}; pub use self::staging::StorageDir; -use self::store_metadata::{put_staging_metadata, EnvChange}; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. @@ -123,55 +122,6 @@ impl ObjectStoreFormat { } } -pub async fn resolve_parseable_metadata() -> Result { - let staging_metadata = store_metadata::get_staging_metadata()?; - let storage = CONFIG.storage().get_object_store(); - let remote_metadata = storage.get_metadata().await?; - - let check = store_metadata::check_metadata_conflict(staging_metadata, remote_metadata); - - const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage"; - let res: Result = match check { - EnvChange::None(metadata) => Ok(metadata), - EnvChange::StagingMismatch => Err(MISMATCH), - EnvChange::StorageMismatch => Err(MISMATCH), - EnvChange::NewRemote => { - Err("Could not start the server because metadata not found in storage") - } - EnvChange::NewStaging(mut metadata) => { - create_dir_all(CONFIG.staging_dir())?; - metadata.staging = CONFIG.staging_dir().canonicalize()?; - create_remote_metadata(&metadata).await?; - put_staging_metadata(&metadata)?; - - Ok(metadata) - } - EnvChange::CreateBoth => { - create_dir_all(CONFIG.staging_dir())?; - let metadata = StorageMetadata::new(); - create_remote_metadata(&metadata).await?; - put_staging_metadata(&metadata)?; - - Ok(metadata) - } - }; - - res.map_err(|err| { - let err = format!( - "{}. {}", - err, - "Join us on Parseable Slack to report this incident : https://launchpass.com/parseable" - ); - let err: Box = err.into(); - ObjectStorageError::UnhandledError(err) - }) -} - -async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { - let client = CONFIG.storage().get_object_store(); - client.put_metadata(metadata).await -} - #[derive(serde::Serialize)] pub struct LogStream { pub name: String, diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index d19981221..cb0f971dd 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -17,14 +17,14 @@ */ use std::{ - fs::{self, OpenOptions}, + fs::{self, create_dir_all, OpenOptions}, path::PathBuf, }; use once_cell::sync::OnceCell; use std::io; -use crate::{option::CONFIG, rbac::user::User, utils::uid}; +use crate::{option::CONFIG, rbac::user::User, storage::ObjectStorageError, utils::uid}; use super::object_storage::PARSEABLE_METADATA_FILE_NAME; @@ -80,34 +80,93 @@ impl StorageMetadata { } } -pub fn check_metadata_conflict( - staging_metadata: Option, - remote_metadata: Option, -) -> EnvChange { - match (staging_metadata, remote_metadata) { - (Some(staging), Some(remote)) if staging.mode == remote.mode => { - if staging.storage != remote.storage { - EnvChange::StorageMismatch - } else if staging.staging != remote.staging { - EnvChange::StagingMismatch +// always returns remote metadata as it is source of truth +// overwrites staging metadata while updating storage info +pub async fn resolve_parseable_metadata() -> Result { + let staging_metadata = get_staging_metadata()?; + let storage = CONFIG.storage().get_object_store(); + let remote_metadata = storage.get_metadata().await?; + + let check = match (staging_metadata, remote_metadata) { + (Some(staging), Some(remote)) => { + if staging.deployment_id == remote.deployment_id { + EnvChange::None(remote) } else { - EnvChange::None(staging) + EnvChange::DeploymentMismatch } } - (Some(staging), Some(remote)) if staging.mode != remote.mode => EnvChange::StorageMismatch, - (None, None) => EnvChange::CreateBoth, (None, Some(remote)) => EnvChange::NewStaging(remote), (Some(_), None) => EnvChange::NewRemote, - _ => unreachable!(), + (None, None) => EnvChange::CreateBoth, + }; + + // flags for if metadata needs to be synced + let mut overwrite_staging = false; + let mut overwrite_remote = false; + + const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage"; + let res = match check { + EnvChange::None(metadata) => { + // overwrite staging anyways so that it matches remote in case of any divergence + overwrite_staging = true; + Ok(metadata) + } + EnvChange::DeploymentMismatch => Err(MISMATCH), + EnvChange::NewRemote => { + Err("Could not start the server because metadata not found in storage") + } + EnvChange::NewStaging(mut metadata) => { + create_dir_all(CONFIG.staging_dir())?; + metadata.staging = CONFIG.staging_dir().canonicalize()?; + // this flag is set to true so that metadata is copied to staging + overwrite_staging = true; + // overwrite remote because staging dir has changed. + overwrite_remote = true; + Ok(metadata) + } + EnvChange::CreateBoth => { + create_dir_all(CONFIG.staging_dir())?; + let metadata = StorageMetadata::new(); + // new metadata needs to be set on both staging and remote + overwrite_remote = true; + overwrite_staging = true; + Ok(metadata) + } + }; + + let metadata = res.map_err(|err| { + let err = format!( + "{}. {}", + err, + "Join us on Parseable Slack to report this incident : https://launchpass.com/parseable" + ); + let err: Box = err.into(); + ObjectStorageError::UnhandledError(err) + })?; + + if overwrite_staging { + put_staging_metadata(&metadata)?; + } + + if overwrite_remote { + put_remote_metadata(&metadata).await?; } + + Ok(metadata) } + +// variant contain remote metadata #[derive(Debug, Clone, PartialEq, Eq)] pub enum EnvChange { + /// No change in env i.e both staging and remote have same id None(StorageMetadata), - StagingMismatch, - StorageMismatch, + /// Mismatch in deployment id. Cannot use this staging for this remote + DeploymentMismatch, + /// Metadata not found in storage. Treated as possible misconfiguration on user side. NewRemote, + /// If a new staging is found then we just copy remote metadata to this staging. NewStaging(StorageMetadata), + /// Fresh remote and staging, hence create a new metadata file on both CreateBoth, } @@ -126,6 +185,11 @@ pub fn get_staging_metadata() -> io::Result> { Ok(Some(meta)) } +pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { + let client = CONFIG.storage().get_object_store(); + client.put_metadata(metadata).await +} + pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new().create_new(true).write(true).open(path)?; diff --git a/server/src/validator.rs b/server/src/validator.rs index 2b31fa53f..4470af7ff 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -23,7 +23,9 @@ use crate::metadata::STREAM_INFO; use crate::query::Query; use chrono::{DateTime, Utc}; -use self::error::{AlertValidationError, QueryValidationError, StreamNameValidationError}; +use self::error::{ + AlertValidationError, QueryValidationError, StreamNameValidationError, UsernameValidationError, +}; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -112,6 +114,19 @@ pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { Ok(()) } +pub fn verify_username(username: &str) -> Result<(), UsernameValidationError> { + // Check if the username meets the required criteria + if username.len() <= 3 || username.len() > 64 { + return Err(UsernameValidationError::InvalidLength); + } + // Username should contain only alphanumeric characters or underscores + if !username.chars().all(|c| c.is_alphanumeric() || c == '_') { + return Err(UsernameValidationError::SpecialChar); + } + + Ok(()) +} + pub fn query(query: &str, start_time: &str, end_time: &str) -> Result { if query.is_empty() { return Err(QueryValidationError::EmptyQuery); @@ -233,4 +248,12 @@ pub mod error { #[error("SQL keyword cannot be used as stream name")] SQLKeyword(String), } + + #[derive(Debug, thiserror::Error)] + pub enum UsernameValidationError { + #[error("Username length should be between 3 and 64 chars")] + InvalidLength, + #[error("Username contains invalid characters. Only aplhanumeric and _ is allowed")] + SpecialChar, + } } From 8461f56b3438765ba868007bea45c655b02e8737 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 16 May 2023 19:33:17 +0530 Subject: [PATCH 05/16] Fix validator --- server/src/validator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/validator.rs b/server/src/validator.rs index 4470af7ff..2027c885b 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -116,7 +116,7 @@ pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { pub fn verify_username(username: &str) -> Result<(), UsernameValidationError> { // Check if the username meets the required criteria - if username.len() <= 3 || username.len() > 64 { + if username.len() < 3 || username.len() > 64 { return Err(UsernameValidationError::InvalidLength); } // Username should contain only alphanumeric characters or underscores From beabeee4b8a63f893b67ae8c8f9a263e0cb3b2ca Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 16 May 2023 19:33:48 +0530 Subject: [PATCH 06/16] Add migration --- server/src/handlers/http/rbac.rs | 8 +- server/src/main.rs | 3 +- server/src/migration.rs | 88 +++++++++++++++++++++- server/src/migration/metadata_migration.rs | 13 ++++ server/src/storage/store_metadata.rs | 16 ++-- 5 files changed, 116 insertions(+), 12 deletions(-) create mode 100644 server/src/migration/metadata_migration.rs diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 460a98847..1227f51b4 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -27,13 +27,13 @@ pub async fn put_user(username: web::Path) -> Result) -> Result) -> Result anyhow::Result<()> { CONFIG.validate(); let storage = CONFIG.storage().get_object_store(); CONFIG.validate_staging()?; + migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; banner::print(&CONFIG, &metadata).await; - rbac::set_user_map(metadata.user.clone()); + rbac::set_user_map(metadata.users.clone()); metadata.set_global(); let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); diff --git a/server/src/migration.rs b/server/src/migration.rs index 5bde8c889..c2eff4ce9 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -17,14 +17,49 @@ * */ +mod metadata_migration; mod schema_migration; mod stream_metadata_migration; +use std::fs::OpenOptions; + use bytes::Bytes; use relative_path::RelativePathBuf; use serde::Serialize; -use crate::{option::Config, storage::ObjectStorage}; +use crate::{ + option::Config, + storage::{ObjectStorage, ObjectStorageError}, +}; + +pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { + let object_store = config.storage().get_object_store(); + let storage_metadata = get_storage_metadata(&*object_store).await?; + let staging_metadata = get_staging_metadata(config)?; + + fn get_version(metadata: &serde_json::Value) -> Option<&str> { + metadata + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()) + } + + if let Some(storage_metadata) = storage_metadata { + if let Some("v1") = dbg!(get_version(&storage_metadata)) { + let metadata = metadata_migration::v1_v2(storage_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } + } + + if let Some(staging_metadata) = staging_metadata { + if let Some("v1") = get_version(&staging_metadata) { + let metadata = metadata_migration::v1_v2(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + } + + Ok(()) +} pub async fn run_migration(config: &Config) -> anyhow::Result<()> { let storage = config.storage().get_object_store(); @@ -85,3 +120,54 @@ fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { .map(|any| any.into()) .expect("serialize cannot fail") } + +pub fn get_staging_metadata(config: &Config) -> anyhow::Result> { + let path = config.staging_dir().join(".parseable.json"); + let bytes = match std::fs::read(path) { + Ok(bytes) => bytes, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => return Ok(None), + _ => return Err(err.into()), + }, + }; + let meta: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + Ok(Some(meta)) +} + +async fn get_storage_metadata( + storage: &dyn ObjectStorage, +) -> anyhow::Result> { + let path = RelativePathBuf::from_iter([".parseable.json"]); + match storage.get_object(&path).await { + Ok(bytes) => Ok(Some( + serde_json::from_slice(&bytes).expect("parseable config is valid json"), + )), + Err(err) => { + if matches!(err, ObjectStorageError::NoSuchKey(_)) { + Ok(None) + } else { + Err(err.into()) + } + } + } +} + +pub async fn put_remote_metadata( + storage: &dyn ObjectStorage, + metadata: &serde_json::Value, +) -> anyhow::Result<()> { + let path = RelativePathBuf::from_iter([".parseable.json"]); + let metadata = serde_json::to_vec(metadata)?.into(); + Ok(storage.put_object(&path, metadata).await?) +} + +pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> anyhow::Result<()> { + let path = config.staging_dir().join(".parseable.json"); + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path)?; + serde_json::to_writer(&mut file, metadata)?; + Ok(()) +} diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs new file mode 100644 index 000000000..5f75e9dca --- /dev/null +++ b/server/src/migration/metadata_migration.rs @@ -0,0 +1,13 @@ +use std::vec; + +use serde_json::Value; + +pub fn v1_v2(mut storage_metadata: serde_json::Value) -> Value { + let metadata = storage_metadata.as_object_mut().unwrap(); + *metadata.get_mut("version").unwrap() = Value::String("v2".to_string()); + metadata.remove("user"); + metadata.remove("stream"); + metadata.insert("users".to_string(), Value::Array(vec![])); + metadata.insert("streams".to_string(), Value::Array(vec![])); + storage_metadata +} diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index cb0f971dd..562a4c209 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -47,20 +47,20 @@ pub struct StorageMetadata { pub storage: String, #[serde(default = "crate::utils::uid::gen")] pub deployment_id: uid::Uid, - pub user: Vec, - pub stream: Vec, + pub users: Vec, + pub streams: Vec, } impl StorageMetadata { pub fn new() -> Self { Self { - version: "v1".to_string(), + version: "v2".to_string(), mode: CONFIG.storage_name.to_owned(), staging: CONFIG.staging_dir().canonicalize().unwrap(), storage: CONFIG.storage().get_endpoint(), deployment_id: uid::gen(), - user: Vec::new(), - stream: Vec::new(), + users: Vec::new(), + streams: Vec::new(), } } @@ -192,7 +192,11 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); - let mut file = OpenOptions::new().create_new(true).write(true).open(path)?; + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path)?; serde_json::to_writer(&mut file, meta)?; Ok(()) } From 73eb40952f0b520a40f1e274117f53a4957e11eb Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 17 May 2023 15:33:56 +0530 Subject: [PATCH 07/16] Add banner --- server/src/handlers/http/rbac.rs | 18 ++++++++++++++++++ server/src/migration/metadata_migration.rs | 18 ++++++++++++++++++ server/src/rbac.rs | 18 ++++++++++++++++++ server/src/rbac/user.rs | 18 ++++++++++++++++++ 4 files changed, 72 insertions(+) diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 1227f51b4..e42937753 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::{ option::CONFIG, rbac::{ diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 5f75e9dca..b949ab4d2 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::vec; use serde_json::Value; diff --git a/server/src/rbac.rs b/server/src/rbac.rs index f2ba3e782..04b6d14a1 100644 --- a/server/src/rbac.rs +++ b/server/src/rbac.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::sync::RwLock; use once_cell::sync::OnceCell; diff --git a/server/src/rbac/user.rs b/server/src/rbac/user.rs index be2b75482..197493fd5 100644 --- a/server/src/rbac/user.rs +++ b/server/src/rbac/user.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::collections::HashMap; use argon2::{ From ec5577285c7ba4a77b585cf0a04a198354460c0a Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 17 May 2023 15:36:08 +0530 Subject: [PATCH 08/16] Remove comments --- server/src/handlers/http/rbac.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index e42937753..2f2ffd48d 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -34,9 +34,7 @@ static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); pub async fn put_user(username: web::Path) -> Result { let username = username.into_inner(); - // verify username validator::verify_username(&username)?; - // get exclusive lock let _ = UPDATE_LOCK.lock().await; // fail this request if the user is already in the map // there is an exisiting config for this user @@ -61,7 +59,6 @@ pub async fn put_user(username: web::Path) -> Result) -> Result { let username = username.into_inner(); - // get exclusive lock let _ = UPDATE_LOCK.lock().await; // fail this request if the user does not exists if !get_user_map().read().unwrap().contains_key(&username) { @@ -121,9 +118,7 @@ async fn get_metadata() -> Result Result<(), ObjectStorageError> { - // put to remote storage::put_remote_metadata(metadata).await?; - // put to staging storage::put_staging_metadata(metadata)?; Ok(()) } From eb760669065c39d04928f6749921e50dee3f1612 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 17 May 2023 15:45:19 +0530 Subject: [PATCH 09/16] Fix --- server/src/handlers/http/rbac.rs | 6 +++--- server/src/rbac/user.rs | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 2f2ffd48d..b8110e407 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -65,7 +65,7 @@ pub async fn reset_password(username: web::Path) -> Result) -> Result) -> Result, } @@ -38,11 +38,11 @@ pub struct User { impl User { // create a new User and return self with password generated for said user. pub fn create_new(username: String) -> (Self, String) { - let PassCode { password, hashcode } = Self::gen_new_password(); + let PassCode { password, hash } = Self::gen_new_password(); ( Self { username, - password: hashcode, + password_hash: hash, role: Vec::new(), }, password, @@ -53,7 +53,7 @@ impl User { // $[$v=][$=(,=)*][$[$]] // ref https://github.com/P-H-C/phc-string-format/blob/master/phc-sf-spec.md#specification pub fn verify(&self, password: &str) -> bool { - let parsed_hash = PasswordHash::new(&self.password).unwrap(); + let parsed_hash = PasswordHash::new(&self.password_hash).unwrap(); Argon2::default() .verify_password(password.as_bytes(), &parsed_hash) .is_ok() @@ -62,8 +62,8 @@ impl User { // gen new password pub fn gen_new_password() -> PassCode { let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - let hashcode = gen_hash(&password); - PassCode { password, hashcode } + let hash = gen_hash(&password); + PassCode { password, hash } } } @@ -89,7 +89,7 @@ impl UserMap { pub struct PassCode { pub password: String, - pub hashcode: String, + pub hash: String, } pub fn get_admin_user() -> User { @@ -99,7 +99,7 @@ pub fn get_admin_user() -> User { User { username, - password: hashcode, + password_hash: hashcode, role: Vec::new(), } } From 011125a5ba36788c811601a7806734a3cc530ffd Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 17 May 2023 16:06:51 +0530 Subject: [PATCH 10/16] Add comment --- server/src/handlers/http.rs | 8 +++++--- server/src/handlers/http/rbac.rs | 7 +++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index ac313c9b5..f1374ba87 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -162,11 +162,13 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route(web::get().to(logstream::get_retention)), ); - let user_api = web::scope("/user") + // POST /user/create/{username} => Create a new user .service(web::resource("/create/{username}").route(web::post().to(rbac::put_user))) - .service(web::resource("/reset/{username}").route(web::put().to(rbac::reset_password))) - .service(web::resource("/delete/{username}").route(web::get().to(rbac::delete_user))); + // POST /user/reset/{username} => Reset password for a user + .service(web::resource("/reset/{username}").route(web::post().to(rbac::reset_password))) + // DELETE /user/delete/{username} => Delete a user + .service(web::resource("/delete/{username}").route(web::delete().to(rbac::delete_user))); cfg.service( // Base path "{url}/api/v1" diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index b8110e407..0bf9f9199 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -32,6 +32,9 @@ use tokio::sync::Mutex; // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); +// Handler for POST /api/v1/user/create/{username} +// Creates a new user by username +// returns password generated for this user pub async fn put_user(username: web::Path) -> Result { let username = username.into_inner(); validator::verify_username(&username)?; @@ -57,6 +60,9 @@ pub async fn put_user(username: web::Path) -> Result) -> Result { let username = username.into_inner(); let _ = UPDATE_LOCK.lock().await; @@ -91,6 +97,7 @@ pub async fn reset_password(username: web::Path) -> Result) -> Result { let username = username.into_inner(); let _ = UPDATE_LOCK.lock().await; From 0bca6af4ea578ab6fed1fc788680fa192f7b3da8 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 17 May 2023 16:14:56 +0530 Subject: [PATCH 11/16] Fix --- server/src/handlers/http/rbac.rs | 2 +- server/src/migration.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 0bf9f9199..619390e39 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -79,7 +79,7 @@ pub async fn reset_password(username: web::Path) -> Result anyhow::Result<()> { } if let Some(storage_metadata) = storage_metadata { - if let Some("v1") = dbg!(get_version(&storage_metadata)) { + if get_version(&storage_metadata) == Some("v1") { let metadata = metadata_migration::v1_v2(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } } if let Some(staging_metadata) = staging_metadata { - if let Some("v1") = get_version(&staging_metadata) { + if get_version(&staging_metadata) == Some("v1") { let metadata = metadata_migration::v1_v2(staging_metadata); put_staging_metadata(config, &metadata)?; } From 2056b3578f76dd6a85fb9f2e083d4759045894cc Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 May 2023 13:41:02 +0530 Subject: [PATCH 12/16] Change user api --- server/src/handlers/http.rs | 25 ++++++++--- server/src/handlers/http/rbac.rs | 75 +++++++++++++++----------------- 2 files changed, 52 insertions(+), 48 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index f1374ba87..e083a8f3d 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -20,7 +20,8 @@ use std::fs::File; use std::io::BufReader; use actix_cors::Cors; -use actix_web::dev::ServiceRequest; +use actix_web::dev::{Service, ServiceRequest}; +use actix_web::error::ErrorBadRequest; use actix_web::{middleware, web, App, HttpServer}; use actix_web_httpauth::extractors::basic::BasicAuth; use actix_web_httpauth::middleware::HttpAuthentication; @@ -163,12 +164,22 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { .route(web::get().to(logstream::get_retention)), ); let user_api = web::scope("/user") - // POST /user/create/{username} => Create a new user - .service(web::resource("/create/{username}").route(web::post().to(rbac::put_user))) - // POST /user/reset/{username} => Reset password for a user - .service(web::resource("/reset/{username}").route(web::post().to(rbac::reset_password))) - // DELETE /user/delete/{username} => Delete a user - .service(web::resource("/delete/{username}").route(web::delete().to(rbac::delete_user))); + // POST /user/{username} => Create a new user + .service(web::resource("/{username}").route(web::put().to(rbac::put_user))) + // DELETE /user/{username} => Delete a user + .service(web::resource("/{username}").route(web::delete().to(rbac::delete_user))) + .wrap_fn(|req, srv| { + // deny request if username is same as username from config + let username = req.match_info().get("username").unwrap_or(""); + let is_root = username == CONFIG.parseable.username; + let call = srv.call(req); + async move { + if is_root { + return Err(ErrorBadRequest("Cannot call this API for root admin user")); + } + call.await + } + }); cfg.service( // Base path "{url}/api/v1" diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 619390e39..c24f86fa3 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -32,44 +32,54 @@ use tokio::sync::Mutex; // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); -// Handler for POST /api/v1/user/create/{username} -// Creates a new user by username +// Handler for PUT /api/v1/user/{username} +// Creates a new user by username if it does not exists +// Otherwise make a call to reset password // returns password generated for this user pub async fn put_user(username: web::Path) -> Result { let username = username.into_inner(); validator::verify_username(&username)?; let _ = UPDATE_LOCK.lock().await; - // fail this request if the user is already in the map - // there is an exisiting config for this user - if get_user_map().read().unwrap().contains_key(&username) { - return Err(RBACError::UserExists); - } - - let mut metadata = get_metadata().await?; - if metadata.users.iter().any(|user| user.username == username) { - // should be unreachable given state is always consistent - return Err(RBACError::UserExists); - } + let user_exists = get_user_map().read().unwrap().contains_key(&username); + if user_exists { + reset_password(username).await + } else { + let mut metadata = get_metadata().await?; + if metadata.users.iter().any(|user| user.username == username) { + // should be unreachable given state is always consistent + return Err(RBACError::UserExists); + } - let (user, password) = User::create_new(username); - metadata.users.push(user.clone()); - put_metadata(&metadata).await?; - // set this user to user map - get_user_map().write().unwrap().insert(user); + let (user, password) = User::create_new(username); + metadata.users.push(user.clone()); + put_metadata(&metadata).await?; + // set this user to user map + get_user_map().write().unwrap().insert(user); - Ok(password) + Ok(password) + } } -// Handler for POST /api/v1/user/reset/{username} -// Reset password for given username -// returns new password generated for this user -pub async fn reset_password(username: web::Path) -> Result { +// Handler for DELETE /api/v1/user/delete/{username} +pub async fn delete_user(username: web::Path) -> Result { let username = username.into_inner(); let _ = UPDATE_LOCK.lock().await; // fail this request if the user does not exists if !get_user_map().read().unwrap().contains_key(&username) { return Err(RBACError::UserDoesNotExist); - } + }; + // delete from parseable.json first + let mut metadata = get_metadata().await?; + metadata.users.retain(|user| user.username != username); + put_metadata(&metadata).await?; + // update in mem table + get_user_map().write().unwrap().remove(&username); + Ok(format!("deleted user: {}", username)) +} + +// Reset password for given username +// returns new password generated for this user +pub async fn reset_password(username: String) -> Result { // get new password for this user let PassCode { password, hash } = User::gen_new_password(); // update parseable.json first @@ -97,23 +107,6 @@ pub async fn reset_password(username: web::Path) -> Result) -> Result { - let username = username.into_inner(); - let _ = UPDATE_LOCK.lock().await; - // fail this request if the user does not exists - if !get_user_map().read().unwrap().contains_key(&username) { - return Err(RBACError::UserDoesNotExist); - }; - // delete from parseable.json first - let mut metadata = get_metadata().await?; - metadata.users.retain(|user| user.username != username); - put_metadata(&metadata).await?; - // update in mem table - get_user_map().write().unwrap().remove(&username); - Ok(format!("deleted user: {}", username)) -} - async fn get_metadata() -> Result { let metadata = CONFIG .storage() From 868d1c7d4fe7a1f42216ff623cc0232dd5bba3d8 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 May 2023 13:54:37 +0530 Subject: [PATCH 13/16] Fix middleware --- server/src/handlers/http.rs | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index e083a8f3d..ae8238684 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -163,23 +163,25 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route(web::get().to(logstream::get_retention)), ); - let user_api = web::scope("/user") - // POST /user/{username} => Create a new user - .service(web::resource("/{username}").route(web::put().to(rbac::put_user))) - // DELETE /user/{username} => Delete a user - .service(web::resource("/{username}").route(web::delete().to(rbac::delete_user))) - .wrap_fn(|req, srv| { - // deny request if username is same as username from config - let username = req.match_info().get("username").unwrap_or(""); - let is_root = username == CONFIG.parseable.username; - let call = srv.call(req); - async move { - if is_root { - return Err(ErrorBadRequest("Cannot call this API for root admin user")); + let user_api = web::scope("/user").service( + web::resource("/{username}") + // POST /user/{username} => Create a new user + .route(web::put().to(rbac::put_user)) + // DELETE /user/{username} => Delete a user + .route(web::delete().to(rbac::delete_user)) + .wrap_fn(|req, srv| { + // deny request if username is same as username from config + let username = req.match_info().get("username").unwrap_or(""); + let is_root = username == CONFIG.parseable.username; + let call = srv.call(req); + async move { + if is_root { + return Err(ErrorBadRequest("Cannot call this API for root admin user")); + } + call.await } - call.await - } - }); + }), + ); cfg.service( // Base path "{url}/api/v1" From b528b89376bb9f84af33fe356a1c40e785b456e7 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 May 2023 14:07:29 +0530 Subject: [PATCH 14/16] Change validation --- server/src/validator.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/validator.rs b/server/src/validator.rs index 2027c885b..902ec2184 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -120,7 +120,10 @@ pub fn verify_username(username: &str) -> Result<(), UsernameValidationError> { return Err(UsernameValidationError::InvalidLength); } // Username should contain only alphanumeric characters or underscores - if !username.chars().all(|c| c.is_alphanumeric() || c == '_') { + if !username + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') + { return Err(UsernameValidationError::SpecialChar); } From bfe58b40fc80f5f636f4d04be53d11e720cd181f Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 May 2023 14:08:58 +0530 Subject: [PATCH 15/16] Fix --- server/src/validator.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/validator.rs b/server/src/validator.rs index 902ec2184..956facfa5 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -256,7 +256,9 @@ pub mod error { pub enum UsernameValidationError { #[error("Username length should be between 3 and 64 chars")] InvalidLength, - #[error("Username contains invalid characters. Only aplhanumeric and _ is allowed")] + #[error( + "Username contains invalid characters. Only lowercase aplhanumeric and _ is allowed" + )] SpecialChar, } } From 8d750ba79c8f9ddb042798eac8c14057920d652f Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 May 2023 16:29:07 +0530 Subject: [PATCH 16/16] Change to roles --- server/src/rbac/user.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/rbac/user.rs b/server/src/rbac/user.rs index 67e4eeafe..e98098ad0 100644 --- a/server/src/rbac/user.rs +++ b/server/src/rbac/user.rs @@ -32,7 +32,7 @@ pub struct User { pub username: String, pub password_hash: String, // fill this - pub role: Vec<()>, + pub roles: Vec<()>, } impl User { @@ -43,7 +43,7 @@ impl User { Self { username, password_hash: hash, - role: Vec::new(), + roles: Vec::new(), }, password, ) @@ -100,6 +100,6 @@ pub fn get_admin_user() -> User { User { username, password_hash: hashcode, - role: Vec::new(), + roles: Vec::new(), } }