Skip to content

Commit 82aca7e

Browse files
aokolnychyirdblue
authored andcommitted
[SPARK-33779][SQL] DataSource V2: API to request distribution and ordering on write
### What changes were proposed in this pull request? This PR adds connector interfaces proposed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. **Note**: This PR contains a subset of changes discussed in PR apache#29066. ### Why are the changes needed? Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful: - global sort - cluster data and sort within partitions - local sort within partitions - no sort Please see the design doc above for a more detailed explanation of requirements. ### Does this PR introduce _any_ user-facing change? This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info). The existing `Distribution` interface in `read` package is read-specific and not flexible enough like discussed in the design doc. The current proposal is to evolve these interfaces separately until they converge. ### How was this patch tested? This patch adds only interfaces. Closes apache#30706 from aokolnychyi/spark-23889-interfaces. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Ryan Blue <[email protected]>
1 parent 839d689 commit 82aca7e

File tree

14 files changed

+626
-10
lines changed

14 files changed

+626
-10
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.Expression;
22+
23+
/**
24+
* A distribution where tuples that share the same values for clustering expressions are co-located
25+
* in the same partition.
26+
*
27+
* @since 3.2.0
28+
*/
29+
@Experimental
30+
public interface ClusteredDistribution extends Distribution {
31+
/**
32+
* Returns clustering expressions.
33+
*/
34+
Expression[] clustering();
35+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* An interface that defines how data is distributed across partitions.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public interface Distribution {}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.Expression;
22+
import org.apache.spark.sql.connector.expressions.SortOrder;
23+
24+
/**
25+
* Helper methods to create distributions to pass into Spark.
26+
*
27+
* @since 3.2.0
28+
*/
29+
@Experimental
30+
public class Distributions {
31+
private Distributions() {
32+
}
33+
34+
/**
35+
* Creates a distribution where no promises are made about co-location of data.
36+
*/
37+
public static UnspecifiedDistribution unspecified() {
38+
return LogicalDistributions.unspecified();
39+
}
40+
41+
/**
42+
* Creates a distribution where tuples that share the same values for clustering expressions are
43+
* co-located in the same partition.
44+
*/
45+
public static ClusteredDistribution clustered(Expression[] clustering) {
46+
return LogicalDistributions.clustered(clustering);
47+
}
48+
49+
/**
50+
* Creates a distribution where tuples have been ordered across partitions according
51+
* to ordering expressions, but not necessarily within a given partition.
52+
*/
53+
public static OrderedDistribution ordered(SortOrder[] ordering) {
54+
return LogicalDistributions.ordered(ordering);
55+
}
56+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.expressions.SortOrder;
22+
23+
/**
24+
* A distribution where tuples have been ordered across partitions according
25+
* to ordering expressions, but not necessarily within a given partition.
26+
*
27+
* @since 3.2.0
28+
*/
29+
@Experimental
30+
public interface OrderedDistribution extends Distribution {
31+
/**
32+
* Returns ordering expressions.
33+
*/
34+
SortOrder[] ordering();
35+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.distributions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A distribution where no promises are made about co-location of data.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public interface UnspecifiedDistribution extends Distribution {}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expressions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,15 @@ public static Transform hours(String column) {
164164
return LogicalExpressions.hours(Expressions.column(column));
165165
}
166166

167+
/**
168+
* Create a sort expression.
169+
*
170+
* @param expr an expression to produce values to sort
171+
* @param direction direction of the sort
172+
* @param nullOrder null order of the sort
173+
* @return a SortOrder
174+
*/
175+
public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nullOrder) {
176+
return LogicalExpressions.sort(expr, direction, nullOrder);
177+
}
167178
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A null order used in sorting expressions.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public enum NullOrdering {
29+
NULLS_FIRST, NULLS_LAST;
30+
31+
@Override
32+
public String toString() {
33+
switch (this) {
34+
case NULLS_FIRST:
35+
return "NULLS FIRST";
36+
case NULLS_LAST:
37+
return "NULLS LAST";
38+
default:
39+
throw new IllegalArgumentException("Unexpected null order: " + this);
40+
}
41+
}
42+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* A sort direction used in sorting expressions.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public enum SortDirection {
29+
ASCENDING, DESCENDING;
30+
31+
@Override
32+
public String toString() {
33+
switch (this) {
34+
case ASCENDING:
35+
return "ASC";
36+
case DESCENDING:
37+
return "DESC";
38+
default:
39+
throw new IllegalArgumentException("Unexpected sort direction: " + this);
40+
}
41+
}
42+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* Represents a sort order in the public expression API.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Experimental
28+
public interface SortOrder extends Expression {
29+
/**
30+
* Returns the sort expression.
31+
*/
32+
Expression expression();
33+
34+
/**
35+
* Returns the sort direction.
36+
*/
37+
SortDirection direction();
38+
39+
/**
40+
* Returns the null ordering.
41+
*/
42+
NullOrdering nullOrdering();
43+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.sql.connector.distributions.Distribution;
22+
import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
23+
import org.apache.spark.sql.connector.expressions.SortOrder;
24+
25+
/**
26+
* A write that requires a specific distribution and ordering of data.
27+
*
28+
* @since 3.2.0
29+
*/
30+
@Experimental
31+
public interface RequiresDistributionAndOrdering extends Write {
32+
/**
33+
* Returns the distribution required by this write.
34+
* <p>
35+
* Spark will distribute incoming records across partitions to satisfy the required distribution
36+
* before passing the records to the data source table on write.
37+
* <p>
38+
* Implementations may return {@link UnspecifiedDistribution} if they don't require any specific
39+
* distribution of data on write.
40+
*
41+
* @return the required distribution
42+
*/
43+
Distribution requiredDistribution();
44+
45+
/**
46+
* Returns the ordering required by this write.
47+
* <p>
48+
* Spark will order incoming records within partitions to satisfy the required ordering
49+
* before passing those records to the data source table on write.
50+
* <p>
51+
* Implementations may return an empty array if they don't require any specific ordering of data
52+
* on write.
53+
*
54+
* @return the required ordering
55+
*/
56+
SortOrder[] requiredOrdering();
57+
}

0 commit comments

Comments
 (0)