Skip to content

Commit 0180b84

Browse files
snowmoon-zhangrxin
authored andcommitted
[SPARK-10577] [PYSPARK] DataFrame hint for broadcast join
https://issues.apache.org/jira/browse/SPARK-10577 Author: Jian Feng <[email protected]> Closes apache#8801 from Jianfeng-chs/master.
1 parent bf20d6c commit 0180b84

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

python/pyspark/sql/functions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
3030
from pyspark.sql.types import StringType
3131
from pyspark.sql.column import Column, _to_java_column, _to_seq
32+
from pyspark.sql.dataframe import DataFrame
3233

3334

3435
def _create_function(name, doc=""):
@@ -189,6 +190,14 @@ def approxCountDistinct(col, rsd=None):
189190
return Column(jc)
190191

191192

193+
@since(1.6)
194+
def broadcast(df):
195+
"""Marks a DataFrame as small enough for use in broadcast joins."""
196+
197+
sc = SparkContext._active_spark_context
198+
return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx)
199+
200+
192201
@since(1.4)
193202
def coalesce(*cols):
194203
"""Returns the first column that is not null.

python/pyspark/sql/tests.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,24 @@ def foo():
10751075

10761076
self.assertRaises(TypeError, foo)
10771077

1078+
# add test for SPARK-10577 (test broadcast join hint)
1079+
def test_functions_broadcast(self):
1080+
from pyspark.sql.functions import broadcast
1081+
1082+
df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
1083+
df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
1084+
1085+
# equijoin - should be converted into broadcast join
1086+
plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan()
1087+
self.assertEqual(1, plan1.toString().count("BroadcastHashJoin"))
1088+
1089+
# no join key -- should not be a broadcast join
1090+
plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan()
1091+
self.assertEqual(0, plan2.toString().count("BroadcastHashJoin"))
1092+
1093+
# planner should not crash without a join
1094+
broadcast(df1)._jdf.queryExecution().executedPlan()
1095+
10781096

10791097
class HiveContextSQLTests(ReusedPySparkTestCase):
10801098

0 commit comments

Comments
 (0)