Integrating Spark Structured Streaming with the Confluent Schema Registry

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema … Read more

How to stop INFO messages displaying on spark console?

Edit your conf/log4j.properties file and change the following line: log4j.rootCategory=INFO, console to log4j.rootCategory=ERROR, console Another approach would be to : Start spark-shell and type in the following: import org.apache.log4j.Logger import org.apache.log4j.Level Logger.getLogger(“org”).setLevel(Level.OFF) Logger.getLogger(“akka”).setLevel(Level.OFF) You won’t see any logs after that. Other options for Level include: all, debug, error, fatal, info, off, trace, trace_int, warn Details … Read more

Unpivot in spark-sql/pyspark

You can use the built in stack function, for example in Scala: scala> val df = Seq((“G”,Some(4),2,None),(“H”,None,4,Some(5))).toDF(“A”,”X”,”Y”, “Z”) df: org.apache.spark.sql.DataFrame = [A: string, X: int … 2 more fields] scala> df.show +—+—-+—+—-+ | A| X| Y| Z| +—+—-+—+—-+ | G| 4| 2|null| | H|null| 4| 5| +—+—-+—+—-+ scala> df.select($”A”, expr(“stack(3, ‘X’, X, ‘Y’, Y, ‘Z’, … Read more

How to connect Spark SQL to remote Hive metastore (via thrift protocol) with no hive-site.xml?

For Spark 1.x, you can set with : System.setProperty(“hive.metastore.uris”, “thrift://METASTORE:9083”); final SparkConf conf = new SparkConf(); SparkContext sc = new SparkContext(conf); HiveContext hiveContext = new HiveContext(sc); Or final SparkConf conf = new SparkConf(); SparkContext sc = new SparkContext(conf); HiveContext hiveContext = new HiveContext(sc); hiveContext.setConf(“hive.metastore.uris”, “thrift://METASTORE:9083”); Update If your Hive is Kerberized : Try setting these … Read more

Spark SQL: apply aggregate functions to a list of columns

There are multiple ways of applying aggregate functions to multiple columns. GroupedData class provides a number of methods for the most common functions, including count, max, min, mean and sum, which can be used directly as follows: Python: df = sqlContext.createDataFrame( [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)], (“col1”, “col2”, … Read more

Find maximum row per group in Spark DataFrame

Using join (it will result in more than one row in group in case of ties): import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy(“id_sa”, “id_sb”).agg(count(“*”).alias(“cnt”)).alias(“cnts”) maxs = cnts.groupBy(“id_sa”).agg(F.max(“cnt”).alias(“mx”)).alias(“maxs”) cnts.join(maxs, (col(“cnt”) == col(“mx”)) & (col(“cnts.id_sa”) == col(“maxs.id_sa”)) ).select(col(“cnts.id_sa”), col(“cnts.id_sb”)) Using window functions (will drop ties): from pyspark.sql.functions import row_number from pyspark.sql.window import … Read more

Using a column value as a parameter to a spark DataFrame function

One option is to use pyspark.sql.functions.expr, which allows you to use columns values as inputs to spark-sql functions. Based on @user8371915’s comment I have found that the following works: from pyspark.sql.functions import expr df.select( ‘*’, expr(‘posexplode(split(repeat(“,”, rpt), “,”))’).alias(“index”, “col”) ).where(‘index > 0’).drop(“col”).sort(‘letter’, ‘index’).show() #+——+—+—–+ #|letter|rpt|index| #+——+—+—–+ #| X| 3| 1| #| X| 3| 2| #| … Read more