KafkaStreams outer join semantics change in 3.1.0
Before 3.1.0, KafkaStream's stream-to-stream outer join used to produce a bit counterintuitive output.
When we have stream s1, s2 with 10 seconds window, s1.leftJoin(s2) produces below output before 3.1.0:
refs: kstream-kstream-join in 3.0.0
t | s1 | s2 | leftJoin |
---|---|---|---|
0 | {key:A,value:X} | {left:X,right:null} | |
5 | {key:A,value:Y} | {left:X,right:Y} |
Let's see this by example.
Against our intuition, though there's a matching record in right stream within 10 secs so the result would be only "joined" output, "unjoined" output also appears as soon as a record is produced to left-stream "eagerly".
From 3.1.0, the semantic is changed to only output second one in above example.
refs: kstream-kstream-join in 3.1.0
t | s1 | s2 | leftJoin |
---|---|---|---|
0 | {key:A,value:X} | ||
5 | {key:A,value:Y} | {left:X,right:Y} |
Notes when using TopologyTestDriver
You should keep in mind that the joins make progress only by producing records when writing tests using TopologyTestDriver
. (Also there are several points you should make sure to write correct test)
Let's say we have below KafkaStreams test:
- There are topics s1, s2
- Test left-joining s1 with s2 in 100 millisec window, and output the joined String in the form of
"JOINED({left value}/{right value})"
Serde<String> serde = Serdes.String(); Serializer<String> ser = serde.serializer(); Properties props = new Properties(); // By default, KafkaStreams RocksDB as the window store. // Set unique application.id is important to avoid reusing window data of previous tests, which could cause unpredictable test results. // (As long as closing TopologyTestDriver properly, RocksDB data are cleaned-up every time though) props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092"); props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serde.getClass().getName()); props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass().getName()); // When we do outer join, non-joined records (i.e. no corresponding key appeared in other stream) only emitted // after this interval elapsed. // That means we have to advance mockTime of TestTopologyDriver accordingly, which is bit bothersome. // For the ease of testing, we just set the interval to 0 to emit non-joined result immediately as soon as the window expires. props.setProperty(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, "0"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> s1 = builder.stream("s1", Consumed.with(serde, serde)); s1.foreach((k,v) -> System.out.printf("produced s1. key: %s\n", k)); KStream<String, String> s2 = builder.stream("s2", Consumed.with(serde, serde)); s2.foreach((k,v) -> System.out.printf("produced s2. key: %s\n", k)); s1.leftJoin(s2, (v1, v2) -> String.format("JOINED(%s/%s)", v1, v2), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L))) .foreach((k, v) -> System.out.printf("Joined. key: %s, value: %s\n", k, v)); // As described above, we should use TopologyTestDriver in try-with-resource to ensure the resources are cleaned up correctly try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, Instant.ofEpochMilli(0L))) { TestInputTopic<String, String> s1Topic = driver.createInputTopic("s1", ser, ser, Instant.ofEpochMilli(0L), Duration.ZERO); TestInputTopic<String, String> s2Topic = driver.createInputTopic("s2", ser, ser, Instant.ofEpochMilli(0L), Duration.ZERO); s1Topic.pipeInput("A", "1", 0L); s1Topic.pipeInput("B", "2", 50L); s2Topic.pipeInput("C", "3", 101L); s1Topic.pipeInput("D", "4", 151L); }
This test yields below output:
produced s1. key: A produced s1. key: B produced s2. key: C Joined. key: A, value: JOINED(1/null) produced s1. key: D Joined. key: B, value: JOINED(2/null)
The result can be interpreted like:
t | event | description |
---|---|---|
0 | Produce A:1 to topic s1 | |
50 | Produce B:2 to topic s1 | |
101 | Produce C:3 to topic s2 | A:1 window expires because 100 millis elapsed. JOINED result for A:1 is emitted |
151 | Produce D:4 to topic s1 | B:2 window expires because 100 millis elapsed. JOINED result for B:2 is emitted |
D:4
is still in pending because no more records are produced after that, so joined result is not emitted for D:4
Further reading: KafkaStreams test code