From e94c87563c34a1b132f1364ba91f5dcc71b23a89 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 18 Jul 2020 15:28:20 -0700 Subject: [PATCH 1/7] Support Dask Dataframes in Hyperband --- dask_ml/model_selection/_incremental.py | 28 +++++++++++++++++-------- tests/model_selection/test_hyperband.py | 14 +++++++++++++ 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index f9cf33286..220b672b5 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -179,6 +179,15 @@ async def _fit( # Convert testing data into a single element on the cluster # This assumes that it fits into memory on a single worker + if isinstance(X_train, (dd.DataFrame, dd.Series)): + X_train = X_train.to_dask_array() + if isinstance(X_test, (dd.DataFrame, dd.Series)): + X_test = X_test.to_dask_array() + if isinstance(y_train, dd.Series): + y_train = y_train.to_dask_array() + if isinstance(y_test, dd.Series): + y_test = y_test.to_dask_array() + if isinstance(X_test, da.Array): X_test = client.compute(X_test) else: @@ -515,13 +524,11 @@ def _validate_parameters(self, X, y): ) # Make sure dask arrays are passed so error on unknown chunk size is raised - if isinstance(X, dd.DataFrame): - X = X.to_dask_array() - if isinstance(y, (dd.DataFrame, dd.Series)): - y = y.to_dask_array() - kwargs = dict(accept_unknown_chunks=False, accept_dask_dataframe=False) - X = self._check_array(X, **kwargs) - y = self._check_array(y, ensure_2d=False, **kwargs) + kwargs = dict(accept_unknown_chunks=True, accept_dask_dataframe=True) + if not isinstance(X, dd.DataFrame): + X = self._check_array(X, **kwargs) + if not isinstance(y, dd.Series): + y = self._check_array(y, ensure_2d=False, **kwargs) scorer = check_scoring(self.estimator, scoring=self.scoring) return X, y, scorer @@ -560,7 +567,8 @@ def _get_train_test_split(self, X, y, **kwargs): else: test_size = self.test_size X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=test_size, random_state=self.random_state + X, y, test_size=test_size, random_state=self.random_state, + shuffle=True ) return X_train, X_test, y_train, y_test @@ -635,7 +643,9 @@ async def _fit(self, X, y, **fit_params): context = dummy_context() X, y, scorer = self._validate_parameters(X, y) - X_train, X_test, y_train, y_test = self._get_train_test_split(X, y) + + X_train, X_test, y_train, y_test = self._get_train_test_split(X, y, + shuffle=True) with context: results = await fit( diff --git a/tests/model_selection/test_hyperband.py b/tests/model_selection/test_hyperband.py index aa4b2d73f..b4d86a963 100644 --- a/tests/model_selection/test_hyperband.py +++ b/tests/model_selection/test_hyperband.py @@ -15,6 +15,7 @@ ) from sklearn.linear_model import SGDClassifier +import dask.dataframe as dd from dask_ml._compat import DISTRIBUTED_2_5_0 from dask_ml.datasets import make_classification from dask_ml.model_selection import ( @@ -465,3 +466,16 @@ def test_logs_dont_repeat(c, s, a, b): # Make sure only one model creation message is printed per bracket # (all brackets have unique n_models as asserted above) assert len(n_models) == len(set(n_models)) + + +@gen_cluster(client=True) +async def test_dataframe_inputs(c, s, a, b): + X = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) + X = dd.from_pandas(X, npartitions=2) + y = pd.Series([False, True, True]) + y = dd.from_pandas(y, npartitions=2) + + model = ConstantFunction() + params = {"value": scipy.stats.uniform(0, 1)} + alg = HyperbandSearchCV(model, params, max_iter=9, random_state=42) + await alg.fit(X, y) From 8800d6149d51498630bcb1ba000b278553032d56 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 18 Jul 2020 16:07:13 -0700 Subject: [PATCH 2/7] co-persist training and testing data --- dask_ml/model_selection/_incremental.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 220b672b5..8ec27beda 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -188,6 +188,8 @@ async def _fit( if isinstance(y_test, dd.Series): y_test = y_test.to_dask_array() + X_train, y_train, X_test, y_test = dask.persist(X_train, y_train, X_test, y_test) + if isinstance(X_test, da.Array): X_test = client.compute(X_test) else: @@ -198,7 +200,6 @@ async def _fit( y_test = await client.scatter(y_test) # Convert to batches of delayed objects of numpy arrays - X_train, y_train = dask.persist(X_train, y_train) X_train = sorted(futures_of(X_train), key=lambda f: f.key) y_train = sorted(futures_of(y_train), key=lambda f: f.key) assert len(X_train) == len(y_train) From 485ff530bca48eb6051591c75dbe6a5965d42333 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 18 Jul 2020 16:26:28 -0700 Subject: [PATCH 3/7] black --- dask_ml/model_selection/_incremental.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 8ec27beda..c099f183f 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -568,8 +568,7 @@ def _get_train_test_split(self, X, y, **kwargs): else: test_size = self.test_size X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=test_size, random_state=self.random_state, - shuffle=True + X, y, test_size=test_size, random_state=self.random_state, shuffle=True ) return X_train, X_test, y_train, y_test @@ -645,8 +644,9 @@ async def _fit(self, X, y, **fit_params): X, y, scorer = self._validate_parameters(X, y) - X_train, X_test, y_train, y_test = self._get_train_test_split(X, y, - shuffle=True) + X_train, X_test, y_train, y_test = self._get_train_test_split( + X, y, shuffle=True + ) with context: results = await fit( From 506be91f96c379abd606b03a02092211ab2e792f Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 18 Jul 2020 19:24:20 -0700 Subject: [PATCH 4/7] Don't check arrays without need --- dask_ml/model_selection/_incremental.py | 2 -- tests/model_selection/test_incremental.py | 16 ++++++---------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index c099f183f..0dca72a87 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -28,7 +28,6 @@ from .._compat import check_is_fitted, dummy_context from .._typing import ArrayLike, Int from .._utils import LoggingContext -from ..utils import check_array from ..wrappers import ParallelPostFit from ._split import train_test_split @@ -549,7 +548,6 @@ def _check_array(self, X, **kwargs): """ if isinstance(X, np.ndarray): X = da.from_array(X, X.shape) - X = check_array(X, **kwargs) return X def _get_train_test_split(self, X, y, **kwargs): diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 31da5b12c..82b381544 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -2,6 +2,7 @@ import concurrent.futures import itertools import logging +import math import random import sys @@ -259,13 +260,6 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b): search = InverseDecaySearchCV(model, params, **kwargs) else: raise ValueError() - if memory == "distributed" and input_type == "dataframe": - with pytest.raises(TypeError, match=r"to_dask_array\(lengths=True\)"): - await search.fit(X, y, classes=[0, 1]) - - # Dask-ML raised a type error; let's implement the suggestion - X = await c.compute(c.submit(X.to_dask_array, lengths=True)) - y = await c.compute(c.submit(y.to_dask_array, lengths=True)) await search.fit(X, y, classes=[0, 1]) assert search.history_ @@ -321,8 +315,10 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b): proba = search.predict_proba(X) log_proba = search.predict_log_proba(X) - assert proba.shape == (1000, 2) - assert log_proba.shape == (1000, 2) + assert proba.shape[1] == 2 + assert proba.shape[0] == 1000 or math.isnan(proba.shape[0]) + assert log_proba.shape[1] == 2 + assert log_proba.shape[0] == 1000 or math.isnan(proba.shape[0]) assert isinstance(proba, da.Array) assert isinstance(log_proba, da.Array) @@ -334,7 +330,7 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b): da.utils.assert_eq(log_proba, log_proba_) decision = search.decision_function(X_) - assert decision.shape == (1000,) + assert decision.shape == (1000,) or math.isnan(decision.shape[0]) return True From da4a9b97a075d43ff5827bc49b78a8994c92ac61 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 18 Jul 2020 20:06:42 -0700 Subject: [PATCH 5/7] isort --- tests/model_selection/test_hyperband.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/model_selection/test_hyperband.py b/tests/model_selection/test_hyperband.py index b4d86a963..0946c9edb 100644 --- a/tests/model_selection/test_hyperband.py +++ b/tests/model_selection/test_hyperband.py @@ -2,11 +2,11 @@ import math import warnings -import dask.array as da import numpy as np import pandas as pd import pytest import scipy.stats +from dask import array as da, dataframe as dd from distributed.utils_test import ( # noqa: F401 captured_logger, cluster, @@ -15,7 +15,6 @@ ) from sklearn.linear_model import SGDClassifier -import dask.dataframe as dd from dask_ml._compat import DISTRIBUTED_2_5_0 from dask_ml.datasets import make_classification from dask_ml.model_selection import ( From 9465a2cbabf8e000b9925baa41db24e3e6ae45da Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 21 Jul 2020 07:35:46 -0700 Subject: [PATCH 6/7] Update dask_ml/model_selection/_incremental.py Co-authored-by: Tom Augspurger --- dask_ml/model_selection/_incremental.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 0dca72a87..f1ec4f585 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -642,9 +642,7 @@ async def _fit(self, X, y, **fit_params): X, y, scorer = self._validate_parameters(X, y) - X_train, X_test, y_train, y_test = self._get_train_test_split( - X, y, shuffle=True - ) + X_train, X_test, y_train, y_test = self._get_train_test_split() with context: results = await fit( From bcc97532dce9bdd1aea682e047f4a5ae02782e18 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 21 Jul 2020 07:50:31 -0700 Subject: [PATCH 7/7] add back in X, y --- dask_ml/model_selection/_incremental.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index f1ec4f585..9e031d2f2 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -642,7 +642,7 @@ async def _fit(self, X, y, **fit_params): X, y, scorer = self._validate_parameters(X, y) - X_train, X_test, y_train, y_test = self._get_train_test_split() + X_train, X_test, y_train, y_test = self._get_train_test_split(X, y) with context: results = await fit(