From bacebda467d016540cec10bb71f1929bdc8163e0 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 7 Apr 2023 13:06:16 +0530 Subject: [PATCH 1/6] Use object_store instead of aws_sdk --- Cargo.lock | 225 ++++++++----------------- server/Cargo.toml | 5 +- server/src/storage.rs | 1 + server/src/storage/s3.rs | 348 +++++++++++++++------------------------ 4 files changed, 205 insertions(+), 374 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 25fa460d8..5535f3f10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,17 +591,6 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" -[[package]] -name = "assert-json-diff" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4259cbe96513d2f1073027a259fc2ca917feb3026a5a8d984e3628e490255cc0" -dependencies = [ - "extend", - "serde", - "serde_json", -] - [[package]] name = "async-compression" version = "0.3.15" @@ -646,6 +635,35 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-config" +version = "0.54.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3d1e2a1f1ab3ac6c4b884e37413eaa03eb9d901e4fc68ee8f5c1d49721680e" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-sdk-sso", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "hex", + "http", + "hyper", + "ring", + "time 0.3.17", + "tokio", + "tower", + "tracing", + "zeroize", +] + [[package]] name = "aws-credential-types" version = "0.54.1" @@ -693,38 +711,53 @@ dependencies = [ ] [[package]] -name = "aws-sdk-s3" +name = "aws-sdk-sso" version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1533be023eeac69668eb718b1c48af7bd5e26305ed770553d2877ab1f7507b68" +checksum = "ca0119bacf0c42f587506769390983223ba834e605f049babe514b2bd646dbb2" dependencies = [ "aws-credential-types", "aws-endpoint", "aws-http", "aws-sig-auth", - "aws-sigv4", "aws-smithy-async", - "aws-smithy-checksums", "aws-smithy-client", - "aws-smithy-eventstream", "aws-smithy-http", "aws-smithy-http-tower", "aws-smithy-json", "aws-smithy-types", - "aws-smithy-xml", "aws-types", "bytes", - "bytes-utils", - "fastrand", "http", - "http-body", - "once_cell", - "percent-encoding", "regex", "tokio-stream", "tower", +] + +[[package]] +name = "aws-sdk-sts" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "270b6a33969ebfcb193512fbd5e8ee5306888ad6c6d5d775cdbfb2d50d94de26" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "regex", + "tower", "tracing", - "url", ] [[package]] @@ -735,7 +768,6 @@ checksum = "660a02a98ab1af83bd8d714afbab2d502ba9b18c49e7e4cddd6bf8837ff778cb" dependencies = [ "aws-credential-types", "aws-sigv4", - "aws-smithy-eventstream", "aws-smithy-http", "aws-types", "http", @@ -748,9 +780,7 @@ version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdaf11005b7444e6cd66f600d09861a3aeb6eb89a0f003c7c9820dbab2d15297" dependencies = [ - "aws-smithy-eventstream", "aws-smithy-http", - "bytes", "form_urlencoded", "hex", "hmac", @@ -775,27 +805,6 @@ dependencies = [ "tokio-stream", ] -[[package]] -name = "aws-smithy-checksums" -version = "0.54.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55fe82d7463becdd632f8c6446cbdb2cbe34ad42a7d92c480d8fca08749d07a4" -dependencies = [ - "aws-smithy-http", - "aws-smithy-types", - "bytes", - "crc32c", - "crc32fast", - "hex", - "http", - "http-body", - "md-5", - "pin-project-lite", - "sha1", - "sha2", - "tracing", -] - [[package]] name = "aws-smithy-client" version = "0.54.1" @@ -805,7 +814,6 @@ dependencies = [ "aws-smithy-async", "aws-smithy-http", "aws-smithy-http-tower", - "aws-smithy-protocol-test", "aws-smithy-types", "bytes", "fastrand", @@ -815,30 +823,17 @@ dependencies = [ "hyper-rustls", "lazy_static", "pin-project-lite", - "serde", "tokio", "tower", "tracing", ] -[[package]] -name = "aws-smithy-eventstream" -version = "0.54.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "652a99272024770cbe33579dc0016914a09922b27f9a4d12f37472aacbbe71c1" -dependencies = [ - "aws-smithy-types", - "bytes", - "crc32fast", -] - [[package]] name = "aws-smithy-http" version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5bd86f48d7e36fb24ee922d04d79c8353e01724b1c38757ed92593179223aa7" dependencies = [ - "aws-smithy-eventstream", "aws-smithy-types", "bytes", "bytes-utils", @@ -850,8 +845,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "pin-utils", - "tokio", - "tokio-util", "tracing", ] @@ -881,18 +874,13 @@ dependencies = [ ] [[package]] -name = "aws-smithy-protocol-test" +name = "aws-smithy-query" version = "0.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b72e9ac0818d0016ced540ba0d06975299d27684ff514173b21c9976fd72062b" +checksum = "2881effde104a2b0619badaad9f30ae67805e86fbbdb99e5fcc176e8bfbc1a85" dependencies = [ - "assert-json-diff", - "http", - "pretty_assertions", - "regex", - "roxmltree", - "serde_json", - "thiserror", + "aws-smithy-types", + "urlencoding", ] [[package]] @@ -1342,15 +1330,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc32c" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e" -dependencies = [ - "rustc_version", -] - [[package]] name = "crc32fast" version = "1.3.2" @@ -1466,16 +1445,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "cxx" version = "1.0.90" @@ -1702,12 +1671,6 @@ dependencies = [ "syn", ] -[[package]] -name = "diff" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" - [[package]] name = "digest" version = "0.10.6" @@ -1794,18 +1757,6 @@ dependencies = [ "libc", ] -[[package]] -name = "extend" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "fastrand" version = "1.8.0" @@ -2826,11 +2777,14 @@ dependencies = [ [[package]] name = "object_store" -version = "0.5.4" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f344e51ec9584d2f51199c0c29c6f73dddd04ade986497875bf8fa2f178caf0" +checksum = "ec9cd6ca25e796a49fa242876d1c4de36a24a6da5258e9f0bc062dbf5e81c53b" dependencies = [ "async-trait", + "aws-config", + "aws-credential-types", + "aws-types", "base64 0.21.0", "bytes", "chrono", @@ -2917,15 +2871,6 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" -[[package]] -name = "output_vt100" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628223faebab4e3e40667ee0b2336d34a5b960ff60ea743ddfdbcf7770bcfb66" -dependencies = [ - "winapi", -] - [[package]] name = "outref" version = "0.1.0" @@ -3007,8 +2952,6 @@ dependencies = [ "anyhow", "arrow-schema", "async-trait", - "aws-sdk-s3", - "aws-smithy-async", "base64 0.21.0", "bytes", "bzip2", @@ -3031,7 +2974,6 @@ dependencies = [ "lazy_static", "log", "maplit", - "md-5", "num_cpus", "object_store", "once_cell", @@ -3155,18 +3097,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pretty_assertions" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a25e9bcb20aa780fd0bb16b72403a9064d6b3f22f026946029acb941a50af755" -dependencies = [ - "ctor", - "diff", - "output_vt100", - "yansi", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3284,9 +3214,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.27.1" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41" +checksum = "e5c1a97b1bc42b1d550bfb48d4262153fe400a12bab1511821736f7eac76d7e2" dependencies = [ "memchr", "serde", @@ -3472,15 +3402,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "roxmltree" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "921904a62e410e37e215c40381b7117f830d9d89ba60ab5236170541dd25646b" -dependencies = [ - "xmlparser", -] - [[package]] name = "rstest" version = "0.16.0" @@ -4394,6 +4315,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9" + [[package]] name = "uuid" version = "1.3.0" @@ -4765,12 +4692,6 @@ dependencies = [ "lzma-sys", ] -[[package]] -name = "yansi" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" - [[package]] name = "zeroize" version = "1.5.7" diff --git a/server/Cargo.toml b/server/Cargo.toml index ffee1de57..0ecab746e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,8 +16,6 @@ prometheus = { version = "0.13", features = ["process"] } anyhow = { version = "1.0", features = ["backtrace"] } arrow-schema = { version = "31.0", features = ["serde"] } async-trait = "0.1" -aws-sdk-s3 = "0.24" -aws-smithy-async = { version = "0.54", features = ["rt-tokio"] } base64 = "0.21" bytes = "1.4" chrono = "0.4" @@ -33,7 +31,7 @@ clap = { version = "4.1", default-features = false, features = [ ] } crossterm = "0.26" datafusion = "17" -object_store = { version = "0.5", features = ["aws"] } +object_store = { version = "0.5.6", features = ["aws", "aws_profile"] } derive_more = "0.99" env_logger = "0.10" futures = "0.3" @@ -43,7 +41,6 @@ humantime-serde = "1.1" lazy_static = "1.4" log = "0.4" num_cpus = "1.15" -md-5 = "0.10" sysinfo = "0.28.4" hostname = "0.3" rand = "0.8" diff --git a/server/src/storage.rs b/server/src/storage.rs index 6eaf3af11..94b4b468d 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -345,6 +345,7 @@ pub enum ObjectStorageError { #[error("Unhandled Error: {0}")] UnhandledError(Box), + #[allow(dead_code)] #[error("Authentication Error: {0}")] AuthenticationError(Box), } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d44a1a436..e4a0de43e 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -17,15 +17,6 @@ */ use async_trait::async_trait; -use aws_sdk_s3::config::retry::RetryConfig; -use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind}; -use aws_sdk_s3::model::{CommonPrefix, Delete, ObjectIdentifier}; -use aws_sdk_s3::types::{ByteStream, SdkError}; -use aws_sdk_s3::Error as AwsSdkError; -use aws_sdk_s3::{Client, Credentials, Region}; -use aws_smithy_async::rt::sleep::default_async_sleep; -use base64::engine::general_purpose::STANDARD as BASE64; -use base64::engine::Engine as _; use bytes::Bytes; use datafusion::arrow::datatypes::Schema; @@ -38,14 +29,16 @@ use datafusion::error::DataFusionError; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; -use itertools::Itertools; -use md5::{Digest, Md5}; -use object_store::aws::AmazonS3Builder; +use object_store::aws::{AmazonS3, AmazonS3Builder, Checksum}; use object_store::limit::LimitStore; +use object_store::path::Path as StorePath; +use object_store::ObjectStore; use relative_path::RelativePath; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::iter::Iterator; -use std::path::Path; +use std::path::Path as StdPath; use std::sync::Arc; use std::time::Instant; @@ -54,6 +47,9 @@ use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; use super::{object_storage, ObjectStorageProvider}; +// in bytes +const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; + #[derive(Debug, Clone, clap::Args)] #[command( name = "S3 config", @@ -97,11 +93,11 @@ pub struct S3Config { /// Set client to send content_md5 header on every put request #[arg( long, - env = "P_S3_SET_CONTENT_MD5", + env = "P_S3_CHECKSUM", value_name = "bool", default_value = "false" )] - pub content_md5: bool, + pub set_checksum: bool, } impl ObjectStorageProvider for S3Config { @@ -132,25 +128,27 @@ impl ObjectStorageProvider for S3Config { } fn get_object_store(&self) -> Arc { - let uri: String = self.endpoint_url.parse().unwrap(); - let region = Region::new(self.region.clone()); - let creds = Credentials::new(&self.access_key_id, &self.secret_key, None, None, ""); + let mut s3 = AmazonS3Builder::new() + .with_region(&self.region) + .with_endpoint(&self.endpoint_url) + .with_bucket_name(&self.bucket_name) + .with_access_key_id(&self.access_key_id) + .with_secret_access_key(&self.secret_key) + // allow http for local instances + .with_allow_http(true); + + if self.set_checksum { + s3 = s3.with_checksum_algorithm(Checksum::SHA256); + } - let config = aws_sdk_s3::Config::builder() - .region(region) - .endpoint_url(uri) - .force_path_style(true) - .credentials_provider(creds) - .retry_config(RetryConfig::standard().with_max_attempts(5)) - .sleep_impl(default_async_sleep().expect("sleep impl is provided for tokio rt")) - .build(); + let s3 = s3.build().unwrap(); - let client = Client::from_conf(config); + // limit objectstore to a concurrent request limit + let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); Arc::new(S3 { - client, + client: s3, bucket: self.bucket_name.clone(), - set_content_md5: self.content_md5, }) } @@ -163,23 +161,20 @@ impl ObjectStorageProvider for S3Config { } } +fn to_path(path: &RelativePath) -> StorePath { + StorePath::from(path.as_str()) +} + pub struct S3 { - client: aws_sdk_s3::Client, + client: LimitStore, bucket: String, - set_content_md5: bool, } impl S3 { - async fn _get_object(&self, path: &RelativePath) -> Result { + async fn _get_object(&self, path: &RelativePath) -> Result { let instant = Instant::now(); - let resp = self - .client - .get_object() - .bucket(&self.bucket) - .key(path.as_str()) - .send() - .await; + let resp = self.client.get(&to_path(path)).await; match resp { Ok(resp) => { @@ -187,7 +182,7 @@ impl S3 { REQUEST_RESPONSE_TIME .with_label_values(&["GET", "200"]) .observe(time); - let body = resp.body.collect().await.unwrap().into_bytes(); + let body = resp.bytes().await.unwrap(); Ok(body) } Err(err) => { @@ -204,18 +199,9 @@ impl S3 { &self, path: &RelativePath, resource: Bytes, - md5: Option, - ) -> Result<(), AwsSdkError> { + ) -> Result<(), ObjectStorageError> { let time = Instant::now(); - let resp = self - .client - .put_object() - .bucket(&self.bucket) - .key(path.as_str()) - .body(resource.into()) - .set_content_md5(md5) - .send() - .await; + let resp = self.client.put(&to_path(path), resource).await; let status = if resp.is_ok() { "200" } else { "400" }; let time = time.elapsed().as_secs_f64(); REQUEST_RESPONSE_TIME @@ -225,164 +211,132 @@ impl S3 { resp.map(|_| ()).map_err(|err| err.into()) } - async fn _delete_stream(&self, stream_name: &str) -> Result<(), AwsSdkError> { - let mut pages = self - .client - .list_objects_v2() - .bucket(&self.bucket) - .prefix(format!("{stream_name}/")) - .into_paginator() - .send(); - - let mut delete_objects: Vec = vec![]; - while let Some(page) = pages.next().await { - let page = page?; - for obj in page.contents.unwrap() { - let obj_id = ObjectIdentifier::builder().set_key(obj.key).build(); - delete_objects.push(obj_id); - } - } - - let delete = Delete::builder().set_objects(Some(delete_objects)).build(); - - self.client - .delete_objects() - .bucket(&self.bucket) - .delete(delete) - .send() - .await?; - - Ok(()) - } - - async fn _delete_prefix(&self, path: &RelativePath) -> Result<(), AwsSdkError> { - let mut pages = self - .client - .list_objects_v2() - .bucket(&self.bucket) - .prefix(path.as_str()) - .into_paginator() - .send(); - - let mut delete_objects: Vec = vec![]; - while let Some(page) = pages.next().await { - let page = page?; - for obj in page.contents.unwrap() { - let obj_id = ObjectIdentifier::builder().set_key(obj.key).build(); - delete_objects.push(obj_id); - } - } - - let delete = Delete::builder().set_objects(Some(delete_objects)).build(); + async fn _delete_prefix(&self, key: &str) -> Result<(), ObjectStorageError> { + let object_stream = self.client.list(Some(&(key.into()))).await?; - self.client - .delete_objects() - .bucket(&self.bucket) - .delete(delete) - .send() - .await?; + object_stream + .for_each_concurrent(None, |x| async { + match x { + Ok(obj) => { + if (self.client.delete(&obj.location).await).is_err() { + log::error!("Failed to fetch object during delete stream"); + } + } + Err(_) => { + log::error!("Failed to fetch object during delete stream"); + } + }; + }) + .await; Ok(()) } - async fn _list_streams(&self) -> Result, AwsSdkError> { - let resp = self - .client - .list_objects_v2() - .bucket(&self.bucket) - .delimiter('/') - .send() - .await?; + async fn _list_streams(&self) -> Result, ObjectStorageError> { + let resp = self.client.list_with_delimiter(None).await?; - let common_prefixes = resp.common_prefixes().unwrap_or_default(); + let common_prefixes = resp.common_prefixes; // return prefixes at the root level let dirs: Vec<_> = common_prefixes .iter() - .filter_map(CommonPrefix::prefix) - .filter_map(|name| name.strip_suffix('/')) - .map(String::from) + .filter_map(|path| path.parts().next()) + .map(|name| name.as_ref().to_string()) .collect(); let stream_json_check = FuturesUnordered::new(); for dir in &dirs { let key = format!("{}/{}", dir, object_storage::STREAM_METADATA_FILE_NAME); - let task = async move { - self.client - .head_object() - .bucket(&self.bucket) - .key(key) - .send() - .await - .map(|_| ()) - }; - + let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; stream_json_check.push(task); } stream_json_check.try_collect().await?; - Ok(dirs - .into_iter() - .map(|name| LogStream { name }) - .collect_vec()) + Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) } - async fn _list_dates(&self, stream: &str) -> Result, AwsSdkError> { - let prefix = format!("{stream}/"); + async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { let resp = self .client - .list_objects_v2() - .bucket(&self.bucket) - .prefix(&prefix) - .delimiter('/') - .send() + .list_with_delimiter(Some(&(stream.into()))) .await?; - let common_prefixes = resp.common_prefixes().unwrap_or_default(); + let common_prefixes = resp.common_prefixes; // return prefixes at the root level let dates: Vec<_> = common_prefixes .iter() - .filter_map(CommonPrefix::prefix) - .filter_map(|name| { - name.strip_suffix('/') - .and_then(|name| name.strip_prefix(&prefix)) - }) + .filter_map(|path| path.as_ref().strip_prefix(&format!("{stream}/"))) .map(String::from) .collect(); Ok(dates) } - async fn _upload_file( - &self, - key: &str, - path: &Path, - md5: Option, - ) -> Result<(), AwsSdkError> { - let body = ByteStream::from_path(&path).await.unwrap(); - + async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { let instant = Instant::now(); - let resp = self - .client - .put_object() - .bucket(&self.bucket) - .key(key) - .body(body) - .set_content_md5(md5) - .send() - .await; - let status = if resp.is_ok() { "200" } else { "400" }; + let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; + + let res = if should_multipart { + self._upload_multipart(key, path).await + } else { + let bytes = tokio::fs::read(path).await?; + self.client + .put(&key.into(), bytes.into()) + .await + .map_err(|err| err.into()) + }; + + let status = if res.is_ok() { "200" } else { "400" }; let time = instant.elapsed().as_secs_f64(); REQUEST_RESPONSE_TIME .with_label_values(&["UPLOAD_PARQUET", status]) .observe(time); - log::trace!("{:?}", resp); - resp.map(|_| ()).map_err(|err| err.into()) + res + } + + async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + let mut buf = [0u8; MULTIPART_UPLOAD_SIZE / 2]; + let mut file = OpenOptions::new().read(true).open(path).await?; + + let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?; + + let close_multipart = |err| async move { + log::error!("multipart upload failed. {:?}", err); + self.client + .abort_multipart(&key.into(), &multipart_id) + .await + }; + + loop { + match file.read(&mut buf).await { + Ok(len) => { + if len == 0 { + break; + } + if let Err(err) = async_writer.write_all(&buf[0..len]).await { + close_multipart(err).await?; + break; + } + if let Err(err) = async_writer.flush().await { + close_multipart(err).await?; + break; + } + } + Err(err) => { + close_multipart(err).await?; + break; + } + } + } + + async_writer.shutdown().await?; + + Ok(()) } } @@ -397,13 +351,7 @@ impl ObjectStorage for S3 { path: &RelativePath, resource: Bytes, ) -> Result<(), ObjectStorageError> { - let hash = self.set_content_md5.then(|| { - let mut hash = Md5::new(); - hash.update(&resource); - BASE64.encode(hash.finalize()) - }); - - self._put_object(path, resource, hash) + self._put_object(path, resource) .await .map_err(|err| ObjectStorageError::ConnectionError(Box::new(err)))?; @@ -411,23 +359,17 @@ impl ObjectStorage for S3 { } async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { - self._delete_prefix(path).await?; + self._delete_prefix(path.as_ref()).await?; Ok(()) } async fn check(&self) -> Result<(), ObjectStorageError> { - self.client - .head_bucket() - .bucket(&self.bucket) - .send() - .await - .map(|_| ()) - .map_err(|err| err.into()) + Ok(self.client.head(&"".into()).await.map(|_| ())?) } async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { - self._delete_stream(stream_name).await?; + self._delete_prefix(stream_name).await?; Ok(()) } @@ -444,17 +386,8 @@ impl ObjectStorage for S3 { Ok(streams) } - async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { - let hash = if self.set_content_md5 { - let mut file = std::fs::File::open(path)?; - let mut digest = Md5::new(); - std::io::copy(&mut file, &mut digest)?; - Some(BASE64.encode(digest.finalize())) - } else { - None - }; - - self._upload_file(key, path, hash).await?; + async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + self._upload_file(key, path).await?; Ok(()) } @@ -496,34 +429,13 @@ impl ObjectStorage for S3 { } } -impl From for ObjectStorageError { - fn from(error: AwsSdkError) -> Self { - match error { - AwsSdkError::NotFound(_) | AwsSdkError::NoSuchKey(_) => { - ObjectStorageError::NoSuchKey("".to_string()) - } - e => ObjectStorageError::UnhandledError(Box::new(e)), - } - } -} - -// TODO: Needs to be adjusted https://github.com/awslabs/aws-sdk-rust/issues/657#issue-1436568853 -impl From> for ObjectStorageError { - fn from(error: SdkError) -> Self { +impl From for ObjectStorageError { + fn from(error: object_store::Error) -> Self { match error { - SdkError::ServiceError(err) => { - let err = err.into_err(); - let err_kind = &err.kind; - match err_kind { - HeadBucketErrorKind::Unhandled(_) => { - ObjectStorageError::AuthenticationError(err.into()) - } - _ => ObjectStorageError::UnhandledError(err.into()), - } - } - err @ SdkError::DispatchFailure(_) | err @ SdkError::TimeoutError(_) => { - ObjectStorageError::ConnectionError(err.into()) + object_store::Error::Generic { source, .. } => { + ObjectStorageError::UnhandledError(source) } + object_store::Error::NotFound { path, .. } => ObjectStorageError::NoSuchKey(path), err => ObjectStorageError::UnhandledError(Box::new(err)), } } From d260ec4b7e8b5e79260a586aeb00040a42320ca9 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 7 Apr 2023 13:16:58 +0530 Subject: [PATCH 2/6] Remove Unused Dep --- Cargo.lock | 36 ------------------------------------ server/Cargo.toml | 1 - 2 files changed, 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5535f3f10..d2798b8e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,29 +34,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "actix-files" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d832782fac6ca7369a70c9ee9a20554623c5e51c76e190ad151780ebea1cf689" -dependencies = [ - "actix-http", - "actix-service", - "actix-utils", - "actix-web", - "askama_escape", - "bitflags", - "bytes", - "derive_more", - "futures-core", - "http-range", - "log", - "mime", - "mime_guess", - "percent-encoding", - "pin-project-lite", -] - [[package]] name = "actix-http" version = "3.3.0" @@ -585,12 +562,6 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "askama_escape" -version = "0.10.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" - [[package]] name = "async-compression" version = "0.3.15" @@ -2124,12 +2095,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21dec9db110f5f872ed9699c3ecf50cf16f423502706ba5c72462e28d3157573" - [[package]] name = "httparse" version = "1.8.0" @@ -2944,7 +2909,6 @@ name = "parseable" version = "0.4.0" dependencies = [ "actix-cors", - "actix-files", "actix-web", "actix-web-httpauth", "actix-web-prometheus", diff --git a/server/Cargo.toml b/server/Cargo.toml index 0ecab746e..9a7c6cd8b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,6 @@ categories = ["logging", "observability", "log analytics"] actix-web-httpauth = "0.8" actix-web = { version = "4.3", features = ["rustls"] } actix-cors = "0.6" -actix-files = "0.6" actix-web-prometheus = { version = "0.1" } prometheus = { version = "0.13", features = ["process"] } anyhow = { version = "1.0", features = ["backtrace"] } From 24839ab68e83526ee5e05596761586d46a1745c4 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 7 Apr 2023 14:34:41 +0530 Subject: [PATCH 3/6] Add env var to configure client --- server/src/storage/s3.rs | 72 ++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index e4a0de43e..6694eadcb 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -32,7 +32,7 @@ use futures::{StreamExt, TryStreamExt}; use object_store::aws::{AmazonS3, AmazonS3Builder, Checksum}; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; -use object_store::ObjectStore; +use object_store::{ClientOptions, ObjectStore}; use relative_path::RelativePath; use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -40,7 +40,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::iter::Iterator; use std::path::Path as StdPath; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; @@ -49,6 +49,7 @@ use super::{object_storage, ObjectStorageProvider}; // in bytes const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; +const CONNECT_TIMEOUT_SECS: u64 = 5; #[derive(Debug, Clone, clap::Args)] #[command( @@ -90,7 +91,7 @@ pub struct S3Config { #[arg(long, env = "P_S3_BUCKET", value_name = "bucket-name", required = true)] pub bucket_name: String, - /// Set client to send content_md5 header on every put request + /// Set client to send checksum header on every put request #[arg( long, env = "P_S3_CHECKSUM", @@ -98,20 +99,56 @@ pub struct S3Config { default_value = "false" )] pub set_checksum: bool, + + /// Set client to use virtual hosted style acess + #[arg( + long, + env = "P_S3_PATH_STYLE", + value_name = "bool", + default_value = "false" + )] + pub use_path_style: bool, + + /// Set client to skip tls verification + #[arg( + long, + env = "P_S3_TLS_SKIP_VERIFY", + value_name = "bool", + default_value = "false" + )] + pub skip_tls: bool, } -impl ObjectStorageProvider for S3Config { - fn get_datafusion_runtime(&self) -> Arc { - let s3 = AmazonS3Builder::new() +impl S3Config { + fn get_default_builder(&self) -> AmazonS3Builder { + let mut client_options = ClientOptions::default() + .with_allow_http(true) + .with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS)); + + if self.skip_tls { + client_options = client_options.with_allow_invalid_certificates(true) + } + + let mut builder = AmazonS3Builder::new() .with_region(&self.region) .with_endpoint(&self.endpoint_url) .with_bucket_name(&self.bucket_name) .with_access_key_id(&self.access_key_id) .with_secret_access_key(&self.secret_key) - // allow http for local instances - .with_allow_http(true) - .build() - .unwrap(); + .with_virtual_hosted_style_request(!self.use_path_style) + .with_allow_http(true); + + if self.set_checksum { + builder = builder.with_checksum_algorithm(Checksum::SHA256) + } + + builder.with_client_options(client_options) + } +} + +impl ObjectStorageProvider for S3Config { + fn get_datafusion_runtime(&self) -> Arc { + let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); @@ -128,20 +165,7 @@ impl ObjectStorageProvider for S3Config { } fn get_object_store(&self) -> Arc { - let mut s3 = AmazonS3Builder::new() - .with_region(&self.region) - .with_endpoint(&self.endpoint_url) - .with_bucket_name(&self.bucket_name) - .with_access_key_id(&self.access_key_id) - .with_secret_access_key(&self.secret_key) - // allow http for local instances - .with_allow_http(true); - - if self.set_checksum { - s3 = s3.with_checksum_algorithm(Checksum::SHA256); - } - - let s3 = s3.build().unwrap(); + let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); From ad37192489c48b08d3c975dbf2c2a383c930ec98 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 7 Apr 2023 19:14:16 +0530 Subject: [PATCH 4/6] Fix --- server/src/main.rs | 1 - server/src/option.rs | 22 +--------------------- server/src/storage/s3.rs | 6 +++++- 3 files changed, 6 insertions(+), 23 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 112c3c4af..dde50ac7c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -60,7 +60,6 @@ async fn main() -> anyhow::Result<()> { CONFIG.validate(); let storage = CONFIG.storage().get_object_store(); CONFIG.validate_staging()?; - CONFIG.validate_storage(&*storage).await; let metadata = storage::resolve_parseable_metadata().await?; metadata.set_global(); banner::print(&CONFIG, storage::StorageMetadata::global()).await; diff --git a/server/src/option.rs b/server/src/option.rs index ab1687059..644b69f51 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -22,10 +22,7 @@ use clap::{command, value_parser, Arg, Args, Command, FromArgMatches}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::storage::{ - FSConfig, ObjectStorage, ObjectStorageError, ObjectStorageProvider, S3Config, - LOCAL_SYNC_INTERVAL, -}; +use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; use crate::utils::validate_path_is_writeable; lazy_static::lazy_static! { @@ -95,23 +92,6 @@ impl Config { } } - pub async fn validate_storage(&self, storage: &(impl ObjectStorage + ?Sized)) { - match storage.check().await { - Ok(_) => (), - Err(ObjectStorageError::ConnectionError(inner)) => panic!( - "Failed to connect to the Object Storage Service on {url}\nCaused by: {cause}", - url = self.storage().get_endpoint(), - cause = inner - ), - Err(ObjectStorageError::AuthenticationError(inner)) => panic!( - "Failed to authenticate. Please ensure credentials are valid\n Caused by: {inner}" - ), - Err(error) => { - panic!("{error}") - } - } - } - pub fn validate_staging(&self) -> anyhow::Result<()> { let staging_path = self.staging_dir(); validate_path_is_writeable(staging_path) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 6694eadcb..19ed27e3c 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -389,7 +389,11 @@ impl ObjectStorage for S3 { } async fn check(&self) -> Result<(), ObjectStorageError> { - Ok(self.client.head(&"".into()).await.map(|_| ())?) + Ok(self + .client + .head(&object_storage::PARSEABLE_METADATA_FILE_NAME.into()) + .await + .map(|_| ())?) } async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { From 72c2a3ad0d37c1371273c3cdff837ac95a66f658 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 8 Apr 2023 12:52:17 +0530 Subject: [PATCH 5/6] Fix --- server/src/storage/s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 19ed27e3c..d36248885 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -324,7 +324,7 @@ impl S3 { } async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { - let mut buf = [0u8; MULTIPART_UPLOAD_SIZE / 2]; + let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; let mut file = OpenOptions::new().read(true).open(path).await?; let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?; From 5de61c65e23a12a5b0dcb6e3f7b39f171785ad51 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sat, 8 Apr 2023 12:54:03 +0530 Subject: [PATCH 6/6] Remove Default --- server/src/option.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 644b69f51..20e61589f 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -122,12 +122,6 @@ impl Config { } } -impl Default for Config { - fn default() -> Self { - Self::new() - } -} - fn parseable_cli_command() -> Command { let local = Server::get_clap_command("local-store"); let local = ::augment_args_for_update(local);