How do I add an persistent column of row ids to Spark DataFrame?

Spark 2.0

  • This is issue has been resolved in Spark 2.0 with SPARK-14241.

  • Another similar issue has been resolved in Spark 2.1 with SPARK-14393

Spark 1.x

Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.

It doesn’t take any parameters so from an optimizer perspective it doesn’t matter when it is called and can be pushed after all other operations. Hence the behavior you see.

If you take look at the code you’ll find out this is explicitly marked by extending MonotonicallyIncreasingID expression with Nondeterministic.


I don’t think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType()) 

(df
  .withColumn("rn", monotonically_increasing_id())
  # Due to nondeterministic behavior it has to be a separate step
  .withColumn("rn", bound("P", "rn"))  
  .where("P"))

In general it could be cleaner to add indices using zipWithIndex on a RDD and then convert it back to a DataFrame.


* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.

Leave a Comment