How to improve performance for slow Spark jobs using DataFrame and JDBC connection?

All of the aggregation operations are performed after the whole dataset is retrieved into memory into a DataFrame collection. So doing the count in Spark will never be as efficient as it would be directly in TeraData. Sometimes it’s worth it to push some computation into the database by creating views and then mapping those views using the JDBC API.

Every time you use the JDBC driver to access a large table you should specify the partitioning strategy otherwise you will create a DataFrame/RDD with a single partition and you will overload the single JDBC connection.

Instead you want to try the following AI (since Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

There is also an option to push down some filtering.

If you don’t have an uniformly distributed integral column you want to create some custom partitions by specifying custom predicates (where statements). For example let’s suppose you have a timestamp column and want to partition by date ranges:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

It will generate a DataFrame where each partition will contain the records of each subquery associated to the different predicates.

Check the source code at DataFrameReader.scala

Leave a Comment