Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 240 additions & 3 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use arrow_array::builder::BooleanBuilder;
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, WindowUDF};
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::{any::Any, sync::Arc};
Expand All @@ -50,10 +50,18 @@ pub(crate) const COLUMNS: &str = "columns";
pub(crate) const DF_SETTINGS: &str = "df_settings";
pub(crate) const SCHEMATA: &str = "schemata";
pub(crate) const ROUTINES: &str = "routines";
pub(crate) const PARAMETERS: &str = "parameters";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA, ROUTINES];
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
TABLES,
VIEWS,
COLUMNS,
DF_SETTINGS,
SCHEMATA,
ROUTINES,
PARAMETERS,
];

/// Implements the `information_schema` virtual schema and tables
///
Expand Down Expand Up @@ -286,6 +294,102 @@ impl InformationSchemaConfig {
fn is_deterministic(signature: &Signature) -> bool {
signature.volatility == Volatility::Immutable
}
fn make_parameters(
&self,
udfs: &HashMap<String, Arc<ScalarUDF>>,
udafs: &HashMap<String, Arc<AggregateUDF>>,
udwfs: &HashMap<String, Arc<WindowUDF>>,
config_options: &ConfigOptions,
builder: &mut InformationSchemaParametersBuilder,
) -> Result<()> {
let catalog_name = &config_options.catalog.default_catalog;
let schema_name = &config_options.catalog.default_schema;
let mut add_parameters = |func_name: &str,
args: Option<&Vec<(String, String)>>,
arg_types: Vec<String>,
return_type: Option<String>,
is_variadic: bool| {
for (position, type_name) in arg_types.iter().enumerate() {
let param_name =
args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
builder.add_parameter(
catalog_name,
schema_name,
func_name,
position as u64 + 1,
"IN",
param_name,
type_name,
None::<&str>,
is_variadic,
);
}
if let Some(return_type) = return_type {
builder.add_parameter(
catalog_name,
schema_name,
func_name,
1,
"OUT",
None::<&str>,
return_type.as_str(),
None::<&str>,
false,
);
}
};

for (func_name, udf) in udfs {
let args = udf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udf_args_and_return_types(udf)?;
for (arg_types, return_type) in combinations {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udf.signature()),
);
}
}

for (func_name, udaf) in udafs {
let args = udaf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udaf_args_and_return_types(udaf)?;
for (arg_types, return_type) in combinations {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udaf.signature()),
);
}
}

for (func_name, udwf) in udwfs {
let args = udwf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udwf_args_and_return_types(udwf)?;
for (arg_types, return_type) in combinations {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udwf.signature()),
);
}
}

Ok(())
}

fn is_variadic(signature: &Signature) -> bool {
matches!(
signature.type_signature,
TypeSignature::Variadic(_) | TypeSignature::VariadicAny
)
}
}

/// get the arguments and return types of a UDF
Expand Down Expand Up @@ -384,6 +488,7 @@ impl SchemaProvider for InformationSchemaProvider {
DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
SCHEMATA => Arc::new(InformationSchemata::new(config)),
ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
_ => return Ok(None),
};

Expand Down Expand Up @@ -1098,3 +1203,135 @@ impl PartitionStream for InformationSchemaRoutines {
))
}
}

#[derive(Debug)]
struct InformationSchemaParameters {
schema: SchemaRef,
config: InformationSchemaConfig,
}

impl InformationSchemaParameters {
fn new(config: InformationSchemaConfig) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("specific_catalog", DataType::Utf8, false),
Field::new("specific_schema", DataType::Utf8, false),
Field::new("specific_name", DataType::Utf8, false),
Field::new("ordinal_position", DataType::UInt64, false),
Field::new("parameter_mode", DataType::Utf8, false),
Field::new("parameter_name", DataType::Utf8, true),
Field::new("data_type", DataType::Utf8, false),
Field::new("parameter_default", DataType::Utf8, true),
Field::new("is_variadic", DataType::Boolean, false),
]));

Self { schema, config }
}

fn builder(&self) -> InformationSchemaParametersBuilder {
InformationSchemaParametersBuilder {
schema: self.schema.clone(),
specific_catalog: StringBuilder::new(),
specific_schema: StringBuilder::new(),
specific_name: StringBuilder::new(),
ordinal_position: UInt64Builder::new(),
parameter_mode: StringBuilder::new(),
parameter_name: StringBuilder::new(),
data_type: StringBuilder::new(),
parameter_default: StringBuilder::new(),
is_variadic: BooleanBuilder::new(),
inserted: HashSet::new(),
}
}
}

struct InformationSchemaParametersBuilder {
schema: SchemaRef,
specific_catalog: StringBuilder,
specific_schema: StringBuilder,
specific_name: StringBuilder,
ordinal_position: UInt64Builder,
parameter_mode: StringBuilder,
parameter_name: StringBuilder,
data_type: StringBuilder,
parameter_default: StringBuilder,
is_variadic: BooleanBuilder,
// use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type)
inserted: HashSet<(String, u64, String, String)>,
}

impl InformationSchemaParametersBuilder {
#[allow(clippy::too_many_arguments)]
fn add_parameter(
&mut self,
specific_catalog: impl AsRef<str>,
specific_schema: impl AsRef<str>,
specific_name: impl AsRef<str>,
ordinal_position: u64,
parameter_mode: impl AsRef<str>,
parameter_name: Option<impl AsRef<str>>,
data_type: impl AsRef<str>,
parameter_default: Option<impl AsRef<str>>,
is_variadic: bool,
) {
let key = (
specific_name.as_ref().to_string(),
ordinal_position,
parameter_mode.as_ref().to_string(),
data_type.as_ref().to_string(),
);
if self.inserted.insert(key) {
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
}
}

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(self.specific_catalog.finish()),
Arc::new(self.specific_schema.finish()),
Arc::new(self.specific_name.finish()),
Arc::new(self.ordinal_position.finish()),
Arc::new(self.parameter_mode.finish()),
Arc::new(self.parameter_name.finish()),
Arc::new(self.data_type.finish()),
Arc::new(self.parameter_default.finish()),
Arc::new(self.is_variadic.finish()),
],
)
.unwrap()
}
}

impl PartitionStream for InformationSchemaParameters {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
futures::stream::once(async move {
config.make_parameters(
ctx.scalar_functions(),
ctx.aggregate_functions(),
ctx.window_functions(),
ctx.session_config().options(),
&mut builder,
)?;
Ok(builder.finish())
}),
))
}
}
57 changes: 57 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand Down Expand Up @@ -84,6 +85,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -99,6 +101,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -111,6 +114,7 @@ SELECT * from information_schema.tables WHERE tables.table_schema='information_s
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -121,6 +125,7 @@ SELECT * from information_schema.tables WHERE information_schema.tables.table_sc
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand All @@ -131,6 +136,7 @@ SELECT * from information_schema.tables WHERE datafusion.information_schema.tabl
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand Down Expand Up @@ -454,6 +460,7 @@ SHOW TABLES
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema parameters VIEW
datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
Expand Down Expand Up @@ -636,3 +643,53 @@ query B
select is_deterministic from information_schema.routines where routine_name = 'now';
----
false

# test every function type are included in the result
query TTTITTTTB rowsort
select * from information_schema.parameters where specific_name = 'date_trunc' OR specific_name = 'string_agg' OR specific_name = 'rank';
----
datafusion public date_trunc 1 IN precision Utf8 NULL false
datafusion public date_trunc 1 IN precision Utf8View NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Microsecond, Some("+TZ")) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Millisecond, Some("+TZ")) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Nanosecond, Some("+TZ")) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Second, None) NULL false
datafusion public date_trunc 1 OUT NULL Timestamp(Second, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Microsecond, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Microsecond, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Millisecond, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Millisecond, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Nanosecond, Some("+TZ")) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Second, None) NULL false
datafusion public date_trunc 2 IN expression Timestamp(Second, Some("+TZ")) NULL false
datafusion public string_agg 1 IN expression LargeUtf8 NULL false
datafusion public string_agg 1 OUT NULL LargeUtf8 NULL false
datafusion public string_agg 2 IN delimiter LargeUtf8 NULL false
datafusion public string_agg 2 IN delimiter Null NULL false
datafusion public string_agg 2 IN delimiter Utf8 NULL false

# test variable length arguments
query TTTB rowsort
select specific_name, data_type, parameter_mode, is_variadic from information_schema.parameters where specific_name = 'concat';
----
concat LargeUtf8 IN true
concat LargeUtf8 OUT false
concat Utf8 IN true
concat Utf8 OUT false
concat Utf8View IN true
concat Utf8View OUT false

# test ceorcion signature
query TTIT rowsort
select specific_name, data_type, ordinal_position, parameter_mode from information_schema.parameters where specific_name = 'repeat';
----
repeat Int64 2 IN
repeat LargeUtf8 1 IN
repeat LargeUtf8 1 OUT
repeat Utf8 1 IN
repeat Utf8 1 OUT
repeat Utf8View 1 IN
Loading