How to read records in JSON format from Kafka using Structured Streaming?

From the Spark perspective value is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.

If data is serialized as a JSON string you have two options. You can cast value to StringType and use from_json and provide a schema:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

or cast to StringType, extract fields by path using get_json_object:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

and cast later to the desired types.

Leave a Comment