java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ while running TwitterPopularTags

Thank you for giving your suggestion. I was able to resolve this issue by using SBT assembly only. Following is the details regarding how I did this. Spark – Already present in Cloudera VM Scala – Not sure if this is present in Cloudera, if not we can install it SBT – This also needs … Read more

Spark DataFrame: does groupBy after orderBy maintain that order?

groupBy after orderBy doesn’t maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself … Read more

How to write spark streaming DF to Kafka topic

Yes, unfortunately Spark (1.x, 2.x) doesn’t make it straight-forward how to write to Kafka in an efficient manner. I’d suggest the following approach: Use (and re-use) one KafkaProducer instance per executor process/JVM. Here’s the high-level setup for this approach: First, you must “wrap” Kafka’s KafkaProducer because, as you mentioned, it is not serializable. Wrapping it … Read more

How can I update a broadcast variable in spark streaming?

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl public class BroadcastWrapper { private Broadcast<ReferenceData> broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) … Read more

Spark Dataframe validating column names for parquet writes

For everyone experiencing this in pyspark: this even happened to me after renaming the columns. One way I could get this to work after some iterations is this: file = “/opt/myfile.parquet” df = spark.read.parquet(file) for c in df.columns: df = df.withColumnRenamed(c, c.replace(” “, “”)) df = spark.read.schema(df.schema).parquet(file)