How are stages split into tasks in Spark?

You have a pretty nice outline here. To answer your questions

  • A separate task does need to be launched for each partition of data for each stage. Consider that each partition will likely reside on distinct physical locations – e.g. blocks in HDFS or directories/volumes for a local file system.

Note that the submission of Stages is driven by the DAG Scheduler. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.

We can see that in action in the following toy example in which we do the following types of operations:

  • load two datasources
  • perform some map operation on both of the data sources separately
  • join them
  • perform some map and filter operations on the result
  • save the result

So then how many stages will we end up with?

  • 1 stage each for loading the two datasources in parallel = 2 stages
  • A third stage representing the join that is dependent on the other two stages
  • Note: all of the follow-on operations working on the joined data may be performed in the same stage because they must happen sequentially. There is no benefit to launching additional stages because they can not start work until the prior operation were completed.

Here is that toy program

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

And here is the DAG of the result

enter image description here

Now: how many tasks ? The number of tasks should be equal to

Sum of (Stage * #Partitions in the stage)

Leave a Comment