How does HashPartitioner work?

Well, lets make your dataset marginally more interesting:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

We have six elements:

rdd.count
Long = 6

no partitioner:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

and eight partitions:

rdd.partitions.length
Int = 8

Now lets define small helper to count number of elements per partition:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Since we don’t have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

inital-distribution

Now lets repartition our dataset:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Since parameter passed to HashPartitioner defines number of partitions we have expect one partition:

rddOneP.partitions.length
Int = 1

Since we have only one partition it contains all elements:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

hash-partitioner-1

Note that the order of values after the shuffle is non-deterministic.

Same way if we use HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

we’ll get 2 partitions:

rddTwoP.partitions.length
Int = 2

Since rdd is partitioned by key data won’t be distributed uniformly anymore:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Because with have three keys and only two different values of hashCode mod numPartitions there is nothing unexpected here:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Just to confirm the above:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

hash-partitioner-2

Finally with HashPartitioner(7) we get seven partitions, three non-empty with 2 elements each:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

hash-partitioner-7

Summary and Notes

  • HashPartitioner takes a single argument which defines number of partitions
  • values are assigned to partitions using hash of keys. hash function may differ depending on the language (Scala RDD may use hashCode, DataSets use MurmurHash 3, PySpark, portable_hash).

    In simple case like this, where key is a small integer, you can assume that hash is an identity (i = hash(i)).

    Scala API uses nonNegativeMod to determine partition based on computed hash,

  • if distribution of keys is not uniform you can end up in situations when part of your cluster is idle

  • keys have to be hashable. You can check my answer for A list as a key for PySpark’s reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:

    Java arrays have hashCodes that are based on the arrays’ identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.

  • In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?

  • Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.

  • Please note that currently hash based methods don’t work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).

  • HashPartitioner (or any other Partitioner) shuffles the data. Unless partitioning is reused between multiple operations it doesn’t reduce amount of data to be shuffled.

Leave a Comment