You have to remember that Spark SQL (unlike RDD) is not what-you-see-is-what-you-get. Optimizer / planner is free to schedule operations in the arbitrary order or even repeat stages multiple times.
Python udfs
are not applied on a Row
basis, but using batch mode. when
is not so much ignored, but not used to optimize execution plan:
== Physical Plan ==
*Project [col#0, CASE WHEN isnull(col#0) THEN col#0 ELSE pythonUDF0#21 END AS upper#17]
+- BatchEvalPython [<lambda>(col#0)], [col#0, pythonUDF0#21]
+- Scan ExistingRDD[col#0]
Therefore function used with udf
has to be robust to None
inputs, for example:
df.withColumn(
'upper',
f.udf(
lambda x: x.upper() if x is not None else None,
StringType()
)(f.col('col'))
).show()