Skip to content

UDFs are no longer executed in a thread #7453

@orf

Description

@orf

Describe the bug

In Datafusion 28 and below, UDFs where executed in a separate thread when writing to parquet. The example code below does not fail the assertion in version 28 but does in version 30 and git main.

If you have a UDF that expects to be running in a thread, or does some form of blocking computation then this change means your previously parallel dataframe plan becomes serial.

I couldn't spot anything in the release notes about this.

To Reproduce

deps:

[dependencies]
tokio = { version = "^1.0", features = ["rt-multi-thread", "full"] }
datafusion = { version = "=30", default-features = false, features = ["encoding__expressions", "zstd"] }

code:

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() {
    use std::sync::Arc;
    use datafusion::arrow::array::{ArrayRef, BooleanArray};
    use datafusion::arrow::datatypes::DataType;
    use datafusion::logical_expr::Volatility;
    use datafusion::parquet::file::properties::WriterProperties;
    use datafusion::physical_plan::functions::make_scalar_function;
    use datafusion::prelude::*;

    let config = SessionConfig::default();
    let ctx = SessionContext::with_config(config);
    let df = ctx
        .read_parquet("input.parquet", ParquetReadOptions::default())
        .await
        .unwrap();

    let main_thread_id = std::thread::current().id();

    let func = create_udf(
        "some_func",
        vec![DataType::Binary],
        Arc::new(DataType::Boolean),
        Volatility::Immutable,
        make_scalar_function(move |args: &[ArrayRef]| {
            let func_thread_id = std::thread::current().id();
            assert_ne!(main_thread_id, func_thread_id);
            return Ok(Arc::new(BooleanArray::from(vec![true; args[0].len()])));
        }));

    let df = df
        .select(vec![col("hash"), func.call(vec![col("hash")])])
        .unwrap();

    let props = WriterProperties::builder()
        .build();
    df.write_parquet("some_dir/", Some(props)).await.unwrap();
}

Expected behavior

Sync UDF functions should be executed in a blocking thread pool.

Additional context

I thought this might be related to #7205, but it doesn't appear to be culprit.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions