Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from .._compat import check_is_fitted, dummy_context
from .._typing import ArrayLike, Int
from .._utils import LoggingContext
from ..utils import check_array
from ..wrappers import ParallelPostFit
from ._split import train_test_split

Expand Down Expand Up @@ -179,6 +178,17 @@ async def _fit(

# Convert testing data into a single element on the cluster
# This assumes that it fits into memory on a single worker
if isinstance(X_train, (dd.DataFrame, dd.Series)):
X_train = X_train.to_dask_array()
if isinstance(X_test, (dd.DataFrame, dd.Series)):
X_test = X_test.to_dask_array()
if isinstance(y_train, dd.Series):
y_train = y_train.to_dask_array()
if isinstance(y_test, dd.Series):
y_test = y_test.to_dask_array()

X_train, y_train, X_test, y_test = dask.persist(X_train, y_train, X_test, y_test)

if isinstance(X_test, da.Array):
X_test = client.compute(X_test)
else:
Expand All @@ -189,7 +199,6 @@ async def _fit(
y_test = await client.scatter(y_test)

# Convert to batches of delayed objects of numpy arrays
X_train, y_train = dask.persist(X_train, y_train)
X_train = sorted(futures_of(X_train), key=lambda f: f.key)
y_train = sorted(futures_of(y_train), key=lambda f: f.key)
assert len(X_train) == len(y_train)
Expand Down Expand Up @@ -515,13 +524,11 @@ def _validate_parameters(self, X, y):
)

# Make sure dask arrays are passed so error on unknown chunk size is raised
if isinstance(X, dd.DataFrame):
X = X.to_dask_array()
if isinstance(y, (dd.DataFrame, dd.Series)):
y = y.to_dask_array()
kwargs = dict(accept_unknown_chunks=False, accept_dask_dataframe=False)
X = self._check_array(X, **kwargs)
y = self._check_array(y, ensure_2d=False, **kwargs)
kwargs = dict(accept_unknown_chunks=True, accept_dask_dataframe=True)
if not isinstance(X, dd.DataFrame):
X = self._check_array(X, **kwargs)
if not isinstance(y, dd.Series):
y = self._check_array(y, ensure_2d=False, **kwargs)
scorer = check_scoring(self.estimator, scoring=self.scoring)
return X, y, scorer

Expand All @@ -541,7 +548,6 @@ def _check_array(self, X, **kwargs):
"""
if isinstance(X, np.ndarray):
X = da.from_array(X, X.shape)
X = check_array(X, **kwargs)
return X

def _get_train_test_split(self, X, y, **kwargs):
Expand All @@ -560,7 +566,7 @@ def _get_train_test_split(self, X, y, **kwargs):
else:
test_size = self.test_size
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=self.random_state
X, y, test_size=test_size, random_state=self.random_state, shuffle=True
)
return X_train, X_test, y_train, y_test

Expand Down Expand Up @@ -635,6 +641,7 @@ async def _fit(self, X, y, **fit_params):
context = dummy_context()

X, y, scorer = self._validate_parameters(X, y)

X_train, X_test, y_train, y_test = self._get_train_test_split(X, y)

with context:
Expand Down
15 changes: 14 additions & 1 deletion tests/model_selection/test_hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import math
import warnings

import dask.array as da
import numpy as np
import pandas as pd
import pytest
import scipy.stats
from dask import array as da, dataframe as dd
from distributed.utils_test import ( # noqa: F401
captured_logger,
cluster,
Expand Down Expand Up @@ -465,3 +465,16 @@ def test_logs_dont_repeat(c, s, a, b):
# Make sure only one model creation message is printed per bracket
# (all brackets have unique n_models as asserted above)
assert len(n_models) == len(set(n_models))


@gen_cluster(client=True)
async def test_dataframe_inputs(c, s, a, b):
X = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
X = dd.from_pandas(X, npartitions=2)
y = pd.Series([False, True, True])
y = dd.from_pandas(y, npartitions=2)

model = ConstantFunction()
params = {"value": scipy.stats.uniform(0, 1)}
alg = HyperbandSearchCV(model, params, max_iter=9, random_state=42)
await alg.fit(X, y)
16 changes: 6 additions & 10 deletions tests/model_selection/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import concurrent.futures
import itertools
import logging
import math
import random
import sys

Expand Down Expand Up @@ -259,13 +260,6 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b):
search = InverseDecaySearchCV(model, params, **kwargs)
else:
raise ValueError()
if memory == "distributed" and input_type == "dataframe":
with pytest.raises(TypeError, match=r"to_dask_array\(lengths=True\)"):
await search.fit(X, y, classes=[0, 1])

# Dask-ML raised a type error; let's implement the suggestion
X = await c.compute(c.submit(X.to_dask_array, lengths=True))
y = await c.compute(c.submit(y.to_dask_array, lengths=True))
await search.fit(X, y, classes=[0, 1])

assert search.history_
Expand Down Expand Up @@ -321,8 +315,10 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b):

proba = search.predict_proba(X)
log_proba = search.predict_log_proba(X)
assert proba.shape == (1000, 2)
assert log_proba.shape == (1000, 2)
assert proba.shape[1] == 2
assert proba.shape[0] == 1000 or math.isnan(proba.shape[0])
assert log_proba.shape[1] == 2
assert log_proba.shape[0] == 1000 or math.isnan(proba.shape[0])

assert isinstance(proba, da.Array)
assert isinstance(log_proba, da.Array)
Expand All @@ -334,7 +330,7 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b):
da.utils.assert_eq(log_proba, log_proba_)

decision = search.decision_function(X_)
assert decision.shape == (1000,)
assert decision.shape == (1000,) or math.isnan(decision.shape[0])
return True


Expand Down