1954

Thoughts, stories and ideas.

KafkaStreams outer join semantics change in 3.1.0

Before 3.1.0, KafkaStream's stream-to-stream outer join used to produce a bit counterintuitive output.

When we have stream s1, s2 with 10 seconds window, s1.leftJoin(s2) produces below output before 3.1.0:

refs: kstream-kstream-join in 3.0.0

t s1 s2 leftJoin
0 {key:A,value:X} {left:X,right:null}
5 {key:A,value:Y} {left:X,right:Y}

Let's see this by example.

Against our intuition, though there's a matching record in right stream within 10 secs so the result would be only "joined" output, "unjoined" output also appears as soon as a record is produced to left-stream "eagerly".

From 3.1.0, the semantic is changed to only output second one in above example.

issues.apache.org

refs: kstream-kstream-join in 3.1.0

t s1 s2 leftJoin
0 {key:A,value:X}
5 {key:A,value:Y} {left:X,right:Y}

Notes when using TopologyTestDriver

You should keep in mind that the joins make progress only by producing records when writing tests using TopologyTestDriver. (Also there are several points you should make sure to write correct test)

Let's say we have below KafkaStreams test:

  • There are topics s1, s2
  • Test left-joining s1 with s2 in 100 millisec window, and output the joined String in the form of "JOINED({left value}/{right value})"
Serde<String> serde = Serdes.String();
Serializer<String> ser = serde.serializer();

Properties props = new Properties();

// By default, KafkaStreams RocksDB as the window store.
// Set unique application.id is important to avoid reusing window data of previous tests, which could cause unpredictable test results.
// (As long as closing TopologyTestDriver properly, RocksDB data are cleaned-up every time though)
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serde.getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass().getName());
// When we do outer join, non-joined records (i.e. no corresponding key appeared in other stream) only emitted
// after this interval elapsed.
// That means we have to advance mockTime of TestTopologyDriver accordingly, which is bit bothersome.
// For the ease of testing, we just set the interval to 0 to emit non-joined result immediately as soon as the window expires.
props.setProperty(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, "0");

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> s1 = builder.stream("s1", Consumed.with(serde, serde));
s1.foreach((k,v) -> System.out.printf("produced s1. key: %s\n", k));
KStream<String, String> s2 = builder.stream("s2", Consumed.with(serde, serde));
s2.foreach((k,v) -> System.out.printf("produced s2. key: %s\n", k));

s1.leftJoin(s2, (v1, v2) -> String.format("JOINED(%s/%s)", v1, v2),
            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)))
  .foreach((k, v) -> System.out.printf("Joined. key: %s, value: %s\n", k, v));

// As described above, we should use TopologyTestDriver in try-with-resource to ensure the resources are cleaned up correctly
try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, Instant.ofEpochMilli(0L))) {
    TestInputTopic<String, String> s1Topic =
            driver.createInputTopic("s1", ser, ser, Instant.ofEpochMilli(0L), Duration.ZERO);
    TestInputTopic<String, String> s2Topic =
            driver.createInputTopic("s2", ser, ser, Instant.ofEpochMilli(0L), Duration.ZERO);

    s1Topic.pipeInput("A", "1", 0L);
    s1Topic.pipeInput("B", "2", 50L);
    s2Topic.pipeInput("C", "3", 101L);
    s1Topic.pipeInput("D", "4", 151L);
}

This test yields below output:

produced s1. key: A
produced s1. key: B
produced s2. key: C
Joined. key: A, value: JOINED(1/null)
produced s1. key: D
Joined. key: B, value: JOINED(2/null)

The result can be interpreted like:

t event description
0 Produce A:1 to topic s1
50 Produce B:2 to topic s1
101 Produce C:3 to topic s2 A:1 window expires because 100 millis elapsed. JOINED result for A:1 is emitted
151 Produce D:4 to topic s1 B:2 window expires because 100 millis elapsed. JOINED result for B:2 is emitted

D:4 is still in pending because no more records are produced after that, so joined result is not emitted for D:4

Further reading: KafkaStreams test code