Skip to content

Commit 2444616

Browse files
devin-petersohnkunalgosar
authored andcommitted
Updating implementation (ray-project#2)
1 parent fd9cd40 commit 2444616

File tree

3 files changed

+156
-50
lines changed

3 files changed

+156
-50
lines changed

python/ray/dataframe/dataframe.py

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,30 +64,32 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
6464

6565
pd_df = pd.DataFrame(data=data, index=index, columns=columns,
6666
dtype=dtype, copy=copy)
67-
row_partitions = \
67+
row_partitions, col_partitions = \
6868
_partition_pandas_dataframe(pd_df,
69-
npartitions=get_npartitions())
69+
num_partitions=get_npartitions())
7070
columns = pd_df.columns
7171
index = pd_df.index
7272

73-
if col_partitions is None:
73+
elif col_partitions is None:
7474
col_partitions = _rebuild_cols.remote(row_partitions,
7575
index,
7676
columns)
77-
if row_partitions is None:
77+
78+
elif row_partitions is None:
7879
row_partitions = _rebuild_rows.remote(col_partitions,
7980
index,
8081
columns)
8182

82-
self._col_partitions = col_partitions
83-
self._row_partitions = row_partitions
8483

8584
# this _index object is a pd.DataFrame
8685
# and we use that DataFrame's Index to index the rows.
8786
self._row_lengths, self._row_index = \
88-
_compute_length_and_index.remote(self._row_partitions)
87+
_compute_length_and_index.remote(row_partitions, index)
8988
self._col_lengths, self._col_index = \
90-
_compute_width_and_index.remote(self._col_partitions)
89+
_compute_width_and_index.remote(col_partitions, columns)
90+
91+
self._col_partitions = col_partitions
92+
self._row_partitions = row_partitions
9193

9294
# TODO: Remove testing print code comments
9395
# print("row partitions")
@@ -100,11 +102,12 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
100102
# print(self._row_lengths, "\n", self._row_index)
101103
# print("col items")
102104
# print(self._col_lengths, "\n", self._col_index)
103-
104-
if index is not None:
105-
self.index = index
106-
107-
self.columns = columns
105+
#
106+
# if index is not None:
107+
# self.index = index
108+
#
109+
# if columns is not None:
110+
# self.columns = columns
108111

109112
def __str__(self):
110113
return repr(self)
@@ -114,8 +117,68 @@ def __repr__(self):
114117
result = repr(to_pandas(self))
115118
return result
116119

117-
head = repr(to_pandas(self.head(20)))
118-
tail = repr(to_pandas(self.tail(20)))
120+
def head(df, n):
121+
"""Compute the head for this without creating a new DataFrame"""
122+
123+
sizes = df._row_lengths
124+
cumulative = np.cumsum(np.array(sizes))
125+
new_dfs = [df._row_partitions[i]
126+
for i in range(len(cumulative))
127+
if cumulative[i] < n]
128+
129+
last_index = len(new_dfs)
130+
131+
# this happens when we only need from the first partition
132+
if last_index == 0:
133+
num_to_transfer = n
134+
else:
135+
num_to_transfer = n - cumulative[last_index - 1]
136+
137+
new_dfs.append(_deploy_func.remote(lambda df: df.head(num_to_transfer),
138+
df._row_partitions[last_index]))
139+
140+
index = df._row_index.head(n).index
141+
pd_head = pd.concat(ray.get(new_dfs))
142+
pd_head.index = index
143+
pd_head.columns = df.columns
144+
return pd_head
145+
146+
def tail(df, n):
147+
"""Compute the tail for this without creating a new DataFrame"""
148+
149+
sizes = self._row_lengths
150+
151+
if n >= sum(sizes):
152+
return self
153+
154+
cumulative = np.cumsum(np.array(sizes[::-1]))
155+
156+
reverse_dfs = self._row_partitions[::-1]
157+
new_dfs = [reverse_dfs[i]
158+
for i in range(len(cumulative))
159+
if cumulative[i] < n]
160+
161+
last_index = len(new_dfs)
162+
163+
# this happens when we only need from the last partition
164+
if last_index == 0:
165+
num_to_transfer = n
166+
else:
167+
num_to_transfer = n - cumulative[last_index - 1]
168+
169+
new_dfs.append(_deploy_func.remote(lambda df: df.tail(num_to_transfer),
170+
reverse_dfs[last_index]))
171+
172+
new_dfs.reverse()
173+
174+
index = self._row_index.tail(n).index
175+
pd_tail = pd.concat(ray.get(new_dfs))
176+
pd_tail.index = index
177+
pd_tail.columns = df.columns
178+
return pd_tail
179+
180+
head = repr(head(self, 20))
181+
tail = repr(tail(self, 20))
119182

120183
result = head + "\n...\n" + tail
121184

@@ -444,6 +507,7 @@ def _update_inplace(self, row_partitions=None, col_partitions=None,
444507
# Perform rollback if length mismatch, since operations occur inplace here
445508
# Since we want to keep the existing DF in a usable state, we need to carefully catch
446509
# and handle this error
510+
# TODO Make this asynchronous
447511
if index is not None and len(index) != len(new_row_index):
448512
self._row_partitions = old_row_parts
449513
self._col_partitions = old_col_parts
@@ -586,6 +650,12 @@ def sum(self, axis=None, skipna=True, level=None, numeric_only=None):
586650
The sum of the DataFrame.
587651
"""
588652
# TODO: Fix this function - it does not work.
653+
return ray.get(
654+
_map_partitions(lambda df: df.sum(axis=axis,
655+
skipna=skipna,
656+
level=level,
657+
numeric_only=numeric_only),
658+
self._col_partitions))
589659

590660
# # TODO: We don't support `level` right now, so df.sum always returns a pd.Series
591661
# # Generalize when we support MultiIndexes, and distributed Series (?)

python/ray/dataframe/shuffle.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ def assign_and_send_data(self, index, partition_assignments,
6767
A value to be used with ray.get() to ensure proper scheduling
6868
order.
6969
"""
70+
if len(self.partition_data) == 0:
71+
return None
7072

7173
def calc_send(i, indices_to_send, data_to_send):
7274
"""Separates the data to send into a list based on assignments.

python/ray/dataframe/utils.py

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ def assign_partitions(index, num_partitions):
3232
raise TypeError("Unexpected value of type {0} assigned to ShuffleActor"
3333
.format(type(index).__name__))
3434

35-
if len(uniques) % num_partitions == 0:
36-
chunksize = int(len(uniques) / num_partitions)
37-
else:
38-
chunksize = int(len(uniques) / num_partitions) + 1
35+
chunksize = len(uniques) // num_partitions \
36+
if len(uniques) % num_partitions == 0 \
37+
else len(uniques) // num_partitions + 1
3938

4039
assignments = []
4140

@@ -81,55 +80,83 @@ def _get_widths(df):
8180
return 0
8281

8382

84-
def _partition_pandas_dataframe(df, npartitions=None, chunksize=None):
83+
def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None,
84+
col_chunksize=None):
8585
"""Partitions a Pandas DataFrame object.
8686
Args:
8787
df (pandas.DataFrame): The pandas DataFrame to convert.
8888
npartitions (int): The number of partitions to split the DataFrame
8989
into. Has priority over chunksize.
90-
chunksize (int): The number of rows to put in each partition.
90+
row_chunksize (int): The number of rows to put in each partition.
9191
Returns:
9292
[ObjectID]: A list of object IDs corresponding to the dataframe
9393
partitions
9494
"""
95-
if npartitions is not None:
96-
chunksize = len(df) // npartitions + 1
97-
elif chunksize is None:
98-
raise ValueError("The number of partitions or chunksize must be set.")
95+
if num_partitions is not None:
96+
row_chunksize = len(df) // num_partitions \
97+
if len(df) % num_partitions == 0 \
98+
else len(df) // num_partitions + 1
99+
100+
col_chunksize = len(df.columns) // num_partitions \
101+
if len(df.columns) % num_partitions == 0 \
102+
else len(df.columns) // num_partitions + 1
103+
else:
104+
assert row_chunksize is not None and col_chunksize is not None
105+
106+
temp_df = df
107+
108+
row_partitions = []
109+
while len(temp_df) > row_chunksize:
110+
t_df = temp_df[:row_chunksize]
111+
# reset_index here because we want a pd.RangeIndex
112+
# within the partitions. It is smaller and sometimes faster.
113+
t_df.reset_index(drop=True, inplace=True)
114+
t_df.columns = pd.RangeIndex(0, len(t_df.columns))
115+
top = ray.put(t_df)
116+
row_partitions.append(top)
117+
temp_df = temp_df[row_chunksize:]
118+
else:
119+
temp_df.reset_index(drop=True, inplace=True)
120+
temp_df.columns = pd.RangeIndex(0, len(temp_df.columns))
121+
row_partitions.append(ray.put(temp_df))
99122

100123
temp_df = df
101124

102-
dataframes = []
103-
while len(temp_df) > chunksize:
104-
t_df = temp_df[:chunksize]
125+
col_partitions = []
126+
while len(temp_df.columns) > col_chunksize:
127+
t_df = temp_df.iloc[:, 0:col_chunksize]
105128
# reset_index here because we want a pd.RangeIndex
106129
# within the partitions. It is smaller and sometimes faster.
107-
t_df = t_df.reset_index(drop=True)
130+
t_df.reset_index(drop=True, inplace=True)
131+
t_df.columns = pd.RangeIndex(0, len(t_df.columns))
108132
top = ray.put(t_df)
109-
dataframes.append(top)
110-
temp_df = temp_df[chunksize:]
133+
col_partitions.append(top)
134+
temp_df = temp_df.iloc[:, col_chunksize:]
111135
else:
112-
temp_df = temp_df.reset_index(drop=True)
113-
dataframes.append(ray.put(temp_df))
136+
temp_df.reset_index(drop=True, inplace=True)
137+
temp_df.columns = pd.RangeIndex(0, len(temp_df.columns))
138+
col_partitions.append(ray.put(temp_df))
114139

115-
return dataframes
140+
return row_partitions, col_partitions
116141

117142

118-
def from_pandas(df, npartitions=None, chunksize=None):
143+
def from_pandas(df, num_partitions=None, chunksize=None):
119144
"""Converts a pandas DataFrame to a Ray DataFrame.
120145
Args:
121146
df (pandas.DataFrame): The pandas DataFrame to convert.
122-
npartitions (int): The number of partitions to split the DataFrame
147+
num_partitions (int): The number of partitions to split the DataFrame
123148
into. Has priority over chunksize.
124149
chunksize (int): The number of rows to put in each partition.
125150
Returns:
126151
A new Ray DataFrame object.
127152
"""
128153
from .dataframe import DataFrame
129154

130-
dataframes = _partition_pandas_dataframe(df, npartitions, chunksize)
155+
row_partitions, col_partitions = \
156+
_partition_pandas_dataframe(df, num_partitions, chunksize)
131157

132-
return DataFrame(row_partitions=dataframes,
158+
return DataFrame(row_partitions=row_partitions,
159+
col_partitions=col_partitions,
133160
columns=df.columns,
134161
index=df.index)
135162

@@ -158,10 +185,13 @@ def _rebuild_cols(row_partitions, index, columns):
158185
[ObjectID]: List of new column partitions.
159186
"""
160187
# NOTE: Reexamine if this is the correct number of columns solution
161-
n_cols = min(get_npartitions(), len(row_partitions), len(columns))
188+
n_cols = min(max(get_npartitions(), len(row_partitions)), len(columns))
162189
partition_assignments = assign_partitions.remote(columns, n_cols)
163-
shufflers = [ShuffleActor.remote(x, partition_axis=0, shuffle_axis=1)
164-
for x in row_partitions]
190+
shufflers = [ShuffleActor.remote(
191+
row_partitions[i] if i < len(row_partitions) else pd.DataFrame(),
192+
partition_axis=0,
193+
shuffle_axis=1)
194+
for i in range(n_cols)]
165195

166196
shufflers_done = \
167197
[shufflers[i].shuffle.remote(
@@ -176,11 +206,12 @@ def _rebuild_cols(row_partitions, index, columns):
176206

177207
# TODO: Determine if this is the right place to reset the index
178208
def fix_indexes(df):
179-
df.index = index
180-
df.columns = np.arange(len(df.columns))
209+
df.columns = pd.RangeIndex(0, len(df.columns))
210+
df.reset_index(drop=True, inplace=True)
181211
return df
182212

183-
return [shuffler.apply_func.remote(fix_indexes) for shuffler in shufflers[:n_cols]]
213+
return [shuffler.apply_func.remote(fix_indexes)
214+
for shuffler in shufflers[:n_cols]]
184215

185216

186217
@ray.remote
@@ -212,8 +243,9 @@ def _rebuild_rows(col_partitions, index, columns):
212243
# TODO: Determine if this is the right place to reset the index
213244
# TODO: Determine if this needs the same changes as above
214245
def fix_indexes(df):
215-
df.columns = columns
216-
return df.reset_index(drop=True)
246+
df.columns = pd.RangeIndex(0, len(df.columns))
247+
df.reset_index(drop=True, inplace=True)
248+
return df
217249

218250
return [shuffler.apply_func.remote(fix_indexes) for shuffler in shufflers[:n_rows]]
219251

@@ -265,7 +297,7 @@ def _map_partitions(func, partitions, *argslists):
265297

266298

267299
@ray.remote(num_return_vals=2)
268-
def _compute_length_and_index(dfs):
300+
def _compute_length_and_index(dfs, index):
269301
"""Create a default index, which is a RangeIndex
270302
Returns:
271303
The pd.RangeIndex object that represents this DataFrame.
@@ -278,11 +310,12 @@ def _compute_length_and_index(dfs):
278310

279311
idx_df_col_names = ("partition", "index_within_partition")
280312

281-
return lengths, pd.DataFrame(dest_indices, columns=idx_df_col_names)
313+
return lengths, pd.DataFrame(dest_indices, index=index,
314+
columns=idx_df_col_names)
282315

283316

284317
@ray.remote(num_return_vals=2)
285-
def _compute_width_and_index(dfs):
318+
def _compute_width_and_index(dfs, columns):
286319
"""Create a default index, which is a RangeIndex
287320
Returns:
288321
The pd.RangeIndex object that represents this DataFrame.
@@ -295,4 +328,5 @@ def _compute_width_and_index(dfs):
295328

296329
idx_df_col_names = ("partition", "index_within_partition")
297330

298-
return widths, pd.DataFrame(dest_indices, columns=idx_df_col_names)
331+
return widths, pd.DataFrame(dest_indices, index=columns,
332+
columns=idx_df_col_names)

0 commit comments

Comments
 (0)