Is it possible to access message headers with Kafka Streams?

Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).

You can access record metadata via the Processor API (ie, via transform(), transformValues(), or process()), by the given “context” object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

Update

As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor class with process(Record) instead of process(K, V) method. For this case, headers (and record metadata) are accessible via the Record class).

This new feature is not yet available in “PAPI method of the DSL though (eg. KStream#process(), KStream#transform() and siblings).

+++++

Prior to 2.0, the context only exposes topic, partition, offset, and timestamp—but not headers that are in fact dropped by Streams on read in those older versions.

Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.

Leave a Comment