You are branching the query plan: from the same ds1 you are trying to:
ds1.collect.foreach(...)
ds1.writeStream.format(...){...}
But you are only calling .start()
on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.
The solution is to start both branches and await termination.
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
val query1 = ds1.collect.foreach(println)
.writeStream
.format("console")
.start()
val query2 = ds1.writeStream
.format("console")
.start()
ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()