Kafka Streams – Merging multiple KStream (s)
In this post we are going to see how we can merge messages from multiple topics into a single topic in using Kafka Streams. In the example scenario, we will be using heart beat data for four sensors and merge into a single topic as follows:
Here we are making an assumption that you already have all services in the confluent packages running. Here we are using confluent CLI to run these services.
Creating Sensor data on Individual Topics
Let’s create data on individual topics. We can do this by simple producers. Let’s create a sample Scala SBT application.
Here are some of the details. We are using scala 2.12.4 for this application.
Let’s add dependencies for producing records. I also needed to update the sbt version in build.properties file for the version installed in the system.
In order to send messages to Kafka, let’s first create schema for the record to be pushed to the topic. It is the simplest type with a single field SensorId of type string. We are saving the schema in Heartbeat.avsc file.
We can use avro-tools to generate types which can be used in JVM based application. We can use these java class even in a Scala application. Let’s first download the tool from apache’s download mirror.
Now let’s use avro-tools to compile the above schema into a java type. The type is generated in the specified ./src/main/java folder.
Let’s produce records on four different topics Sensor1-heartbeat, Sensor2-heartbeat, Sensor3-heartbeat, Sensor4-heartbeat. We are pushing five messages to each of these topics. In order to make sure that the messages are successfully pushed, we are calling KafkaProducer.flush().
We can verify these messages using Landoop’s tools including Kafka-topics-ui and Schema-registry-ui. We are using the Docker images for the two tools. We need to correctly specify the details of Kafka Brokers and schema registry URIs. We are running these tools on 8000 and 8001 ports.
And sure enough we can see messages on the expected topics. Here is the view of Kafka Topics UI.
The schema-registry-ui also shows the automatic registration of schema by the producer when the messages are pushed to the topics.
Using Kafka Streams to Merge Sensor’s data
Let’s create a simple Scala application to merge the heart beat of all sensors into a single topic.
Streams Config Properties
First we need to introduce configurations for Stream. They include details of broker and schema-registry. In addition, we also need to provide ApplicationId for a streaming app. This is how Kafka Streams maintain multiple instance of the same application. If more than one application instance is running, then Kafka streams automatically assigns the partitions to the instances dividing the topic’s data over these instances.
The ApplicationId is also specially useful when joining two KStream / KTable where topics are equally partitioned. In this case, KafkaStreams should make sure that the correct partitions are assigned to a particular instance of application.
I specially have commented out the configuration for Key and Value serializers here. This is to emphasize that we can avoid this configuration here and specify it when building individual KStream (s) using StreamBuilder.
The applications are technically consumers of messages from a topic. As the messages are pulled from the topic, the offset is committed. During development and while debugging, if you continue to commit the offsets or if offset is committed automatically then KafkaStreams wouldn’t be pulling the messages again, which can be really frustrating. In that case, just update the ApplicationId and this would be as good as a new consumer and would start pulling it again from beginning.
Serde for Serialization / Deserialization
Another important part is serializer / deserializer. Kafka Streams introduced Serde. There are default Serde instances available for primitive types in org.apache.kafka.common.serialization.Serdes including String, Integer.
These Serde (s) are used when subscribing using StreamsBuilder. We can pull the messages using SpecificRecord or GenericRecord. For a SpecificRecord, we need to create a SpecificAvroSerde with the type parameter for the specific type.
The Serde (s) can be used for key and / or value serialization. After instantiating the Serde, we need to configure it and specify if we want to use it for key or value serialization using isForKey parameter. We also need to specify properties map with details about schema-registry. You might learn about it as a surprise and might find it just before pulling your hair.
Other details for Stream configurations can be found here: Confluent Stream Config
Before we pull messages from Kafka, we need to create KStreamBuilder and StreamsConfig. For StreamsConfig, we need to use the properties map discussed earlier in this post.
We can pull messages from Kafka as KStream or KTable. KTable would keep only the recent messages for individual keys. If another message is received for an existing key, the message is updated. In order to delete message from the KTable, we can just push message with the same key and null value. This is called tomb stoning. In this case, we are pulling these messages using KStream using stream method of StreamBuilder. Here we are also using the Serde (s) discussed above. This is how we can build the whole DSL.
After building the whole DSL, we need to create a KafkaStreams object and call it’s start() method.
The merged stream would be part of building up the DSL. In our example, we can use the above code to create KStream from the heart beat topics of all 4 sensors discussed in this post. Here we are using all four stream and merging them using StreamBuilder.merge method.
Here is the complete example:
And most certainly, we can view data using Landoop’s Kafka Topics UI, and most certainly, the data is available in the published topics.