Let’s start with a couple of imports
from pyspark.sql.functions import col, lit, coalesce, greatest
Next define minus infinity literal:
minf = lit(float("-inf"))
Map columns and pass the result to greatest
:
rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])
Finally withColumn
:
df1.withColumn("rowmax", rowmax)
with result:
+---+---+---+----+------+
| v1| v2| v3| v4|rowmax|
+---+---+---+----+------+
|foo|1.0|3.0|null| 3.0|
|bar|2.0|2.0| -10| 2.0|
|baz|3.3|1.2|null| 3.3|
+---+---+---+----+------+
You can use the same pattern with different row wise operations replacing minf
with neutral element. For example:
rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])
or:
from operator import mul
from functools import reduce
rowproduct = reduce(
mul,
[coalesce(col(x), lit(1)) for x in ['v2','v3','v4']]
)
Your own code could be significantly simplified with udf
:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
def get_max_row_with_None_(*cols):
return float(max(x for x in cols if x is not None))
get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))
Replace minf
with lit(float("inf"))
and greatest
with least
to get the smallest value per row.