Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
import pytest

pytest_plugins = ["distributed.utils_test", "tests.integration.fixtures"]


def pytest_addoption(parser):
parser.addoption("--rungpu", action="store_true", help="run tests meant for GPU")


def pytest_runtest_setup(item):
if "gpu" in item.keywords and not item.config.getoption("--rungpu"):
pytest.skip("need --rungpu option to run")
13 changes: 13 additions & 0 deletions continuous_integration/gpuci/axis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
PYTHON_VER:
- "3.8"

CUDA_VER:
- "11.2"

LINUX_VER:
- ubuntu18.04

RAPIDS_VER:
- "21.12"

excludes:
62 changes: 62 additions & 0 deletions continuous_integration/gpuci/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
##################################################
# dask-sql GPU build and test script for CI #
##################################################
set -e
NUMARGS=$#
ARGS=$*

# Arg parsing function
function hasArg {
(( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ")
}

# Set path and build parallel level
export PATH=/opt/conda/bin:/usr/local/cuda/bin:$PATH
export PARALLEL_LEVEL=${PARALLEL_LEVEL:-4}

# Set home to the job's workspace
export HOME="$WORKSPACE"

# specify maven options
export MAVEN_OPTS="-Dmaven.repo.local=${WORKSPACE}/.m2/repository"

# Switch to project root; also root of repo checkout
cd "$WORKSPACE"

# Determine CUDA release version
export CUDA_REL=${CUDA_VERSION%.*}

################################################################################
# SETUP - Check environment
################################################################################

gpuci_logger "Check environment variables"
env

gpuci_logger "Check GPU usage"
nvidia-smi

gpuci_logger "Activate conda env"
. /opt/conda/etc/profile.d/conda.sh
conda activate dask_sql

gpuci_logger "Install dask"
python -m pip install git+https://github.com/dask/dask

gpuci_logger "Install distributed"
python -m pip install git+https://github.com/dask/distributed

gpuci_logger "Install dask-sql"
pip install -e ".[dev]"
python setup.py java

gpuci_logger "Check Python version"
python --version

gpuci_logger "Check conda environment"
conda info
conda config --show-sources
conda list --show-channel-urls

gpuci_logger "Python py.test for dask-sql"
py.test $WORKSPACE -n 4 -v -m gpu --rungpu --junitxml="$WORKSPACE/junit-dask-sql.xml" --cov-config="$WORKSPACE/.coveragerc" --cov=dask_sql --cov-report=xml:"$WORKSPACE/dask-sql-coverage.xml" --cov-report term
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ addopts =
--cov-report=term-missing
testpaths =
tests
markers =
gpu: marks tests that require GPUs (skipped by default, run with '--rungpu')
89 changes: 69 additions & 20 deletions tests/integration/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@


@skip_if_external_scheduler
def test_create_from_csv(c, df, temporary_data_file):
@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_create_from_csv(c, df, temporary_data_file, gpu):
df.to_csv(temporary_data_file, index=False)

c.sql(
Expand All @@ -17,7 +18,8 @@ def test_create_from_csv(c, df, temporary_data_file):
new_table
WITH (
location = '{temporary_data_file}',
format = 'csv'
format = 'csv',
gpu = {gpu}
)
"""
)
Expand All @@ -28,10 +30,28 @@ def test_create_from_csv(c, df, temporary_data_file):
"""
).compute()

if gpu:
result_df = result_df.to_pandas()

assert_frame_equal(result_df, df)


def test_cluster_memory(client, c, df):
@pytest.mark.parametrize(
"gpu",
[
False,
pytest.param(
True,
marks=[
pytest.mark.gpu,
pytest.mark.xfail(
reason="dataframes on memory currently aren't being converted to dask-cudf"
),
],
),
],
)
def test_cluster_memory(client, c, df, gpu):
client.publish_dataset(df=dd.from_pandas(df, npartitions=1))

c.sql(
Expand All @@ -40,7 +60,8 @@ def test_cluster_memory(client, c, df):
new_table
WITH (
location = 'df',
format = 'memory'
format = 'memory',
gpu = {gpu}
)
"""
)
Expand All @@ -51,11 +72,15 @@ def test_cluster_memory(client, c, df):
"""
).compute()

if gpu:
return_df = return_df.to_pandas()

assert_frame_equal(df, return_df)


@skip_if_external_scheduler
def test_create_from_csv_persist(c, df, temporary_data_file):
@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_create_from_csv_persist(c, df, temporary_data_file, gpu):
df.to_csv(temporary_data_file, index=False)

c.sql(
Expand All @@ -65,7 +90,8 @@ def test_create_from_csv_persist(c, df, temporary_data_file):
WITH (
location = '{temporary_data_file}',
format = 'csv',
persist = True
persist = True,
gpu = {gpu}
)
"""
)
Expand All @@ -76,6 +102,9 @@ def test_create_from_csv_persist(c, df, temporary_data_file):
"""
).compute()

if gpu:
return_df = return_df.to_pandas()

assert_frame_equal(df, return_df)


Expand Down Expand Up @@ -143,15 +172,29 @@ def test_create_from_query(c, df):


@skip_if_external_scheduler
def test_view_table_persist(c, temporary_data_file, df):
@pytest.mark.parametrize(
"gpu",
[
False,
pytest.param(
True,
marks=(
pytest.mark.gpu,
pytest.mark.xfail(reason="to_pandas() changes int precision"),
),
),
],
)
def test_view_table_persist(c, temporary_data_file, df, gpu):
df.to_csv(temporary_data_file, index=False)
c.sql(
f"""
CREATE TABLE
new_table
WITH (
location = '{temporary_data_file}',
format = 'csv'
format = 'csv',
gpu = {gpu}
)
"""
)
Expand All @@ -177,21 +220,27 @@ def test_view_table_persist(c, temporary_data_file, df):
"""
)

assert_frame_equal(
c.sql("SELECT c FROM count_view").compute(), pd.DataFrame({"c": [700]})
)
assert_frame_equal(
c.sql("SELECT c FROM count_table").compute(), pd.DataFrame({"c": [700]})
)
from_view = c.sql("SELECT c FROM count_view").compute()
from_table = c.sql("SELECT c FROM count_table").compute()

if gpu:
from_view = from_view.to_pandas()
from_table = from_table.to_pandas()

assert_frame_equal(from_view, pd.DataFrame({"c": [700]}))
assert_frame_equal(from_table, pd.DataFrame({"c": [700]}))

df.iloc[:10].to_csv(temporary_data_file, index=False)

assert_frame_equal(
c.sql("SELECT c FROM count_view").compute(), pd.DataFrame({"c": [10]})
)
assert_frame_equal(
c.sql("SELECT c FROM count_table").compute(), pd.DataFrame({"c": [700]})
)
from_view = c.sql("SELECT c FROM count_view").compute()
from_table = c.sql("SELECT c FROM count_table").compute()

if gpu:
from_view = from_view.to_pandas()
from_table = from_table.to_pandas()

assert_frame_equal(from_view, pd.DataFrame({"c": [10]}))
assert_frame_equal(from_table, pd.DataFrame({"c": [700]}))


def test_replace_and_error(c, temporary_data_file, df):
Expand Down
Loading