Does a join of co-partitioned RDDs cause a shuffle in Apache Spark?

No. If two RDDs have the same partitioner, the join will not cause a shuffle. You can see this in CoGroupedRDD.scala: override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug(“Adding one-to-one dependency with ” + rdd) new OneToOneDependency(rdd) } else { logDebug(“Adding shuffle dependency … Read more

Queries with streaming sources must be executed with writeStream.start();

You are branching the query plan: from the same ds1 you are trying to: ds1.collect.foreach(…) ds1.writeStream.format(…){…} But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back. The solution is to start both branches and await termination. val ds1 … Read more

How to calculate the size of dataframe in bytes in Spark?

Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes we can get the size of actual Dataframe once its loaded into memory. Check the below code. scala> val df = spark.read.format(“orc”).load(“/tmp/srinivas/”) df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string … 75 more fields] scala> import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes bytes: BigInt = 763275709 scala> FileUtils.byteCountToDisplaySize(bytes.toLong) res5: String = 727 MB … Read more

reading json file in pyspark

First of all, the json is invalid. After the header a , is missing. That being said, lets take this json: {“header”:{“platform”:”atm”,”version”:”2.0″},”details”:[{“abc”:”3″,”def”:”4″},{“abc”:”5″,”def”:”6″},{“abc”:”7″,”def”:”8″}]} This can be processed by: >>> df = sqlContext.jsonFile(‘test.json’) >>> df.first() Row(details=[Row(abc=”3″, def=”4″), Row(abc=”5″, def=”6″), Row(abc=”7″, def=”8″)], header=Row(platform=’atm’, version=’2.0′)) >>> df = df.flatMap(lambda row: row[‘details’]) PythonRDD[38] at RDD at PythonRDD.scala:43 >>> df.collect() [Row(abc=”3″, … Read more

Spark using python: How to resolve Stage x contains a task of very large size (xxx KB). The maximum recommended task size is 100 KB

The general idea is that PySpark creates as many java processes than there are executors, and then ships data to each process. If there are too few processes, a memory bottleneck happens on the java heap space. In your case, the specific error is that the RDD that you created with sc.parallelize([…]) did not specify … Read more

Amazon s3a returns 400 Bad Request with Spark

This message correspond to something like “bad endpoint” or bad signature version support. like seen here frankfurt is the only one that not support signature version 2. And it’s the one I picked. Of course after all my reserch can’t say what is signature version, it’s not obvious in the documentation. But the V2 seems … Read more

spark broadcast variable Map giving null value

lit() return Column type, but map.get require the int type you can do in this way val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF(“sentiment”) val map = new util.HashMap[Int, Int]() map.put(1, 1) map.put(2, 2) map.put(3, 3) val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map) df.rdd.map(x => { val num = x.getInt(0) (num, bf.value.get(num)) }).toDF(“key”, “add_key”).show(false)