How to manually set group.id and commit kafka offsets in spark structured streaming?

[*] tl;dr It is not possible to commit any messages to Kafka. Starting with Spark version 3.x you can define the name of the Kafka consumer group, however, this still does not allow you to commit any messages. Since Spark 3.0.0 According to the Structured Kafka Integration Guide you can provide the ConsumerGroup as an … Read more

Use collect_list and collect_set in Spark SQL

Spark 2.0+: SPARK-10605 introduced native collect_list and collect_set implementation. SparkSession with Hive support or HiveContext are no longer required. Spark 2.0-SNAPSHOT (before 2016-05-03): You have to enable Hive support for a given SparkSession: In Scala: val spark = SparkSession.builder .master(“local”) .appName(“testing”) .enableHiveSupport() // <- enable Hive support. .getOrCreate() In Python: spark = (SparkSession.builder .enableHiveSupport() .getOrCreate()) … Read more

Adding a group count column to a PySpark dataframe

When you do a groupBy(), you have to specify the aggregation before you can display the results. For example: import pyspark.sql.functions as f data = [ (‘a’, 5), (‘a’, 8), (‘a’, 7), (‘b’, 1), ] df = sqlCtx.createDataFrame(data, [“x”, “y”]) df.groupBy(‘x’).count().select(‘x’, f.col(‘count’).alias(‘n’)).show() #+—+—+ #| x| n| #+—+—+ #| b| 1| #| a| 3| #+—+—+ Here … Read more

Spark MLlib LDA, how to infer the topics distribution of a new unseen document?

As of Spark 1.5 this functionality has not been implemented for the DistributedLDAModel. What you’re going to need to do is convert your model to a LocalLDAModel using the toLocal method and then call the topicDistributions(documents: RDD[(Long, Vector]) method where documents are the new (i.e. out-of-training) documents, something like this: newDocuments: RDD[(Long, Vector)] = … … Read more

Pyspark : forward fill with last observation for a DataFrame

Another workaround to get this working, is to try something like this: from pyspark.sql import functions as F from pyspark.sql.window import Window window = ( Window .partitionBy(‘cookie_id’) .orderBy(‘Time’) .rowsBetween(Window.unboundedPreceding, Window.currentRow) ) final = ( joined .withColumn(‘UserIDFilled’, F.last(‘User_ID’, ignorenulls=True).over(window)) ) So what this is doing is that it constructs your window based on the partition key … Read more