-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
If there is a SubqueryAlias relation, datafusion-substrait will bypass it. This works for the producer, the generated Substrait plans are correct. However, the DF plan generated with the consumer will be incorrect since it has no way to distinguish between the different relations that read from the same table.
This can be demonstrated in these examples:
#[tokio::test]
async fn roundtrip_non_equi_inner_join_table_reuse() -> Result<()> {
roundtrip("SELECT d1.a FROM data d1 JOIN data d2 ON d1.e <> d2.e").await
}
//thread 'tests::roundtrip_non_equi_inner_join_table_reuse' panicked at 'assertion failed: `(left == right)`
// left: `"Projection: d1.a\n Inner Join: Filter: d1.e != d2.e\n SubqueryAlias: d1\n TableScan: data projection=[a, e]\n SubqueryAlias: d2\n TableScan: data projection=[e]"`,
// right: `"Projection: data.a\n Inner Join: \n Projection: data.a\n Filter: data.e != data.e\n TableScan: data projection=[a, e], partial_filters=[data.e != data.e]\n TableScan: data projection=[e]"`', datafusion/substrait/tests/roundtrip_logical_plan.rs:628:9The original DF plan is:
Projection: d1.a
Inner Join: Filter: d1.e != d2.e
SubqueryAlias: d1
TableScan: data projection=[a, e]
SubqueryAlias: d2
TableScan: data projection=[e]once this plan is fed through the producer, we get the correct Substrait plan:
Plan {
version: Some(
Version {
major_number: 0,
minor_number: 31,
patch_number: 0,
git_hash: "",
producer: "datafusion",
},
),
extension_uris: [],
extensions: [
SimpleExtensionDeclaration {
mapping_type: Some(
ExtensionFunction(
ExtensionFunction {
extension_uri_reference: 4294967295,
function_anchor: 0,
name: "not_equal",
},
),
),
},
],
relations: [
PlanRel {
rel_type: Some(
Root(
RelRoot {
input: Some(
Rel {
rel_type: Some(
Project(
ProjectRel {
common: None,
input: Some(
Rel {
rel_type: Some(
Join(
JoinRel {
common: None,
left: Some(
Rel {
rel_type: Some(
Read(
ReadRel {
common: None,
base_schema: Some(
NamedStruct {
names: [
"a",
"b",
"c",
"d",
"e",
"f",
],
r#struct: None,
},
),
filter: None,
best_effort_filter: None,
projection: Some(
MaskExpression {
select: Some(
StructSelect {
struct_items: [
StructItem {
field: 0,
child: None,
},
StructItem {
field: 4,
child: None,
},
],
},
),
maintain_singular_struct: false,
},
),
advanced_extension: None,
read_type: Some(
NamedTable(
NamedTable {
names: [
"data",
],
advanced_extension: None,
},
),
),
},
),
),
},
),
right: Some(
Rel {
rel_type: Some(
Read(
ReadRel {
common: None,
base_schema: Some(
NamedStruct {
names: [
"a",
"b",
"c",
"d",
"e",
"f",
],
r#struct: None,
},
),
filter: None,
best_effort_filter: None,
projection: Some(
MaskExpression {
select: Some(
StructSelect {
struct_items: [
StructItem {
field: 4,
child: None,
},
],
},
),
maintain_singular_struct: false,
},
),
advanced_extension: None,
read_type: Some(
NamedTable(
NamedTable {
names: [
"data",
],
advanced_extension: None,
},
),
),
},
),
),
},
),
expression: None,
post_join_filter: Some(
Expression {
rex_type: Some(
ScalarFunction(
ScalarFunction {
function_reference: 0,
arguments: [
FunctionArgument {
arg_type: Some(
Value(
Expression {
rex_type: Some(
Selection(
FieldReference {
reference_type: Some(
DirectReference(
ReferenceSegment {
reference_type: Some(
StructField(
StructField {
field: 1,
child: None,
},
),
),
},
),
),
root_type: None,
},
),
),
},
),
),
},
FunctionArgument {
arg_type: Some(
Value(
Expression {
rex_type: Some(
Selection(
FieldReference {
reference_type: Some(
DirectReference(
ReferenceSegment {
reference_type: Some(
StructField(
StructField {
field: 2,
child: None,
},
),
),
},
),
),
root_type: None,
},
),
),
},
),
),
},
],
options: [],
output_type: None,
args: [],
},
),
),
},
),
r#type: Inner,
advanced_extension: None,
},
),
),
},
),
expressions: [
Expression {
rex_type: Some(
Selection(
FieldReference {
reference_type: Some(
DirectReference(
ReferenceSegment {
reference_type: Some(
StructField(
StructField {
field: 0,
child: None,
},
),
),
},
),
),
root_type: None,
},
),
),
},
],
advanced_extension: None,
},
),
),
},
),
names: [
"d1.a",
],
},
),
),
},
],
advanced_extensions: None,
expected_type_urls: [],
}however, if we want to get back a DF plan, and use the consumer, we'll get:
[Unoptimized plan]
Projection: data.a
Inner Join: Filter: data.e != data.e
TableScan: data projection=[a, e]
TableScan: data projection=[e]
[Optimized plan]
Projection: data.a
Inner Join:
Projection: data.a
Filter: data.e != data.e
TableScan: data projection=[a, e], partial_filters=[data.e != data.e]
TableScan: data projection=[e]Notice that because there is no way for DF to distinguish between the left data table and the right data table, DF thinks they are they are from the same TableScan relation. Thus, the output DF plan is incorrect.
Describe the solution you'd like
Preserve aliases in Substrait.
Describe alternatives you've considered
N/A
Additional context
Additional example:
#[tokio::test]
async fn roundtrip_exists_filter() -> Result<()> {
roundtrip("SELECT b FROM data d1 WHERE EXISTS (SELECT * FROM data d2 WHERE d2.a = d1.a AND d2.e != d1.e)").await
}
// thread 'tests::roundtrip_exists_filter' panicked at 'assertion failed: `(left == right)`
// left: `"Projection: d1.b\n LeftSemi Join: d1.a = __correlated_sq_1.a Filter: __correlated_sq_1.e != d1.e\n SubqueryAlias: d1\n TableScan: data projection=[a, b, e]\n SubqueryAlias: __correlated_sq_1\n SubqueryAlias: d2\n TableScan: data projection=[a, e]"`,
// right: `"Projection: data.b\n LeftSemi Join: data.a = data.a\n Projection: data.a, data.b\n Filter: data.e != data.e\n TableScan: data projection=[a, b, e], partial_filters=[data.e != data.e]\n TableScan: data projection=[a]"`', datafusion/substrait/tests/roundtrip_logical_plan.rs:625:9[Original plan]
Projection: d1.b
LeftSemi Join: d1.a = __correlated_sq_1.a Filter: __correlated_sq_1.e != d1.e
SubqueryAlias: d1
TableScan: data projection=[a, b, e]
SubqueryAlias: __correlated_sq_1
SubqueryAlias: d2
TableScan: data projection=[a, e]
[Unoptimized plan from consumer]
Projection: data.b
LeftSemi Join: data.a = data.a Filter: data.e != data.e
TableScan: data projection=[a, b, e]
TableScan: data projection=[a, e]
[Optimized plan from consumer (incorrect)]
Projection: data.b
LeftSemi Join: data.a = data.a
Projection: data.a, data.b
Filter: data.e != data.e
TableScan: data projection=[a, b, e], partial_filters=[data.e != data.e]
TableScan: data projection=[a]