A pitfall in Kafka partition splitting when auto.offset.reset = latest
Kafka 2.7.0現在、consumerのauto.offset.reset configはlatestがデフォルトとなっています。
これは、consumer groupがあるpartitionをsubscribeするとき、commit済みoffsetが存在しない場合(consumerの初回デプロイ時や、offsets.retention.minutes以上の期間consumerを起動していなかった場合などが該当します)、consume開始位置をlog end offsetにセットするというものです。
通常は1) consumer 2) producerの手順でデプロイしますので、これで問題ありません。(逆の場合、consumerが起動するまでの間にproduceされたmessageは処理されないことになります)
ここで、サービスを開始してしばらくしてpartitionを途中で増やしたくなったとしましょう。
これによりkeyのpartitionへの割り当てが変わるため、一時的にkey単位のmessage localityが崩れることはよく知られていますが、consumerをauto.offset.reset = latestに設定している場合、さらに一部messageのconsumeが漏れる可能性があるという落とし穴があります。
これは以下のようにして起こります。
- topic-Xのpartition countを1 -> 2にsplit
- producerがmetadataをrefreshし、partitionが追加されたことを知る。新しく追加されたpartition-1へのproduceを開始
- consumerがmetadataをrefreshし、consumer rebalanceが必要であることを知る。新しく追加されたpartition-1をconsume開始
- partition-1にcommitされたoffsetはまだ存在しないため、開始位置をlog end offsetにセットする。
2と3のどちらが先に発生するかはタイミング依存です。(metadata.max.age.msや、ちょうどmetadataが更新されるような事象(brokerが死ぬとか)が起きたとしてどちらが先に検出するか、など)
もし2 => 3の間にproducerがある程度のmessageをpartition-1にproduceしていた場合、いくつかのmessageはconsumerに届かないことになります。
Demo
さて、実際にpartition splitするタイミングでdelivery lostするかどうか試してみましょう。
これは、以下のような仕様の小さなproducer/consumer applicationです。
- producerは、1秒間隔でmessageをproduceする。
- messageにはmonotonic integerをidとして与える。produce完了したら、標準出力にidを出す。
- consumerは、consumeしたら標準出力にidを出す。
partitionを一つだけ持ったdemo-topic
というtopicを作って、アプリケーションを動かしてみます。
% ./gradlew run [ID=0] sent. tp=demo-topic-0[offset=0] [ID=0] received. [ID=1] sent. tp=demo-topic-0[offset=1] [ID=1] received. [ID=2] sent. tp=demo-topic-0[offset=2] [ID=2] received. [ID=3] sent. tp=demo-topic-0[offset=3] [ID=3] received. [ID=4] sent. tp=demo-topic-0[offset=4] [ID=4] received. [ID=5] sent. tp=demo-topic-0[offset=5] [ID=5] received. ...
messageが正しく届いている限り、このようにsentとreceiveがセットで出力されます。
さて、途中でpartitionを1 -> 2に増やしてみましょう。
... [ID=16] sent. tp=demo-topic-1[offset=0] [ID=17] sent. tp=demo-topic-0[offset=16] [ID=17] received. [ID=18] sent. tp=demo-topic-1[offset=1] [ID=19] sent. tp=demo-topic-1[offset=2] [ID=20] sent. tp=demo-topic-0[offset=17] [ID=20] received. ...
ある時点からproducerはpartition-1へのproduceを開始しましたが、consumerに届いていないことがわかります。 もう少し待つと、
... [ID=28] sent. tp=demo-topic-1[offset=5] [ID=28] received. [ID=29] sent. tp=demo-topic-1[offset=6] [ID=29] received. [ID=30] sent. tp=demo-topic-0[offset=23] [ID=30] received. ...
offset=5からconsumeされるようになりましたが、offset=4まではdelivery lostしてしまいました。
今回の検証コードではconsumerに長いmetadata.max.age.msを設定して容易に事象が発生するようにしたのですが、たとえ同じあるいはconsumerのほうを短くしていたとしても、metadataの更新には様々な契機が存在するため、auto.offset.reset = latestで有る限り同様の事象は起こり得ます。
Conclusion
consumerのauto.offset.resetをlatestにしている場合、partitionをsplitするタイミングでdelivery lostするケースがあることを確認しました。
後からpartitionを増やす可能性があり、かつdelivery lossが許容できないようなサービスの場合はauto.offset.reset = earliestに設定しておく必要がありそうです。