Incremental Fetch Requests in Kafka
Incremental Fetch Requests introduced in Kafka 1.1.0 is a mechanism to reduce Fetch overhead (especially when a client is interested in many partitions e.g. ReplicaFetchers)
Before KIP-227, all topic partitions had to be enumerated in every Fetch requests and responses.
Let's say a client is interested in tp-X-0, tp-Y-1, tp-Z-1, and tp-Y, tp-Z's message rate is much fewer than tp-X.
Most of time, Fetch requests are completed by tp-X's message production (by fetch.min.bytes
), and no data will be returned for tp-Y, tp-Z, so essentially they are not necessary to be included in Fetch responses.
The idea of Incremental Fetch Requests is to include only necessary topic partitions in each requests/responses.
In a nutshell, it works like:
- When a client start fetching, it establishes FetchSession with the broker to cache partition information that the client is interested in
- The client will omit including partition info in Fetch requests unless it wants to update the fetch offset (after fetching new data)
- Similarly, the broker will omit including partition info in Fetch responses unless there's a data to be returned (after some data is produced)
Let's see this by experiment.
Settings:
component | port | description |
---|---|---|
broker-1 | 9091 | |
broker-2 | 9092 | |
broker-3 | 9093 | leader of test-topic-0 |
zk | 2181 |
And test-topic has only one partition.
First, start console consumer
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-topic
Check the consumer's port
$ lsof -p $(jps | grep ConsoleConsumer | cut -d' ' -f1) | grep 9093 java 66011 hokada 106u IPv6 468231 0t0 TCP lima-default:42364->lima-default:9093 (ESTABLISHED)
Take tcpdump of console-consumer
$ sudo tcpdump src port 42364 or dst port 42364 -i any -w dump.pcap tcpdump: data link type LINUX_SLL2 tcpdump: listening on any, link-type LINUX_SLL2 (Linux cooked v2), snapshot length 262144 bytes
After a while, in other terminal, produce a message
$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic test-topic
End tcpdump and open the dump in wireshark, with decoding as Kafka protocol
As we can see, partition info is not included in Fetch requests/responses,
And only after producing a message, Fetch response returns partition data:
Fetch requests also include partition info to update the fetch offset: