Filling gaps in timeseries Spark

If input DataFrame has following structure:

root
 |-- periodstart: timestamp (nullable = true)
 |-- usage: long (nullable = true)

Scala

Determine min / max:

val (minp, maxp) = df
  .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
  .as[(Long, Long)]
  .first

Set step, for example for 15 minutes:

val step: Long = 15 * 60

Generate reference range:

val reference = spark
  .range((minp / step) * step, ((maxp / step) + 1) * step, step)
  .select($"id".cast("timestamp").alias("periodstart"))

Join and fill the gaps:

reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))

Python

Similarly in PySpark:

from pyspark.sql.functions import col, min as min_, max as max_

step = 15 * 60

minp, maxp = df.select(
    min_("periodstart").cast("long"), max_("periodstart").cast("long")
).first()

reference = spark.range(
    (minp / step) * step, ((maxp / step) + 1) * step, step
).select(col("id").cast("timestamp").alias("periodstart"))

reference.join(df, ["periodstart"], "leftouter")

Leave a Comment