Write Spark dataframe as CSV with partitions

Spark 2.0.0+:

Built-in csv format supports partitioning out of the box so you should be able to simply use:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)

without including any additional packages.

Spark < 2.0.0:

At this moment (v1.4.0) spark-csv doesn’t support partitionBy (see databricks/spark-csv#123) but you can adjust built-in sources to achieve what you want.

You can try two different approaches. Assuming your data is relatively simple (no complex strings and need for character escaping) and looks more or less like this:

df = sc.parallelize([
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])

You can manually prepare values for writing:

from pyspark.sql.functions import col, concat_ws

key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])

kvs = df.select(key, values)

and write using text source

kvs.write.partitionBy("k").text("/tmp/foo")

df_foo = (sqlContext.read.format("com.databricks.spark.csv")
    .options(inferSchema="true")
    .load("/tmp/foo/k=foo"))

df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)

In more complex cases you can try to use proper CSV parser to preprocess values in a similar way, either by using UDF or mapping over RDD, but it will be significantly more expensive.

If CSV format is not a hard requirement you can also use JSON writer which supports partitionBy out-of-the-box:

df.write.partitionBy("k").json("/tmp/bar")

as well as partition discovery on read.

Leave a Comment