Skip to content

Commit c5809b6

Browse files
zhengruifengHyukjinKwon
authored andcommitted
[SPARK-48647][PYTHON][CONNECT] Refine the error message for YearMonthIntervalType in df.collect
### What changes were proposed in this pull request? Refine the error message for `YearMonthIntervalType` in `df.collect` ### Why are the changes needed? for better understanding ### Does this PR introduce _any_ user-facing change? yes before: ``` In [1]: spark.sql("SELECT INTERVAL '10-8' YEAR TO MONTH AS interval").first() [********************************************************************************] 100.00% Complete (0 Tasks running, 0[********************************************************************************] 100.00% Complete (0 Tasks running, 0[********************************************************************************] 100.00% Complete (0 Tasks running, 0 --------------------------------------------------------------------------- KeyError Traceback (most recent call last) Cell In[1], line 1 ----> 1 spark.sql("SELECT INTERVAL '10-8' YEAR TO MONTH AS interval").first() File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:522, in DataFrame.first(self) 521 def first(self) -> Optional[Row]: --> 522 return self.head() File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:666, in DataFrame.head(self, n) 664 def head(self, n: Optional[int] = None) -> Union[Optional[Row], List[Row]]: 665 if n is None: --> 666 rs = self.head(1) 667 return rs[0] if rs else None 668 return self.take(n) File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:668, in DataFrame.head(self, n) 666 rs = self.head(1) 667 return rs[0] if rs else None --> 668 return self.take(n) File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:671, in DataFrame.take(self, num) 670 def take(self, num: int) -> List[Row]: --> 671 return self.limit(num).collect() File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1835, in DataFrame.collect(self) 1831 schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True) 1833 assert schema is not None and isinstance(schema, StructType) -> 1835 return ArrowTableToRowsConversion.convert(table, schema) File ~/Dev/spark/python/pyspark/sql/connect/conversion.py:542, in ArrowTableToRowsConversion.convert(table, schema) 536 assert schema is not None and isinstance(schema, StructType) 538 field_converters = [ 539 ArrowTableToRowsConversion._create_converter(f.dataType) for f in schema.fields 540 ] --> 542 columnar_data = [column.to_pylist() for column in table.columns] 544 rows: List[Row] = [] 545 for i in range(0, table.num_rows): File ~/.dev/miniconda3/envs/spark_dev_312/lib/python3.12/site-packages/pyarrow/table.pxi:1327, in pyarrow.lib.ChunkedArray.to_pylist() File ~/.dev/miniconda3/envs/spark_dev_312/lib/python3.12/site-packages/pyarrow/table.pxi:1256, in pyarrow.lib.ChunkedArray.chunk() File ~/.dev/miniconda3/envs/spark_dev_312/lib/python3.12/site-packages/pyarrow/public-api.pxi:208, in pyarrow.lib.pyarrow_wrap_array() File ~/.dev/miniconda3/envs/spark_dev_312/lib/python3.12/site-packages/pyarrow/array.pxi:3711, in pyarrow.lib.get_array_class_from_type() KeyError: 21 ``` after: ``` In [2]: spark.sql("SELECT INTERVAL '10-8' YEAR TO MONTH AS interval").first() [********************************************************************************] 100.00% Complete (0 Tasks running, 0[********************************************************************************] 100.00% Complete (0 Tasks running, 0 --------------------------------------------------------------------------- PySparkTypeError Traceback (most recent call last) Cell In[2], line 1 ----> 1 spark.sql("SELECT INTERVAL '10-8' YEAR TO MONTH AS interval").first() File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:522, in DataFrame.first(self) 521 def first(self) -> Optional[Row]: --> 522 return self.head() File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:666, in DataFrame.head(self, n) 664 def head(self, n: Optional[int] = None) -> Union[Optional[Row], List[Row]]: 665 if n is None: --> 666 rs = self.head(1) 667 return rs[0] if rs else None 668 return self.take(n) File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:668, in DataFrame.head(self, n) 666 rs = self.head(1) 667 return rs[0] if rs else None --> 668 return self.take(n) File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:671, in DataFrame.take(self, num) 670 def take(self, num: int) -> List[Row]: --> 671 return self.limit(num).collect() File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1833, in DataFrame.collect(self) 1829 table, schema = self._to_table() 1831 # not all datatypes are supported in arrow based collect 1832 # here always verify the schema by from_arrow_schema -> 1833 schema2 = from_arrow_schema(table.schema, prefer_timestamp_ntz=True) 1834 schema = schema or schema2 1836 assert schema is not None and isinstance(schema, StructType) File ~/Dev/spark/python/pyspark/sql/pandas/types.py:306, in from_arrow_schema(arrow_schema, prefer_timestamp_ntz) 300 def from_arrow_schema(arrow_schema: "pa.Schema", prefer_timestamp_ntz: bool = False) -> StructType: 301 """Convert schema from Arrow to Spark.""" 302 return StructType( 303 [ 304 StructField( 305 field.name, --> 306 from_arrow_type(field.type, prefer_timestamp_ntz), 307 nullable=field.nullable, 308 ) 309 for field in arrow_schema 310 ] 311 ) File ~/Dev/spark/python/pyspark/sql/pandas/types.py:293, in from_arrow_type(at, prefer_timestamp_ntz) 291 spark_type = NullType() 292 else: --> 293 raise PySparkTypeError( 294 error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_CONVERSION", 295 message_parameters={"data_type": str(at)}, 296 ) 297 return spark_type PySparkTypeError: [UNSUPPORTED_DATA_TYPE_FOR_ARROW_CONVERSION] month_interval is not supported in conversion to Arrow. ``` ### How was this patch tested? added test ### Was this patch authored or co-authored using generative AI tooling? No Closes #47004 from zhengruifeng/collect_ym_error. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 05c87e5 commit c5809b6

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

python/pyspark/sql/connect/dataframe.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1828,7 +1828,10 @@ def __dir__(self) -> List[str]:
18281828
def collect(self) -> List[Row]:
18291829
table, schema = self._to_table()
18301830

1831-
schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
1831+
# not all datatypes are supported in arrow based collect
1832+
# here always verify the schema by from_arrow_schema
1833+
schema2 = from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
1834+
schema = schema or schema2
18321835

18331836
assert schema is not None and isinstance(schema, StructType)
18341837

python/pyspark/sql/tests/connect/test_connect_error.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@ def test_select_none(self):
225225
message_parameters={"arg_name": "columns"},
226226
)
227227

228+
def test_ym_interval_in_collect(self):
229+
# YearMonthIntervalType is not supported in python side arrow conversion
230+
with self.assertRaises(PySparkTypeError):
231+
self.connect.sql("SELECT INTERVAL '10-8' YEAR TO MONTH AS interval").first()
232+
228233

229234
if __name__ == "__main__":
230235
from pyspark.sql.tests.connect.test_connect_error import * # noqa: F401

0 commit comments

Comments
 (0)