How to decode/deserialize Avro with Python from Kafka

If you use Confluent Schema Registry and want to deserialize avro messages, just add message_bytes.seek(5) to the decode function, since Confluent adds 5 extra bytes before the typical avro-formatted data. def decode(msg_value): message_bytes = io.BytesIO(msg_value) message_bytes.seek(5) decoder = BinaryDecoder(message_bytes) event_dict = reader.read(decoder) return event_dict

Pyspark 2.4.0, read avro from kafka with read stream – Python

You can include spark-avro package, for example using –packages (adjust versions to match spark installation): bin/pyspark –packages org.apache.spark:spark-avro_2.11:2.4.0 and provide your own wrappers: from pyspark.sql.column import Column, _to_java_column def from_avro(col, jsonFormatSchema): sc = SparkContext._active_spark_context avro = sc._jvm.org.apache.spark.sql.avro f = getattr(getattr(avro, “package$”), “MODULE$”).from_avro return Column(f(_to_java_column(col), jsonFormatSchema)) def to_avro(col): sc = SparkContext._active_spark_context avro = sc._jvm.org.apache.spark.sql.avro f = … Read more

How to fix Expected start-union. Got VALUE_NUMBER_INT when converting JSON to Avro on the command line?

According to the explanation by Doug Cutting, Avro’s JSON encoding requires that non-null union values be tagged with their intended type. This is because unions like [“bytes”,”string”] and [“int”,”long”] are ambiguous in JSON, the first are both encoded as JSON strings, while the second are both encoded as JSON numbers. http://avro.apache.org/docs/current/spec.html#json_encoding Thus your record must … Read more

Integrating Spark Structured Streaming with the Confluent Schema Registry

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema … Read more