Apache Spark — Assign the result of UDF to multiple dataframe columns

It is not possible to create multiple top level columns from a single UDF call but you can create a new struct. It requires an UDF with specified returnType: from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, FloatType schema = StructType([ StructField(“foo”, FloatType(), False), StructField(“bar”, FloatType(), False) ]) def udf_test(n): return (n / 2, … Read more

Spark DataFrames when udf functions do not accept large enough input variables

User defined functions are defined for up to 22 parameters. Only udf helper is define for at most 10 arguments. To handle functions with larger number of parameters you can use org.apache.spark.sql.UDFRegistration. For example val dummy = (( x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int, x8: … Read more

How to transpose an RDD in Spark

Say you have an N×M matrix. If both N and M are so small that you can hold N×M items in memory, it doesn’t make much sense to use an RDD. But transposing it is easy: val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) val transposed = sc.parallelize(rdd.collect.toSeq.transpose) If N or … Read more

Spark UDF with varargs

UDFs don’t support varargs* but you can pass an arbitrary number of columns wrapped using an array function: import org.apache.spark.sql.functions.{udf, array, lit} val myConcatFunc = (xs: Seq[Any], sep: String) => xs.filter(_ != null).mkString(sep) val myConcat = udf(myConcatFunc) An example usage: val df = sc.parallelize(Seq( (null, “a”, “b”, “c”), (“d”, null, null, “e”) )).toDF(“x1”, “x2”, “x3”, … Read more

How do I install pyspark for use in standalone scripts?

Spark-2.2.0 onwards use pip install pyspark to install pyspark in your machine. For older versions refer following steps. Add Pyspark lib in Python path in the bashrc export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH also don’t forget to set up the SPARK_HOME. PySpark depends the py4j Python package. So install that as follows pip install py4j For more details about … Read more

Spark – Error “A master URL must be set in your configuration” when submitting an app

The TLDR: .config(“spark.master”, “local”) a list of the options for spark.master in spark 2.2.1 I ended up on this page after trying to run a simple Spark SQL java program in local mode. To do this, I found that I could set spark.master using: SparkSession spark = SparkSession .builder() .appName(“Java Spark SQL basic example”) .config(“spark.master”, … Read more

spark.ml StringIndexer throws ‘Unseen label’ on fit()

Unseen label is a generic message which doesn’t correspond to a specific column. Most likely problem is with a following stage: StringIndexer(inputCol=”lang”, outputCol=”lang_idx”) with pl-PL present in train(“lang”) and not present in test(“lang”). You can correct it using setHandleInvalid with skip: from pyspark.ml.feature import StringIndexer train = sc.parallelize([(1, “foo”), (2, “bar”)]).toDF([“k”, “v”]) test = sc.parallelize([(3, … Read more