How to add a SparkListener from pySpark in Python?

It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener. First lets create a Scala package with all required classes. Directory structure: . ├── build.sbt └── src └── main └── scala └── net └── zero323 └── spark └── examples └── listener ├── Listener.scala ├── … Read more

How to pivot on multiple columns in Spark SQL?

Here’s a non-UDF way involving a single pivot (hence, just a single column scan to identify all the unique dates). dff = mydf.groupBy(‘id’).pivot(‘day’).agg(F.first(‘price’).alias(‘price’),F.first(‘units’).alias(‘unit’)) Here’s the result (apologies for the non-matching ordering and naming): +—+——-+——+——-+——+——-+——+——-+——+ | id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit| +—+——-+——+——-+——+——-+——+——-+——+ |100| 23| 10| 45| 11| 67| 12| 78| 13| |101| 23| 10| 45| 13| 67| 14| 78| 15| … Read more

Run a scala code jar appear NoSuchMethodError:scala.Predef$.refArrayOps

Most probably you’re compiling your code locally with Scala 2.12 but at the server it’s running with Scala 2.13 or 2.11. Try to recompile your code with the version of Scala at the server. Scala 2.11, 2.12, 2.13 are binary incompatible. The signature of refArrayOps is different (in binary incompatible way) in Scala 2.13 def … Read more

How to create a Dataset of Maps?

It is not covered in 2.2, but can be easily addressed. You can add required Encoder using ExpressionEncoder, either explicitly: import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder spark .createDataset(Seq(Map(1 -> 2)))(ExpressionEncoder(): Encoder[Map[Int, Int]]) or implicitly: implicit def mapIntIntEncoder: Encoder[Map[Int, Int]] = ExpressionEncoder() spark.createDataset(Seq(Map(1 -> 2)))

Spark: disk I/O on stage boundaries explanation

It’s a good question in that we hear of in-memory Spark vs. Hadoop, so a little confusing. The docs are terrible, but I ran a few things and verified observations by looking around to find a most excellent source: http://hydronitrogen.com/apache-spark-shuffles-explained-in-depth.html Assuming an Action has been called – so as to avoid the obvious comment if … Read more

Why is the fold action necessary in Spark?

Empty RDD It cannot be substituted when RDD is empty: val rdd = sc.emptyRDD[Int] rdd.reduce(_ + _) // java.lang.UnsupportedOperationException: empty collection at // org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ … rdd.fold(0)(_ + _) // Int = 0 You can of course combine reduce with condition on isEmpty but it is rather ugly. Mutable buffer Another use case for fold is … Read more

Joining two dataframes without a common column

add an index column to both dataframe using the below code df1.withColumn(“id1”,monotonicallyIncreasingId) df2.withColumn(“id2”,monotonicallyIncreasingId) then join both the dataframes using the below code and drop the index column df1.join(df2,col(“id1”)===col(“id2″),”inner”) .drop(“id1″,”id2”)