Skip to content

"future cannot be sent between threads safely" when returning ctx.read_csv from async_trait function #1154

@jonmmease

Description

@jonmmease

Describe the bug
A compilation error is raised in a situation that I believe is a valid use case of DataFusion

To Reproduce
Define an #[async_trait] with an async function signature that returns a DataFrame. Then implement the trait with a function that creates a context (ctx) and returns the result of ctx.read_csv.

#[async_trait]
trait CallReadTrait {
    async fn call(&self) -> Arc<dyn DataFrame>;
}

struct CallRead {}

#[async_trait]
impl CallReadTrait for CallRead {
    async fn call(&self) -> Arc<dyn DataFrame> {
        let mut ctx = ExecutionContext::new();
        ctx.read_csv("dummy.csv", CsvReadOptions::new()).await.unwrap()
    }
}

When compiling with the stable 1.55.0 toolchain, the following error is raised

error: future cannot be sent between threads safely
    --> datafusion/src/execution/context.rs:4151:52
     |
4151 |           async fn call(&self) -> Arc<dyn DataFrame> {
     |  ____________________________________________________^
4152 | |             let mut ctx = ExecutionContext::new();
4153 | |             ctx.read_csv("dummy.csv", CsvReadOptions::new()).await.unwrap()
4154 | |         }
     | |_________^ future created by async block is not `Send`
     |
     = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, context::ExecutionContextState>`
note: future is not `Send` as this value is used across an await
    --> datafusion/src/execution/context.rs:341:14
     |
341  |               &LogicalPlanBuilder::scan_csv(
     |  ______________^
342  | |                 object_store,
343  | |                 path,
344  | |                 options,
...    |
347  | |             )
348  | |             .await?
     | |__________________^ first, await occurs here, with `self.state.lock().unwrap()` maybe used later...
note: `self.state.lock().unwrap()` is later dropped here
    --> datafusion/src/execution/context.rs:351:5
     |
346  |                 self.state.lock().unwrap().config.target_partitions,
     |                 -------------------------- has type `std::sync::MutexGuard<'_, context::ExecutionContextState>` which is not `Send`
...
351  |     }
     |     ^
     = note: required for the cast to the object type `dyn futures::Future<Output = std::sync::Arc<(dyn dataframe::DataFrame + 'static)>> + std::marker::Send`

Expected behavior
I expect this code compile and run successfully

Additional context
This error is not raised when calling the code in an async function that is not defined in an #[async_trait] implementation.

Here is an excerpt of the current ctx.read_csv function

        Ok(Arc::new(DataFrameImpl::new(
            self.state.clone(),
            &LogicalPlanBuilder::scan_csv(
                object_store,
                path,
                options,
                None,
                self.state.lock().unwrap().config.target_partitions,
            )
            .await?
            .build()?,
        )))

I don't fully understand the root cause here, but the compilation error goes away if the self.state.lock().unwrap().config.target_partitions expression is lifted to a local variable like this:

        let target_partitions = self.state.lock().unwrap().config.target_partitions;
        Ok(Arc::new(DataFrameImpl::new(
            self.state.clone(),
            &LogicalPlanBuilder::scan_csv(
                object_store,
                path,
                options,
                None,
                target_partitions,
            )
            .await?
            .build()?,
        )))

The situation is similar for the read_parquet and read_avro functions as well.

If this change makes sense, I'm happy to open a PR.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions