-
Notifications
You must be signed in to change notification settings - Fork 385
Check if schema is compatible in add_files API
#907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
21bcf02
3269c14
177edce
e9004ae
c33fe1c
2c95cd4
66a485c
2641301
6c4f5d7
dead345
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -166,6 +166,8 @@ | |
|
|
||
| ONE_MEGABYTE = 1024 * 1024 | ||
| BUFFER_SIZE = "buffer-size" | ||
| DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" | ||
|
|
||
| ICEBERG_SCHEMA = b"iceberg.schema" | ||
| # The PARQUET: in front means that it is Parquet specific, in this case the field_id | ||
| PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id" | ||
|
|
@@ -1934,7 +1936,7 @@ def data_file_statistics_from_parquet_metadata( | |
|
|
||
|
|
||
| def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: | ||
| from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties | ||
| from pyiceberg.table import PropertyUtil, TableProperties | ||
|
|
||
| parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) | ||
| row_group_size = PropertyUtil.property_as_int( | ||
|
|
@@ -2015,6 +2017,50 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ | |
| return bin_packed_record_batches | ||
|
|
||
|
|
||
| def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema) -> None: | ||
| """ | ||
| Check if the `table_schema` is compatible with `other_schema`. | ||
|
|
||
| Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type. | ||
|
|
||
| Raises: | ||
| ValueError: If the schemas are not compatible. | ||
| """ | ||
| name_mapping = table_schema.name_mapping | ||
| downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False | ||
| try: | ||
| task_schema = pyarrow_to_schema( | ||
| other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us | ||
| ) | ||
| except ValueError as e: | ||
| other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
| additional_names = set(other_schema.column_names) - set(table_schema.column_names) | ||
| raise ValueError( | ||
| f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." | ||
| ) from e | ||
|
|
||
| if table_schema.as_struct() != task_schema.as_struct(): | ||
| from rich.console import Console | ||
| from rich.table import Table as RichTable | ||
|
|
||
| console = Console(record=True) | ||
|
|
||
| rich_table = RichTable(show_header=True, header_style="bold") | ||
| rich_table.add_column("") | ||
| rich_table.add_column("Table field") | ||
| rich_table.add_column("Dataframe field") | ||
|
|
||
| for lhs in table_schema.fields: | ||
| try: | ||
| rhs = task_schema.find_field(lhs.field_id) | ||
| rich_table.add_row("✅" if lhs == rhs else "❌", str(lhs), str(rhs)) | ||
| except ValueError: | ||
| rich_table.add_row("❌", str(lhs), "Missing") | ||
|
|
||
| console.print(rich_table) | ||
| raise ValueError(f"Mismatch in fields:\n{console.export_text()}") | ||
|
|
||
|
|
||
| def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: | ||
| for file_path in file_paths: | ||
| input_file = io.new_input(file_path) | ||
|
|
@@ -2026,6 +2072,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ | |
| f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" | ||
| ) | ||
| schema = table_metadata.schema() | ||
| _check_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understand is that now if we enable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the catch @HonahX - I think you are right. Adding a nanosecond timestamp file doesn't correctly allow Spark Iceberg to read the file and instead results in exceptions like: I will make downcast_ns_timestamp_to_us_on_write an input argument to _check_schema_compatible, so that we can prevent nanoseconds timestamp types from being added through add_files, but can continue to support it being downcast in overwrite/append There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, I think it is okay to be able to read more broadly. We do need tests to ensure that it works correctly. Looking at Arrow, there are already some physical types that we don't support ( |
||
|
|
||
| statistics = data_file_statistics_from_parquet_metadata( | ||
| parquet_metadata=parquet_metadata, | ||
| stats_columns=compute_statistics_plan(schema, table_metadata.properties), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.