PySpark: compute row maximum of the subset of columns and add to an exisiting dataframe

Let’s start with a couple of imports

from pyspark.sql.functions import col, lit, coalesce, greatest

Next define minus infinity literal:

minf = lit(float("-inf"))

Map columns and pass the result to greatest:

rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])

Finally withColumn:

df1.withColumn("rowmax", rowmax)

with result:

+---+---+---+----+------+
| v1| v2| v3|  v4|rowmax|
+---+---+---+----+------+
|foo|1.0|3.0|null|   3.0|
|bar|2.0|2.0| -10|   2.0|
|baz|3.3|1.2|null|   3.3|
+---+---+---+----+------+

You can use the same pattern with different row wise operations replacing minf with neutral element. For example:

rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])

or:

from operator import mul
from functools import reduce

rowproduct = reduce(
  mul, 
  [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']]
)

Your own code could be significantly simplified with udf:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def get_max_row_with_None_(*cols):
    return float(max(x for x in cols if x is not None))

get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))

Replace minf with lit(float("inf")) and greatest with least to get the smallest value per row.

Leave a Comment