Kafka consumer's cooperative rebalancing explained
Introduction
Kafkaにおいて、同一のgroup.id
を持つKafka consumerの集合をconsumer groupと呼び、各groupは特定のtopicをsubscribeする。
group内の各consumer(memberと呼ぶ)にはtopicのpartitionが分散して割り当てられ、それぞれ割り当てられたpartitionのmessageのみを処理することで、分散処理を実現する。
またgroup内のあるconsumerが故障した際は、そのconsumerの受け持っていたpartitionは他のconsumerにreassignされる。つまり、groupはconsumerの冗長化を成す単位でもある。
このような、group内のmembershipに変更に伴うpartitionのreassignをrebalanceと呼ぶ。
この記事で解説するcooperative rebalancingは、このrebalanceについてよく知られた非効率性の改善を目的としてKafka 2.4で導入されたものである。
cooperative rebalancingについてはこちらも参考にされたい:
How eager rebalancing works
cooperative rebalancingに対して、従来のrebalanceの仕組みをeager rebalancingと呼ぶ。
まずは、以下のシナリオを通してeager rebalancingがどのように動作するかを見ていく。
- 初期状態は以下
topic-foo
は3つのpartitionp0,p1,p2
を持つgroup.id = g
のgroupに、2つのconsumerc0, c1
が属しているg
はtopic-foo
をsubscribeしており、partition assignmentはc0[p0,p1], c1[p2]
となっている
g
に新たなconsumerc2
を追加する
eager, cooperativeともに、rebalanceは主にJoinGroup/SyncGroupという二つのKafka requestを通じて進行する。
eagerにおけるrebalanceは以下のようなフローとなる。
- c2を起動すると、c2はgroup coordinator(memberの死活監視やJoinGroup/SyncGroupのhandlingといった、consumer group管理を担うKafka broker。
group.id
のhash値により決まる)に対してJoinGroupを送る - これを受けて、
g
はrebalance中(正確にはPreparingRebalance
state)に移行する c0,c1
はHeartBeatを通してg
がrebalance中であることを知り、自身の受け持っているpartitionを引数としてonPartitionsRevoked
を実行し、partitionをいったん手放すc0,c1
がJoinGroupを送る- group coordinatorは、すべてのmemberからのJoinGroupが到着したことをもって、JoinGroup responseを返す
- JoinGroup responseを受け取った各memberは次に、SyncGroupをgroup coordinatorに送る
- この中で
leader
と呼ばれるmemberは、JoinGroup responseに含まれた最終的なmember listをもとに各memberに対するassignmentを決定し、SyncGroupに含める
- この中で
- group coordinatorはSyncGroup responseを返す。この時点でcoordinatorから見た
g
のrebalanceは完了する。 - SyncGroup responseを通して各memberはpartition assignmentを受け取り、
onPartitionsAssigned
を実行し、処理を開始する。
問題はstep 3で、全てのmemberが自身の持っているpartitionを手放すことにある。
このとき実行されるonPartitionRevoked
callbackでは、典型的にはconsumerが持っているstateを永続化したり、offsetをcommitしたりといったclean up処理を行う。KafkaStreamsも同様であり、特にstatefulなアプリケーションではclean upに長時間かかることがある。
そしてstep 3からstep 8の間は新規のmessageを処理できないため、realtime性が重要なアプリケーションでは致命的な影響となる。
How cooperative rebalancing works
先のeager rebalancingではstep 3において全てのmemberが全てのpartitionを手放していたが、これを最適化できないか考えてみる。
今回のシナリオではpartitionが3つあり最終的にはconsumerが3台となるため、分散の観点からいえば、最終的に各consumerに1つのpartitionがassignされればよい。
つまり、partitionの移動を最小化しようと思うと、初期状態からp1(またはp0)のみをc2に移動させればよいはずだ。
つまり理想的にはstep 3でc0
がp1
のみを手放せばよく、その他のpartitionに関しては処理を継続できるはずである。
ここで問題となるのは、step 3の時点ではどのpartitionを手放せばよいか分からない点にある。
上記のeager rebalancingの例で見た通り、最終的なmembershipはJoinGroup responseによって通知され、それをもとにleaderはassignmentを決定し、それがSyncGroup responseによって通知される。
したがって、step 3の時点ではc0
がp1
のみを手放せばよいことは分からない。
これらを、cooperative rebalancingでは以下のように解決する。
- JoinGroup送信前ではなく、SyncGroup responseを受け取ってassignmentが確定した後に手放すようにする
- そして手放したあとにもう一度rebalanceをトリガーする
図にするとこのようになる。
つまり今回のシナリオのようなmember追加の場合、rebalanceは2回発生する。(JoinGroup/SyncGroupのラウンドトリップも2回発生する)
しかし2回目のrebalanceはすでに手放した後のpartitionを新規memberにassignするだけなので、十分コストは低いと期待できる。
Conclusion
consumer groupにmemberを追加するシナリオを通して、eagerとcooperative rebalancingの動作の違いを見てきた。
特に以下の点は注意したい。
onPartitionsRevoked
の実行タイミングがSyncGroup受信後に移った- partitionを手放すためのrebalanceと、手放したpartitionをassignするrebalanceの2回のrebalanceが発生する