How to change the number of replicas of a Kafka topic?

To increase the number of replicas for a given topic you have to: 1. Specify the extra replicas in a custom reassignment json file For example, you could create increase-replication-factor.json and put this content in it: {“version”:1, “partitions”:[ {“topic”:”signals”,”partition”:0,”replicas”:[0,1,2]}, {“topic”:”signals”,”partition”:1,”replicas”:[0,1,2]}, {“topic”:”signals”,”partition”:2,”replicas”:[0,1,2]} ]} 2. Use the file with the –execute option of the kafka-reassign-partitions tool [or … Read more

Kafka how to read from __consumer_offsets topic

I came across this question when trying to also consume from the __consumer_offsets topic. I managed to figure it out for different Kafka versions and thought I’d share what I’d found For Kafka 0.8.2.x Note: This uses Zookeeper connection #Create consumer config echo “exclude.internal.topics=false” > /tmp/consumer.config #Consume all offsets ./ –consumer.config /tmp/consumer.config \ –formatter “kafka.server.OffsetManager\$OffsetsMessageFormatter” … Read more

Increase the number of messages read by a Kafka consumer in a single poll

You can increase Consumer poll() batch size by increasing max.partition.fetch.bytes, but still as per documentation it has limitation with fetch.max.bytes which also need to be increased with required batch size. And also from the documentation there is one other property message.max.bytes in Topic config and Broker config to restrict the batch size. so one way … Read more

How to commit manually with Kafka Stream?

Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit — in fact, auto-commit is disabled for the internally used consumer and Streams manages commits “manually”. The reason is, that commits can only happen at certain points … Read more

How does an offset expire for an Apache Kafka consumer group?

Update Since Apache Kafka 2.1, offsets won’t be deleted as long as the consumer group is active, independent if the consumers commit offsets or not, ie, the offset.retention.minutes clocks only starts to tick when the group becomes empty (in older released, the clock started to tick directly when the commit happened). Cf. Original Answer … Read more

Is there a way to purge the topic in Kafka?

Temporarily update the retention time on the topic to one second: \ –zookeeper <zkhost>:2181 \ –alter \ –topic <topic name> \ –config And in newer Kafka releases, you can also do it with kafka-configs –entity-type topics \ –zookeeper <zkhost>:2181 \ –entity-type topics \ –alter \ –entity-name <topic name> \ –add-config then … Read more

Difference between and for Kafka >= 0.10.1

Before KIP-62, there is only (ie, Kafka 0.10.0 and earlier). is introduced via KIP-62 (part of Kafka 0.10.1). KIP-62, decouples heartbeats from calls to poll() via a background heartbeat thread, allowing for a longer processing time (ie, time between two consecutive poll()) than heartbeat interval. Assume processing a message takes 1 minute. If … Read more