Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet/configs/users.d/users.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<!-- Allow access management -->
<access_management>1</access_management>

<!-- Allow access to named collections -->
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>

<!-- Example of row level security policy. -->
<!-- <databases>
<test>
Expand Down
5 changes: 5 additions & 0 deletions parquet/configs/users.d/users_disabled_caching.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
<!-- Allow access management -->
<access_management>1</access_management>

<!-- Allow access to named collections -->
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>

<!-- Example of row level security policy. -->
<!-- <databases>
<test>
Expand Down
13 changes: 10 additions & 3 deletions parquet/tests/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def insert_into_function(self):

with When("I insert test data into `s3` table function in Parquet format"):
node.query(
f"INSERT INTO FUNCTION s3('{self.context.uri}{file_name}.Parquet', '{self.context.access_key_id}', '{self.context.secret_access_key}', 'Parquet', '{func_def}', '{compression_type.lower()}') VALUES {','.join(total_values)}",
f"INSERT INTO FUNCTION s3(s3_credentials, url='{self.context.uri}{file_name}.Parquet', format='Parquet', structure='{func_def}', compression_method='{compression_type.lower()}') VALUES {','.join(total_values)}",
settings=[("allow_suspicious_low_cardinality_types", 1)],
)

Expand Down Expand Up @@ -328,7 +328,7 @@ def select_from_function_manual_cast_types(self):
executor=executor,
)(
sql=f"SELECT {column.name}, toTypeName({column.name}) FROM \
s3('{self.context.uri}{table_name}.Parquet', '{self.context.access_key_id}', '{self.context.secret_access_key}', 'Parquet', '{table_def}')",
s3(s3_credentials, url='{self.context.uri}{table_name}.Parquet', format='Parquet', structure='{table_def}')",
)
join()

Expand Down Expand Up @@ -373,7 +373,7 @@ def select_from_function_auto_cast_types(self):
executor=executor,
)(
sql=f"SELECT {column.name}, toTypeName({column.name}) FROM \
s3('{self.context.uri}{table_name}.Parquet', '{self.context.access_key_id}', '{self.context.secret_access_key}', 'Parquet')",
s3(s3_credentials, url='{self.context.uri}{table_name}.Parquet', format='Parquet')",
)
join()

Expand Down Expand Up @@ -442,6 +442,13 @@ def outline(self, compression_type):
self.context.compression_type = compression_type
self.context.node = self.context.cluster.node("clickhouse1")

with Given("I add S3 credentials configuration"):
named_s3_credentials(
access_key_id=self.context.access_key_id,
secret_access_key=self.context.secret_access_key,
restart=True,
)

Suite(run=engine)
Suite(run=function)

Expand Down
5 changes: 5 additions & 0 deletions s3/configs/clickhouse/users.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
<!-- Allow access management -->
<access_management>1</access_management>

<!-- Allow access to named collections -->
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>

<!-- Example of row level security policy. -->
<!-- <databases>
<test>
Expand Down
35 changes: 25 additions & 10 deletions s3/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,25 @@ def s3_storage(
return add_config(config, restart=restart, nodes=nodes, timeout=timeout)


@TestStep(Given)
def named_s3_credentials(
self, access_key_id, secret_access_key, restart=False, nodes=None, timeout=60
):
"""Add S3 connection configuration as a named collection."""
config = create_xml_config_content(
entries={
"named_collections": {
"s3_credentials": {
"access_key_id": access_key_id,
"secret_access_key": secret_access_key,
}
}
},
config_file="s3_credentials.xml",
)
return add_config(config, restart=restart, nodes=nodes, timeout=timeout)


@contextmanager
def s3_endpoints(
endpoints,
Expand Down Expand Up @@ -1847,16 +1866,14 @@ def insert_to_s3_function(
):
"""Write a table to a file in s3. File will be overwritten from an empty table during cleanup."""

access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
uri = uri or self.context.uri
node = current().context.node

try:
query = f"INSERT INTO FUNCTION s3('{uri}{filename}', '{access_key_id}','{secret_access_key}', 'CSVWithNames', '{columns}'"
query = f"INSERT INTO FUNCTION s3(s3_credentials, url='{uri}{filename}', format='CSVWithNames', structure='{columns}'"

if compression:
query += f", '{compression}'"
query += f", compression_method='{compression}'"

query += f") SELECT * FROM {table_name}"

Expand All @@ -1868,7 +1885,7 @@ def insert_to_s3_function(
yield

finally:
query = f"INSERT INTO FUNCTION s3('{uri}{filename}', '{access_key_id}','{secret_access_key}', 'CSV', '{columns}'"
query = f"INSERT INTO FUNCTION s3(s3_credentials, url='{uri}{filename}', format='CSV', structure='{columns}'"
query += f") SELECT * FROM null('{columns}')"

node.query(query)
Expand All @@ -1887,18 +1904,16 @@ def insert_from_s3_function(
no_checks=False,
):
"""Import data from a file in s3 to a table."""
access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
uri = uri or self.context.uri
node = current().context.node

if cluster_name is None:
query = f"INSERT INTO {table_name} SELECT * FROM s3('{uri}{filename}', '{access_key_id}','{secret_access_key}', 'CSVWithNames', '{columns}'"
query = f"INSERT INTO {table_name} SELECT * FROM s3(s3_credentials, url='{uri}{filename}', format='CSVWithNames', structure='{columns}'"
else:
query = f"INSERT INTO {table_name} SELECT * FROM s3Cluster('{cluster_name}', '{uri}{filename}', '{access_key_id}','{secret_access_key}', 'CSVWithNames', '{columns}'"
query = f"INSERT INTO {table_name} SELECT * FROM s3Cluster('{cluster_name}', s3_credentials, url='{uri}{filename}', format='CSVWithNames', structure='{columns}'"

if compression:
query += f", '{compression}'"
query += f", compression_method='{compression}'"

query += ")"

Expand Down
39 changes: 17 additions & 22 deletions s3/tests/table_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,24 +470,21 @@ def credentials_s3Cluster(self):
@Requirements(RQ_SRS_015_S3_Settings_PartitionBy("1.0"))
def partition(self):
"""Check that ClickHouse can export partitioned data."""
access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
uri = self.context.uri
node = current().context.node

with When("I export the data to S3 using the table function"):
sql = (
f"INSERT INTO FUNCTION s3('{uri}_partition_export_"
+ "{_partition_id}.csv'"
+ f", '{access_key_id}','{secret_access_key}', 'CSV', 'a String') PARTITION BY a VALUES ('x'),('y'),('z')"
f"INSERT INTO FUNCTION s3(s3_credentials, url='{uri}_partition_export_"
+ "{_partition_id}.csv', format='CSV', structure='a String') PARTITION BY a VALUES ('x'),('y'),('z')"
)
node.query(sql)

for partition_id in ["x", "y", "z"]:
with Then(f"I check the data in the {partition_id} partition"):
output = node.query(
f"""SELECT * FROM
s3('{uri}_partition_export_{partition_id}.csv', '{access_key_id}','{secret_access_key}', 'CSV', 'a String') FORMAT TabSeparated"""
s3(s3_credentials, url='{uri}_partition_export_{partition_id}.csv', format='CSV', structure='a String') FORMAT TabSeparated"""
).output
assert output == partition_id, error()

Expand All @@ -496,18 +493,15 @@ def partition(self):
@Requirements(RQ_SRS_015_S3_Settings_PartitionBy("1.0"))
def partition_s3Cluster(self):
"""Check that ClickHouse can export partitioned data using s3Cluster table function."""
access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
uri = self.context.uri
node = current().context.node

for cluster_name in self.context.clusters:
with Combination(f"cluster_name = {cluster_name}"):
with When("I export the data to S3 using the table function"):
sql = (
f"INSERT INTO FUNCTION s3('{uri}_partition_export_"
+ "{_partition_id}.csv'"
+ f", '{access_key_id}','{secret_access_key}', 'CSV', 'a String') PARTITION BY a VALUES ('x'),('y'),('z')"
f"INSERT INTO FUNCTION s3(s3_credentials, url='{uri}_partition_export_"
+ "{_partition_id}.csv', format='CSV', structure='a String') PARTITION BY a VALUES ('x'),('y'),('z')"
)
node.query(sql)

Expand All @@ -519,7 +513,7 @@ def partition_s3Cluster(self):
self.context.cluster.node("clickhouse1")
.query(
f"""SELECT * FROM
s3Cluster('{cluster_name}', '{uri}_partition_export_{partition_id}.csv', '{access_key_id}','{secret_access_key}', 'CSV', 'a String') FORMAT TabSeparated"""
s3Cluster('{cluster_name}', s3_credentials, url='{uri}_partition_export_{partition_id}.csv', format='CSV', structure='a String') FORMAT TabSeparated"""
)
.output
)
Expand Down Expand Up @@ -690,8 +684,6 @@ def remote_host_filter(self):
"""
table1_name = "table_" + getuid()
table2_name = "table_" + getuid()
access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
uri = self.context.uri
node = current().context.node
expected = "427"
Expand All @@ -717,7 +709,7 @@ def remote_host_filter(self):
node.query(
f"""
INSERT INTO FUNCTION
s3('{uri}', '{access_key_id}','{secret_access_key}', 'CSVWithNames', 'd UInt64')
s3(s3_credentials, url='{uri}', format='CSVWithNames', structure='d UInt64')
SELECT * FROM {table1_name}""",
message=f'DB::Exception: URL "{uri}" is not allowed',
)
Expand All @@ -727,8 +719,6 @@ def remote_host_filter(self):
@Requirements(RQ_SRS_015_S3_TableFunction_MeasureFileSize("1.0"))
def measure_file_size(self):

access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
table1_name = "table_" + getuid()
uri = self.context.uri + "measure-file-size/"
bucket_path = self.context.bucket_path + "/measure-file-size"
Expand Down Expand Up @@ -756,7 +746,7 @@ def measure_file_size(self):

with Then("I compare the size that clickhouse reports"):
r = node.query(
f"SELECT sum(_size) FROM s3('{uri}**', '{access_key_id}','{secret_access_key}', 'One') FORMAT TSV"
f"SELECT sum(_size) FROM s3(s3_credentials, url='{uri}**', format='One') FORMAT TSV"
)

for retry in retries(timeout=30, delay=5):
Expand All @@ -769,8 +759,6 @@ def measure_file_size(self):
@Requirements(RQ_SRS_015_S3_TableFunction_MeasureFileSize("1.0"))
def measure_file_size_s3Cluster(self):
"""Check that ClickHouse can measure file size using s3Cluster table function."""
access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
table1_name = "table_" + getuid()
uri = self.context.uri + "measure-file-size/"
bucket_path = self.context.bucket_path + "/measure-file-size"
Expand Down Expand Up @@ -804,7 +792,7 @@ def measure_file_size_s3Cluster(self):
for attempt in retries(timeout=10, delay=1):
with attempt:
r = node.query(
f"SELECT sum(_size) FROM s3Cluster('{cluster_name}', '{uri}**', '{access_key_id}','{secret_access_key}', 'One') FORMAT TSV"
f"SELECT sum(_size) FROM s3Cluster('{cluster_name}', s3_credentials, url='{uri}**', format='One') FORMAT TSV"
)
size_clickhouse = int(r.output.strip())
debug(
Expand All @@ -820,6 +808,13 @@ def measure_file_size_s3Cluster(self):
def outline(self):
"""Test S3 and S3 compatible storage through storage disks."""

with Given("I add S3 credentials configuration"):
named_s3_credentials(
access_key_id=self.context.access_key_id,
secret_access_key=self.context.secret_access_key,
restart=True,
)

for scenario in loads(current_module(), Scenario):
with allow_s3_truncate(self.context.node):
Scenario(run=scenario, flags=TE)
Expand Down Expand Up @@ -867,7 +862,7 @@ def ssec_encryption_check(self):
node.query(
f"""
INSERT INTO FUNCTION
s3('{self.context.uri}encrypted.csv', '{self.context.access_key_id}','{self.context.secret_access_key}', 'CSV', 'd UInt64')
s3(s3_credentials, url='{self.context.uri}encrypted.csv', format='CSV', structure='d UInt64')
SELECT * FROM {name}"""
)

Expand Down
31 changes: 13 additions & 18 deletions s3/tests/table_function_invalid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,18 @@ def insert_to_s3_function_invalid(
columns="d UInt64",
compression=None,
file_format="CSVWithNames",
access_key_id=None,
secret_access_key=None,
message=None,
exitcode=None,
timeout=60,
):
"""Write a table to a file in s3. File will be overwritten from an empty table during cleanup."""

access_key_id = access_key_id or self.context.access_key_id
secret_access_key = secret_access_key or self.context.secret_access_key
node = current().context.node

query = f"INSERT INTO FUNCTION s3('{path}', '{access_key_id}','{secret_access_key}', '{file_format}', '{columns}'"
query = f"INSERT INTO FUNCTION s3(s3_credentials, url='{path}', format='{file_format}', structure='{columns}'"

if compression:
query += f", '{compression}'"
query += f", compression_method='{compression}'"

query += f") SELECT * FROM {table_name}"

Expand All @@ -48,15 +44,13 @@ def insert_from_s3_function_invalid(
timeout=60,
):
"""Import data from a file in s3 to a table and catch fail."""
access_key_id = self.context.access_key_id
secret_access_key = self.context.secret_access_key
uri = uri or self.context.uri
node = current().context.node

query = f"INSERT INTO {table_name} SELECT * FROM s3('{uri}{filename}', '{access_key_id}','{secret_access_key}', 'CSVWithNames', '{columns}'"
query = f"INSERT INTO {table_name} SELECT * FROM s3(s3_credentials, url='{uri}{filename}', format='CSVWithNames', structure='{columns}'"

if compression:
query += f", '{compression}'"
query += f", compression_method='{compression}'"

query += ")"

Expand Down Expand Up @@ -373,14 +367,8 @@ def invalid_credentials(self):
"""I export the data to S3 using the table function with invalid
credentials, expecting failure"""
):
insert_to_s3_function_invalid(
table_name=name_table1,
path=f"{uri}invalid.csv",
access_key_id=access_key_id,
secret_access_key=secret_access_key,
message=expected,
exitcode=243,
)
query = f"INSERT INTO FUNCTION s3('{uri}invalid.csv', '{access_key_id}', '{secret_access_key}', 'CSV', 'd UInt64') SELECT * FROM {name_table1}"
node.query(query, message=expected, exitcode=243)


@TestOutline(Feature)
Expand All @@ -390,6 +378,13 @@ def outline(self, uri):

self.context.uri = uri

with Given("I add S3 credentials configuration"):
named_s3_credentials(
access_key_id=self.context.access_key_id,
secret_access_key=self.context.secret_access_key,
restart=True,
)

for scenario in loads(current_module(), Scenario):
with allow_s3_truncate(self.context.node):
scenario()
Expand Down
Loading
Loading