spark ssc.textFileStream is not streamining any files from directory
Try it with another directory and then copy these files to that directory, while the job is running.
Try it with another directory and then copy these files to that directory, while the job is running.
No. If two RDDs have the same partitioner, the join will not cause a shuffle. You can see this in CoGroupedRDD.scala: override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug(“Adding one-to-one dependency with ” + rdd) new OneToOneDependency(rdd) } else { logDebug(“Adding shuffle dependency … Read more
You are branching the query plan: from the same ds1 you are trying to: ds1.collect.foreach(…) ds1.writeStream.format(…){…} But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back. The solution is to start both branches and await termination. val ds1 … Read more
Learning to performance-tune Spark requires quite a bit of investigation and learning. There are a few good resources including this video. Spark 1.4 has some better diagnostics and visualisation in the interface which can help you. In summary, you spill when the size of the RDD partitions at the end of the stage exceed the … Read more
Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes we can get the size of actual Dataframe once its loaded into memory. Check the below code. scala> val df = spark.read.format(“orc”).load(“/tmp/srinivas/”) df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string … 75 more fields] scala> import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes bytes: BigInt = 763275709 scala> FileUtils.byteCountToDisplaySize(bytes.toLong) res5: String = 727 MB … Read more
First of all, the json is invalid. After the header a , is missing. That being said, lets take this json: {“header”:{“platform”:”atm”,”version”:”2.0″},”details”:[{“abc”:”3″,”def”:”4″},{“abc”:”5″,”def”:”6″},{“abc”:”7″,”def”:”8″}]} This can be processed by: >>> df = sqlContext.jsonFile(‘test.json’) >>> df.first() Row(details=[Row(abc=”3″, def=”4″), Row(abc=”5″, def=”6″), Row(abc=”7″, def=”8″)], header=Row(platform=’atm’, version=’2.0′)) >>> df = df.flatMap(lambda row: row[‘details’]) PythonRDD[38] at RDD at PythonRDD.scala:43 >>> df.collect() [Row(abc=”3″, … Read more
The general idea is that PySpark creates as many java processes than there are executors, and then ships data to each process. If there are too few processes, a memory bottleneck happens on the java heap space. In your case, the specific error is that the RDD that you created with sc.parallelize([…]) did not specify … Read more
This message correspond to something like “bad endpoint” or bad signature version support. like seen here frankfurt is the only one that not support signature version 2. And it’s the one I picked. Of course after all my reserch can’t say what is signature version, it’s not obvious in the documentation. But the V2 seems … Read more
lit() return Column type, but map.get require the int type you can do in this way val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF(“sentiment”) val map = new util.HashMap[Int, Int]() map.put(1, 1) map.put(2, 2) map.put(3, 3) val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map) df.rdd.map(x => { val num = x.getInt(0) (num, bf.value.get(num)) }).toDF(“key”, “add_key”).show(false)