How to get Kafka offsets for structured query for manual and reliable offset management?

Spark 2.2 introduced a Kafka’s structured streaming source. As I understand, it’s relying on HDFS checkpoint dir to store offsets and guarantee an “exactly-once” message delivery. Correct. Every trigger Spark Structured Streaming will save offsets to offset directory in the checkpoint location (defined using checkpointLocation option or spark.sql.streaming.checkpointLocation Spark property or randomly assigned) that is … Read more

Why does starting a streaming query lead to “ExitCodeException exitCode=-1073741515”?

Actually, I had the same problem while running Spark unit tests on my local machine. It was caused by the failing WinUtils.exe in %HADOOP_HOME% folder: Input: %HADOOP_HOME%\bin\winutils.exe chmod 777 %SOME_TEMP_DIRECTORY% Output: winutils.exe – System Error The code execution cannot proceed because MSVCR100.dll was not found. Reinstalling the program may fix this problem. After some surfing … Read more

Why does format(“kafka”) fail with “Failed to find data source: kafka.” (even with uber-jar)?

kafka data source is an external module and is not available to Spark applications by default. You have to define it as a dependency in your pom.xml (as you have done), but that’s just the very first step to have it in your Spark application. <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> With that dependency you have … Read more

How to manually set group.id and commit kafka offsets in spark structured streaming?

[*] tl;dr It is not possible to commit any messages to Kafka. Starting with Spark version 3.x you can define the name of the Kafka consumer group, however, this still does not allow you to commit any messages. Since Spark 3.0.0 According to the Structured Kafka Integration Guide you can provide the ConsumerGroup as an … Read more

How to read records in JSON format from Kafka using Structured Streaming?

From the Spark perspective value is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first. If data is serialized as a JSON string you have two options. You can cast value to StringType and use from_json and … Read more

Spark Strutured Streaming automatically converts timestamp to local time

For me it worked to use: spark.conf.set(“spark.sql.session.timeZone”, “UTC”) It tells the spark SQL to use UTC as a default timezone for timestamps. I used it in spark SQL for example: select *, cast(‘2017-01-01 10:10:10’ as timestamp) from someTable I know it does not work in 2.0.1. but works in Spark 2.2. I used in SQLTransformer … Read more

Integrating Spark Structured Streaming with the Confluent Schema Registry

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema … Read more