groupBy
after orderBy
doesn’t maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list
over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).
Complete example code:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val data = Seq(
( "id1", 0, 12),
("id1", 1, 55),
("id1", 23, 44),
("id2", 0, 12),
("id2", 1, 89),
("id2", 23, 34)
).toDF("id", "hour", "count")
val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
data.withColumn(
"collected",
collect_list($"count").over(
Window.partitionBy("id").orderBy("hour")
)
)
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count")
.show
This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.
Output:
+---+------------+
| id|hourly_count|
+---+------------+
|id1| 12:55:44|
|id2| 12:89:34|
+---+------------+