Why does sortBy transformation trigger a Spark job?

sortBy is implemented using sortByKey which depends on a RangePartitioner (JVM) or partitioning function (Python). When you call sortBy / sortByKey partitioner (partitioning function) is initialized eagerly and samples input RDD to compute partition boundaries. Job you see corresponds to this process. Actual sorting is performed only if you execute an action on the newly … Read more

Why are Spark Parquet files for an aggregate larger than the original?

In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage. Aggregation, as the one you apply, has to shuffle the data. When you check the execution … Read more

How to group by common element in array?

Include graphframes (the latest supported Spark version is 2.1, but it should support 2.2 as well, if you use newer you’ll have to build your own with 2.3 patch) replacing XXX with Spark version and YYY with Scala version: spark.jars.packages graphframes:graphframes:0.5.0-sparkXXX-s_YYY Add explode keys: import org.apache.spark.sql.functions._ val df = Seq( (Seq(“k1”, “k2”), “v1”), (Seq(“k2”), “v2”), … Read more

Spark: how to get the number of written rows?

If you really want you can add custom listener and extract number of written rows from outputMetrics. Very simple example can look like this: import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten } } }) sc.parallelize(1 to 10, 2).saveAsTextFile(“/tmp/foobar”) recordsWrittenCount // Long = … Read more

Why does Spark think this is a cross / Cartesian join

This happens because you join structures sharing the same lineage and this leads to a trivially equal condition: res2.explain() == Physical Plan == org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L)) :- Filter isnotnull(idx#204L) : +- LogicalRDD [idx#204L, val#205] +- Filter ((isnotnull(key2#210L) && (key2#210L … Read more