diff --git a/.travis.yml b/.travis.yml index ca5650f89735..909c1f9c39c4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -144,7 +144,6 @@ script: # ray dataframe tests - python -m pytest python/ray/dataframe/test/test_dataframe.py - - python -m pytest python/ray/dataframe/test/test_series.py - python -m pytest python/ray/dataframe/test/test_concat.py - python -m pytest python/ray/dataframe/test/test_io.py diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 3aa82b85b57e..5bd75cf516d8 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -3,7 +3,9 @@ from __future__ import print_function import pandas as pd -from pandas import (eval, unique, value_counts, Panel, date_range, MultiIndex) +from pandas import (eval, MultiIndex, CategoricalIndex, Series, Panel, unique, + value_counts, date_range, Timedelta, Timestamp, + to_datetime, NaT) import threading pd_version = pd.__version__ @@ -29,7 +31,6 @@ def get_npartitions(): # We import these file after above two function # because they depend on npartitions. from .dataframe import DataFrame # noqa: 402 -from .series import Series # noqa: 402 from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402 read_clipboard, read_excel, read_hdf, read_feather, # noqa: 402 read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402 @@ -40,8 +41,9 @@ def get_npartitions(): __all__ = [ "DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval", - "unique", "value_counts", "to_datetime", "get_dummies", "Panel", - "date_range", "MultiIndex" + "Panel", "unique", "get_dummies", "value_counts", "MultiIndex", + "CategoricalIndex", "Timedelta", "Timestamp", "date_range", "to_datetime", + "NaT" ] try: diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 5abd98f361bf..e3222b3e0f82 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -363,7 +363,7 @@ def _arithmetic_helper(self, remote_func, axis, level=None): # We use the index to get the internal index. oid_series = [(oid_series[i], i) for i in range(len(oid_series))] - if len(oid_series) > 1: + if len(oid_series) > 0: for df, partition in oid_series: this_partition = \ self._col_metadata.partition_series(partition) @@ -3103,35 +3103,80 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, are the quantiles. """ - def quantile_helper(df, q, axis, numeric_only, interpolation): - try: + def check_bad_dtype(t): + return t == np.dtype('O') or is_timedelta64_dtype(t) + + if not numeric_only: + # check if there are any object columns + if all(check_bad_dtype(t) for t in self.dtypes): + raise TypeError("can't multiply sequence by non-int of type " + "'float'") + else: + if next((True for t in self.dtypes if check_bad_dtype(t)), + False): + dtype = next(t for t in self.dtypes if check_bad_dtype(t)) + raise ValueError("Cannot compare type '{}' with type '{}'" + .format(type(dtype), float)) + else: + # Normally pandas returns this near the end of the quantile, but we + # can't afford the overhead of running the entire operation before + # we error. + if all(check_bad_dtype(t) for t in self.dtypes): + raise ValueError("need at least one array to concatenate") + + # check that all qs are between 0 and 1 + pd.DataFrame()._check_percentile(q) + + def quantile_helper(df, base_object): + """Quantile to be run inside each partitoin. + + Args: + df: The DataFrame composing the partition. + base_object: An empty pd.Series or pd.DataFrame depending on q. + + Returns: + A new Series or DataFrame depending on q. + """ + # This if call prevents ValueErrors with object only partitions + if (numeric_only and + all([dtype == np.dtype('O') or + is_timedelta64_dtype(dtype) + for dtype in df.dtypes])): + return base_object + else: return df.quantile(q=q, axis=axis, numeric_only=numeric_only, interpolation=interpolation) - except ValueError: - return pd.Series() + + axis = pd.DataFrame()._get_axis_number(axis) if isinstance(q, (pd.Series, np.ndarray, pd.Index, list)): - # In the case of a list, we build it one at a time. - # TODO Revisit for performance - quantiles = [] - for q_i in q: - def remote_func(df): - return quantile_helper(df, q=q_i, axis=axis, - numeric_only=numeric_only, - interpolation=interpolation) - - result = self._arithmetic_helper(remote_func, axis) - result.name = q_i - quantiles.append(result) - - return pd.concat(quantiles, axis=1).T - else: - def remote_func(df): - return quantile_helper(df, q=q, axis=axis, - numeric_only=numeric_only, - interpolation=interpolation) - result = self._arithmetic_helper(remote_func, axis) + q_index = pd.Float64Index(q) + + if axis == 0: + new_partitions = _map_partitions( + lambda df: quantile_helper(df, pd.DataFrame()), + self._col_partitions) + + # select only correct dtype columns + new_columns = self.dtypes[self.dtypes.apply( + lambda x: is_numeric_dtype(x))].index + + else: + new_partitions = _map_partitions( + lambda df: quantile_helper(df, pd.DataFrame()), + self._row_partitions) + new_columns = self.index + + return DataFrame(col_partitions=new_partitions, + index=q_index, + columns=new_columns) + + else: + # When q is a single float, we return a Series, so using + # arithmetic_helper works well here. + result = self._arithmetic_helper( + lambda df: quantile_helper(df, pd.Series()), axis) result.name = q return result diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 6b56db87cb84..0ca927f571be 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -140,7 +140,7 @@ def _map_partitions(func, partitions, *argslists): partitions ([ObjectID]): The list of partitions to map func on. Returns: - A new Dataframe containing the result of the function + A list of partitions ([ObjectID]) with the result of the function """ if partitions is None: return None