You have a pretty nice outline here. To answer your questions
- A separate
task
does need to be launched for each partition of data for eachstage
. Consider that each partition will likely reside on distinct physical locations – e.g. blocks in HDFS or directories/volumes for a local file system.
Note that the submission of Stage
s is driven by the DAG Scheduler
. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.
We can see that in action in the following toy example in which we do the following types of operations:
- load two datasources
- perform some map operation on both of the data sources separately
- join them
- perform some map and filter operations on the result
- save the result
So then how many stages will we end up with?
- 1 stage each for loading the two datasources in parallel = 2 stages
- A third stage representing the
join
that is dependent on the other two stages - Note: all of the follow-on operations working on the joined data may be performed in the same stage because they must happen sequentially. There is no benefit to launching additional stages because they can not start work until the prior operation were completed.
Here is that toy program
val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")
And here is the DAG of the result
Now: how many tasks ? The number of tasks should be equal to
Sum of (Stage
* #Partitions in the stage
)