Skip to content

Commit 2bc67ef

Browse files
authored
Fix DataFrame::cache errors with Plan("Mismatch between schema and batches") (#8510)
* type cast * add test * use physical plan * logic optimization
1 parent 4578f3d commit 2bc67ef

File tree

1 file changed

+17
-5
lines changed
  • datafusion/core/src/dataframe

1 file changed

+17
-5
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,11 +1276,12 @@ impl DataFrame {
12761276
/// ```
12771277
pub async fn cache(self) -> Result<DataFrame> {
12781278
let context = SessionContext::new_with_state(self.session_state.clone());
1279-
let mem_table = MemTable::try_new(
1280-
SchemaRef::from(self.schema().clone()),
1281-
self.collect_partitioned().await?,
1282-
)?;
1283-
1279+
// The schema is consistent with the output
1280+
let plan = self.clone().create_physical_plan().await?;
1281+
let schema = plan.schema();
1282+
let task_ctx = Arc::new(self.task_ctx());
1283+
let partitions = collect_partitioned(plan, task_ctx).await?;
1284+
let mem_table = MemTable::try_new(schema, partitions)?;
12841285
context.read_table(Arc::new(mem_table))
12851286
}
12861287
}
@@ -2638,6 +2639,17 @@ mod tests {
26382639
Ok(())
26392640
}
26402641

2642+
#[tokio::test]
2643+
async fn test_cache_mismatch() -> Result<()> {
2644+
let ctx = SessionContext::new();
2645+
let df = ctx
2646+
.sql("SELECT CASE WHEN true THEN NULL ELSE 1 END")
2647+
.await?;
2648+
let cache_df = df.cache().await;
2649+
assert!(cache_df.is_ok());
2650+
Ok(())
2651+
}
2652+
26412653
#[tokio::test]
26422654
async fn cache_test() -> Result<()> {
26432655
let df = test_table()

0 commit comments

Comments
 (0)