How to pass whole Row to UDF – Spark DataFrame filter

You have to use struct() function for constructing the row while making a call to the function, follow these steps. Import Row, import org.apache.spark.sql._ Define the UDF def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} Register the UDF sqlContext.udf.register(“myFilterFunction”, myFilterFunction _) Create the dataFrame val records = sqlContext.createDataFrame(Seq((“sachin”, “sachin”), (“aggarwal”, “aggarwal1”))).toDF(“text”, “text2”) Use the UDF records.filter(callUdf(“myFilterFunction”,struct($”text”,$”text2″))).show When u want … Read more

How does Spark aggregate function – aggregateByKey work?

aggregateByKey() is quite different from reduceByKey. What happens is that reduceByKey is sort of a particular case of aggregateByKey. aggregateByKey() will combine the values for a particular key, and the result of such combination can be any object that you specify. You have to specify how the values are combined (“added”) inside one partition (that … Read more

Apache spark dealing with case statements

These are few ways to write If-Else / When-Then-Else / When-Otherwise expression in pyspark. Sample dataframe df = spark.createDataFrame([(1,1),(2,2),(3,3)],[‘id’,’value’]) df.show() #+—+—–+ #| id|value| #+—+—–+ #| 1| 1| #| 2| 2| #| 3| 3| #+—+—–+ #Desired Output: #+—+—–+———-+ #| id|value|value_desc| #+—+—–+———-+ #| 1| 1| one| #| 2| 2| two| #| 3| 3| other| #+—+—–+———-+ Option#1: withColumn() … Read more

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Yes, a spark application has one and only Driver. What is the relationship between numWorkerNodes and numExecutors? A worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker. So … Read more

Spark-Obtaining file name in RDDs

Since Spark 1.6 you can combine text data source and input_file_name function as follows: Scala: import org.apache.spark.sql.functions.input_file_name val inputPath: String = ??? spark.read.text(inputPath) .select(input_file_name, $”value”) .as[(String, String)] // Optionally convert to Dataset .rdd // or RDD Python: (Versions before 2.x are buggy and may not preserve names when converted to RDD): from pyspark.sql.functions import input_file_name … Read more

Pyspark: Pass multiple columns in UDF

If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example: >>> from pyspark.sql.types import IntegerType >>> from pyspark.sql.functions import udf, array >>> sum_cols = udf(lambda arr: sum(arr), IntegerType()) >>> spark.createDataFrame([(101, 1, 16)], [‘ID’, ‘A’, ‘B’]) \ … .withColumn(‘Result’, sum_cols(array(‘A’, ‘B’))).show() +—+—+—+——+ | … Read more

Save ML model for future usage

Spark 2.0.0+ At first glance all Transformers and Estimators implement MLWritable with the following interface: def write: MLWriter def save(path: String): Unit and MLReadable with the following interface def read: MLReader[T] def load(path: String): T This means that you can use save method to write model to disk, for example import org.apache.spark.ml.PipelineModel val model: PipelineModel … Read more

How to find count of Null and Nan values for each column in a PySpark dataframe efficiently?

You can use method shown here and replace isNull with isnan: from pyspark.sql.functions import isnan, when, count, col df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 3| +——-+———-+—+ or df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 5| +——-+———-+—+

Understanding Spark serialization

To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object. A Java object is serializable if its class or any of its superclasses implements either the java.io.Serializable interface or its subinterface, java.io.Externalizable. A class is never serialized … Read more