Apache spark dealing with case statements

These are few ways to write If-Else / When-Then-Else / When-Otherwise expression in pyspark. Sample dataframe df = spark.createDataFrame([(1,1),(2,2),(3,3)],[‘id’,’value’]) df.show() #+—+—–+ #| id|value| #+—+—–+ #| 1| 1| #| 2| 2| #| 3| 3| #+—+—–+ #Desired Output: #+—+—–+———-+ #| id|value|value_desc| #+—+—–+———-+ #| 1| 1| one| #| 2| 2| two| #| 3| 3| other| #+—+—–+———-+ Option#1: withColumn() … Read more

Applying a Window function to calculate differences in pySpark

You can bring the previous day column by using lag function, and add additional column that does actual day-to-day return from the two columns, but you may have to tell spark how to partition your data and/or order it to do lag, something like this: from pyspark.sql.window import Window import pyspark.sql.functions as func from pyspark.sql.functions … Read more

TypeError: Column is not iterable – How to iterate over ArrayType()?

In Spark < 2.4 you can use an user defined function: from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, DataType, StringType def transform(f, t=StringType()): if not isinstance(t, DataType): raise TypeError(“Invalid type {}”.format(type(t))) @udf(ArrayType(t)) def _(xs): if xs is not None: return [f(x) for x in xs] return _ foo_udf = transform(str.upper) df.withColumn(‘names’, foo_udf(f.col(‘names’))).show(truncate=False) +——+———————–+ |type … Read more

Using a column value as a parameter to a spark DataFrame function

One option is to use pyspark.sql.functions.expr, which allows you to use columns values as inputs to spark-sql functions. Based on @user8371915’s comment I have found that the following works: from pyspark.sql.functions import expr df.select( ‘*’, expr(‘posexplode(split(repeat(“,”, rpt), “,”))’).alias(“index”, “col”) ).where(‘index > 0’).drop(“col”).sort(‘letter’, ‘index’).show() #+——+—+—–+ #|letter|rpt|index| #+——+—+—–+ #| X| 3| 1| #| X| 3| 2| #| … Read more