Difference between two rows in Spark dataframe

You can use Window function if the calculation is fixed as calculating difference between previous months, or calculating between previous two months … etc. For that you can use lag and lead function with Window.

But for that you need to change the date column as below so that it can be ordered.

+-------+------+--------------+------+
|Column1|Date  |Date_Converted|Amount|
+-------+------+--------------+------+
|A      |1-jul |2017-07-01    |1000  |
|A      |1-june|2017-06-01    |2000  |
|A      |1-May |2017-05-01    |2000  |
|A      |1-dec |2017-12-01    |3000  |
|A      |1-Nov |2017-11-01    |2000  |
|B      |1-jul |2017-07-01    |100   |
|B      |1-june|2017-06-01    |300   |
|B      |1-May |2017-05-01    |400   |
|B      |1-dec |2017-12-01    |300   |
+-------+------+--------------+------+

You can find the difference between previous month and current month by doing

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("Column1").orderBy("Date_Converted")
import org.apache.spark.sql.functions._
df.withColumn("diff_Amt_With_Prev_Month", $"Amount" - when((lag("Amount", 1).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 1).over(windowSpec)))
   .show(false)

You should have

+-------+------+--------------+------+------------------------+
|Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_Month|
+-------+------+--------------+------+------------------------+
|B      |1-May |2017-05-01    |400   |400.0                   |
|B      |1-june|2017-06-01    |300   |-100.0                  |
|B      |1-jul |2017-07-01    |100   |-200.0                  |
|B      |1-dec |2017-12-01    |300   |200.0                   |
|A      |1-May |2017-05-01    |2000  |2000.0                  |
|A      |1-june|2017-06-01    |2000  |0.0                     |
|A      |1-jul |2017-07-01    |1000  |-1000.0                 |
|A      |1-Nov |2017-11-01    |2000  |1000.0                  |
|A      |1-dec |2017-12-01    |3000  |1000.0                  |
+-------+------+--------------+------+------------------------+

You can increase the lagging position for previous two months as

df.withColumn("diff_Amt_With_Prev_two_Month", $"Amount" - when((lag("Amount", 2).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 2).over(windowSpec)))
  .show(false)

which will give you

+-------+------+--------------+------+----------------------------+
|Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_two_Month|
+-------+------+--------------+------+----------------------------+
|B      |1-May |2017-05-01    |400   |400.0                       |
|B      |1-june|2017-06-01    |300   |300.0                       |
|B      |1-jul |2017-07-01    |100   |-300.0                      |
|B      |1-dec |2017-12-01    |300   |0.0                         |
|A      |1-May |2017-05-01    |2000  |2000.0                      |
|A      |1-june|2017-06-01    |2000  |2000.0                      |
|A      |1-jul |2017-07-01    |1000  |-1000.0                     |
|A      |1-Nov |2017-11-01    |2000  |0.0                         |
|A      |1-dec |2017-12-01    |3000  |2000.0                      |
+-------+------+--------------+------+----------------------------+

I hope the answer is helpful

Leave a Comment