Spark SQL broadcast hash join

You can explicitly mark the DataFrame as small enough for broadcasting
using broadcast function:

Python:

from pyspark.sql.functions import broadcast

small_df = ...
large_df = ...

large_df.join(broadcast(small_df), ["foo"])

or broadcast hint (Spark >= 2.2):

large_df.join(small_df.hint("broadcast"), ["foo"])

Scala:

import org.apache.spark.sql.functions.broadcast

val smallDF: DataFrame = ???
val largeDF: DataFrame = ???

largeDF.join(broadcast(smallDF), Seq("foo"))

or broadcast hint (Spark >= 2.2):

largeDF.join(smallDF.hint("broadcast"), Seq("foo"))

SQL

You can use hints (Spark >= 2.2):

SELECT /*+ MAPJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

or

SELECT /*+  BROADCASTJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo

or

SELECT /*+ BROADCAST(small) */ * 
FROM large JOIN small
ON larger.foo = small.foo

R (SparkR):

With hint (Spark >= 2.2):

join(large, hint(small, "broadcast"), large$foo == small$foo)

With broadcast (Spark >= 2.3)

join(large, broadcast(small), large$foo == small$foo)

Note:

Broadcast join is useful if one of structures is relatively small. Otherwise it can be significantly more expensive than a full shuffle.

Leave a Comment