How to aggregate values into collection after groupBy?

Spark >= 2.4

You can replace flatten udf with built-in flatten function

import org.apache.spark.sql.functions.flatten

leaving the rest as-is.

Spark >= 2.0, < 2.4

It is possible but quite expensive. Using data you’ve provided:

case class Record(
    visitorId: String, trackingIds: Array[String], emailIds: Array[String])

val df = Seq(
  Record("a158", Array("666b"), Array("12")),
  Record("7g21", Array("c0b5"), Array("45")),
  Record("7g21", Array("c0b4"), Array("87")),
  Record("a158", Array("666b",  "777c"), Array.empty[String])).toDF

and a helper function:

import org.apache.spark.sql.functions.udf

val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)

we can fill the blanks with placeholders:

import org.apache.spark.sql.functions.{array, lit, when}

val dfWithPlaceholders = df.withColumn(
  "emailIds", 
  when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))

collect_lists and flatten:

import org.apache.spark.sql.functions.{array, collect_list}

val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")

df
  .groupBy($"visitorId")
  .agg(trackingIds, emailIds)

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     a158|[666b, 666b, 777c]|  [12, ]|
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

With statically typed Dataset:

df.as[Record]
  .groupByKey(_.visitorId)
  .mapGroups { case (key, vs) => 
    vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
      case (trackingIds, emailIds) => 
        Record(key, trackingIds.flatten, emailIds.flatten)
  }}

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     a158|[666b, 666b, 777c]|  [12, ]|
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

Spark 1.x

You can convert to RDD and group

import org.apache.spark.sql.Row

dfWithPlaceholders.rdd
  .map {
     case Row(id: String, 
       trcks: Seq[String @ unchecked],
       emails: Seq[String @ unchecked]) => (id, (trcks, emails))
  }
  .groupByKey
  .map {case (key, vs) => vs.toArray.unzip match {
    case (trackingIds, emailIds) => 
      Record(key, trackingIds.flatten, emailIds.flatten)
  }}
  .toDF

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// |     a158|[666b, 666b, 777c]|  [12, ]|
// +---------+------------------+--------+

Leave a Comment