Skip to content

Commit b0b6e44

Browse files
authored
Introduce INFORMATION_SCHEMA.ROUTINES table (#13255)
* tmp * introduce routines table * add is_deterministic field * cargo fmt * rollback the session_state changed
1 parent ddee471 commit b0b6e44

File tree

8 files changed

+415
-20
lines changed

8 files changed

+415
-20
lines changed

datafusion/core/src/catalog_common/information_schema.rs

Lines changed: 291 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,41 @@
1919
//!
2020
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
2121
22-
use arrow::{
23-
array::{StringBuilder, UInt64Builder},
24-
datatypes::{DataType, Field, Schema, SchemaRef},
25-
record_batch::RecordBatch,
26-
};
27-
use async_trait::async_trait;
28-
use datafusion_common::DataFusionError;
29-
use std::fmt::Debug;
30-
use std::{any::Any, sync::Arc};
31-
3222
use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider};
3323
use crate::datasource::streaming::StreamingTable;
3424
use crate::execution::context::TaskContext;
35-
use crate::logical_expr::TableType;
25+
use crate::logical_expr::{TableType, Volatility};
3626
use crate::physical_plan::stream::RecordBatchStreamAdapter;
3727
use crate::physical_plan::SendableRecordBatchStream;
3828
use crate::{
3929
config::{ConfigEntry, ConfigOptions},
4030
physical_plan::streaming::PartitionStream,
4131
};
32+
use arrow::{
33+
array::{StringBuilder, UInt64Builder},
34+
datatypes::{DataType, Field, Schema, SchemaRef},
35+
record_batch::RecordBatch,
36+
};
37+
use arrow_array::builder::BooleanBuilder;
38+
use async_trait::async_trait;
39+
use datafusion_common::error::Result;
40+
use datafusion_common::DataFusionError;
41+
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, WindowUDF};
42+
use std::collections::{HashMap, HashSet};
43+
use std::fmt::Debug;
44+
use std::{any::Any, sync::Arc};
4245

4346
pub(crate) const INFORMATION_SCHEMA: &str = "information_schema";
4447
pub(crate) const TABLES: &str = "tables";
4548
pub(crate) const VIEWS: &str = "views";
4649
pub(crate) const COLUMNS: &str = "columns";
4750
pub(crate) const DF_SETTINGS: &str = "df_settings";
4851
pub(crate) const SCHEMATA: &str = "schemata";
52+
pub(crate) const ROUTINES: &str = "routines";
4953

5054
/// All information schema tables
5155
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
52-
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA];
56+
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA, ROUTINES];
5357

5458
/// Implements the `information_schema` virtual schema and tables
5559
///
@@ -208,6 +212,151 @@ impl InformationSchemaConfig {
208212
builder.add_setting(entry);
209213
}
210214
}
215+
216+
fn make_routines(
217+
&self,
218+
udfs: &HashMap<String, Arc<ScalarUDF>>,
219+
udafs: &HashMap<String, Arc<AggregateUDF>>,
220+
udwfs: &HashMap<String, Arc<WindowUDF>>,
221+
config_options: &ConfigOptions,
222+
builder: &mut InformationSchemaRoutinesBuilder,
223+
) -> Result<()> {
224+
let catalog_name = &config_options.catalog.default_catalog;
225+
let schema_name = &config_options.catalog.default_schema;
226+
227+
for (name, udf) in udfs {
228+
let return_types = get_udf_args_and_return_types(udf)?
229+
.into_iter()
230+
.map(|(_, return_type)| return_type)
231+
.collect::<HashSet<_>>();
232+
for return_type in return_types {
233+
builder.add_routine(
234+
catalog_name,
235+
schema_name,
236+
name,
237+
"FUNCTION",
238+
Self::is_deterministic(udf.signature()),
239+
return_type,
240+
"SCALAR",
241+
udf.documentation().map(|d| d.description.to_string()),
242+
)
243+
}
244+
}
245+
246+
for (name, udaf) in udafs {
247+
let return_types = get_udaf_args_and_return_types(udaf)?
248+
.into_iter()
249+
.map(|(_, return_type)| return_type)
250+
.collect::<HashSet<_>>();
251+
for return_type in return_types {
252+
builder.add_routine(
253+
catalog_name,
254+
schema_name,
255+
name,
256+
"FUNCTION",
257+
Self::is_deterministic(udaf.signature()),
258+
return_type,
259+
"AGGREGATE",
260+
udaf.documentation().map(|d| d.description.to_string()),
261+
)
262+
}
263+
}
264+
265+
for (name, udwf) in udwfs {
266+
let return_types = get_udwf_args_and_return_types(udwf)?
267+
.into_iter()
268+
.map(|(_, return_type)| return_type)
269+
.collect::<HashSet<_>>();
270+
for return_type in return_types {
271+
builder.add_routine(
272+
catalog_name,
273+
schema_name,
274+
name,
275+
"FUNCTION",
276+
Self::is_deterministic(udwf.signature()),
277+
return_type,
278+
"WINDOW",
279+
udwf.documentation().map(|d| d.description.to_string()),
280+
)
281+
}
282+
}
283+
Ok(())
284+
}
285+
286+
fn is_deterministic(signature: &Signature) -> bool {
287+
signature.volatility == Volatility::Immutable
288+
}
289+
}
290+
291+
/// get the arguments and return types of a UDF
292+
/// returns a tuple of (arg_types, return_type)
293+
fn get_udf_args_and_return_types(
294+
udf: &Arc<ScalarUDF>,
295+
) -> Result<Vec<(Vec<String>, Option<String>)>> {
296+
let signature = udf.signature();
297+
let arg_types = signature.type_signature.get_possible_types();
298+
if arg_types.is_empty() {
299+
Ok(vec![(vec![], None)])
300+
} else {
301+
Ok(arg_types
302+
.into_iter()
303+
.map(|arg_types| {
304+
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
305+
let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string());
306+
let arg_types = arg_types
307+
.into_iter()
308+
.map(|t| t.to_string())
309+
.collect::<Vec<_>>();
310+
(arg_types, return_type)
311+
})
312+
.collect::<Vec<_>>())
313+
}
314+
}
315+
316+
fn get_udaf_args_and_return_types(
317+
udaf: &Arc<AggregateUDF>,
318+
) -> Result<Vec<(Vec<String>, Option<String>)>> {
319+
let signature = udaf.signature();
320+
let arg_types = signature.type_signature.get_possible_types();
321+
if arg_types.is_empty() {
322+
Ok(vec![(vec![], None)])
323+
} else {
324+
Ok(arg_types
325+
.into_iter()
326+
.map(|arg_types| {
327+
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
328+
let return_type =
329+
udaf.return_type(&arg_types).ok().map(|t| t.to_string());
330+
let arg_types = arg_types
331+
.into_iter()
332+
.map(|t| t.to_string())
333+
.collect::<Vec<_>>();
334+
(arg_types, return_type)
335+
})
336+
.collect::<Vec<_>>())
337+
}
338+
}
339+
340+
fn get_udwf_args_and_return_types(
341+
udwf: &Arc<WindowUDF>,
342+
) -> Result<Vec<(Vec<String>, Option<String>)>> {
343+
let signature = udwf.signature();
344+
let arg_types = signature.type_signature.get_possible_types();
345+
if arg_types.is_empty() {
346+
Ok(vec![(vec![], None)])
347+
} else {
348+
Ok(arg_types
349+
.into_iter()
350+
.map(|arg_types| {
351+
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
352+
let arg_types = arg_types
353+
.into_iter()
354+
.map(|t| t.to_string())
355+
.collect::<Vec<_>>();
356+
(arg_types, None)
357+
})
358+
.collect::<Vec<_>>())
359+
}
211360
}
212361

213362
#[async_trait]
@@ -234,6 +383,7 @@ impl SchemaProvider for InformationSchemaProvider {
234383
VIEWS => Arc::new(InformationSchemaViews::new(config)),
235384
DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
236385
SCHEMATA => Arc::new(InformationSchemata::new(config)),
386+
ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
237387
_ => return Ok(None),
238388
};
239389

@@ -819,3 +969,132 @@ impl InformationSchemaDfSettingsBuilder {
819969
.unwrap()
820970
}
821971
}
972+
973+
#[derive(Debug)]
974+
struct InformationSchemaRoutines {
975+
schema: SchemaRef,
976+
config: InformationSchemaConfig,
977+
}
978+
979+
impl InformationSchemaRoutines {
980+
fn new(config: InformationSchemaConfig) -> Self {
981+
let schema = Arc::new(Schema::new(vec![
982+
Field::new("specific_catalog", DataType::Utf8, false),
983+
Field::new("specific_schema", DataType::Utf8, false),
984+
Field::new("specific_name", DataType::Utf8, false),
985+
Field::new("routine_catalog", DataType::Utf8, false),
986+
Field::new("routine_schema", DataType::Utf8, false),
987+
Field::new("routine_name", DataType::Utf8, false),
988+
Field::new("routine_type", DataType::Utf8, false),
989+
Field::new("is_deterministic", DataType::Boolean, true),
990+
Field::new("data_type", DataType::Utf8, true),
991+
Field::new("function_type", DataType::Utf8, true),
992+
Field::new("description", DataType::Utf8, true),
993+
]));
994+
995+
Self { schema, config }
996+
}
997+
998+
fn builder(&self) -> InformationSchemaRoutinesBuilder {
999+
InformationSchemaRoutinesBuilder {
1000+
schema: self.schema.clone(),
1001+
specific_catalog: StringBuilder::new(),
1002+
specific_schema: StringBuilder::new(),
1003+
specific_name: StringBuilder::new(),
1004+
routine_catalog: StringBuilder::new(),
1005+
routine_schema: StringBuilder::new(),
1006+
routine_name: StringBuilder::new(),
1007+
routine_type: StringBuilder::new(),
1008+
is_deterministic: BooleanBuilder::new(),
1009+
data_type: StringBuilder::new(),
1010+
function_type: StringBuilder::new(),
1011+
description: StringBuilder::new(),
1012+
}
1013+
}
1014+
}
1015+
1016+
struct InformationSchemaRoutinesBuilder {
1017+
schema: SchemaRef,
1018+
specific_catalog: StringBuilder,
1019+
specific_schema: StringBuilder,
1020+
specific_name: StringBuilder,
1021+
routine_catalog: StringBuilder,
1022+
routine_schema: StringBuilder,
1023+
routine_name: StringBuilder,
1024+
routine_type: StringBuilder,
1025+
is_deterministic: BooleanBuilder,
1026+
data_type: StringBuilder,
1027+
function_type: StringBuilder,
1028+
description: StringBuilder,
1029+
}
1030+
1031+
impl InformationSchemaRoutinesBuilder {
1032+
#[allow(clippy::too_many_arguments)]
1033+
fn add_routine(
1034+
&mut self,
1035+
catalog_name: impl AsRef<str>,
1036+
schema_name: impl AsRef<str>,
1037+
routine_name: impl AsRef<str>,
1038+
routine_type: impl AsRef<str>,
1039+
is_deterministic: bool,
1040+
data_type: Option<impl AsRef<str>>,
1041+
function_type: impl AsRef<str>,
1042+
description: Option<impl AsRef<str>>,
1043+
) {
1044+
self.specific_catalog.append_value(catalog_name.as_ref());
1045+
self.specific_schema.append_value(schema_name.as_ref());
1046+
self.specific_name.append_value(routine_name.as_ref());
1047+
self.routine_catalog.append_value(catalog_name.as_ref());
1048+
self.routine_schema.append_value(schema_name.as_ref());
1049+
self.routine_name.append_value(routine_name.as_ref());
1050+
self.routine_type.append_value(routine_type.as_ref());
1051+
self.is_deterministic.append_value(is_deterministic);
1052+
self.data_type.append_option(data_type.as_ref());
1053+
self.function_type.append_value(function_type.as_ref());
1054+
self.description.append_option(description);
1055+
}
1056+
1057+
fn finish(&mut self) -> RecordBatch {
1058+
RecordBatch::try_new(
1059+
self.schema.clone(),
1060+
vec![
1061+
Arc::new(self.specific_catalog.finish()),
1062+
Arc::new(self.specific_schema.finish()),
1063+
Arc::new(self.specific_name.finish()),
1064+
Arc::new(self.routine_catalog.finish()),
1065+
Arc::new(self.routine_schema.finish()),
1066+
Arc::new(self.routine_name.finish()),
1067+
Arc::new(self.routine_type.finish()),
1068+
Arc::new(self.is_deterministic.finish()),
1069+
Arc::new(self.data_type.finish()),
1070+
Arc::new(self.function_type.finish()),
1071+
Arc::new(self.description.finish()),
1072+
],
1073+
)
1074+
.unwrap()
1075+
}
1076+
}
1077+
1078+
impl PartitionStream for InformationSchemaRoutines {
1079+
fn schema(&self) -> &SchemaRef {
1080+
&self.schema
1081+
}
1082+
1083+
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1084+
let config = self.config.clone();
1085+
let mut builder = self.builder();
1086+
Box::pin(RecordBatchStreamAdapter::new(
1087+
self.schema.clone(),
1088+
futures::stream::once(async move {
1089+
config.make_routines(
1090+
ctx.scalar_functions(),
1091+
ctx.aggregate_functions(),
1092+
ctx.window_functions(),
1093+
ctx.session_config().options(),
1094+
&mut builder,
1095+
)?;
1096+
Ok(builder.finish())
1097+
}),
1098+
))
1099+
}
1100+
}

datafusion/core/src/execution/session_state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,15 @@ impl Session for SessionState {
231231
}
232232

233233
fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
234-
self.scalar_functions()
234+
&self.scalar_functions
235235
}
236236

237237
fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
238-
self.aggregate_functions()
238+
&self.aggregate_functions
239239
}
240240

241241
fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
242-
self.window_functions()
242+
&self.window_functions
243243
}
244244

245245
fn runtime_env(&self) -> &Arc<RuntimeEnv> {

0 commit comments

Comments
 (0)