How to pass whole Row to UDF – Spark DataFrame filter

You have to use struct() function for constructing the row while making a call to the function, follow these steps. Import Row, import org.apache.spark.sql._ Define the UDF def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} Register the UDF sqlContext.udf.register("myFilterFunction", myFilterFunction _) Create the dataFrame val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2") Use the UDF records.filter(callUdf("myFilterFunction",struct($"text",$"text2″))).show

How does Spark aggregate function – aggregateByKey work?

aggregateByKey() is quite different from reduceByKey. What happens is that reduceByKey is sort of a particular case of aggregateByKey. aggregateByKey() will combine the values for a particular key, and the result of such combination can be any object that you specify. You have to specify how the values are combined ("added") inside one partition (that

Apache spark dealing with case statements

These are few ways to write If-Else / When-Then-Else / When-Otherwise expression in pyspark. Sample dataframe df = spark.createDataFrame([(1,1),(2,2),(3,3)],['id','value']) #+—+—–+ #| id|value| #+—+—–+ #| 1| 1| #| 2| 2| #| 3| 3| #+—+—–+ #Desired Output: #+—+—–+———-+ #| id|value|value_desc| #+—+—–+———-+ #| 1| 1| one| #| 2| 2| two| #| 3| 3| other| #+—+—–+———-+ Option#1: withColumn()

Determining optimal number of Spark partitions based on workers, cores and DataFrame size

Yes, a spark application has one and only Driver. What is the relationship between numWorkerNodes and numExecutors? A worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker. So

Spark-Obtaining file name in RDDs

Since Spark 1.6 you can combine text data source and input_file_name function as follows: Scala: import org.apache.spark.sql.functions.input_file_name val inputPath: String = ??? .select(input_file_name, $"value") .as[(String, String)] // Optionally convert to Dataset .rdd // or RDD Python: (Versions before 2.x are buggy and may not preserve names when converted to RDD): from pyspark.sql.functions import input_file_name

Pyspark: Pass multiple columns in UDF

If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example: >>> from pyspark.sql.types import IntegerType >>> from pyspark.sql.functions import udf, array >>> sum_cols = udf(lambda arr: sum(arr), IntegerType()) >>> spark.createDataFrame([(101, 1, 16)], [‘ID’, ‘A’, ‘B’]) \ … .withColumn(‘Result’, sum_cols(array(‘A’, ‘B’))).show() +—+—+—+——+ | … Read more

Save ML model for future usage

Spark 2.0.0+ At first glance all Transformers and Estimators implement MLWritable with the following interface: def write: MLWriter def save(path: String): Unit and MLReadable with the following interface def read: MLReader[T] def load(path: String): T This means that you can use save method to write model to disk, for example import val model: PipelineModel … Read more

How to find count of Null and Nan values for each column in a PySpark dataframe efficiently?

You can use method shown here and replace isNull with isnan: from pyspark.sql.functions import isnan, when, count, col[count(when(isnan(c), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 3| +——-+———-+—+ or[count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 5| +——-+———-+—+

Understanding Spark serialization

To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object. A Java object is serializable if its class or any of its superclasses implements either the interface or its subinterface, A class is never serialized