Parsing multiline records in Scala

By default Spark creates a single element per line. It means that in your case every record is spread over multiple elements which, as stated by Daniel Darabos in the comments, can be processed by different workers.

Since it looks like your data is relatively regular and separated by an empty line you should be able to use newAPIHadoopFile with custom delimiter:

import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val path: String = ???

val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration
conf.set("textinputformat.record.delimiter", "\n\n")

val usgRDD = sc.newAPIHadoopFile(
    path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
  .map{ case (_, v) => v.toString }

val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("\n") match {
  case Array(x, xs @ _*) => (x, xs)
})

In Spark 2.4 or later data loading part can be also achieved using Dataset API:

val ds: Dataset[String] = spark.read.option("lineSep", "\n\n").text(path)

Leave a Comment