Skip to content

Commit e5fac1a

Browse files
authored
Aggregate improvements and SQL compatibility (#134)
* A lot of refactoring the the groupby. Mainly to include both distinct and null-grouping * Test for non-dask aggregations * All NaN data needs to go into the same partition (otherwise we can not sort) * Fix compatibility with SQL on null-joins * Distinct is not needed, as it is optimized away from Calcite * Implement is not distinct * Describe new limitations and remove old ones * Added compatibility test from fugue * Added a test for sorting with multiple partitions and NaNs * Stylefix
1 parent 7273c2d commit e5fac1a

File tree

8 files changed

+1173
-126
lines changed

8 files changed

+1173
-126
lines changed

dask_sql/physical/rel/logical/aggregate.py

Lines changed: 211 additions & 121 deletions
Large diffs are not rendered by default.

dask_sql/physical/rel/logical/join.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,23 @@ def convert(
100100
f"common_{i}": df_rhs_renamed.iloc[:, index]
101101
for i, index in enumerate(rhs_on)
102102
}
103+
104+
# SQL compatibility: when joining on columns that
105+
# contain NULLs, pandas will actually happily
106+
# keep those NULLs. That is however not compatible with
107+
# SQL, so we get rid of them here
108+
if join_type in ["inner", "right"]:
109+
df_lhs_filter = reduce(
110+
operator.and_,
111+
[~df_lhs_renamed.iloc[:, index].isna() for index in lhs_on],
112+
)
113+
df_lhs_renamed = df_lhs_renamed[df_lhs_filter]
114+
if join_type in ["inner", "left"]:
115+
df_rhs_filter = reduce(
116+
operator.and_,
117+
[~df_rhs_renamed.iloc[:, index].isna() for index in rhs_on],
118+
)
119+
df_rhs_renamed = df_rhs_renamed[df_rhs_filter]
103120
else:
104121
# We are in the complex join case
105122
# where we have no column to merge on

dask_sql/physical/rel/logical/sort.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def _sort_first_column(
135135
col = df[first_sort_column]
136136
is_na = col.isna().persist()
137137
if is_na.any().compute():
138-
df_is_na = df[is_na].reset_index(drop=True)
138+
df_is_na = df[is_na].reset_index(drop=True).repartition(1)
139139
df_not_is_na = (
140140
df[~is_na]
141141
.set_index(first_sort_column, drop=False)

dask_sql/physical/rex/core/call.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,21 @@ def null(self, df: SeriesOrScalar,) -> SeriesOrScalar:
240240
return pd.isna(df) or df is None or np.isnan(df)
241241

242242

243+
class IsNotDistinctOperation(Operation):
244+
"""The is not distinct operator"""
245+
246+
def __init__(self):
247+
super().__init__(self.not_distinct)
248+
249+
def not_distinct(self, lhs: SeriesOrScalar, rhs: SeriesOrScalar) -> SeriesOrScalar:
250+
"""
251+
Returns true where `lhs` is not distinct from `rhs` (or both are null).
252+
"""
253+
is_null = IsNullOperation()
254+
255+
return (is_null(lhs) & is_null(rhs)) | (lhs == rhs)
256+
257+
243258
class RegexOperation(Operation):
244259
"""An abstract regex operation, which transforms the SQL regex into something python can understand"""
245260

@@ -627,6 +642,8 @@ class RexCallPlugin(BaseRexPlugin):
627642
"-": ReduceOperation(operation=operator.sub, unary_operation=lambda x: -x),
628643
"/": ReduceOperation(operation=SQLDivisionOperator()),
629644
"*": ReduceOperation(operation=operator.mul),
645+
"is distinct from": NotOperation().of(IsNotDistinctOperation()),
646+
"is not distinct from": IsNotDistinctOperation(),
630647
# special operations
631648
"cast": lambda x: x,
632649
"case": CaseOperation(),

docs/pages/sql.rst

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,16 @@ Limitatons
199199

200200
``dask-sql`` is still in early development, therefore exist some limitations:
201201

202-
* Not all operations and aggregations are implemented already, most prominently: ``WINDOW`` is not implemented so far.
203-
* ``GROUP BY`` aggregations can not use ``DISTINCT``
202+
Not all operations and aggregations are implemented already, most prominently: ``WINDOW`` is not implemented so far.
204203

205204
.. note::
206205

207206
Whenever you find a not already implemented operation, keyword
208207
or functionality, please raise an issue at our `issue tracker <https://github.com/nils-braun/dask-sql/issues>`_ with your use-case.
209208

209+
Dask/pandas and SQL treat null-values (or nan) differently on sorting, grouping and joining.
210+
``dask-sql`` tries to follow the SQL standard as much as possible, so results might be different to what you expect from Dask/pandas.
211+
210212
Apart from those functional limitations, there is a operation which need special care: ``ORDER BY```.
211213
Normally, ``dask-sql`` calls create a ``dask`` data frame, which gets only computed when you call the ``.compute()`` member.
212214
Due to internal constraints, this is currently not the case for ``ORDER BY``.
@@ -218,4 +220,5 @@ Including this operation will trigger a calculation of the full data frame alrea
218220
The data inside ``dask`` is partitioned, to distribute it over the cluster.
219221
``head`` will only return the first N elements from the first partition - even if N is larger than the partition size.
220222
As a benefit, calling ``.head(N)`` is typically faster than calculating the full data sample with ``.compute()``.
221-
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not.
223+
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered -
224+
but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not.

0 commit comments

Comments
 (0)