Apache Spark – foreach Vs foreachPartition When to use What?

foreach and foreachPartitions are actions.

foreach(function): Unit

A generic function for invoking operations with side effects. For each
element in the RDD, it invokes the passed function . This is
generally used for manipulating accumulators or writing to external
stores.

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

example :

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(function): Unit

Similar to foreach() , but instead of invoking function for each
element, it calls it for each partition. The function should be able
to accept an iterator. This is more efficient than foreach() because
it reduces the number of function calls (just like mapPartitions() ).

Usage of foreachPartition examples:


  • Example1 : for each partition one database connection (Inside for each partition block) you want to use then this is an example usage of how it can be done using scala.
/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }
  • Example2 :

Usage of foreachPartition with sparkstreaming (dstreams) and kafka producer

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

Note : If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using
sparkContext.broadcast since Kafka producer is asynchronous and
buffers data heavily before sending.


Accumulator samples snippet to play around with it… through which
you can test the performance

     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }

      test("Foreach partition - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
        assert(accum.value == 6L)
      }

Conclusion :

foreachPartition operations on partitions so obviously it would be
better edge than foreach

Rule of Thumb :

foreachPartition should be used when you are accessing costly
resources such as database connections or kafka producer etc.. which would initialize
one per partition rather than one per element(foreach). when it
comes to accumulators you can measure the performance by above test
methods, which should work faster in case of accumulators as well..

Also… see map vs mappartitions which has similar concept but they are tranformations.

Leave a Comment