1954

Thoughts, stories and ideas.

Incremental Fetch Requests in Kafka

Incremental Fetch Requests introduced in Kafka 1.1.0 is a mechanism to reduce Fetch overhead (especially when a client is interested in many partitions e.g. ReplicaFetchers)

cwiki.apache.org

Before KIP-227, all topic partitions had to be enumerated in every Fetch requests and responses.

Let's say a client is interested in tp-X-0, tp-Y-1, tp-Z-1, and tp-Y, tp-Z's message rate is much fewer than tp-X.

Most of time, Fetch requests are completed by tp-X's message production (by fetch.min.bytes), and no data will be returned for tp-Y, tp-Z, so essentially they are not necessary to be included in Fetch responses.

The idea of Incremental Fetch Requests is to include only necessary topic partitions in each requests/responses.

In a nutshell, it works like:

  1. When a client start fetching, it establishes FetchSession with the broker to cache partition information that the client is interested in
  2. The client will omit including partition info in Fetch requests unless it wants to update the fetch offset (after fetching new data)
  3. Similarly, the broker will omit including partition info in Fetch responses unless there's a data to be returned (after some data is produced)

Let's see this by experiment.

Settings:

component port description
broker-1 9091
broker-2 9092
broker-3 9093 leader of test-topic-0
zk 2181

And test-topic has only one partition.

First, start console consumer

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-topic

Check the consumer's port

$ lsof -p $(jps | grep ConsoleConsumer | cut -d' ' -f1) | grep 9093
java    66011 hokada  106u     IPv6             468231       0t0     TCP lima-default:42364->lima-default:9093 (ESTABLISHED)

Take tcpdump of console-consumer

$ sudo tcpdump src port 42364 or dst port 42364 -i any -w dump.pcap
tcpdump: data link type LINUX_SLL2
tcpdump: listening on any, link-type LINUX_SLL2 (Linux cooked v2), snapshot length 262144 bytes

After a while, in other terminal, produce a message

$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic test-topic

End tcpdump and open the dump in wireshark, with decoding as Kafka protocol

As we can see, partition info is not included in Fetch requests/responses,

And only after producing a message, Fetch response returns partition data:

Fetch requests also include partition info to update the fetch offset: