@@ -209,6 +209,10 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
209209
210210 // --- Unit tests of EnsureRequirements ---------------------------------------------------------
211211
212+ // When it comes to testing whether EnsureRequirements properly ensures distribution requirements,
213+ // there two dimensions that need to be considered: are the child partitionings compatible and
214+ // do they satisfy the distribution requirements? As a result, we need at least four test cases.
215+
212216 private def assertDistributionRequirementsAreSatisfied (outputPlan : SparkPlan ): Unit = {
213217 if (outputPlan.requiresChildPartitioningsToBeCompatible) {
214218 val childPartitionings = outputPlan.children.map(_.outputPartitioning)
@@ -223,7 +227,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
223227 }
224228 }
225229
226- test(" EnsureRequirements ensures that child partitionings guarantee each other, if required " ) {
230+ test(" EnsureRequirements with incompatible child partitionings which satisfy distribution " ) {
227231 // Consider an operator that requires inputs that are clustered by two expressions (e.g.
228232 // sort merge join where there are multiple columns in the equi-join condition)
229233 val clusteringA = Literal (1 ) :: Nil
@@ -241,8 +245,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
241245 assert(! rightPartitioning.guarantees(leftPartitioning))
242246 val inputPlan = DummyPlan (
243247 children = Seq (
244- DummyPlan (outputPartitioning = HashPartitioning (clusteringA, 1 ) ),
245- DummyPlan (outputPartitioning = HashPartitioning (clusteringB, 1 ) )
248+ DummyPlan (outputPartitioning = leftPartitioning ),
249+ DummyPlan (outputPartitioning = rightPartitioning )
246250 ),
247251 requiresChildPartitioningsToBeCompatible = true ,
248252 requiredChildDistribution = Seq (distribution, distribution),
@@ -251,14 +255,13 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
251255 val outputPlan = EnsureRequirements (sqlContext).apply(inputPlan)
252256 assertDistributionRequirementsAreSatisfied(outputPlan)
253257 if (outputPlan.collect { case Exchange (_, _) => true }.isEmpty) {
254- fail(s " Exchanges should have been added:\n $outputPlan" )
258+ fail(s " Exchange should have been added:\n $outputPlan" )
255259 }
256260 }
257261
258- test(" EnsureRequirements ensures that children produce same number of partitions, if required " ) {
262+ test(" EnsureRequirements with child partitionings with different numbers of output partitions " ) {
259263 // This is similar to the previous test, except it checks that partitionings are not compatible
260- // unless they produce the same number of partitions. This requirement is also enforced via
261- // assertions in Exchange.
264+ // unless they produce the same number of partitions.
262265 val clustering = Literal (1 ) :: Nil
263266 val distribution = ClusteredDistribution (clustering)
264267 val inputPlan = DummyPlan (
@@ -274,6 +277,50 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
274277 assertDistributionRequirementsAreSatisfied(outputPlan)
275278 }
276279
280+ test(" EnsureRequirements with compatible child partitionings that do not satisfy distribution" ) {
281+ val distribution = ClusteredDistribution (Literal (1 ) :: Nil )
282+ // The left and right inputs have compatible partitionings but they do not satisfy the
283+ // distribution because they are clustered on different columns. Thus, we need to shuffle.
284+ val childPartitioning = HashPartitioning (Literal (2 ) :: Nil , 1 )
285+ assert(! childPartitioning.satisfies(distribution))
286+ val inputPlan = DummyPlan (
287+ children = Seq (
288+ DummyPlan (outputPartitioning = childPartitioning),
289+ DummyPlan (outputPartitioning = childPartitioning)
290+ ),
291+ requiresChildPartitioningsToBeCompatible = true ,
292+ requiredChildDistribution = Seq (distribution, distribution),
293+ requiredChildOrdering = Seq (Seq .empty, Seq .empty)
294+ )
295+ val outputPlan = EnsureRequirements (sqlContext).apply(inputPlan)
296+ assertDistributionRequirementsAreSatisfied(outputPlan)
297+ if (outputPlan.collect { case Exchange (_, _) => true }.isEmpty) {
298+ fail(s " Exchange should have been added: \n $outputPlan" )
299+ }
300+ }
301+
302+ test(" EnsureRequirements with compatible child partitionings that satisfy distribution" ) {
303+ // In this case, all requirements are satisfied and no exchange should be added.
304+ val distribution = ClusteredDistribution (Literal (1 ) :: Nil )
305+ val childPartitioning = HashPartitioning (Literal (1 ) :: Nil , 5 )
306+ assert(childPartitioning.satisfies(distribution))
307+ val inputPlan = DummyPlan (
308+ children = Seq (
309+ DummyPlan (outputPartitioning = childPartitioning),
310+ DummyPlan (outputPartitioning = childPartitioning)
311+ ),
312+ requiresChildPartitioningsToBeCompatible = true ,
313+ requiredChildDistribution = Seq (distribution, distribution),
314+ requiredChildOrdering = Seq (Seq .empty, Seq .empty)
315+ )
316+ val outputPlan = EnsureRequirements (sqlContext).apply(inputPlan)
317+ assertDistributionRequirementsAreSatisfied(outputPlan)
318+ if (outputPlan.collect { case Exchange (_, _) => true }.nonEmpty) {
319+ fail(s " Exchange should not have been added: \n $outputPlan" )
320+ }
321+ }
322+
323+ // This is a regression test for SPARK-9703
277324 test(" EnsureRequirements should not repartition if only ordering requirement is unsatisfied" ) {
278325 // Consider an operator that imposes both output distribution and ordering requirements on its
279326 // children, such as sort sort merge join. If the distribution requirements are satisfied but
0 commit comments