How can I update a broadcast variable in spark streaming?

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

Your code would look like :

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

This worked for me on multi-cluster as well.
Hope this helps

Leave a Comment