1954

Thoughts, stories and ideas.

A pitfall in Kafka partition splitting when auto.offset.reset = latest

Kafka 2.7.0現在、consumerのauto.offset.reset configはlatestがデフォルトとなっています。

これは、consumer groupがあるpartitionをsubscribeするとき、commit済みoffsetが存在しない場合(consumerの初回デプロイ時や、offsets.retention.minutes以上の期間consumerを起動していなかった場合などが該当します)、consume開始位置をlog end offsetにセットするというものです。

通常は1) consumer 2) producerの手順でデプロイしますので、これで問題ありません。(逆の場合、consumerが起動するまでの間にproduceされたmessageは処理されないことになります)

ここで、サービスを開始してしばらくしてpartitionを途中で増やしたくなったとしましょう。

これによりkeyのpartitionへの割り当てが変わるため、一時的にkey単位のmessage localityが崩れることはよく知られていますが、consumerをauto.offset.reset = latestに設定している場合、さらに一部messageのconsumeが漏れる可能性があるという落とし穴があります。

これは以下のようにして起こります。

  1. topic-Xのpartition countを1 -> 2にsplit
  2. producerがmetadataをrefreshし、partitionが追加されたことを知る。新しく追加されたpartition-1へのproduceを開始
  3. consumerがmetadataをrefreshし、consumer rebalanceが必要であることを知る。新しく追加されたpartition-1をconsume開始
    • partition-1にcommitされたoffsetはまだ存在しないため、開始位置をlog end offsetにセットする。

2と3のどちらが先に発生するかはタイミング依存です。(metadata.max.age.msや、ちょうどmetadataが更新されるような事象(brokerが死ぬとか)が起きたとしてどちらが先に検出するか、など)

もし2 => 3の間にproducerがある程度のmessageをpartition-1にproduceしていた場合、いくつかのmessageはconsumerに届かないことになります。

Demo

さて、実際にpartition splitするタイミングでdelivery lostするかどうか試してみましょう。

github.com

これは、以下のような仕様の小さなproducer/consumer applicationです。

  1. producerは、1秒間隔でmessageをproduceする。
    • messageにはmonotonic integerをidとして与える。produce完了したら、標準出力にidを出す。
  2. consumerは、consumeしたら標準出力にidを出す。

partitionを一つだけ持ったdemo-topicというtopicを作って、アプリケーションを動かしてみます。

% ./gradlew run
[ID=0] sent. tp=demo-topic-0[offset=0]
[ID=0] received.
[ID=1] sent. tp=demo-topic-0[offset=1]
[ID=1] received.
[ID=2] sent. tp=demo-topic-0[offset=2]
[ID=2] received.
[ID=3] sent. tp=demo-topic-0[offset=3]
[ID=3] received.
[ID=4] sent. tp=demo-topic-0[offset=4]
[ID=4] received.
[ID=5] sent. tp=demo-topic-0[offset=5]
[ID=5] received.
...

messageが正しく届いている限り、このようにsentとreceiveがセットで出力されます。

さて、途中でpartitionを1 -> 2に増やしてみましょう。

...
[ID=16] sent. tp=demo-topic-1[offset=0]
[ID=17] sent. tp=demo-topic-0[offset=16]
[ID=17] received.
[ID=18] sent. tp=demo-topic-1[offset=1]
[ID=19] sent. tp=demo-topic-1[offset=2]
[ID=20] sent. tp=demo-topic-0[offset=17]
[ID=20] received.
...

ある時点からproducerはpartition-1へのproduceを開始しましたが、consumerに届いていないことがわかります。 もう少し待つと、

...
[ID=28] sent. tp=demo-topic-1[offset=5]
[ID=28] received.
[ID=29] sent. tp=demo-topic-1[offset=6]
[ID=29] received.
[ID=30] sent. tp=demo-topic-0[offset=23]
[ID=30] received.
...

offset=5からconsumeされるようになりましたが、offset=4まではdelivery lostしてしまいました。

今回の検証コードではconsumerに長いmetadata.max.age.msを設定して容易に事象が発生するようにしたのですが、たとえ同じあるいはconsumerのほうを短くしていたとしても、metadataの更新には様々な契機が存在するため、auto.offset.reset = latestで有る限り同様の事象は起こり得ます。

Conclusion

consumerのauto.offset.resetをlatestにしている場合、partitionをsplitするタイミングでdelivery lostするケースがあることを確認しました。

後からpartitionを増やす可能性があり、かつdelivery lossが許容できないようなサービスの場合はauto.offset.reset = earliestに設定しておく必要がありそうです。