Write single CSV file using spark-csv

It is creating a folder with multiple files, because each partition is saved individually. If you need a single output file (still in a folder) you can repartition (preferred if upstream data is large, but requires a shuffle): df .repartition(1) .write.format(“com.databricks.spark.csv”) .option(“header”, “true”) .save(“mydata.csv”) or coalesce: df .coalesce(1) .write.format(“com.databricks.spark.csv”) .option(“header”, “true”) .save(“mydata.csv”) data frame before … Read more

Spark functions vs UDF performance?

when would a udf be faster If you ask about Python UDF the answer is probably never*. Since SQL functions are relatively simple and are not designed for complex tasks it is pretty much impossible compensate the cost of repeated serialization, deserialization and data movement between Python interpreter and JVM. Does anyone know why this … Read more

How to query JSON data column using Spark DataFrames?

Spark >= 2.4 If needed, schema can be determined using schema_of_json function (please note that this assumes that an arbitrary row is a valid representative of the schema). import org.apache.spark.sql.functions.{lit, schema_of_json, from_json} import collection.JavaConverters._ val schema = schema_of_json(lit(df.select($”jsonData”).as[String].first)) df.withColumn(“jsonData”, from_json($”jsonData”, schema, Map[String, String]().asJava)) Spark >= 2.1 You can use from_json function: import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ … Read more

How to store custom objects in Dataset?

Update This answer is still valid and informative, although things are now better since 2.2/2.3, which adds built-in encoder support for Set, Seq, Map, Date, Timestamp, and BigDecimal. If you stick to making types with only case classes and the usual Scala types, you should be fine with just the implicit in SQLImplicits. Unfortunately, virtually … Read more

Querying Spark SQL DataFrame with complex types

It depends on a type of the column. Lets start with some dummy data: import org.apache.spark.sql.functions.{udf, lit} import scala.util.Try case class SubRecord(x: Int) case class ArrayElement(foo: String, bar: Int, vals: Array[Double]) case class Record( an_array: Array[Int], a_map: Map[String, String], a_struct: SubRecord, an_array_of_structs: Array[ArrayElement]) val df = sc.parallelize(Seq( Record(Array(1, 2, 3), Map(“foo” -> “bar”), SubRecord(1), Array( … Read more

Resolving dependency problems in Apache Spark

Apache Spark’s classpath is built dynamically (to accommodate per-application user code) which makes it vulnerable to such issues. @user7337271‘s answer is correct, but there are some more concerns, depending on the cluster manager (“master”) you’re using. First, a Spark application consists of these components (each one is a separate JVM, therefore potentially contains different classes … Read more

How to pivot Spark DataFrame?

As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows: df .groupBy(grouping_columns) .pivot(pivot_column, [values]) .agg(aggregate_expressions) Usage examples using nycflights13 and csv format: Python: from pyspark.sql.functions import avg flights = (sqlContext .read .format(“csv”) .options(inferSchema=”true”, header=”true”) .load(“flights.csv”) .na.drop()) flights.registerTempTable(“flights”) sqlContext.cacheTable(“flights”) gexprs = (“origin”, “dest”, “carrier”) aggexpr = avg(“arr_delay”) flights.count() ## … Read more

How to select the first row of each group?

Window functions: Something like this should do the trick: import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window val df = sc.parallelize(Seq( (0,”cat26″,30.9), (0,”cat13″,22.1), (0,”cat95″,19.6), (0,”cat105″,1.3), (1,”cat67″,28.5), (1,”cat4″,26.8), (1,”cat13″,12.6), (1,”cat23″,5.3), (2,”cat56″,39.6), (2,”cat40″,29.7), (2,”cat187″,27.9), (2,”cat68″,9.8), (3,”cat8″,35.6))).toDF(“Hour”, “Category”, “TotalValue”) val w = Window.partitionBy($”hour”).orderBy($”TotalValue”.desc) val dfTop = df.withColumn(“rn”, row_number.over(w)).where($”rn” === 1).drop(“rn”) dfTop.show // +—-+——–+———-+ // |Hour|Category|TotalValue| // +—-+——–+———-+ // | 0| … Read more