Skip to content

Commit 22d4950

Browse files
SaladRaiderdevin-petersohn
authored andcommitted
[DataFrame] Implements df.pipe (#1999)
* Add empty df test * Fix flake8 issues * rebase with master * reset master tests * Implement df.pipe * fix tests * Use test_pipe as a pytest.fixture * Add newline at EOF
1 parent a1d7bb3 commit 22d4950

File tree

2 files changed

+42
-7
lines changed

2 files changed

+42
-7
lines changed

python/ray/dataframe/dataframe.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2783,9 +2783,17 @@ def pct_change(self, periods=1, fill_method='pad', limit=None, freq=None,
27832783
"github.com/ray-project/ray.")
27842784

27852785
def pipe(self, func, *args, **kwargs):
2786-
raise NotImplementedError(
2787-
"To contribute to Pandas on Ray, please visit "
2788-
"github.com/ray-project/ray.")
2786+
"""Apply func(self, *args, **kwargs)
2787+
2788+
Args:
2789+
func: function to apply to the df.
2790+
args: positional arguments passed into ``func``.
2791+
kwargs: a dictionary of keyword arguments passed into ``func``.
2792+
2793+
Returns:
2794+
object: the return type of ``func``.
2795+
"""
2796+
return com._pipe(self, func, *args, **kwargs)
27892797

27902798
def pivot(self, index=None, columns=None, values=None):
27912799
raise NotImplementedError(

python/ray/dataframe/test/test_dataframe.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ def test_int_dataframe():
262262
test_cummin(ray_df, pandas_df)
263263
test_cumprod(ray_df, pandas_df)
264264
test_cumsum(ray_df, pandas_df)
265+
test_pipe(ray_df, pandas_df)
265266

266267
# test_loc(ray_df, pandas_df)
267268
# test_iloc(ray_df, pandas_df)
@@ -405,6 +406,7 @@ def test_float_dataframe():
405406
test_cummin(ray_df, pandas_df)
406407
test_cumprod(ray_df, pandas_df)
407408
test_cumsum(ray_df, pandas_df)
409+
test_pipe(ray_df, pandas_df)
408410

409411
test___len__(ray_df, pandas_df)
410412
test_first_valid_index(ray_df, pandas_df)
@@ -568,6 +570,7 @@ def test_mixed_dtype_dataframe():
568570
test_min(ray_df, pandas_df)
569571
test_notna(ray_df, pandas_df)
570572
test_notnull(ray_df, pandas_df)
573+
test_pipe(ray_df, pandas_df)
571574

572575
# TODO Fix pandas so that the behavior is correct
573576
# We discovered a bug where argmax does not always give the same result
@@ -718,6 +721,7 @@ def test_nan_dataframe():
718721
test_cummin(ray_df, pandas_df)
719722
test_cumprod(ray_df, pandas_df)
720723
test_cumsum(ray_df, pandas_df)
724+
test_pipe(ray_df, pandas_df)
721725

722726
test___len__(ray_df, pandas_df)
723727
test_first_valid_index(ray_df, pandas_df)
@@ -2151,11 +2155,34 @@ def test_pct_change():
21512155
ray_df.pct_change()
21522156

21532157

2154-
def test_pipe():
2155-
ray_df = create_test_dataframe()
2158+
@pytest.fixture
2159+
def test_pipe(ray_df, pandas_df):
2160+
n = len(ray_df.index)
2161+
a, b, c = 2 % n, 0, 3 % n
2162+
col = ray_df.columns[3 % len(ray_df.columns)]
21562163

2157-
with pytest.raises(NotImplementedError):
2158-
ray_df.pipe(None)
2164+
def h(x):
2165+
return x.drop(columns=[col])
2166+
2167+
def g(x, arg1=0):
2168+
for _ in range(arg1):
2169+
x = x.append(x)
2170+
return x
2171+
2172+
def f(x, arg2=0, arg3=0):
2173+
return x.drop([arg2, arg3])
2174+
2175+
assert ray_df_equals(f(g(h(ray_df), arg1=a), arg2=b, arg3=c),
2176+
(ray_df.pipe(h)
2177+
.pipe(g, arg1=a)
2178+
.pipe(f, arg2=b, arg3=c)))
2179+
2180+
assert ray_df_equals_pandas((ray_df.pipe(h)
2181+
.pipe(g, arg1=a)
2182+
.pipe(f, arg2=b, arg3=c)),
2183+
(pandas_df.pipe(h)
2184+
.pipe(g, arg1=a)
2185+
.pipe(f, arg2=b, arg3=c)))
21592186

21602187

21612188
def test_pivot():

0 commit comments

Comments
 (0)