How to create a Dataset of Maps?

It is not covered in 2.2, but can be easily addressed. You can add required Encoder using ExpressionEncoder, either explicitly: import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder spark .createDataset(Seq(Map(1 -> 2)))(ExpressionEncoder(): Encoder[Map[Int, Int]]) or implicitly: implicit def mapIntIntEncoder: Encoder[Map[Int, Int]] = ExpressionEncoder() spark.createDataset(Seq(Map(1 -> 2)))

When to use Spark DataFrame/Dataset API and when to use plain RDD?

From this Databricks’ blog article A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets When to use RDDs? Consider these scenarios or common use cases for using RDDs when: you want low-level transformation and actions and control on your dataset; your data is unstructured, such as media streams or streams of text; you … Read more

Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?

That’s the line in your Physical Plan you should remember to know the real difference between Dataset[T] and DataFrame (which is Dataset[Row]). Filter <function1>.apply I keep saying that people should stay away from the typed Dataset API and keep using the untyped DataFrame API as the Scala code becomes a black box to the optimizer … Read more

Difference between SparkContext, JavaSparkContext, SQLContext, and SparkSession?

sparkContext is a Scala implementation entry point and JavaSparkContext is a java wrapper of sparkContext. SQLContext is entry point of SparkSQL which can be received from sparkContext.Prior to 2.x.x, RDD ,DataFrame and Data-set were three different data abstractions.Since Spark 2.x.x, All three data abstractions are unified and SparkSession is the unified entry point of Spark. … Read more

Array Intersection in Spark SQL

Since Spark 2.4 array_intersect function can be used directly in SQL spark.sql( “SELECT array_intersect(array(1, 42), array(42, 3)) AS intersection” ).show() +————+ |intersection| +————+ | [42]| +————+ and Dataset API: import org.apache.spark.sql.functions.array_intersect Seq((Seq(1, 42), Seq(42, 3))) .toDF(“a”, “b”) .select(array_intersect($”a”, $”b”) as “intersection”) .show() +————+ |intersection| +————+ | [42]| +————+ Equivalent functions are also present in the … Read more

How to get keys and values from MapType column in SparkSQL DataFrame

Spark >= 2.3 You can simplify the process using map_keys function: import org.apache.spark.sql.functions.map_keys There is also map_values function, but it won’t be directly useful here. Spark < 2.3 General method can be expressed in a few steps. First required imports: import org.apache.spark.sql.functions.udf import org.apache.spark.sql.Row and example data: val ds = Seq( (1, Map(“foo” -> (1, … Read more

Overwrite only some partitions in a partitioned spark Dataset

Since Spark 2.3.0 this is an option when overwriting a table. To overwrite it, you need to set the new spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite. Example in scala: spark.conf.set( “spark.sql.sources.partitionOverwriteMode”, “dynamic” ) data.write.mode(“overwrite”).insertInto(“partitioned_table”) I recommend doing a repartition based on your partition column before writing, … Read more

Encode an ADT / sealed trait hierarchy into Spark DataSet column

TL;DR There is no good solution right now, and given Spark SQL / Dataset implementation, it is unlikely there will be one in the foreseeable future. You can use generic kryo or java encoder val occupation: Seq[Occupation] = Seq(SoftwareEngineer, Wizard(1), Other(“foo”)) spark.createDataset(occupation)(org.apache.spark.sql.Encoders.kryo[Occupation]) but is hardly useful in practice. UDT API provides another possible approach as … Read more

Perform a typed join in Scala with Spark Datasets

Observation Spark SQL can optimize join only if join condition is based on the equality operator. This means we can consider equijoins and non-equijoins separately. Equijoin Equijoin can be implemented in a type safe manner by mapping both Datasets to (key, value) tuples, performing join based on keys, and reshaping the result: import org.apache.spark.sql.Encoder import … Read more