Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?

That’s the line in your Physical Plan you should remember to know the real difference between Dataset[T] and DataFrame (which is Dataset[Row]).

Filter <function1>.apply

I keep saying that people should stay away from the typed Dataset API and keep using the untyped DataFrame API as the Scala code becomes a black box to the optimizer in too many places. You’ve just hit one of these and think also about the deserialization of all the objects that Spark SQL keeps away from JVM to avoid GCs. Every time you touch the objects you literally ask Spark SQL to deserialize objects and load them onto JVM that puts a lot of pressure on GC (which will get triggered more often with the typed Dataset API as compared to the untyped DataFrame API).

See UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice.


Quoting Reynold Xin after I asked the very same question on [email protected] mailing list:

The UDF is a black box so Spark can’t know what it is dealing with. There
are simple cases in which we can analyze the UDFs byte code and infer what
it is doing, but it is pretty difficult to do in general.

There is a JIRA ticket for such cases SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions, but as someone said (I think it was Adam B. on twitter) it’d be a kind of joke to expect it any time soon.

One big advantage of the Dataset API is the type safety, at the cost of performance due to heavy reliance on user-defined closures/lambdas. These closures are typically slower than expressions because we have more flexibility to optimize expressions (known data types, no virtual function calls, etc). In many cases, it’s actually not going to be very difficult to look into the byte code of these closures and figure out what they are trying to do. If we can understand them, then we can turn them directly into Catalyst expressions for more optimized executions.


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

The above code is equivalent to the following:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt is exactly where you put optimizations aside and let Spark Optimizer skip it.

Leave a Comment