Skip to content
7 changes: 5 additions & 2 deletions python/ray/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import print_function

import pandas as pd
from pandas import (eval, Panel, date_range, MultiIndex)
from pandas import (eval, unique, value_counts, Panel, date_range, MultiIndex)
import threading

pd_version = pd.__version__
Expand Down Expand Up @@ -35,10 +35,13 @@ def get_npartitions():
read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402
read_sql) # noqa: 402
from .concat import concat # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .reshape import get_dummies # noqa: 402

__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval",
"Panel", "date_range", "MultiIndex"
"unique", "value_counts", "to_datetime", "get_dummies", "Panel",
"date_range", "MultiIndex"
]

try:
Expand Down
45 changes: 39 additions & 6 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2433,9 +2433,44 @@ def mod(self, other, axis='columns', level=None, fill_value=None):
fill_value)

def mode(self, axis=0, numeric_only=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform mode across the DataFrame.

Args:
axis (int): The axis to take the mode on.
numeric_only (bool): if True, only apply to numeric columns.

Returns:
DataFrame: The mode of the DataFrame.
"""
axis = pd.DataFrame()._get_axis_number(axis)

def mode_helper(df):
mode_df = df.mode(axis=axis, numeric_only=numeric_only)
return mode_df, mode_df.shape[axis]

def fix_length(df, *lengths):
max_len = max(lengths[0])
df = df.reindex(pd.RangeIndex(max_len), axis=axis)
return df

parts = self._col_partitions if axis == 0 else self._row_partitions

result = [_deploy_func._submit(args=(lambda df: mode_helper(df),
part), num_return_vals=2)
for part in parts]

parts, lengths = [list(t) for t in zip(*result)]

parts = [_deploy_func.remote(
lambda df, *l: fix_length(df, l), part, *lengths)
for part in parts]

if axis == 0:
return DataFrame(col_partitions=parts,
columns=self.columns)
else:
return DataFrame(row_partitions=parts,
index=self.index)

def mul(self, other, axis='columns', level=None, fill_value=None):
"""Multiplies this DataFrame against another DataFrame/Series/scalar.
Expand Down Expand Up @@ -3734,9 +3769,7 @@ def _getitem_array(self, key):
index=index)
else:
columns = self._col_metadata[key].index

indices_for_rows = [self.columns.index(new_col)
for new_col in columns]
indices_for_rows = [col for col in self.col if col in set(columns)]

new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
Expand Down
64 changes: 64 additions & 0 deletions python/ray/dataframe/datetimes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pandas
import ray

from .dataframe import DataFrame
from .utils import _map_partitions


def to_datetime(arg, errors='raise', dayfirst=False, yearfirst=False, utc=None,
box=True, format=None, exact=True, unit=None,
infer_datetime_format=False, origin='unix'):
"""Convert the arg to datetime format. If not Ray DataFrame, this falls
back on pandas.

Args:
errors ('raise' or 'ignore'): If 'ignore', errors are silenced.
dayfirst (bool): Date format is passed in as day first.
yearfirst (bool): Date format is passed in as year first.
utc (bool): retuns a UTC DatetimeIndex if True.
box (bool): If True, returns a DatetimeIndex.
format (string): strftime to parse time, eg "%d/%m/%Y".
exact (bool): If True, require an exact format match.
unit (string, default 'ns'): unit of the arg.
infer_datetime_format (bool): Whether or not to infer the format.
origin (string): Define the reference date.

Returns:
Type depends on input:

- list-like: DatetimeIndex
- Series: Series of datetime64 dtype
- scalar: Timestamp
"""
if not isinstance(arg, DataFrame):
return pandas.to_datetime(arg, errors=errors, dayfirst=dayfirst,
yearfirst=yearfirst, utc=utc, box=box,
format=format, exact=exact, unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)
if errors == 'raise':
pandas.to_datetime(pandas.DataFrame(columns=arg.columns),
errors=errors, dayfirst=dayfirst,
yearfirst=yearfirst, utc=utc, box=box,
format=format, exact=exact, unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)

def datetime_helper(df, cols):
df.columns = cols
return pandas.to_datetime(df, errors=errors, dayfirst=dayfirst,
yearfirst=yearfirst, utc=utc, box=box,
format=format, exact=exact, unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)

datetime_series = _map_partitions(datetime_helper, arg._row_partitions,
arg.columns)
result = pandas.concat(ray.get(datetime_series), copy=False)
result.index = arg.index

return result
125 changes: 125 additions & 0 deletions python/ray/dataframe/reshape.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved.

import pandas
import numpy as np

from pandas import compat
from pandas.core.dtypes.common import is_list_like
from itertools import cycle

from .dataframe import DataFrame
from .utils import _deploy_func


def get_dummies(data, prefix=None, prefix_sep='_', dummy_na=False,
columns=None, sparse=False, drop_first=False):
"""Convert categorical variable into indicator variables.

Args:
data (array-like, Series, or DataFrame): data to encode.
prefix (string, [string]): Prefix to apply to each encoded column
label.
prefix_sep (string, [string]): Separator between prefix and value.
dummy_na (bool): Add a column to indicate NaNs.
columns: Which columns to encode.
sparse (bool): Not Implemented: If True, returns SparseDataFrame.
drop_first (bool): Whether to remove the first level of encoded data.

Returns:
DataFrame or one-hot encoded data.
"""
if not isinstance(data, DataFrame):
return pandas.get_dummies(data, prefix=prefix, prefix_sep=prefix_sep,
dummy_na=dummy_na, columns=columns,
sparse=sparse, drop_first=drop_first)

if sparse:
raise NotImplementedError(
"SparseDataFrame is not implemented. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

if columns is None:
columns_to_encode = data.dtypes.isin([np.dtype("O"), 'category'])
columns_to_encode = data.columns[columns_to_encode]
else:
columns_to_encode = columns

def check_len(item, name):
len_msg = ("Length of '{name}' ({len_item}) did not match the "
"length of the columns being encoded ({len_enc}).")

if is_list_like(item):
if not len(item) == len(columns_to_encode):
len_msg = len_msg.format(name=name, len_item=len(item),
len_enc=len(columns_to_encode))
raise ValueError(len_msg)

check_len(prefix, 'prefix')
check_len(prefix_sep, 'prefix_sep')
if isinstance(prefix, compat.string_types):
prefix = cycle([prefix])
prefix = [next(prefix) for i in range(len(columns_to_encode))]
if isinstance(prefix, dict):
prefix = [prefix[col] for col in columns_to_encode]

if prefix is None:
prefix = columns_to_encode

# validate separators
if isinstance(prefix_sep, compat.string_types):
prefix_sep = cycle([prefix_sep])
prefix_sep = [next(prefix_sep) for i in range(len(columns_to_encode))]
elif isinstance(prefix_sep, dict):
prefix_sep = [prefix_sep[col] for col in columns_to_encode]

if set(columns_to_encode) == set(data.columns):
with_dummies = []
dropped_columns = pandas.Index()
else:
with_dummies = data.drop(columns_to_encode, axis=1)._col_partitions
dropped_columns = data.columns.drop(columns_to_encode)

def get_dummies_remote(df, to_drop, prefix, prefix_sep):
df = df.drop(to_drop, axis=1)

if df.size == 0:
return df, df.columns

df = pandas.get_dummies(df, prefix=prefix, prefix_sep=prefix_sep,
dummy_na=dummy_na, columns=None, sparse=sparse,
drop_first=drop_first)
columns = df.columns
df.columns = pandas.RangeIndex(0, len(df.columns))
return df, columns

total = 0
columns = []
for i, part in enumerate(data._col_partitions):
col_index = data._col_metadata.partition_series(i)

# TODO(kunalgosar): Handle the case of duplicate columns here
to_encode = col_index.index.isin(columns_to_encode)

to_encode = col_index[to_encode]
to_drop = col_index.drop(to_encode.index)

result = _deploy_func._submit(
args=(get_dummies_remote, part, to_drop,
prefix[total:total + len(to_encode)],
prefix_sep[total:total + len(to_encode)]),
num_return_vals=2)

with_dummies.append(result[0])
columns.append(result[1])
total += len(to_encode)

columns = ray.get(columns)
dropped_columns = dropped_columns.append(columns)

return DataFrame(col_partitions=with_dummies,
columns=dropped_columns,
index=data.index)
32 changes: 27 additions & 5 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2059,11 +2059,11 @@ def test_mod():
test_inter_df_math("mod", simple=False)


def test_mode():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.mode()
@pytest.fixture
def test_mode(ray_df, pandas_df):
assert(ray_series_equals_pandas(ray_df.mode(), pandas_df.mode()))
assert(ray_series_equals_pandas(ray_df.mode(axis=1),
pandas_df.mode(axis=1)))


def test_mul():
Expand Down Expand Up @@ -3058,3 +3058,25 @@ def test__doc__():
pd_obj = getattr(pd.DataFrame, attr, None)
if callable(pd_obj) or isinstance(pd_obj, property):
assert obj.__doc__ == pd_obj.__doc__


def test_to_datetime():
ray_df = rdf.DataFrame({'year': [2015, 2016],
'month': [2, 3],
'day': [4, 5]})
pd_df = pd.DataFrame({'year': [2015, 2016],
'month': [2, 3],
'day': [4, 5]})

rdf.to_datetime(ray_df).equals(pd.to_datetime(pd_df))


def test_get_dummies():
ray_df = rdf.DataFrame({'A': ['a', 'b', 'a'],
'B': ['b', 'a', 'c'],
'C': [1, 2, 3]})
pd_df = pd.DataFrame({'A': ['a', 'b', 'a'],
'B': ['b', 'a', 'c'],
'C': [1, 2, 3]})

ray_df_equals_pandas(rdf.get_dummies(ray_df), pd.get_dummies(pd_df))