How do I convert csv file to rdd

A simplistic approach would be to have a way to preserve the header.

Let’s say you have a file.csv like:

user, topic, hits
om,  scala, 120
daniel, spark, 80
3754978, spark, 1

We can define a header class that uses a parsed version of the first row:

class SimpleCSVHeader(header:Array[String]) extends Serializable {
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String):String = array(index(key))
}

That we can use that header to address the data further down the road:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...

Note that the header is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)

PS: Welcome to Scala 🙂

Leave a Comment