How to write spark streaming DF to Kafka topic

Yes, unfortunately Spark (1.x, 2.x) doesn’t make it straight-forward how to write to Kafka in an efficient manner.

I’d suggest the following approach:

  • Use (and re-use) one KafkaProducer instance per executor process/JVM.

Here’s the high-level setup for this approach:

  1. First, you must “wrap” Kafka’s KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to “ship” it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don’t need to worry about KafkaProducer not being serializable.
  2. You “ship” the wrapped producer to each executor by using a broadcast variable.
  3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.

The code snippets below work with Spark Streaming as of Spark 2.0.

Step 1: Wrapping KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer instance

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer instance (for each executor)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

Hope this helps.

Leave a Comment