Python Spark Cumulative Sum by Group Using DataFrame

This can be done using a combination of a window function and the Window.unboundedPreceding value in the window’s range as follows: from pyspark.sql import Window from pyspark.sql import functions as F windowval = (Window.partitionBy(‘class’).orderBy(‘time’) .rangeBetween(Window.unboundedPreceding, 0)) df_w_cumsum = df.withColumn(‘cum_sum’, F.sum(‘value’).over(windowval)) df_w_cumsum.show() +—-+—–+—–+——-+ |time|value|class|cum_sum| +—-+—–+—–+——-+ | 1| 3| b| 3| | 2| 3| b| 6| | … Read more

TypeError: Column is not iterable – How to iterate over ArrayType()?

In Spark < 2.4 you can use an user defined function: from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, DataType, StringType def transform(f, t=StringType()): if not isinstance(t, DataType): raise TypeError(“Invalid type {}”.format(type(t))) @udf(ArrayType(t)) def _(xs): if xs is not None: return [f(x) for x in xs] return _ foo_udf = transform(str.upper) df.withColumn(‘names’, foo_udf(f.col(‘names’))).show(truncate=False) +——+———————–+ |type … Read more

How to improve performance for slow Spark jobs using DataFrame and JDBC connection?

All of the aggregation operations are performed after the whole dataset is retrieved into memory into a DataFrame collection. So doing the count in Spark will never be as efficient as it would be directly in TeraData. Sometimes it’s worth it to push some computation into the database by creating views and then mapping those … Read more

How to import multiple csv files in a single load?

Use wildcard, e.g. replace 2008 with *: df = sqlContext.read .format(“com.databricks.spark.csv”) .option(“header”, “true”) .load(“../Downloads/*.csv”) // <– note the star (*) Spark 2.0 // these lines are equivalent in Spark 2.0 spark.read.format(“csv”).option(“header”, “true”).load(“../Downloads/*.csv”) spark.read.option(“header”, “true”).csv(“../Downloads/*.csv”) Notes: Replace format(“com.databricks.spark.csv”) by using format(“csv”) or csv method instead. com.databricks.spark.csv format has been integrated to 2.0. Use spark not sqlContext

Overwrite specific partitions in spark dataframe write method

Finally! This is now a feature in Spark 2.3.0: SPARK-20236 To use it, you need to set the spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite. Example: spark.conf.set(“spark.sql.sources.partitionOverwriteMode”,”dynamic”) data.write.mode(“overwrite”).insertInto(“partitioned_table”) I recommend doing a repartition based on your partition column before writing, so you won’t end up with 400 … Read more

Finding duplicates from large data set using Apache Spark

Load the data into spark and apply group by on the email column.. after that check for bag and apply any distance algorithm on the first name and last name columns. This should be pretty straight forward in spark val df = sc.textFile(“hdfs path of data”); df.mapToPair(“email”, <whole_record>) .groupBy(//will be done based on key) .map(//will … Read more