How to find mean of grouped Vector columns in Spark SQL?

Spark >= 2.4 You can use Summarizer: import org.apache.spark.ml.stat.Summarizer val dfNew = df.as[(Int, org.apache.spark.mllib.linalg.Vector)] .map { case (group, v) => (group, v.asML) } .toDF(“group”, “features”) dfNew .groupBy($”group”) .agg(Summarizer.mean($”features”).alias(“means”)) .show(false) +—–+——————————————————————–+ |group|means | +—–+——————————————————————–+ |1 |[8.740630742016827E12,2.6124956666260462E14,3.268714653521495E14] | |6 |[2.1153266920139112E15,2.07232483974322592E17,6.2715161747245427E17]| |3 |[6.3781865566442836E13,8.359124419656149E15,1.865567821598214E14] | |5 |[4.270201403521642E13,6.561211706745676E13,8.395448246737938E15] | |9 |[3.577032684241448E16,2.5432362841314468E16,2.3744826986293008E17] | |4 |[2.339253775419023E14,8.517531902022505E13,3.055115780965264E15] | |8 |[8.029924756674456E15,7.284873600992855E17,3.08621303029924E15] | |7 |[3.2275104122699105E15,7.5472363442090208E16,7.022556624056291E14] … Read more

Efficient pyspark join

you can also use a two-pass approach, in case it suits your requirement.First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, “appending” to the same final result table. It was nicely explained by Sim. see link below two pass approach to join big dataframes in pyspark based … Read more

Why does format(“kafka”) fail with “Failed to find data source: kafka.” (even with uber-jar)?

kafka data source is an external module and is not available to Spark applications by default. You have to define it as a dependency in your pom.xml (as you have done), but that’s just the very first step to have it in your Spark application. <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> With that dependency you have … Read more

Apache Spark: setting executor instances does not change the executors

Increase yarn.nodemanager.resource.memory-mb in yarn-site.xml With 12g per node you can only launch driver(3g) and 2 executors(11g). Node1 – driver 3g (+7% overhead) Node2 – executor1 11g (+7% overhead) Node3 – executor2 11g (+7% overhead) now you are requesting for executor3 of 11g and no node has 11g memory available. for 7% overhead refer spark.yarn.executor.memoryOverhead and … Read more

Spark parquet partitioning : Large number of files

First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : Coalesce reduces parallelism of entire stage (spark)) Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing … Read more