Spark add new column to dataframe with value from previous row

You can use lag window function as follows

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+

but there some important issues:

  1. if you need a global operation (not partitioned by some other column / columns) it is extremely inefficient.
  2. you need a natural way to order your data.

While the second issue is almost never a problem the first one can be a deal-breaker. If this is the case you should simply convert your DataFrame to RDD and compute lag manually. See for example:

Other useful links:

Leave a Comment