Random numbers generation in PySpark

So the actual problem here is relatively simple. Each subprocess in Python inherits its state from its parent: len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) # 1 Since parent state has no reason to change in this particular scenario and workers have a limited lifespan, state of every child will be exactly the same on each run.

How to transpose an RDD in Spark

Say you have an N×M matrix. If both N and M are so small that you can hold N×M items in memory, it doesn’t make much sense to use an RDD. But transposing it is easy: val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) val transposed = sc.parallelize(rdd.collect.toSeq.transpose) If N or … Read more

Why is the fold action necessary in Spark?

Empty RDD It cannot be substituted when RDD is empty: val rdd = sc.emptyRDD[Int] rdd.reduce(_ + _) // java.lang.UnsupportedOperationException: empty collection at // org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ … rdd.fold(0)(_ + _) // Int = 0 You can of course combine reduce with condition on isEmpty but it is rather ugly. Mutable buffer Another use case for fold is … Read more

Does a join of co-partitioned RDDs cause a shuffle in Apache Spark?

No. If two RDDs have the same partitioner, the join will not cause a shuffle. You can see this in CoGroupedRDD.scala: override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug(“Adding one-to-one dependency with ” + rdd) new OneToOneDependency(rdd) } else { logDebug(“Adding shuffle dependency … Read more

How to sort an RDD in Scala Spark?

If you only need the top 10, use rdd.top(10). It avoids sorting, so it is faster. rdd.top makes one parallel pass through the data, collecting the top N in each partition in a heap, then merges the heaps. It is an O(rdd.count) operation. Sorting would be O(rdd.count log rdd.count), and incur a lot of data … Read more

Difference between SparkContext, JavaSparkContext, SQLContext, and SparkSession?

sparkContext is a Scala implementation entry point and JavaSparkContext is a java wrapper of sparkContext. SQLContext is entry point of SparkSQL which can be received from sparkContext.Prior to 2.x.x, RDD ,DataFrame and Data-set were three different data abstractions.Since Spark 2.x.x, All three data abstractions are unified and SparkSession is the unified entry point of Spark. … Read more

Reading in multiple files compressed in tar.gz archive into Spark [duplicate]

A solution is given in Read whole text files from a compression in Spark . Using the code sample provided, I was able to create a DataFrame from the compressed archive like so: val jsonRDD = sc.binaryFiles(“gzarchive/*”). flatMapValues(x => extractFiles(x).toOption). mapValues(_.map(decode()) val df = sqlContext.read.json(jsonRDD.map(_._2).flatMap(x => x)) This method works fine for tar archives of … Read more