Spark-Obtaining file name in RDDs

Since Spark 1.6 you can combine text data source and input_file_name function as follows: Scala: import org.apache.spark.sql.functions.input_file_name val inputPath: String = ??? spark.read.text(inputPath) .select(input_file_name, $”value”) .as[(String, String)] // Optionally convert to Dataset .rdd // or RDD Python: (Versions before 2.x are buggy and may not preserve names when converted to RDD): from pyspark.sql.functions import input_file_name … Read more

Pyspark: Pass multiple columns in UDF

If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example: >>> from pyspark.sql.types import IntegerType >>> from pyspark.sql.functions import udf, array >>> sum_cols = udf(lambda arr: sum(arr), IntegerType()) >>> spark.createDataFrame([(101, 1, 16)], [‘ID’, ‘A’, ‘B’]) \ … .withColumn(‘Result’, sum_cols(array(‘A’, ‘B’))).show() +—+—+—+——+ | … Read more

Save ML model for future usage

Spark 2.0.0+ At first glance all Transformers and Estimators implement MLWritable with the following interface: def write: MLWriter def save(path: String): Unit and MLReadable with the following interface def read: MLReader[T] def load(path: String): T This means that you can use save method to write model to disk, for example import org.apache.spark.ml.PipelineModel val model: PipelineModel … Read more

How to find count of Null and Nan values for each column in a PySpark dataframe efficiently?

You can use method shown here and replace isNull with isnan: from pyspark.sql.functions import isnan, when, count, col df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 3| +——-+———-+—+ or df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() +——-+———-+—+ |session|timestamp1|id2| +——-+———-+—+ | 0| 0| 5| +——-+———-+—+

Understanding Spark serialization

To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object. A Java object is serializable if its class or any of its superclasses implements either the java.io.Serializable interface or its subinterface, java.io.Externalizable. A class is never serialized … Read more

Why does sortBy transformation trigger a Spark job?

sortBy is implemented using sortByKey which depends on a RangePartitioner (JVM) or partitioning function (Python). When you call sortBy / sortByKey partitioner (partitioning function) is initialized eagerly and samples input RDD to compute partition boundaries. Job you see corresponds to this process. Actual sorting is performed only if you execute an action on the newly … Read more

Why are Spark Parquet files for an aggregate larger than the original?

In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage. Aggregation, as the one you apply, has to shuffle the data. When you check the execution … Read more