Increase the number of messages read by a Kafka consumer in a single poll

You can increase Consumer poll() batch size by increasing max.partition.fetch.bytes, but still as per documentation it has limitation with fetch.max.bytes which also need to be increased with required batch size. And also from the documentation there is one other property message.max.bytes in Topic config and Broker config to restrict the batch size. so one way … Read more

How to commit manually with Kafka Stream?

Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit — in fact, auto-commit is disabled for the internally used consumer and Streams manages commits “manually”. The reason is, that commits can only happen at certain points … Read more

How does an offset expire for an Apache Kafka consumer group?

Update Since Apache Kafka 2.1, offsets won’t be deleted as long as the consumer group is active, independent if the consumers commit offsets or not, ie, the offset.retention.minutes clocks only starts to tick when the group becomes empty (in older released, the clock started to tick directly when the commit happened). Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets Original Answer … Read more

Is there a way to purge the topic in Kafka?

Temporarily update the retention time on the topic to one second: kafka-topics.sh \ –zookeeper <zkhost>:2181 \ –alter \ –topic <topic name> \ –config retention.ms=1000 And in newer Kafka releases, you can also do it with kafka-configs –entity-type topics kafka-configs.sh \ –zookeeper <zkhost>:2181 \ –entity-type topics \ –alter \ –entity-name <topic name> \ –add-config retention.ms=1000 then … Read more

Difference between session.timeout.ms and max.poll.interval.ms for Kafka >= 0.10.1

Before KIP-62, there is only session.timeout.ms (ie, Kafka 0.10.0 and earlier). max.poll.interval.ms is introduced via KIP-62 (part of Kafka 0.10.1). KIP-62, decouples heartbeats from calls to poll() via a background heartbeat thread, allowing for a longer processing time (ie, time between two consecutive poll()) than heartbeat interval. Assume processing a message takes 1 minute. If … Read more