Apache spark dealing with case statements

These are few ways to write If-Else / When-Then-Else / When-Otherwise expression in pyspark. Sample dataframe df = spark.createDataFrame([(1,1),(2,2),(3,3)],[‘id’,’value’]) df.show() #+—+—–+ #| id|value| #+—+—–+ #| 1| 1| #| 2| 2| #| 3| 3| #+—+—–+ #Desired Output: #+—+—–+———-+ #| id|value|value_desc| #+—+—–+———-+ #| 1| 1| one| #| 2| 2| two| #| 3| 3| other| #+—+—–+———-+ Option#1: withColumn() … Read more

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Yes, a spark application has one and only Driver. What is the relationship between numWorkerNodes and numExecutors? A worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker. So … Read more

Pyspark: Pass multiple columns in UDF

If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example: >>> from pyspark.sql.types import IntegerType >>> from pyspark.sql.functions import udf, array >>> sum_cols = udf(lambda arr: sum(arr), IntegerType()) >>> spark.createDataFrame([(101, 1, 16)], [‘ID’, ‘A’, ‘B’]) \ … .withColumn(‘Result’, sum_cols(array(‘A’, ‘B’))).show() +—+—+—+——+ | … Read more

Spark parquet partitioning : Large number of files

First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : Coalesce reduces parallelism of entire stage (spark)) Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing … Read more

Reading DataFrame from partitioned parquet file

sqlContext.read.parquet can take multiple paths as input. If you want just day=5 and day=6, you can simply add two paths like: val dataframe = sqlContext .read.parquet(“file:///your/path/data=jDD/year=2015/month=10/day=5/”, “file:///your/path/data=jDD/year=2015/month=10/day=6/”) If you have folders under day=X, like say country=XX, country will automatically be added as a column in the dataframe. EDIT: As of Spark 1.6 one needs to … Read more

Change nullable property of column in spark dataframe

Answer With the imports import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} you can use /** * Set nullable property of column. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ … Read more

Spark DataFrame: does groupBy after orderBy maintain that order?

groupBy after orderBy doesn’t maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself … Read more

Applying a Window function to calculate differences in pySpark

You can bring the previous day column by using lag function, and add additional column that does actual day-to-day return from the two columns, but you may have to tell spark how to partition your data and/or order it to do lag, something like this: from pyspark.sql.window import Window import pyspark.sql.functions as func from pyspark.sql.functions … Read more