Stratified sampling with pyspark

The solution I suggested in Stratified sampling in Spark is pretty straightforward to convert from Scala to Python (or even to Java – What’s the easiest way to stratify a Spark Dataset ?). Nevertheless, I’ll rewrite it python. Let’s start first by creating a toy DataFrame : from pyspark.sql.functions import lit list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)] df … Read more

Does a join of co-partitioned RDDs cause a shuffle in Apache Spark?

No. If two RDDs have the same partitioner, the join will not cause a shuffle. You can see this in CoGroupedRDD.scala: override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug(“Adding one-to-one dependency with ” + rdd) new OneToOneDependency(rdd) } else { logDebug(“Adding shuffle dependency … Read more

How does Distinct() function work in Spark?

.distinct() is definitely doing a shuffle across partitions. To see more of what’s happening, run a .toDebugString on your RDD. val hashPart = new HashPartitioner(<number of partitions>) val myRDDPreStep = <load some RDD> val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName(“myRDD”).persist(StorageLevel.MEMORY_AND_DISK_SER) myRDD.checkpoint println(myRDD.toDebugString) which for an RDD example I have (myRDDPreStep is already hash-partitioned by key, persisted by StorageLevel.MEMORY_AND_DISK_SER, … Read more

How to get Kafka offsets for structured query for manual and reliable offset management?

Spark 2.2 introduced a Kafka’s structured streaming source. As I understand, it’s relying on HDFS checkpoint dir to store offsets and guarantee an “exactly-once” message delivery. Correct. Every trigger Spark Structured Streaming will save offsets to offset directory in the checkpoint location (defined using checkpointLocation option or spark.sql.streaming.checkpointLocation Spark property or randomly assigned) that is … Read more