diff --git a/conftest.py b/conftest.py index 820e89d5f..799e617d1 100644 --- a/conftest.py +++ b/conftest.py @@ -1,3 +1,12 @@ import pytest pytest_plugins = ["distributed.utils_test", "tests.integration.fixtures"] + + +def pytest_addoption(parser): + parser.addoption("--rungpu", action="store_true", help="run tests meant for GPU") + + +def pytest_runtest_setup(item): + if "gpu" in item.keywords and not item.config.getoption("--rungpu"): + pytest.skip("need --rungpu option to run") diff --git a/continuous_integration/gpuci/axis.yaml b/continuous_integration/gpuci/axis.yaml new file mode 100644 index 000000000..da4a725bd --- /dev/null +++ b/continuous_integration/gpuci/axis.yaml @@ -0,0 +1,13 @@ +PYTHON_VER: +- "3.8" + +CUDA_VER: +- "11.2" + +LINUX_VER: +- ubuntu18.04 + +RAPIDS_VER: +- "21.12" + +excludes: diff --git a/continuous_integration/gpuci/build.sh b/continuous_integration/gpuci/build.sh new file mode 100644 index 000000000..f559ce125 --- /dev/null +++ b/continuous_integration/gpuci/build.sh @@ -0,0 +1,62 @@ +################################################## +# dask-sql GPU build and test script for CI # +################################################## +set -e +NUMARGS=$# +ARGS=$* + +# Arg parsing function +function hasArg { + (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") +} + +# Set path and build parallel level +export PATH=/opt/conda/bin:/usr/local/cuda/bin:$PATH +export PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} + +# Set home to the job's workspace +export HOME="$WORKSPACE" + +# specify maven options +export MAVEN_OPTS="-Dmaven.repo.local=${WORKSPACE}/.m2/repository" + +# Switch to project root; also root of repo checkout +cd "$WORKSPACE" + +# Determine CUDA release version +export CUDA_REL=${CUDA_VERSION%.*} + +################################################################################ +# SETUP - Check environment +################################################################################ + +gpuci_logger "Check environment variables" +env + +gpuci_logger "Check GPU usage" +nvidia-smi + +gpuci_logger "Activate conda env" +. /opt/conda/etc/profile.d/conda.sh +conda activate dask_sql + +gpuci_logger "Install dask" +python -m pip install git+https://github.com/dask/dask + +gpuci_logger "Install distributed" +python -m pip install git+https://github.com/dask/distributed + +gpuci_logger "Install dask-sql" +pip install -e ".[dev]" +python setup.py java + +gpuci_logger "Check Python version" +python --version + +gpuci_logger "Check conda environment" +conda info +conda config --show-sources +conda list --show-channel-urls + +gpuci_logger "Python py.test for dask-sql" +py.test $WORKSPACE -n 4 -v -m gpu --rungpu --junitxml="$WORKSPACE/junit-dask-sql.xml" --cov-config="$WORKSPACE/.coveragerc" --cov=dask_sql --cov-report=xml:"$WORKSPACE/dask-sql-coverage.xml" --cov-report term diff --git a/pytest.ini b/pytest.ini index 5f56c83ee..86bc68964 100644 --- a/pytest.ini +++ b/pytest.ini @@ -5,3 +5,5 @@ addopts = --cov-report=term-missing testpaths = tests +markers = + gpu: marks tests that require GPUs (skipped by default, run with '--rungpu') diff --git a/tests/integration/test_create.py b/tests/integration/test_create.py index 1da751701..990c3beab 100644 --- a/tests/integration/test_create.py +++ b/tests/integration/test_create.py @@ -8,7 +8,8 @@ @skip_if_external_scheduler -def test_create_from_csv(c, df, temporary_data_file): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_create_from_csv(c, df, temporary_data_file, gpu): df.to_csv(temporary_data_file, index=False) c.sql( @@ -17,7 +18,8 @@ def test_create_from_csv(c, df, temporary_data_file): new_table WITH ( location = '{temporary_data_file}', - format = 'csv' + format = 'csv', + gpu = {gpu} ) """ ) @@ -28,10 +30,28 @@ def test_create_from_csv(c, df, temporary_data_file): """ ).compute() + if gpu: + result_df = result_df.to_pandas() + assert_frame_equal(result_df, df) -def test_cluster_memory(client, c, df): +@pytest.mark.parametrize( + "gpu", + [ + False, + pytest.param( + True, + marks=[ + pytest.mark.gpu, + pytest.mark.xfail( + reason="dataframes on memory currently aren't being converted to dask-cudf" + ), + ], + ), + ], +) +def test_cluster_memory(client, c, df, gpu): client.publish_dataset(df=dd.from_pandas(df, npartitions=1)) c.sql( @@ -40,7 +60,8 @@ def test_cluster_memory(client, c, df): new_table WITH ( location = 'df', - format = 'memory' + format = 'memory', + gpu = {gpu} ) """ ) @@ -51,11 +72,15 @@ def test_cluster_memory(client, c, df): """ ).compute() + if gpu: + return_df = return_df.to_pandas() + assert_frame_equal(df, return_df) @skip_if_external_scheduler -def test_create_from_csv_persist(c, df, temporary_data_file): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_create_from_csv_persist(c, df, temporary_data_file, gpu): df.to_csv(temporary_data_file, index=False) c.sql( @@ -65,7 +90,8 @@ def test_create_from_csv_persist(c, df, temporary_data_file): WITH ( location = '{temporary_data_file}', format = 'csv', - persist = True + persist = True, + gpu = {gpu} ) """ ) @@ -76,6 +102,9 @@ def test_create_from_csv_persist(c, df, temporary_data_file): """ ).compute() + if gpu: + return_df = return_df.to_pandas() + assert_frame_equal(df, return_df) @@ -143,7 +172,20 @@ def test_create_from_query(c, df): @skip_if_external_scheduler -def test_view_table_persist(c, temporary_data_file, df): +@pytest.mark.parametrize( + "gpu", + [ + False, + pytest.param( + True, + marks=( + pytest.mark.gpu, + pytest.mark.xfail(reason="to_pandas() changes int precision"), + ), + ), + ], +) +def test_view_table_persist(c, temporary_data_file, df, gpu): df.to_csv(temporary_data_file, index=False) c.sql( f""" @@ -151,7 +193,8 @@ def test_view_table_persist(c, temporary_data_file, df): new_table WITH ( location = '{temporary_data_file}', - format = 'csv' + format = 'csv', + gpu = {gpu} ) """ ) @@ -177,21 +220,27 @@ def test_view_table_persist(c, temporary_data_file, df): """ ) - assert_frame_equal( - c.sql("SELECT c FROM count_view").compute(), pd.DataFrame({"c": [700]}) - ) - assert_frame_equal( - c.sql("SELECT c FROM count_table").compute(), pd.DataFrame({"c": [700]}) - ) + from_view = c.sql("SELECT c FROM count_view").compute() + from_table = c.sql("SELECT c FROM count_table").compute() + + if gpu: + from_view = from_view.to_pandas() + from_table = from_table.to_pandas() + + assert_frame_equal(from_view, pd.DataFrame({"c": [700]})) + assert_frame_equal(from_table, pd.DataFrame({"c": [700]})) df.iloc[:10].to_csv(temporary_data_file, index=False) - assert_frame_equal( - c.sql("SELECT c FROM count_view").compute(), pd.DataFrame({"c": [10]}) - ) - assert_frame_equal( - c.sql("SELECT c FROM count_table").compute(), pd.DataFrame({"c": [700]}) - ) + from_view = c.sql("SELECT c FROM count_view").compute() + from_table = c.sql("SELECT c FROM count_table").compute() + + if gpu: + from_view = from_view.to_pandas() + from_table = from_table.to_pandas() + + assert_frame_equal(from_view, pd.DataFrame({"c": [10]})) + assert_frame_equal(from_table, pd.DataFrame({"c": [700]})) def test_replace_and_error(c, temporary_data_file, df): diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index a61b7e838..e7f5e0ef9 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -8,13 +8,21 @@ from dask_sql import Context +try: + import cudf + import dask_cudf +except ImportError: + cudf = None + dask_cudf = None -def test_add_remove_tables(): + +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_add_remove_tables(gpu): c = Context() data_frame = dd.from_pandas(pd.DataFrame(), npartitions=1) - c.create_table("table", data_frame) + c.create_table("table", data_frame, gpu=gpu) assert "table" in c.schema[c.schema_name].tables c.drop_table("table") @@ -23,14 +31,18 @@ def test_add_remove_tables(): with pytest.raises(KeyError): c.drop_table("table") - c.create_table("table", [data_frame]) + c.create_table("table", [data_frame], gpu=gpu) assert "table" in c.schema[c.schema_name].tables -def test_deprecation_warning(): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_deprecation_warning(gpu): c = Context() data_frame = dd.from_pandas(pd.DataFrame(), npartitions=1) + if gpu: + data_frame = dask_cudf.from_dask_dataframe(data_frame) + with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") @@ -45,11 +57,12 @@ def test_deprecation_warning(): assert "table" not in c.schema[c.schema_name].tables -def test_explain(): +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_explain(gpu): c = Context() data_frame = dd.from_pandas(pd.DataFrame({"a": [1, 2, 3]}), npartitions=1) - c.create_table("df", data_frame) + c.create_table("df", data_frame, gpu=gpu) sql_string = c.explain("SELECT * FROM df") @@ -62,6 +75,9 @@ def test_explain(): data_frame = dd.from_pandas(pd.DataFrame({"a": [1, 2, 3]}), npartitions=1) + if gpu: + data_frame = dask_cudf.from_dask_dataframe(data_frame) + sql_string = c.explain( "SELECT * FROM other_df", dataframes={"other_df": data_frame} ) @@ -72,84 +88,129 @@ def test_explain(): ) -def test_sql(): +@pytest.mark.parametrize( + "gpu", + [ + False, + pytest.param( + True, + marks=( + pytest.mark.gpu, + pytest.mark.xfail(reason="create_table(gpu=True) doesn't work"), + ), + ), + ], +) +def test_sql(gpu): c = Context() data_frame = dd.from_pandas(pd.DataFrame({"a": [1, 2, 3]}), npartitions=1) - c.create_table("df", data_frame) + c.create_table("df", data_frame, gpu=gpu) result = c.sql("SELECT * FROM df") - assert isinstance(result, dd.DataFrame) - assert_frame_equal(result.compute(), data_frame.compute()) + assert isinstance(result, dd.DataFrame if not gpu else dask_cudf.DataFrame) + dd.assert_eq(result, data_frame) result = c.sql("SELECT * FROM df", return_futures=False) - assert isinstance(result, pd.DataFrame) - assert_frame_equal(result, data_frame.compute()) + assert isinstance(result, pd.DataFrame if not gpu else cudf.DataFrame) + dd.assert_eq(result, data_frame) + if gpu: + data_frame = dask_cudf.from_dask_dataframe(data_frame) result = c.sql("SELECT * FROM other_df", dataframes={"other_df": data_frame}) - assert isinstance(result, dd.DataFrame) - assert_frame_equal(result.compute(), data_frame.compute()) - - -def test_input_types(temporary_data_file): + assert isinstance(result, dd.DataFrame if not gpu else dask_cudf.DataFrame) + dd.assert_eq(result, data_frame) + + +@pytest.mark.parametrize( + "gpu", + [ + False, + pytest.param( + True, + marks=( + pytest.mark.gpu, + pytest.mark.xfail(reason="create_table(gpu=True) doesn't work"), + ), + ), + ], +) +def test_input_types(temporary_data_file, gpu): c = Context() df = pd.DataFrame({"a": [1, 2, 3]}) - def assert_correct_output(): + def assert_correct_output(gpu): result = c.sql("SELECT * FROM df") - assert isinstance(result, dd.DataFrame) - assert_frame_equal(result.compute(), df) + assert isinstance(result, dd.DataFrame if not gpu else dask_cudf.DataFrame) + dd.assert_eq(result, df) - c.create_table("df", df) - assert_correct_output() + c.create_table("df", df, gpu=gpu) + assert_correct_output(gpu=gpu) - c.create_table("df", dd.from_pandas(df, npartitions=1)) - assert_correct_output() + c.create_table("df", dd.from_pandas(df, npartitions=1), gpu=gpu) + assert_correct_output(gpu=gpu) df.to_csv(temporary_data_file, index=False) - c.create_table("df", temporary_data_file) - assert_correct_output() + c.create_table("df", temporary_data_file, gpu=gpu) + assert_correct_output(gpu=gpu) df.to_csv(temporary_data_file, index=False) - c.create_table("df", temporary_data_file, format="csv") - assert_correct_output() + c.create_table("df", temporary_data_file, format="csv", gpu=gpu) + assert_correct_output(gpu=gpu) df.to_parquet(temporary_data_file, index=False) - c.create_table("df", temporary_data_file, format="parquet") - assert_correct_output() + c.create_table("df", temporary_data_file, format="parquet", gpu=gpu) + assert_correct_output(gpu=gpu) with pytest.raises(AttributeError): - c.create_table("df", temporary_data_file, format="unknown") + c.create_table("df", temporary_data_file, format="unknown", gpu=gpu) strangeThing = object() with pytest.raises(ValueError): - c.create_table("df", strangeThing) - - -def test_tables_from_stack(): + c.create_table("df", strangeThing, gpu=gpu) + + +@pytest.mark.parametrize( + "gpu", + [ + False, + pytest.param( + True, + marks=( + pytest.mark.gpu, + pytest.mark.xfail( + reason="GPU tables aren't picked up by _get_tables_from_stack" + ), + ), + ), + ], +) +def test_tables_from_stack(gpu): c = Context() assert not c._get_tables_from_stack() - df = pd.DataFrame() + df = pd.DataFrame() if not gpu else cudf.DataFrame() assert "df" in c._get_tables_from_stack() - def f(): - df2 = pd.DataFrame() + def f(gpu): + df2 = pd.DataFrame() if not gpu else cudf.DataFrame() assert "df" in c._get_tables_from_stack() assert "df2" in c._get_tables_from_stack() - f() + f(gpu=gpu) - def g(): - df = pd.DataFrame({"a": [1]}) + def g(gpu=gpu): + df = pd.DataFrame({"a": [1]}) if not gpu else cudf.DataFrame({"a": [1]}) assert "df" in c._get_tables_from_stack() assert c._get_tables_from_stack()["df"].columns == ["a"] + g(gpu=gpu) + def test_function_adding(): c = Context()