1954

Thoughts, stories and ideas.

KIP-392 Follower fetchingによるfetch requestの増加について

Kafka 2.4.0でfollower fetchingと呼ばれる機能が入りました。

cwiki.apache.org

これにより、consumerはleader replicaのみではなくfollower replicaからデータを読み出すことが可能となります。

Follower fetchingは、たとえばデータセンターをまたいでreplicaを配置している場合に有効で、consumerと同一DCにあるreplicaから読み出すようにすれば高コストなDC間通信を削減することができます。

Consumerに対するreplicaの割当ては、ReplicaSelector interfaceを実装することで行います。

replica.selector.class broker configurationで実装を指定でき、デフォルトはLeaderSelectorです。(無条件にleader replicaを選ぶ。2.4以前の挙動と同一)

Consumerのrack情報を利用してreplicaを決定する、RackAwareReplicaSelectorもビルトインで提供されます。

High watermark propagation

Partitionのhigh watermark(以下HW)とは、全てのin-sync replicaにreplicationが完了している最新のoffsetを指します。

Consumerが読み出すことができるのはHWまでとなります。(Leaderがもっと新しいmessageを受け取っていても、replicationが完了するまでconsumerには見えない)

もし未replicateのmessageを読み出せた場合、以下のような不整合が起こるためです。

- replication factor = 3, isr = [0, 1, 2], leader = 0
- consumer group A, Bがconsumeしている

として、

timestamp event
t0 offset 55301のmessageをproduce
t1 - consumer group Aがoffset 55301のmessageをconsume
- follower replicaおよびconsumer group Bはまだ55301をfetchしていないとする
t2 - broker 0がクラッシュ
- broker 1が新たなleaderになる

=> 55301はlostするにもかかわらずAによってだけ処理された状態となる。

したがってfollower fetchingにおいても同様に、consume可能なmessageをHWまでに制限する必要があります。

ここで、followerへのHWの伝搬はfetch responseを通して行われるため、そのままだとproduceしたmessageをconsumeするまでのlatencyに最大でfetch response latencyが加算されることになります。

どういうことかというと、

- partitionのproduction rateは低い(1 request / 1 sec)
- replication factor = 3, isr = [0, 1, 2], leader = 0
- replica.fetch.max.wait.ms = 500, replica.fetch.min.bytes = 1
- consumerはleader(0)でなく1からconsumeしている

としたとき、

timestamp (millisec) event
- broker 1, 2はleaderにfetch requestを送信済みでpurgatoryに入っており、条件の満足を待っているとする
t0 - offset 55301のmessageをproduce
- fetch条件の満足により、broker 1, 2にfetch responseが返る
- 同時に、HWが55301に更新される
t0 + 1 - broker 1, 2は次のfetch requestを送信するが、この時点では条件を満足しないためpurgatoryに入る
t0 + 501 - max.waitの経過により、fetch responseが返る。これによりHWが伝搬し、broker 1から55301をconsume可能となる

Leaderからconsumeしていたらt0の時点でmessageが取得可能だったのに対し、followerへのHWの伝搬を待つため500 millisecのlatencyが加算されます。

これに対処するため、KIP-392の実装にともない、fetch responseの完了条件に「followerの持っているHW情報がstale」であることを含むようになりました。

github.com

これにより、上記の例でいうとt0 + 1の時点で即座にbroker 1にfetch responseが返り、HWが伝搬するようになります。

KAFKA-9731

問題は、この方法だとfetch requestが倍増してしまうことです。

上記の例で、stale HWをfetch response条件に含めた時に何回のfetch requestが発行されるか数えてみましょう。

timestamp (millisec) event
t0 - offset 55301のmessageをproduce
- fetch条件の満足により、broker 1, 2にfetch responseが返る
- 同時に、HWが55301に更新される
t0 + 1 - broker 1, 2は次のfetch requestを送信する
- HWがstaleであるため即座にresponseが返る
t0 + 2 - broker 1, 2は再度fetch requestを送る
- これは条件を満足しないためblockする

一度のproduce requestに対し、2倍のfetch requestが発生していることがわかります。

issues.apache.org

この問題はKAFKA-9731として報告され、以下のように修正されました。

Conclusion

  • Kafka 2.4.0, 2.4.1では、fetch requestが大幅に増加します。(2.5.0もっぽい?)
  • Kafka 2.6.0で修正されるが、follower fetchを使う際はleaderから読む場合に対してdelivery latencyが増えます