Processing multiple files as independent RDD’s in parallel

It is more an idea than a full solution and I haven’t tested it yet.

You can start with extracting your data processing pipeline into a function.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

If your files are small you can adjust n parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we’ll get back to this issue later.

val n: Int = ??? 

Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:

val files: Array[String] = ???

Next you can map above list using pipeline function:

val rdds = files.map(f => pipeline(f, n))

Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

Finally we can use above helper on the rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

Depending on your requirements you can add onComplete callbacks or use reactive streams to collect the results.

Leave a Comment