-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Describe the bug
A compilation error is raised in a situation that I believe is a valid use case of DataFusion
To Reproduce
Define an #[async_trait] with an async function signature that returns a DataFrame. Then implement the trait with a function that creates a context (ctx) and returns the result of ctx.read_csv.
#[async_trait]
trait CallReadTrait {
async fn call(&self) -> Arc<dyn DataFrame>;
}
struct CallRead {}
#[async_trait]
impl CallReadTrait for CallRead {
async fn call(&self) -> Arc<dyn DataFrame> {
let mut ctx = ExecutionContext::new();
ctx.read_csv("dummy.csv", CsvReadOptions::new()).await.unwrap()
}
}When compiling with the stable 1.55.0 toolchain, the following error is raised
error: future cannot be sent between threads safely
--> datafusion/src/execution/context.rs:4151:52
|
4151 | async fn call(&self) -> Arc<dyn DataFrame> {
| ____________________________________________________^
4152 | | let mut ctx = ExecutionContext::new();
4153 | | ctx.read_csv("dummy.csv", CsvReadOptions::new()).await.unwrap()
4154 | | }
| |_________^ future created by async block is not `Send`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, context::ExecutionContextState>`
note: future is not `Send` as this value is used across an await
--> datafusion/src/execution/context.rs:341:14
|
341 | &LogicalPlanBuilder::scan_csv(
| ______________^
342 | | object_store,
343 | | path,
344 | | options,
... |
347 | | )
348 | | .await?
| |__________________^ first, await occurs here, with `self.state.lock().unwrap()` maybe used later...
note: `self.state.lock().unwrap()` is later dropped here
--> datafusion/src/execution/context.rs:351:5
|
346 | self.state.lock().unwrap().config.target_partitions,
| -------------------------- has type `std::sync::MutexGuard<'_, context::ExecutionContextState>` which is not `Send`
...
351 | }
| ^
= note: required for the cast to the object type `dyn futures::Future<Output = std::sync::Arc<(dyn dataframe::DataFrame + 'static)>> + std::marker::Send`
Expected behavior
I expect this code compile and run successfully
Additional context
This error is not raised when calling the code in an async function that is not defined in an #[async_trait] implementation.
Here is an excerpt of the current ctx.read_csv function
Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&LogicalPlanBuilder::scan_csv(
object_store,
path,
options,
None,
self.state.lock().unwrap().config.target_partitions,
)
.await?
.build()?,
)))I don't fully understand the root cause here, but the compilation error goes away if the self.state.lock().unwrap().config.target_partitions expression is lifted to a local variable like this:
let target_partitions = self.state.lock().unwrap().config.target_partitions;
Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&LogicalPlanBuilder::scan_csv(
object_store,
path,
options,
None,
target_partitions,
)
.await?
.build()?,
)))The situation is similar for the read_parquet and read_avro functions as well.
If this change makes sense, I'm happy to open a PR.