Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 49 additions & 48 deletions python/ray/dataframe/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,23 @@ def _map_partitions(self, func):

@property
def ngroups(self):
raise NotImplementedError("Not Yet implemented.")
return len(self._keys_and_values)

@property
def skew(self):
raise NotImplementedError("Not Yet implemented.")

def ffill(self, limit=None):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.ffill(limit=limit))

def sem(self, ddof=1):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.sem(ddof=ddof))

def mean(self, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.mean(*args, **kwargs))

@property
def any(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.any())

@property
def plot(self):
Expand All @@ -114,11 +113,10 @@ def groups(self):
raise NotImplementedError("Not Yet implemented.")

def min(self, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.min())

@property
def idxmax(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.idxmax())

@property
def ndim(self):
Expand All @@ -131,21 +129,20 @@ def nth(self, n, dropna=None):
raise NotImplementedError("Not Yet implemented.")

def cumsum(self, axis=0, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.cumsum())

@property
def indices(self):
raise NotImplementedError("Not Yet implemented.")

@property
def pct_change(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.pct_change())

def filter(self, func, dropna=True, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")

def cummax(self, axis=0, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.cummax(axis=axis, **kwargs))

def apply(self, func, *args, **kwargs):
return self._map_partitions(func)
Expand All @@ -155,44 +152,43 @@ def rolling(self, *args, **kwargs):

@property
def dtypes(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.dtypes)

def first(self, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.first(offset=0, **kwargs))

def backfill(self, limit=None):
raise NotImplementedError("Not Yet implemented.")

def __getitem__(self, key):
# This operation requires a SeriesGroupBy Object
raise NotImplementedError("Not Yet implemented.")

def cummin(self, axis=0, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.cummin(axis=axis, **kwargs))

def bfill(self, limit=None):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.bfill(limit=limit))

@property
def idxmin(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.idxmin())

def prod(self, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.prod(**kwargs))

def std(self, ddof=1, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.std(ddof=ddof,
*args, **kwargs))

def aggregate(self, arg, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")

def last(self, **kwargs):
raise NotImplementedError("Not Yet implemented.")

@property
def mad(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.mad())

@property
def rank(self):
raise NotImplementedError("Not Yet implemented.")

Expand All @@ -204,36 +200,26 @@ def pad(self, limit=None):
raise NotImplementedError("Not Yet implemented.")

def max(self, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.max())

def var(self, ddof=1, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.var())

def get_group(self, name, obj=None):
raise NotImplementedError("Not Yet implemented.")

def __len__(self):
raise NotImplementedError("Not Yet implemented.")

@property
def all(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.all())

def size(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.size)

def sum(self, **kwargs):
sums_list = [pd.DataFrame(v.sum(axis=self._axis)).T
for k, v in self._iter]
new_df = pd.concat(sums_list)
if self._axis == 0:
new_df.columns = self._columns
new_df.index = [k for k, v in self._iter]
else:
new_df = new_df.T
new_df.columns = [k for k, v in self._iter]
new_df.index = self._index
return new_df
return self._apply_function(lambda df:
df.sum(axis=self._axis, **kwargs))

def __unicode__(self):
raise NotImplementedError("Not Yet implemented.")
Expand All @@ -249,13 +235,13 @@ def ngroup(self, ascending=True):
raise NotImplementedError("Not Yet implemented.")

def nunique(self, dropna=True):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.nunique())

def resample(self, rule, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")

def median(self, **kwargs):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.median().astype(int))

def head(self, n=5):
raise NotImplementedError("Not Yet implemented.")
Expand Down Expand Up @@ -286,23 +272,20 @@ def agg_help(df):
columns=[k for k, v in self._iter],
index=self._index)

@property
def cov(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.cov())

def transform(self, func, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")

@property
def corr(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.corr())

@property
def fillna(self):
raise NotImplementedError("Not Yet implemented.")

def count(self):
raise NotImplementedError("Not Yet implemented.")
return self._apply_function(lambda df: df.count())

def pipe(self, func, *args, **kwargs):
raise NotImplementedError("Not Yet implemented.")
Expand Down Expand Up @@ -332,6 +315,24 @@ def diff(self):
def take(self):
raise NotImplementedError("Not Yet implemented.")

def _apply_function(self, f):
if not callable(f):
raise ValueError(
"\'{0}\' object is not callable".format(type(f)))

result = [pd.DataFrame(f(v)).T
for k, v in self._iter]

new_df = pd.concat(result)
if self._axis == 0:
new_df.columns = self._columns
new_df.index = [k for k, v in self._iter]
else:
new_df = new_df.T
new_df.columns = [k for k, v in self._iter]
new_df.index = self._index
return new_df


@ray.remote
def groupby(df, by=None, axis=0, level=None, as_index=True, sort=True,
Expand Down