1954

Thoughts, stories and ideas.

KafkaStreams outer join semantics change in 3.1.0

Before 3.1.0, KafkaStream's stream-to-stream outer join used to produce a bit counterintuitive output.

When we have stream s1, s2 with 10 seconds window, s1.leftJoin(s2) produces below output before 3.1.0:

refs: kstream-kstream-join in 3.0.0

t s1 s2 leftJoin
0 {key:A,value:X} {left:X,right:null}
5 {key:A,value:Y} {left:X,right:Y}

Let's see this by example.

Against our intuition, though there's a matching record in right stream within 10 secs so the result would be only "joined" output, "unjoined" output also appears as soon as a record is produced to left-stream "eagerly".

From 3.1.0, the semantic is changed to only output second one in above example.

issues.apache.org

refs: kstream-kstream-join in 3.1.0

t s1 s2 leftJoin
0 {key:A,value:X}
5 {key:A,value:Y} {left:X,right:Y}

Notes when using TopologyTestDriver

You should keep in mind that the joins make progress only by producing records when writing tests using TopologyTestDriver. (Also there are several points you should make sure to write correct test)

Let's say we have below KafkaStreams test:

  • There are topics s1, s2
  • Test left-joining s1 with s2 in 100 millisec window, and output the joined String in the form of "JOINED({left value}/{right value})"
Serde<String> serde = Serdes.String();
Serializer<String> ser = serde.serializer();

Properties props = new Properties();

// By default, KafkaStreams RocksDB as the window store.
// Set unique application.id is important to avoid reusing window data of previous tests, which could cause unpredictable test results.
// (As long as closing TopologyTestDriver properly, RocksDB data are cleaned-up every time though)
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serde.getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass().getName());
// When we do outer join, non-joined records (i.e. no corresponding key appeared in other stream) only emitted
// after this interval elapsed.
// That means we have to advance mockTime of TestTopologyDriver accordingly, which is bit bothersome.
// For the ease of testing, we just set the interval to 0 to emit non-joined result immediately as soon as the window expires.
props.setProperty(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, "0");

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> s1 = builder.stream("s1", Consumed.with(serde, serde));
s1.foreach((k,v) -> System.out.printf("produced s1. key: %s\n", k));
KStream<String, String> s2 = builder.stream("s2", Consumed.with(serde, serde));
s2.foreach((k,v) -> System.out.printf("produced s2. key: %s\n", k));

s1.leftJoin(s2, (v1, v2) -> String.format("JOINED(%s/%s)", v1, v2),
            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)))
  .foreach((k, v) -> System.out.printf("Joined. key: %s, value: %s\n", k, v));

// As described above, we should use TopologyTestDriver in try-with-resource to ensure the resources are cleaned up correctly
try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, Instant.ofEpochMilli(0L))) {
    TestInputTopic<String, String> s1Topic =
            driver.createInputTopic("s1", ser, ser, Instant.ofEpochMilli(0L), Duration.ZERO);
    TestInputTopic<String, String> s2Topic =
            driver.createInputTopic("s2", ser, ser, Instant.ofEpochMilli(0L), Duration.ZERO);

    s1Topic.pipeInput("A", "1", 0L);
    s1Topic.pipeInput("B", "2", 50L);
    s2Topic.pipeInput("C", "3", 101L);
    s1Topic.pipeInput("D", "4", 151L);
}

This test yields below output:

produced s1. key: A
produced s1. key: B
produced s2. key: C
Joined. key: A, value: JOINED(1/null)
produced s1. key: D
Joined. key: B, value: JOINED(2/null)

The result can be interpreted like:

t event description
0 Produce A:1 to topic s1
50 Produce B:2 to topic s1
101 Produce C:3 to topic s2 A:1 window expires because 100 millis elapsed. JOINED result for A:1 is emitted
151 Produce D:4 to topic s1 B:2 window expires because 100 millis elapsed. JOINED result for B:2 is emitted

D:4 is still in pending because no more records are produced after that, so joined result is not emitted for D:4

Further reading: KafkaStreams test code

Access raw clipboard PNG image data in Electron (macOS)

Electron provides the API to get clipboard data via clipboard.readImage.

However, I found it doesn't preserve original image info so some information may drop.

Notably, pHYs metadata in PNG.

The problem arises when we take a screenshot in macOS with storing it in clipboard, then trying to get the data in Electron.

The width/height is doubled to the original screenshot.

That's due to "scale-factor" mechanism in macOS.

macOS's native apps like Preview.app uses pHYs information to get the scale-factor, so they can show the image in original size. (See this code for the details)

There's a SO post that discusses same phenomenon.

Workaround

The workaround is to use readBuffer instead of readImage to get the unconverted raw image.

readBuffer takes an argument which specifies the custom format name, and it's passed through to [NSPasteboard dataForType:] under the hood.

NSPasteboardType for the raw PNG data in clipboard is defined as NSPasteboardTypePNG, that is "public.png".

Hence, you can access raw PNG data from Electron by readBuffer("public.png").

Environment

  • Electron: 22.0.3
  • macOS: 12.6 (Monterey)

Incremental Fetch Requests in Kafka

Incremental Fetch Requests introduced in Kafka 1.1.0 is a mechanism to reduce Fetch overhead (especially when a client is interested in many partitions e.g. ReplicaFetchers)

cwiki.apache.org

Before KIP-227, all topic partitions had to be enumerated in every Fetch requests and responses.

Let's say a client is interested in tp-X-0, tp-Y-1, tp-Z-1, and tp-Y, tp-Z's message rate is much fewer than tp-X.

Most of time, Fetch requests are completed by tp-X's message production (by fetch.min.bytes), and no data will be returned for tp-Y, tp-Z, so essentially they are not necessary to be included in Fetch responses.

The idea of Incremental Fetch Requests is to include only necessary topic partitions in each requests/responses.

In a nutshell, it works like:

  1. When a client start fetching, it establishes FetchSession with the broker to cache partition information that the client is interested in
  2. The client will omit including partition info in Fetch requests unless it wants to update the fetch offset (after fetching new data)
  3. Similarly, the broker will omit including partition info in Fetch responses unless there's a data to be returned (after some data is produced)

Let's see this by experiment.

Settings:

component port description
broker-1 9091
broker-2 9092
broker-3 9093 leader of test-topic-0
zk 2181

And test-topic has only one partition.

First, start console consumer

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-topic

Check the consumer's port

$ lsof -p $(jps | grep ConsoleConsumer | cut -d' ' -f1) | grep 9093
java    66011 hokada  106u     IPv6             468231       0t0     TCP lima-default:42364->lima-default:9093 (ESTABLISHED)

Take tcpdump of console-consumer

$ sudo tcpdump src port 42364 or dst port 42364 -i any -w dump.pcap
tcpdump: data link type LINUX_SLL2
tcpdump: listening on any, link-type LINUX_SLL2 (Linux cooked v2), snapshot length 262144 bytes

After a while, in other terminal, produce a message

$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic test-topic

End tcpdump and open the dump in wireshark, with decoding as Kafka protocol

As we can see, partition info is not included in Fetch requests/responses,

And only after producing a message, Fetch response returns partition data:

Fetch requests also include partition info to update the fetch offset:

Use of MappedByteBuffer could cause long STW due to TTSP (time to safepoint)

In JVM, "safepoint" is a point that all threads are suspended so we can get the consistent view of the thread states.

openjdk.org

JVM executes operations that needs to be done inside safepoint (e.g. thread dump) in below procedure:

The duration from 1 to 3 is called "time to safepoint" (TTSP).

refs: krzysztofslusarski.github.io

Since entire application can't make progress after SafepointSynchronize::begin() until the safepoint ends, in the meantime, the application is called in STW (stop the world) which is known to cause latency issues in JVM.

Clearly, if the operation itself takes long time (e.g. full GC) it leads to long STW.

However, it's known that TTSP also could be long sometimes, which also leads to long STW.

If you enable JVM logging with -Xlog:safepoint=info or verbose, you can see the logs like blow:

[info][safepoint] Total time for which application threads were stopped: 0.0001794 seconds, Stopping threads took: 0.0000587 seconds

Total time for which application threads were stopped indicates total STW duration and Stopping threads took is the TTSP duration.

How threads reach safepoint

Threads reach safepoint in several ways depending on the executed code.

There's a very good article on the web about this:

psy-lob-saw.blogspot.com

In short,

  • If a thread is being blocked for lock etc, the thread is assumed at safepoint
  • If a thread is executing native code, the thread is assumed at safepoint
  • If a thread is executing bytecode, threads check polling page at "reasonable interval" and enters safepoint in next check
    • The above article reproduces long TTSP by exploiting this, by doing large iteration loop that no polling page check is inserted (called "counted loop")

Hence, if single, non-native instruction could take long time, it naturally causes long TTSP because the thread can't reach safepoint in the meantime.

File-backed MappedByteBuffer (Memory mapped file) is an example of such case.

With MappedByteBuffer, we can map the file on the (possibly slow) device to the virtual memory. (which uses mmap system call internally)

When we call MappedByteBuffer#get, if the file content is not loaded to physical memory (e.g. because the mapping is newly created or already evicted from page cache), it causes "major page fault" which involves reading the file, possibly slow when the file is located on a spinning drive.

If a drive that hosts the file about to be mapped got broken (it's not uncommon when we run a middleware that needs huge storage space so many HDDs are attached), it easily causes hundreds ~ tens of thousands of milliseconds page fault duration, and if a safepoint is initiated unluckily, the entire application will be stopped in the meantime and it's a serious functionality issue.

Experiment

Let's confirm that the long major page fault actually causes long TTSP by simple Java program, which just instantiates MappedByteBuffer from the given path and tries read from it.

To reproduce faulty disk, we use ddi to inject delays to the device.

Result:

# - Assume vmtouch is installed
# - Assume a file is prepared in /data_xfs/test/test.dat
# - Assume ddi is setup on /data_xfs

# Inject 5 seconds read delay into the device
$ echo 5000 | sudo tee /sys/fs/ddi/252\:18/read_delay

# Evict the file from page cache
$ vmtouch -e /data_xfs/test/test.dat

# Run a program
$ java -Xlog:safepoint=info:file=jvm.log LongTTSP /data_xfs/test/test.dat
[2022-12-25T08:35:34.156032] (main) pid: 33246
[2022-12-25T08:35:34.157010] (main) Sleeping...
[2022-12-25T08:35:34.157051] (reader-thread) Gonna read from mapped file
[2022-12-25T08:35:39.201298] (reader-thread) Finished read in 5043418 us

# In another terminal, take jstack to initiate the safepoint right after the application started
$ jstack 33246

# Check the JVM log
$ grep stopped jvm.log | grep -v 'stopped: 0'
[5.118s][info][safepoint] Total time for which application threads were stopped: 4.0402532 seconds, Stopping threads took: 4.0391063 seconds

As we can see, the application got STW for 4 seconds and the most part was for TTSP.

Conclusion

  • MappedByteBuffer could cause long STW with slow device for long major page fault
    • We have to consider carefully if memory mapped file is actually necessary
    • Or we have to a mechanism to "warm up" the memory map, to ensure the file is loaded before accessing it through MappedByteBuffer

Kafka consumer's cooperative rebalancing explained

Introduction

Kafkaにおいて、同一のgroup.idを持つKafka consumerの集合をconsumer groupと呼び、各groupは特定のtopicをsubscribeする。

group内の各consumer(memberと呼ぶ)にはtopicのpartitionが分散して割り当てられ、それぞれ割り当てられたpartitionのmessageのみを処理することで、分散処理を実現する。

またgroup内のあるconsumerが故障した際は、そのconsumerの受け持っていたpartitionは他のconsumerにreassignされる。つまり、groupはconsumerの冗長化を成す単位でもある。

このような、group内のmembershipに変更に伴うpartitionのreassignをrebalanceと呼ぶ。

この記事で解説するcooperative rebalancingは、このrebalanceについてよく知られた非効率性の改善を目的としてKafka 2.4で導入されたものである。

cooperative rebalancingについてはこちらも参考にされたい:

www.confluent.io

How eager rebalancing works

cooperative rebalancingに対して、従来のrebalanceの仕組みをeager rebalancingと呼ぶ。

まずは、以下のシナリオを通してeager rebalancingがどのように動作するかを見ていく。

  • 初期状態は以下
    • topic-fooは3つのpartition p0,p1,p2を持つ
    • group.id = gのgroupに、2つのconsumer c0, c1が属している
    • gtopic-fooをsubscribeしており、partition assignmentは c0[p0,p1], c1[p2]となっている
  • gに新たなconsumer c2を追加する

eager, cooperativeともに、rebalanceは主にJoinGroup/SyncGroupという二つのKafka requestを通じて進行する。

eagerにおけるrebalanceは以下のようなフローとなる。

  1. c2を起動すると、c2はgroup coordinator(memberの死活監視やJoinGroup/SyncGroupのhandlingといった、consumer group管理を担うKafka broker。group.idのhash値により決まる)に対してJoinGroupを送る
  2. これを受けて、gはrebalance中(正確にはPreparingRebalance state)に移行する
  3. c0,c1はHeartBeatを通してgがrebalance中であることを知り、自身の受け持っているpartitionを引数としてonPartitionsRevokedを実行し、partitionをいったん手放す
  4. c0,c1がJoinGroupを送る
  5. group coordinatorは、すべてのmemberからのJoinGroupが到着したことをもって、JoinGroup responseを返す
  6. JoinGroup responseを受け取った各memberは次に、SyncGroupをgroup coordinatorに送る
    • この中でleaderと呼ばれるmemberは、JoinGroup responseに含まれた最終的なmember listをもとに各memberに対するassignmentを決定し、SyncGroupに含める
  7. group coordinatorはSyncGroup responseを返す。この時点でcoordinatorから見たgのrebalanceは完了する。
  8. SyncGroup responseを通して各memberはpartition assignmentを受け取り、onPartitionsAssignedを実行し、処理を開始する。

問題はstep 3で、全てのmemberが自身の持っているpartitionを手放すことにある。

このとき実行されるonPartitionRevoked callbackでは、典型的にはconsumerが持っているstateを永続化したり、offsetをcommitしたりといったclean up処理を行う。KafkaStreamsも同様であり、特にstatefulなアプリケーションではclean upに長時間かかることがある。

そしてstep 3からstep 8の間は新規のmessageを処理できないため、realtime性が重要なアプリケーションでは致命的な影響となる。

How cooperative rebalancing works

先のeager rebalancingではstep 3において全てのmemberが全てのpartitionを手放していたが、これを最適化できないか考えてみる。

今回のシナリオではpartitionが3つあり最終的にはconsumerが3台となるため、分散の観点からいえば、最終的に各consumerに1つのpartitionがassignされればよい。

つまり、partitionの移動を最小化しようと思うと、初期状態からp1(またはp0)のみをc2に移動させればよいはずだ。

つまり理想的にはstep 3でc0p1のみを手放せばよく、その他のpartitionに関しては処理を継続できるはずである。

ここで問題となるのは、step 3の時点ではどのpartitionを手放せばよいか分からない点にある。

上記のeager rebalancingの例で見た通り、最終的なmembershipはJoinGroup responseによって通知され、それをもとにleaderはassignmentを決定し、それがSyncGroup responseによって通知される。

したがって、step 3の時点ではc0p1のみを手放せばよいことは分からない。

これらを、cooperative rebalancingでは以下のように解決する。

  • JoinGroup送信前ではなく、SyncGroup responseを受け取ってassignmentが確定した後に手放すようにする
  • そして手放したあとにもう一度rebalanceをトリガーする

図にするとこのようになる。

つまり今回のシナリオのようなmember追加の場合、rebalanceは2回発生する。(JoinGroup/SyncGroupのラウンドトリップも2回発生する)

しかし2回目のrebalanceはすでに手放した後のpartitionを新規memberにassignするだけなので、十分コストは低いと期待できる。

Conclusion

consumer groupにmemberを追加するシナリオを通して、eagerとcooperative rebalancingの動作の違いを見てきた。

特に以下の点は注意したい。

  • onPartitionsRevokedの実行タイミングがSyncGroup受信後に移った
  • partitionを手放すためのrebalanceと、手放したpartitionをassignするrebalanceの2回のrebalanceが発生する

VRoid studioでpresetの服の色を変える

Environment

  • VRoid Studio version: 1.13.0(正式版)

How to

  • このpresetの服の色を赤っぽく変えたいとします
  • まず、Edit textureでテクスチャの編集画面を開きます
  • まず服のテクスチャ画像をpngにexportします
    • 上記のscreenshotでは「デフォルト画像」を右クリックしてExport
  • Exportしたテクスチャ画像をなんらかの画像編集ソフトで開きます(ここではGimpを使っています)
  • お好みの色で塗りつぶしたレイヤーを上に重ねて、合成モードを「Multiply」にすると、うまいことテクスチャにお好みの色味がかった感じになります
  • 再度pngにExportして、VRoid StudioでLayerとしてimportします

Introducing literate-intellij-plugin

Background

TLA+のツールセットにはSpecをLaTeXドキュメントとして出力する機能があります。

Example:

最近、この機能を拙作のTLA+ intellij pluginにintegrateできないか考えていました。

公式のTLA+ toolboxやVS Code pluginにはintegrateされていてSpecをPDFとして出力できますが、TeX処理系は同梱されていないため、ユーザーは別途TeX Live等のdistributionをインストールすることになります。

TeX distributionは非常にサイズが大きくインストールする心理障壁が高いので、できればIntelliJのみでLaTeXの描画を完結させたいところです。

少し調べた感じでは、jlatexmathなどのLaTeXの数式描画に特化したライブラリはいくつかあるものの、TeXのコマンドを解釈して組版処理が行えるようなものは、Pure Javaのポータブルなものは無さそうです。

他には面白いプロジェクトとしてTeX Liveをemscriptenでwasmにcompileするtexlive.jsがあります(ブラウザで動かせればWebView経由でpluginに組み込めそう)が、そもそもTeXはとても歴史がある組版システムなのに(他のマークアップ言語と違い)様々な言語による再発明があまり無いことに個人的に興味を引かれました。

これをきっかけにまずTeXについて少し調べました。

TeX

よく知られている通り、TeXはDonald Knuthによって開発された組版システムおよびそのための命令を記述するマークアップ言語です。

TeXファイルを読み込み組版を行ってPDFなどの形式に出力するソフトウェアをTeX engineとよびます。

ただしVanilla TeXはプリミティブな命令しか持たないため、マクロ機構を使って高レベルな命令セットを提供する様々なパッケージが作られ、TeXのエコシステムは発展してきました。

中でもLeslie Lamportによって開発されたLaTeXは、科学論文の作成におけるデファクトスタンダードとなっています。

TLA+のツールによりSpecから生成されるTeXファイルも、TeXではなくLaTeXを対象としたものです。

LaTeXドキュメントをPDFとして出力するプロセスは以下のようになります。

  1. latex.ltxTeXのプリミティブな命令を使った種々のLaTeXマクロ定義を含むファイル)をTeX engineで読み込む
    • すでに読み込んだ状態のメモリダンプ(formatとよばれる)の形でdistributionに含まれているのが一般的なので、通常のユーザーはこのステップを明示的には実行しません
  2. 次に、実際のコンテンツを含むLaTeXドキュメントを読み込む
  3. 組版結果をPDFとして出力する

つまりプリミティブな(といっても300種類以上あるのですが...)TeX命令を処理できるポータブルなengineがあればいいわけで、作れたりしないだろうかという気になってきます。

WEB

現在広く使われているTeX engineにLuaTeXやpdfTeX、XeTeXなどがありますが、pdfTeX・XeTeXについてはソースコードKnuthTeXがベースになっています。(いっぽう、後から知ったのですがLuaTeXはCで再実装されているようです)

KnuthTeXに特徴的なのは、彼の提唱する「literate programming(文芸的プログラミング)」のために彼が開発した(現代においては恐ろしく紛らわしい名前の)WEBとよばれる記法で書かれていることです。

WEBでは、プログラムを「モジュール」の集合として記述し、各モジュールは以下の3つのパートで構成されます。

WEBには、開発された当時(1987年)の状況を伺わせる独特な仕様がいくつかあります。

たとえば各モジュールには名前をつけて他のモジュールから参照できるのですが、「モジュール名の初回の登場以降は、そのモジュール名をPrefixで参照できる」という仕様があります。 (foo-bar module というモジュールがあった場合、2回目以降は(他にfooで始まるmoduleがなければ) fooのみで参照できる)

WEBのマニュアルを見る限り、どうやら「何回も長いモジュール名を打つのが面倒だから」というモチベーションのようですが、エディタのサポートを受けるのが当然の現代では逆にError proneです。

github.com

WEB自体は現在普及しているとは言い難いし、上記のようにいまとなっては理にかなっていない仕様もありますが、対してWEBで書かれたソフトウェアがいまも広く利用されているというのは面白さを感じます。

literate-intellij-plugin

最低限のTeX engineを考えたとき何を実装しなければならないのか、TeXのWEBソースおよびKnuthのThe TeXbookを参照しながら構想を練り始めましたが、ここで、TeXのソースが長大である上に、pluginが無いため自分が愛用するIDEIntelliJ platform)を使ってコードブラウジングができないことが障壁になりました。

「文芸的プログラミング」のための記法であるWEBソースは、Weaveと呼ばれるツールに通すことでコードを含んだ綺麗に整形された完全なドキュメントを得ることができます。

とはいえ、シンタックスハイライトや定義ジャンプといったエディタのサポートなしで「文学」として読み通すには、TeXのソースは自分には複雑すぎる規模です。

そこで結局、Yak ShavingではありますがまずWEBのIntelliJ pluginを作成することにしました。

github.com

いまのところ以下の機能が実装されています。

  • シンタックスハイライト
    • TeXパート、マクロ定義パート、Pascalパートが色分けされます
  • Go to declarations
    • Pascalモジュール(@<...@>)参照から、定義元 (@<...@>=) にジャンプできます

Conclusion

WEBのコードリーディングが捗るようになったものの、所感としてはTeX engineの再発明は簡単ではなさそうです。

とはいえ、オリジナルのTeXの複雑さは当時のハードウェアや言語面の制約から来ている面も多い気がしており、最低限のTeX engineは今作ればシンプルにできたりしないだろうとまだ思っています。

今後も(興味が続けば)進めていきます。