1954

Thoughts, stories and ideas.

Kafka consumer's cooperative rebalancing explained

Introduction

Kafkaにおいて、同一のgroup.idを持つKafka consumerの集合をconsumer groupと呼び、各groupは特定のtopicをsubscribeする。

group内の各consumer(memberと呼ぶ)にはtopicのpartitionが分散して割り当てられ、それぞれ割り当てられたpartitionのmessageのみを処理することで、分散処理を実現する。

またgroup内のあるconsumerが故障した際は、そのconsumerの受け持っていたpartitionは他のconsumerにreassignされる。つまり、groupはconsumerの冗長化を成す単位でもある。

このような、group内のmembershipに変更に伴うpartitionのreassignをrebalanceと呼ぶ。

この記事で解説するcooperative rebalancingは、このrebalanceについてよく知られた非効率性の改善を目的としてKafka 2.4で導入されたものである。

cooperative rebalancingについてはこちらも参考にされたい:

www.confluent.io

How eager rebalancing works

cooperative rebalancingに対して、従来のrebalanceの仕組みをeager rebalancingと呼ぶ。

まずは、以下のシナリオを通してeager rebalancingがどのように動作するかを見ていく。

  • 初期状態は以下
    • topic-fooは3つのpartition p0,p1,p2を持つ
    • group.id = gのgroupに、2つのconsumer c0, c1が属している
    • gtopic-fooをsubscribeしており、partition assignmentは c0[p0,p1], c1[p2]となっている
  • gに新たなconsumer c2を追加する

eager, cooperativeともに、rebalanceは主にJoinGroup/SyncGroupという二つのKafka requestを通じて進行する。

eagerにおけるrebalanceは以下のようなフローとなる。

  1. c2を起動すると、c2はgroup coordinator(memberの死活監視やJoinGroup/SyncGroupのhandlingといった、consumer group管理を担うKafka broker。group.idのhash値により決まる)に対してJoinGroupを送る
  2. これを受けて、gはrebalance中(正確にはPreparingRebalance state)に移行する
  3. c0,c1はHeartBeatを通してgがrebalance中であることを知り、自身の受け持っているpartitionを引数としてonPartitionsRevokedを実行し、partitionをいったん手放す
  4. c0,c1がJoinGroupを送る
  5. group coordinatorは、すべてのmemberからのJoinGroupが到着したことをもって、JoinGroup responseを返す
  6. JoinGroup responseを受け取った各memberは次に、SyncGroupをgroup coordinatorに送る
    • この中でleaderと呼ばれるmemberは、JoinGroup responseに含まれた最終的なmember listをもとに各memberに対するassignmentを決定し、SyncGroupに含める
  7. group coordinatorはSyncGroup responseを返す。この時点でcoordinatorから見たgのrebalanceは完了する。
  8. SyncGroup responseを通して各memberはpartition assignmentを受け取り、onPartitionsAssignedを実行し、処理を開始する。

問題はstep 3で、全てのmemberが自身の持っているpartitionを手放すことにある。

このとき実行されるonPartitionRevoked callbackでは、典型的にはconsumerが持っているstateを永続化したり、offsetをcommitしたりといったclean up処理を行う。KafkaStreamsも同様であり、特にstatefulなアプリケーションではclean upに長時間かかることがある。

そしてstep 3からstep 8の間は新規のmessageを処理できないため、realtime性が重要なアプリケーションでは致命的な影響となる。

How cooperative rebalancing works

先のeager rebalancingではstep 3において全てのmemberが全てのpartitionを手放していたが、これを最適化できないか考えてみる。

今回のシナリオではpartitionが3つあり最終的にはconsumerが3台となるため、分散の観点からいえば、最終的に各consumerに1つのpartitionがassignされればよい。

つまり、partitionの移動を最小化しようと思うと、初期状態からp1(またはp0)のみをc2に移動させればよいはずだ。

つまり理想的にはstep 3でc0p1のみを手放せばよく、その他のpartitionに関しては処理を継続できるはずである。

ここで問題となるのは、step 3の時点ではどのpartitionを手放せばよいか分からない点にある。

上記のeager rebalancingの例で見た通り、最終的なmembershipはJoinGroup responseによって通知され、それをもとにleaderはassignmentを決定し、それがSyncGroup responseによって通知される。

したがって、step 3の時点ではc0p1のみを手放せばよいことは分からない。

これらを、cooperative rebalancingでは以下のように解決する。

  • JoinGroup送信前ではなく、SyncGroup responseを受け取ってassignmentが確定した後に手放すようにする
  • そして手放したあとにもう一度rebalanceをトリガーする

図にするとこのようになる。

つまり今回のシナリオのようなmember追加の場合、rebalanceは2回発生する。(JoinGroup/SyncGroupのラウンドトリップも2回発生する)

しかし2回目のrebalanceはすでに手放した後のpartitionを新規memberにassignするだけなので、十分コストは低いと期待できる。

Conclusion

consumer groupにmemberを追加するシナリオを通して、eagerとcooperative rebalancingの動作の違いを見てきた。

特に以下の点は注意したい。

  • onPartitionsRevokedの実行タイミングがSyncGroup受信後に移った
  • partitionを手放すためのrebalanceと、手放したpartitionをassignするrebalanceの2回のrebalanceが発生する