Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ script:

# ray dataframe tests
- python -m pytest python/ray/dataframe/test/test_dataframe.py
- python -m pytest python/ray/dataframe/test/test_series.py
- python -m pytest python/ray/dataframe/test/test_concat.py
- python -m pytest python/ray/dataframe/test/test_io.py

Expand Down
10 changes: 6 additions & 4 deletions python/ray/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import print_function

import pandas as pd
from pandas import (eval, unique, value_counts, Panel, date_range, MultiIndex)
from pandas import (eval, MultiIndex, CategoricalIndex, Series, Panel, unique,
value_counts, date_range, Timedelta, Timestamp,
to_datetime, NaT)
import threading

pd_version = pd.__version__
Expand All @@ -29,7 +31,6 @@ def get_npartitions():
# We import these file after above two function
# because they depend on npartitions.
from .dataframe import DataFrame # noqa: 402
from .series import Series # noqa: 402
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do this, you need to remove test_series.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I just move the entire file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it just needs to be removed from the .travis.yml file

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to run those tests? Or no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While Series is unimplemented, we are importing it directly from Pandas. Skipping these tests for now as they expect NotImplementedErrors to be thrown.

from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402
read_clipboard, read_excel, read_hdf, read_feather, # noqa: 402
read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402
Expand All @@ -40,8 +41,9 @@ def get_npartitions():

__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval",
"unique", "value_counts", "to_datetime", "get_dummies", "Panel",
"date_range", "MultiIndex"
"Panel", "unique", "get_dummies", "value_counts", "MultiIndex",
"CategoricalIndex", "Timedelta", "Timestamp", "date_range", "to_datetime",
"NaT"
]

try:
Expand Down
95 changes: 70 additions & 25 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def _arithmetic_helper(self, remote_func, axis, level=None):
# We use the index to get the internal index.
oid_series = [(oid_series[i], i) for i in range(len(oid_series))]

if len(oid_series) > 1:
if len(oid_series) > 0:
for df, partition in oid_series:
this_partition = \
self._col_metadata.partition_series(partition)
Expand Down Expand Up @@ -3103,35 +3103,80 @@ def quantile(self, q=0.5, axis=0, numeric_only=True,
are the quantiles.
"""

def quantile_helper(df, q, axis, numeric_only, interpolation):
try:
def check_bad_dtype(t):
return t == np.dtype('O') or is_timedelta64_dtype(t)

if not numeric_only:
# check if there are any object columns
if all(check_bad_dtype(t) for t in self.dtypes):
raise TypeError("can't multiply sequence by non-int of type "
"'float'")
else:
if next((True for t in self.dtypes if check_bad_dtype(t)),
False):
dtype = next(t for t in self.dtypes if check_bad_dtype(t))
raise ValueError("Cannot compare type '{}' with type '{}'"
.format(type(dtype), float))
else:
# Normally pandas returns this near the end of the quantile, but we
# can't afford the overhead of running the entire operation before
# we error.
if all(check_bad_dtype(t) for t in self.dtypes):
raise ValueError("need at least one array to concatenate")

# check that all qs are between 0 and 1
pd.DataFrame()._check_percentile(q)

def quantile_helper(df, base_object):
"""Quantile to be run inside each partitoin.

Args:
df: The DataFrame composing the partition.
base_object: An empty pd.Series or pd.DataFrame depending on q.

Returns:
A new Series or DataFrame depending on q.
"""
# This if call prevents ValueErrors with object only partitions
if (numeric_only and
all([dtype == np.dtype('O') or
is_timedelta64_dtype(dtype)
for dtype in df.dtypes])):
return base_object
else:
return df.quantile(q=q, axis=axis, numeric_only=numeric_only,
interpolation=interpolation)
except ValueError:
return pd.Series()

axis = pd.DataFrame()._get_axis_number(axis)

if isinstance(q, (pd.Series, np.ndarray, pd.Index, list)):
# In the case of a list, we build it one at a time.
# TODO Revisit for performance
quantiles = []
for q_i in q:
def remote_func(df):
return quantile_helper(df, q=q_i, axis=axis,
numeric_only=numeric_only,
interpolation=interpolation)

result = self._arithmetic_helper(remote_func, axis)
result.name = q_i
quantiles.append(result)

return pd.concat(quantiles, axis=1).T
else:
def remote_func(df):
return quantile_helper(df, q=q, axis=axis,
numeric_only=numeric_only,
interpolation=interpolation)

result = self._arithmetic_helper(remote_func, axis)
q_index = pd.Float64Index(q)

if axis == 0:
new_partitions = _map_partitions(
lambda df: quantile_helper(df, pd.DataFrame()),
self._col_partitions)

# select only correct dtype columns
new_columns = self.dtypes[self.dtypes.apply(
lambda x: is_numeric_dtype(x))].index

else:
new_partitions = _map_partitions(
lambda df: quantile_helper(df, pd.DataFrame()),
self._row_partitions)
new_columns = self.index

return DataFrame(col_partitions=new_partitions,
index=q_index,
columns=new_columns)

else:
# When q is a single float, we return a Series, so using
# arithmetic_helper works well here.
result = self._arithmetic_helper(
lambda df: quantile_helper(df, pd.Series()), axis)
result.name = q
return result

Expand Down
2 changes: 1 addition & 1 deletion python/ray/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _map_partitions(func, partitions, *argslists):
partitions ([ObjectID]): The list of partitions to map func on.

Returns:
A new Dataframe containing the result of the function
A list of partitions ([ObjectID]) with the result of the function
"""
if partitions is None:
return None
Expand Down