Skip to content

Commit f4de1d1

Browse files
hussein-awalasungwy
authored andcommitted
Implement create_table_if_not_exists (apache#415)
* Feat: Add fail_if_exists param to create_table * create create_table_if_not_exists method * fix reset test * fix mypy check
1 parent 4c55d74 commit f4de1d1

File tree

6 files changed

+135
-2
lines changed

6 files changed

+135
-2
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
cast,
3737
)
3838

39-
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, NotInstalledError
39+
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, NotInstalledError, TableAlreadyExistsError
4040
from pyiceberg.io import FileIO, load_file_io
4141
from pyiceberg.manifest import ManifestFile
4242
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
@@ -315,6 +315,34 @@ def create_table(
315315
TableAlreadyExistsError: If a table with the name already exists.
316316
"""
317317

318+
def create_table_if_not_exists(
319+
self,
320+
identifier: Union[str, Identifier],
321+
schema: Union[Schema, "pa.Schema"],
322+
location: Optional[str] = None,
323+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
324+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
325+
properties: Properties = EMPTY_DICT,
326+
) -> Table:
327+
"""Create a table if it does not exist.
328+
329+
Args:
330+
identifier (str | Identifier): Table identifier.
331+
schema (Schema): Table's schema.
332+
location (str | None): Location for the table. Optional Argument.
333+
partition_spec (PartitionSpec): PartitionSpec for the table.
334+
sort_order (SortOrder): SortOrder for the table.
335+
properties (Properties): Table properties that can be a string based dictionary.
336+
337+
Returns:
338+
Table: the created table instance if the table does not exist, else the existing
339+
table instance.
340+
"""
341+
try:
342+
return self.create_table(identifier, schema, location, partition_spec, sort_order, properties)
343+
except TableAlreadyExistsError:
344+
return self.load_table(identifier)
345+
318346
@abstractmethod
319347
def load_table(self, identifier: Union[str, Identifier]) -> Table:
320348
"""Load the table's metadata and returns the table instance.

tests/catalog/integration_test_dynamodb.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Sch
9696
test_catalog.create_table((database_name, table_name), table_schema_nested)
9797

9898

99+
def test_create_table_if_not_exists_duplicated_table(
100+
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str
101+
) -> None:
102+
test_catalog.create_namespace(database_name)
103+
table1 = test_catalog.create_table((database_name, table_name), table_schema_nested)
104+
table2 = test_catalog.create_table_if_not_exists((database_name, table_name), table_schema_nested)
105+
assert table1.identifier == table2.identifier
106+
107+
99108
def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
100109
identifier = (database_name, table_name)
101110
test_catalog.create_namespace(database_name)

tests/catalog/integration_test_glue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,15 @@ def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Sch
200200
test_catalog.create_table((database_name, table_name), table_schema_nested)
201201

202202

203+
def test_create_table_if_not_exists_duplicated_table(
204+
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
205+
) -> None:
206+
test_catalog.create_namespace(database_name)
207+
table1 = test_catalog.create_table((database_name, table_name), table_schema_nested)
208+
table2 = test_catalog.create_table_if_not_exists((database_name, table_name), table_schema_nested)
209+
assert table1.identifier == table2.identifier
210+
211+
203212
def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
204213
identifier = (database_name, table_name)
205214
test_catalog.create_namespace(database_name)

tests/catalog/test_dynamodb.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,18 @@ def test_create_duplicated_table(
176176
test_catalog.create_table(identifier, table_schema_nested)
177177

178178

179+
@mock_aws
180+
def test_create_table_if_not_exists_duplicated_table(
181+
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
182+
) -> None:
183+
identifier = (database_name, table_name)
184+
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
185+
test_catalog.create_namespace(namespace=database_name)
186+
table1 = test_catalog.create_table(identifier, table_schema_nested)
187+
table2 = test_catalog.create_table_if_not_exists(identifier, table_schema_nested)
188+
assert table1.identifier == table2.identifier
189+
190+
179191
@mock_aws
180192
def test_load_table(
181193
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str

tests/catalog/test_rest.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
# pylint: disable=redefined-outer-name,unused-argument
1818
import os
19-
from typing import Any, Dict, cast
19+
from typing import Any, Callable, Dict, cast
2020
from unittest import mock
2121

2222
import pytest
@@ -560,6 +560,64 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non
560560
assert "Table already exists" in str(e.value)
561561

562562

563+
def test_create_table_if_not_exists_200(
564+
rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
565+
) -> None:
566+
def json_callback() -> Callable[[Any, Any], Dict[str, Any]]:
567+
call_count = 0
568+
569+
def callback(request: Any, context: Any) -> Dict[str, Any]:
570+
nonlocal call_count
571+
call_count += 1
572+
573+
if call_count == 1:
574+
context.status_code = 200
575+
return example_table_metadata_no_snapshot_v1_rest_json
576+
else:
577+
context.status_code = 409
578+
return {
579+
"error": {
580+
"message": "Table already exists: fokko.already_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
581+
"type": "AlreadyExistsException",
582+
"code": 409,
583+
}
584+
}
585+
586+
return callback
587+
588+
rest_mock.post(
589+
f"{TEST_URI}v1/namespaces/fokko/tables",
590+
json=json_callback(),
591+
request_headers=TEST_HEADERS,
592+
)
593+
rest_mock.get(
594+
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
595+
json=example_table_metadata_no_snapshot_v1_rest_json,
596+
status_code=200,
597+
request_headers=TEST_HEADERS,
598+
)
599+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
600+
table1 = catalog.create_table(
601+
identifier=("fokko", "fokko2"),
602+
schema=table_schema_simple,
603+
location=None,
604+
partition_spec=PartitionSpec(
605+
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id"), spec_id=1
606+
),
607+
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
608+
properties={"owner": "fokko"},
609+
)
610+
table2 = catalog.create_table_if_not_exists(
611+
identifier=("fokko", "fokko2"),
612+
schema=table_schema_simple,
613+
location=None,
614+
partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id")),
615+
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
616+
properties={"owner": "fokko"},
617+
)
618+
assert table1 == table2
619+
620+
563621
def test_create_table_419(rest_mock: Mocker, table_schema_simple: Schema) -> None:
564622
rest_mock.post(
565623
f"{TEST_URI}v1/namespaces/fokko/tables",

tests/catalog/test_sql.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,23 @@ def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: Schem
247247
catalog.create_table(random_identifier, table_schema_nested)
248248

249249

250+
@pytest.mark.parametrize(
251+
'catalog',
252+
[
253+
lazy_fixture('catalog_memory'),
254+
lazy_fixture('catalog_sqlite'),
255+
],
256+
)
257+
def test_create_table_if_not_exists_duplicated_table(
258+
catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier
259+
) -> None:
260+
database_name, _table_name = random_identifier
261+
catalog.create_namespace(database_name)
262+
table1 = catalog.create_table(random_identifier, table_schema_nested)
263+
table2 = catalog.create_table_if_not_exists(random_identifier, table_schema_nested)
264+
assert table1.identifier == table2.identifier
265+
266+
250267
@pytest.mark.parametrize(
251268
'catalog',
252269
[

0 commit comments

Comments
 (0)