How to optimize partitioning when migrating data from JDBC source?

Determine how many partitions you need given the amount of input data and your cluster resources. As a rule of thumb it is better to keep partition input under 1GB unless strictly necessary. and strictly smaller than the block size limit. You’ve previously stated that you migrate 1TB of data values you use in different … Read more

Avoid performance impact of a single partition mode in Spark window functions

In practice performance impact will be almost the same as if you omitted partitionBy clause at all. All records will be shuffled to a single partition, sorted locally and iterated sequentially one by one. The difference is only in the number of partitions created in total. Let’s illustrate that with an example using simple dataset … Read more

LINQ Partition List into Lists of 8 members [duplicate]

Use the following extension method to break the input into subsets public static class IEnumerableExtensions { public static IEnumerable<List<T>> InSetsOf<T>(this IEnumerable<T> source, int max) { List<T> toReturn = new List<T>(max); foreach(var item in source) { toReturn.Add(item); if (toReturn.Count == max) { yield return toReturn; toReturn = new List<T>(max); } } if (toReturn.Any()) { yield return … Read more

How to control partition size in Spark SQL

Spark < 2.0: You can use Hadoop configuration options: mapred.min.split.size. mapred.max.split.size as well as HDFS block size to control partition size for filesystem based formats*. val minSplit: Int = ??? val maxSplit: Int = ??? sc.hadoopConfiguration.setInt(“mapred.min.split.size”, minSplit) sc.hadoopConfiguration.setInt(“mapred.max.split.size”, maxSplit) Spark 2.0+: You can use spark.sql.files.maxPartitionBytes configuration: spark.conf.set(“spark.sql.files.maxPartitionBytes”, maxSplit) In both cases these values may not … Read more

How to find all partitions of a set

I’ve found a straightforward recursive solution. First, let’s solve a simpler problem: how to find all partitions consisting of exactly two parts. For an n-element set, we can count an int from 0 to (2^n)-1. This creates every n-bit pattern, with each bit corresponding to one input element. If the bit is 0, we place … Read more

Partitioning in spark while reading from RDBMS via JDBC

If you don’t specify either {partitionColumn, lowerBound, upperBound, numPartitions} or {predicates} Spark will use a single executor and create a single non-empty partition. All data will be processed using a single transaction and reads will be neither distributed nor parallelized. See also: How to optimize partitioning when migrating data from JDBC source? How to improve … Read more

How to define partitioning of DataFrame?

Spark >= 2.3.0 SPARK-22614 exposes range partitioning. val partitionedByRange = df.repartitionByRange(42, $”k”) partitionedByRange.explain // == Parsed Logical Plan == // ‘RepartitionByExpression [‘k ASC NULLS FIRST], 42 // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6] // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 … Read more

How does HashPartitioner work?

Well, lets make your dataset marginally more interesting: val rdd = sc.parallelize(for { x <- 1 to 3 y <- 1 to 2 } yield (x, None), 8) We have six elements: rdd.count Long = 6 no partitioner: rdd.partitioner Option[org.apache.spark.Partitioner] = None and eight partitions: rdd.partitions.length Int = 8 Now lets define small helper to … Read more