Stackoverflow due to long RDD Lineage

In general you can use checkpoints to break long lineages. Some more or less similar to this should work:

import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

val checkpointInterval: Int = ???

def loadAndFilter(path: String) = sc.textFile(path)
  .filter(_.startsWith("#####"))
  .map((path, _))

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
  (acc: RDD[T], xi: (RDD[T], Int)) = {
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
    else xi._1.union(acc)
  }

val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)]
fileList.map(loadAndFilter).zipWithIndex
  .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))

In this particular situation a much simpler solution should be to use SparkContext.union method:

val masterRDD = sc.union(
  fileList.map(path => sc.textFile(path)
    .filter(_.startsWith("#####"))
    .map((path, _))) 
)

A difference between these methods should be obvious when you take a look at the DAG generated by loop / reduce:

enter image description here

and a single union:

enter image description here

Of course if files are small you can combine wholeTextFiles with flatMap and read all files at once:

sc.wholeTextFiles(fileList.mkString(","))
  .flatMap{case (path, text) =>  
    text.split("\n").filter(_.startsWith("#####")).map((path, _))}

Leave a Comment