How to transpose an RDD in Spark

Say you have an N×M matrix.

If both N and M are so small that you can hold N×M items in memory, it doesn’t make much sense to use an RDD. But transposing it is easy:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)

If N or M is so large that you cannot hold N or M entries in memory, then you cannot have an RDD line of this size. Either the original or the transposed matrix is impossible to represent in this case.

N and M may be of a medium size: you can hold N or M entries in memory, but you cannot hold N×M entries. In this case you have to blow up the matrix and put it together again:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
  case (row, rowIndex) => row.zipWithIndex.map {
    case (number, columnIndex) => columnIndex -> (rowIndex, number)
  }
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
  indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}

Leave a Comment