From bec4ccf3d0aa0abbd98b1d415c05ea0ff1f254f9 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Tue, 12 Dec 2023 16:43:37 +0800 Subject: [PATCH 1/4] type cast --- datafusion/core/src/dataframe/mod.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c40dd522a457..39fd962dd59b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -60,6 +60,8 @@ use datafusion_expr::{ }; use async_trait::async_trait; +use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; +use datafusion_optimizer::analyzer::AnalyzerRule; /// Contains options that control how data is /// written out from a DataFrame @@ -1271,8 +1273,13 @@ impl DataFrame { /// ``` pub async fn cache(self) -> Result { let context = SessionContext::new_with_state(self.session_state.clone()); + // type cast + // such as, DataType::Null is normally resolved (via Coercion) to an actual type of the target schema + let analyzed_plan = TypeCoercion::new() + .analyze(self.plan.clone(), self.session_state.config_options())?; + let transform = analyzed_plan.schema().clone(); let mem_table = MemTable::try_new( - SchemaRef::from(self.schema().clone()), + SchemaRef::from(transform.as_ref().clone()), self.collect_partitioned().await?, )?; From 54c70de9f6a2374dd949906a98b6d909d0b20874 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Tue, 12 Dec 2023 17:05:46 +0800 Subject: [PATCH 2/4] add test --- datafusion/core/src/dataframe/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 39fd962dd59b..a7c99073e0b4 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2640,6 +2640,17 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_cache_mismatch() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx + .sql("SELECT CASE WHEN true THEN NULL ELSE 1 END") + .await?; + let cache_df = df.cache().await; + assert!(cache_df.is_ok()); + Ok(()) + } + #[tokio::test] async fn cache_test() -> Result<()> { let df = test_table() From 72c498c164ed68e47e1518d3e08fb5c69713ff21 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 13 Dec 2023 10:28:39 +0800 Subject: [PATCH 3/4] use physical plan --- datafusion/core/src/dataframe/mod.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index a7c99073e0b4..9a0724df1466 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -60,8 +60,6 @@ use datafusion_expr::{ }; use async_trait::async_trait; -use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; -use datafusion_optimizer::analyzer::AnalyzerRule; /// Contains options that control how data is /// written out from a DataFrame @@ -1273,15 +1271,10 @@ impl DataFrame { /// ``` pub async fn cache(self) -> Result { let context = SessionContext::new_with_state(self.session_state.clone()); - // type cast - // such as, DataType::Null is normally resolved (via Coercion) to an actual type of the target schema - let analyzed_plan = TypeCoercion::new() - .analyze(self.plan.clone(), self.session_state.config_options())?; - let transform = analyzed_plan.schema().clone(); - let mem_table = MemTable::try_new( - SchemaRef::from(transform.as_ref().clone()), - self.collect_partitioned().await?, - )?; + // The schema is consistent with the output + let physical_plan = self.clone().create_physical_plan().await?; + let mem_table = + MemTable::try_new(physical_plan.schema(), self.collect_partitioned().await?)?; context.read_table(Arc::new(mem_table)) } From 67a84d7759e26c921cb6028c0037c02eb4b637ad Mon Sep 17 00:00:00 2001 From: asura7969 <1402357969@qq.com> Date: Wed, 13 Dec 2023 22:56:23 +0800 Subject: [PATCH 4/4] logic optimization --- datafusion/core/src/dataframe/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 9a0724df1466..0fa6d6cbde06 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1272,10 +1272,11 @@ impl DataFrame { pub async fn cache(self) -> Result { let context = SessionContext::new_with_state(self.session_state.clone()); // The schema is consistent with the output - let physical_plan = self.clone().create_physical_plan().await?; - let mem_table = - MemTable::try_new(physical_plan.schema(), self.collect_partitioned().await?)?; - + let plan = self.clone().create_physical_plan().await?; + let schema = plan.schema(); + let task_ctx = Arc::new(self.task_ctx()); + let partitions = collect_partitioned(plan, task_ctx).await?; + let mem_table = MemTable::try_new(schema, partitions)?; context.read_table(Arc::new(mem_table)) } }