How to zip two (or more) DataFrame in Spark

Operation like this is not supported by a DataFrame API. It is possible to zip two RDDs but to make it work you have to match both number of partitions and number of elements per partition. Assuming this is the case:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}

val a: DataFrame = sc.parallelize(Seq(
  ("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")

// Merge rows
val rows = a.rdd.zip(b.rdd).map{
  case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)

// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)

If above conditions are not met the only option that comes to mind is adding an index and join:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)

// Join and clean
val ab = aWithIndex
  .join(bWithIndex, Seq("_index"))
  .drop("_index")

Leave a Comment