Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ members = [
"common/streams",
"common/tracing",
"common/users",
"common/storage",

# Query
"query",
Expand Down
23 changes: 6 additions & 17 deletions common/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,12 @@ pub enum CopyUnit<'a> {
},
/// UriLocation (a.k.a external location) can be used in `INTO` or `FROM`.
///
/// For examples: `'s3://example/path/to/dir' CREDENTIALS = (AWS_ACCESS_ID="admin" AWS_SECRET_KEY="admin")`
///
/// TODO(xuanwo): Add endpoint_url support.
/// TODO(xuanwo): We can check if we support this protocol during parsing.
/// TODO(xuanwo): Maybe we can introduce more strict (friendly) report for credentials and encryption, like parsed into StorageConfig?
/// For examples: `'s3://example/path/to/dir' CONNECTION = (AWS_ACCESS_ID="admin" AWS_SECRET_KEY="admin")`
UriLocation {
protocol: String,
name: String,
path: String,
credentials: BTreeMap<String, String>,
encryption: BTreeMap<String, String>,
connection: BTreeMap<String, String>,
},
/// Query can only be used as `FROM`.
///
Expand Down Expand Up @@ -157,18 +152,12 @@ impl Display for CopyUnit<'_> {
protocol,
name,
path,
credentials,
encryption,
connection,
} => {
write!(f, "'{protocol}://{name}{path}'")?;
if !credentials.is_empty() {
write!(f, " CREDENTIALS = ( ")?;
write_space_seperated_map(f, credentials)?;
write!(f, " )")?;
}
if !encryption.is_empty() {
write!(f, " ENCRYPTION = ( ")?;
write_space_seperated_map(f, encryption)?;
if !connection.is_empty() {
write!(f, " CONNECTION = ( ")?;
write_space_seperated_map(f, connection)?;
write!(f, " )")?;
}
Ok(())
Expand Down
13 changes: 9 additions & 4 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,13 +1118,19 @@ pub fn copy_target(i: Input) -> IResult<CopyUnit> {
map_res(
rule! {
#literal_string
~ (CONNECTION ~ "=" ~ #options)?
~ (CREDENTIALS ~ "=" ~ #options)?
~ (ENCRYPTION ~ "=" ~ #options)?
},
|(location, credentials_opt, encryption_opt)| {
|(location, connection_opt, credentials_opt, encryption_opt)| {
let parsed =
Url::parse(&location).map_err(|_| ErrorKind::Other("invalid uri location"))?;

// TODO: We will use `CONNECTION` to replace `CREDENTIALS` and `ENCRYPTION`.
let mut conn = connection_opt.map(|v| v.2).unwrap_or_default();
conn.extend(credentials_opt.map(|v| v.2).unwrap_or_default());
conn.extend(encryption_opt.map(|v| v.2).unwrap_or_default());

Ok(CopyUnit::UriLocation {
protocol: parsed.scheme().to_string(),
name: parsed
Expand All @@ -1136,16 +1142,15 @@ pub fn copy_target(i: Input) -> IResult<CopyUnit> {
} else {
parsed.path().to_string()
},
credentials: credentials_opt.map(|v| v.2).unwrap_or_default(),
encryption: encryption_opt.map(|v| v.2).unwrap_or_default(),
connection: conn,
})
},
)(i)
};

rule!(
#stage_location: "@<stage_name> { <path> }"
| #uri_location: "'<protocol>://<name> {<path>} { CREDENTIALS = ({ AWS_ACCESS_KEY = 'aws_access_key' }) } '"
| #uri_location: "'<protocol>://<name> {<path>} { CONNECTION = ({ AWS_ACCESS_KEY = 'aws_access_key' }) } '"
| #table: "{ { <catalog>. } <database>. }<table>"
| #query: "( <query> )"
)(i)
Expand Down
12 changes: 12 additions & 0 deletions common/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ fn test_statement() {
skip_header = 1
)
size_limit=10;"#,
r#"COPY INTO mytable
FROM 's3://mybucket/data.csv'
CONNECTION = (
ENDPOINT_URL = 'http://127.0.0.1:9900'
)
FILE_FORMAT = (
type = 'CSV'
field_delimiter = ','
record_delimiter = '\n'
skip_header = 1
)
size_limit=10;"#,
r#"COPY INTO mytable
FROM @my_stage
FILE_FORMAT = (
Expand Down
2 changes: 1 addition & 1 deletion common/ast/tests/it/testdata/statement-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ error:
--> SQL:1:38
|
1 | COPY INTO mytable FROM 's3://bucket' CREDENTIAL = ();
| ^^^^^^^^^^ expected `CREDENTIALS`, `ENCRYPTION`, `FILES`, `PATTERN`, `FILE_FORMAT`, `VALIDATION_MODE`, or 3 more ...
| ^^^^^^^^^^ expected `CONNECTION`, `CREDENTIALS`, `ENCRYPTION`, `FILES`, `PATTERN`, `FILE_FORMAT`, or 4 more ...


---------- Input ----------
Expand Down
62 changes: 54 additions & 8 deletions common/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5304,8 +5304,57 @@ Copy(
protocol: "s3",
name: "mybucket",
path: "/data.csv",
credentials: {},
encryption: {},
connection: {},
},
dst: Table {
catalog: None,
database: None,
table: Identifier {
name: "mytable",
quote: None,
span: Ident(10..17),
},
},
files: [],
pattern: "",
file_format: {
"field_delimiter": ",",
"record_delimiter": "\n",
"skip_header": "1",
"type": "CSV",
},
validation_mode: "",
size_limit: 10,
},
)


---------- Input ----------
COPY INTO mytable
FROM 's3://mybucket/data.csv'
CONNECTION = (
ENDPOINT_URL = 'http://127.0.0.1:9900'
)
FILE_FORMAT = (
type = 'CSV'
field_delimiter = ','
record_delimiter = '\n'
skip_header = 1
)
size_limit=10;
---------- Output ---------
COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url='http://127.0.0.1:9900' ) FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10
---------- AST ------------
Copy(
CopyStmt {
src: UriLocation {
protocol: "s3",
name: "mybucket",
path: "/data.csv",
connection: {
"endpoint_url": "http://127.0.0.1:9900",
},
},
dst: Table {
catalog: None,
Expand Down Expand Up @@ -5402,8 +5451,7 @@ Copy(
protocol: "s3",
name: "mybucket",
path: "/data.csv",
credentials: {},
encryption: {},
connection: {},
},
files: [],
pattern: "",
Expand Down Expand Up @@ -5480,7 +5528,7 @@ COPY INTO mytable
)
size_limit=10;
---------- Output ---------
COPY INTO mytable FROM 's3://mybucket/data.csv' CREDENTIALS = ( aws_key_id='access_key' aws_secret_key='secret_key' ) ENCRYPTION = ( master_key='master_key' ) FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( aws_key_id='access_key' aws_secret_key='secret_key' master_key='master_key' ) FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10
---------- AST ------------
Copy(
Expand All @@ -5489,11 +5537,9 @@ Copy(
protocol: "s3",
name: "mybucket",
path: "/data.csv",
credentials: {
connection: {
"aws_key_id": "access_key",
"aws_secret_key": "secret_key",
},
encryption: {
"master_key": "master_key",
},
},
Expand Down
2 changes: 1 addition & 1 deletion common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ test = false
common-base = { path = "../base" }

async-trait = "0.1.56"
opendal = { version = "0.10.0", features = ["retry"] }
opendal = { version = "0.11.0", features = ["retry"] }
time = "0.3.10"
1 change: 1 addition & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ build_exceptions! {
StoragePermissionDenied(3002),
StorageUnavailable(3901),
StorageUnsupported(3902),
StorageInsecure(3903),
StorageOther(4000),
}

Expand Down
4 changes: 0 additions & 4 deletions common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ edition = "2021"
doctest = false
test = false

[features]
storage-hdfs = ["opendal/services-hdfs"]

[dependencies]
# Workspace dependencies
common-exception = { path = "../exception" }
Expand All @@ -29,7 +26,6 @@ chrono-tz = "0.6.1"
futures = "0.3.21"
lexical-core = "0.8.5"
micromarshal = "0.1.0"
opendal = { version = "0.10.0", features = ["retry"] }
serde = { version = "1.0.137", features = ["derive"] }
time = "0.3.10"

Expand Down
2 changes: 0 additions & 2 deletions common/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ mod binary_read;
mod binary_write;

mod buffer;
mod configs;
mod format_settings;
mod operator;
mod options_deserializer;
mod position;
mod stat_buffer;
Expand Down
14 changes: 0 additions & 14 deletions common/io/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,8 @@ pub use crate::buffer::BufferReader;
pub use crate::buffer::CheckpointRead;
pub use crate::buffer::MemoryReader;
pub use crate::buffer::NestedCheckpointReader;
pub use crate::configs::StorageAzblobConfig;
pub use crate::configs::StorageConfig;
pub use crate::configs::StorageFsConfig;
pub use crate::configs::StorageHdfsConfig;
pub use crate::configs::StorageParams;
pub use crate::configs::StorageS3Config;
pub use crate::configs::AWS_S3_ENDPOINT;
pub use crate::format_settings::Compression;
pub use crate::format_settings::FormatSettings;
pub use crate::operator::init_azblob_operator;
pub use crate::operator::init_fs_operator;
#[cfg(feature = "storage-hdfs")]
pub use crate::operator::init_hdfs_operator;
pub use crate::operator::init_memory_operator;
pub use crate::operator::init_operator;
pub use crate::operator::init_s3_operator;
pub use crate::options_deserializer::OptionsDeserializer;
pub use crate::options_deserializer::OptionsDeserializerError;
pub use crate::position::*;
Expand Down
2 changes: 1 addition & 1 deletion common/management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ serde_json = "1.0.81"
[dev-dependencies]
common-base = { path = "../base" }
common-meta-embedded = { path = "../meta/embedded" }

common-storage = { path = "../storage" }
mockall = "0.11.1"
4 changes: 2 additions & 2 deletions common/management/tests/it/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ use std::sync::Arc;
use common_base::base::tokio;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::StorageParams;
use common_io::prelude::StorageS3Config;
use common_management::*;
use common_meta_api::KVApi;
use common_meta_embedded::MetaEmbedded;
use common_meta_types::SeqV;
use common_meta_types::StageFile;
use common_meta_types::StageParams;
use common_meta_types::UserStageInfo;
use common_storage::StorageParams;
use common_storage::StorageS3Config;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_add_stage() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion common/meta/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ test = false
[dependencies]
common-datavalues = { path = "../../datavalues" }
common-exception = { path = "../../exception" }
common-io = { path = "../../io" }
common-storage = { path = "../../storage" }

openraft = { git = "https://github.com/datafuselabs/openraft", rev = "v0.7.0-alpha.2" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1", default-features = false }
Expand Down
Loading