How to use JDBC source to write and read data in (Py)Spark?

Writing data Include applicable JDBC driver when you submit the application or start shell. You can use for example –packages: bin/pyspark –packages group:name:version or combining driver-class-path and jars bin/pyspark –driver-class-path $PATH_TO_DRIVER_JAR –jars $PATH_TO_DRIVER_JAR These properties can be also set using PYSPARK_SUBMIT_ARGS environment variable before JVM instance has been started or using conf/spark-defaults.conf to set spark.jars.packages … Read more

How to run independent transformations in parallel using PySpark?

Just use threads and make sure that cluster have enough resources to process both tasks at the same time. from threading import Thread import time def process(rdd, f): def delay(x): time.sleep(1) return f(x) return rdd.map(delay).sum() rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2)) t1 = Thread(target=process, args=(rdd, lambda x: x * 2)) t2 = Thread(target=process, args=(rdd, lambda … Read more

Unpivot in spark-sql/pyspark

You can use the built in stack function, for example in Scala: scala> val df = Seq((“G”,Some(4),2,None),(“H”,None,4,Some(5))).toDF(“A”,”X”,”Y”, “Z”) df: org.apache.spark.sql.DataFrame = [A: string, X: int … 2 more fields] scala> df.show +—+—-+—+—-+ | A| X| Y| Z| +—+—-+—+—-+ | G| 4| 2|null| | H|null| 4| 5| +—+—-+—+—-+ scala> df.select($”A”, expr(“stack(3, ‘X’, X, ‘Y’, Y, ‘Z’, … Read more

How to find median and quantiles using Spark

Ongoing work SPARK-30569 – Add DSL functions invoking percentile_approx Spark 2.0+: You can use approxQuantile method which implements Greenwald-Khanna algorithm: Python: df.approxQuantile(“x”, [0.5], 0.25) Scala: df.stat.approxQuantile(“x”, Array(0.5), 0.25) where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation. Since Spark 2.2 (SPARK-14352) it supports estimation … Read more

How to split Vector into columns – using PySpark

Spark >= 3.0.0 Since Spark 3.0.0 this can be done without using UDF. from pyspark.ml.functions import vector_to_array (df .withColumn(“xs”, vector_to_array(“vector”))) .select([“word”] + [col(“xs”)[i] for i in range(3)])) ## +——-+—–+—–+—–+ ## | word|xs[0]|xs[1]|xs[2]| ## +——-+—–+—–+—–+ ## | assert| 1.0| 2.0| 3.0| ## |require| 0.0| 2.0| 0.0| ## +——-+—–+—–+—–+ Spark < 3.0.0 One possible approach is to … Read more

How to add a constant column in a Spark DataFrame?

Spark 2.2+ Spark 2.2 introduces typedLit to support Seq, Map, and Tuples (SPARK-19254) and following calls should be supported (Scala): import org.apache.spark.sql.functions.typedLit df.withColumn(“some_array”, typedLit(Seq(1, 2, 3))) df.withColumn(“some_struct”, typedLit((“foo”, 1, 0.3))) df.withColumn(“some_map”, typedLit(Map(“key1” -> 1, “key2” -> 2))) Spark 1.3+ (lit), 1.4+ (array, struct), 2.0+ (map): The second argument for DataFrame.withColumn should be a Column so … Read more

Find maximum row per group in Spark DataFrame

Using join (it will result in more than one row in group in case of ties): import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy(“id_sa”, “id_sb”).agg(count(“*”).alias(“cnt”)).alias(“cnts”) maxs = cnts.groupBy(“id_sa”).agg(F.max(“cnt”).alias(“mx”)).alias(“maxs”) cnts.join(maxs, (col(“cnt”) == col(“mx”)) & (col(“cnts.id_sa”) == col(“maxs.id_sa”)) ).select(col(“cnts.id_sa”), col(“cnts.id_sb”)) Using window functions (will drop ties): from pyspark.sql.functions import row_number from pyspark.sql.window import … Read more