Monthly Archives: November 2017

Kafka Streams: Converting KStream to KTable

Kafka Streams: Converting KStream to KTable

There are no methods in KStream to convert into a KTable. But there are workarounds to do that. They are as follows:

Write on a Kafka Topic and read back as Kable

We can also write to an intermediate Kafka Topic and read it back using StreamBuilder. Here is an example:

Perform a dummy aggregation

Here is an example we are converting a KStream to KTable using reduce method of KGroupedStream.

Please make sure to use specified Serde (s) for your key and value. I kept getting the following exception until I specified the exact Serde (s)

Ref: These techniques are actually discussed on Confluent.io blog. You can read in details here:

https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

Kafka Streams – Merging multiple KStream (s)

Kafka Streams – Merging multiple KStream (s)

In this post we are going to see how we can merge messages from multiple topics into a single topic in using Kafka Streams. In the example scenario, we will be using heart beat data for four sensors and merge into a single topic as follows:

Sensors-heartbeats

Sensors-heartbeats

Here we are making an assumption that you already have all services in the confluent packages running. Here we are using confluent CLI to run these services.

confluent start

confluent start

Creating Sensor data on Individual Topics

Let’s create data on individual topics. We can do this by simple producers. Let’s create a sample Scala SBT application.

Create Scala SBT Project

Create Scala SBT Project

Here are some of the details. We are using scala 2.12.4 for this application.

Scala SBT Project

Scala SBT Project

Let’s add dependencies for producing records. I also needed to update the sbt version in build.properties file for the version installed in the system.

In order to send messages to Kafka, let’s first create schema for the record to be pushed to the topic. It is the simplest type with a single field SensorId of type string. We are saving the schema in Heartbeat.avsc file.

We can use avro-tools to generate types which can be used in JVM based application. We can use these java class even in a Scala application. Let’s first download the tool from apache’s download mirror.

avro-tools download

avro-tools download

Now let’s use avro-tools to compile the above schema into a java type. The type is generated in the specified ./src/main/java folder.

Let’s produce records on four different topics Sensor1-heartbeat, Sensor2-heartbeat, Sensor3-heartbeat, Sensor4-heartbeat. We are pushing five messages to each of these topics. In order to make sure that the messages are successfully pushed, we are calling KafkaProducer.flush().

Verifying Messages

We can verify these messages using Landoop’s tools including Kafka-topics-ui and Schema-registry-ui. We are using the Docker images for the two tools. We need to correctly specify the details of Kafka Brokers and schema registry URIs. We are running these tools on 8000 and 8001 ports.

And sure enough we can see messages on the expected topics. Here is the view of Kafka Topics UI.

kafka topics http://www.localhost:8000

kafka topics http://www.localhost:8000

The schema-registry-ui also shows the automatic registration of schema by the producer when the messages are pushed to the topics.

docker run landoop/schema-registry-ui

docker run landoop/schema-registry-ui

Using Kafka Streams to Merge Sensor’s data

Let’s create a simple Scala application to merge the heart beat of all sensors into a single topic.

KafkaMergerApp

KafkaMergerApp

Streams Config Properties

First we need to introduce configurations for Stream. They include details of broker and schema-registry. In addition, we also need to provide ApplicationId for a streaming app. This is how Kafka Streams maintain multiple instance of the same application. If more than one application instance is running, then Kafka streams automatically assigns the partitions to the instances dividing the topic’s data over these instances.

The ApplicationId is also specially useful when joining two KStream / KTable where topics are equally partitioned. In this case, KafkaStreams should make sure that the correct partitions are assigned to a particular instance of application.

I specially have commented out the configuration for Key and Value serializers here. This is to emphasize that we can avoid this configuration here and specify it when building individual KStream (s) using StreamBuilder.

The applications are technically consumers of messages from a topic. As the messages are pulled from the topic, the offset is committed. During development and while debugging, if you continue to commit the offsets or if offset is committed automatically then KafkaStreams wouldn’t be pulling the messages again, which can be really frustrating. In that case, just update the ApplicationId and this would be as good as a new consumer and would start pulling it again from beginning.

Serde for Serialization / Deserialization

Another important part is serializer / deserializer. Kafka Streams introduced Serde. There are default Serde instances available for primitive types in org.apache.kafka.common.serialization.Serdes including String, Integer.

These Serde (s) are used when subscribing using StreamsBuilder. We can pull the messages using SpecificRecord or GenericRecord. For a SpecificRecord, we need to create a SpecificAvroSerde with the type parameter for the specific type.

The Serde (s) can be used for key and / or value serialization. After instantiating the Serde, we need to configure it and specify if we want to use it for key or value serialization using isForKey parameter. We also need to specify properties map with details about schema-registry. You might learn about it as a surprise and might find it just before pulling your hair.

Other details for Stream configurations can be found here: Confluent Stream Config
Before we pull messages from Kafka, we need to create KStreamBuilder and StreamsConfig. For StreamsConfig, we need to use the properties map discussed earlier in this post.

We can pull messages from Kafka as KStream or KTable. KTable would keep only the recent messages for individual keys. If another message is received for an existing key, the message is updated. In order to delete message from the KTable, we can just push message with the same key and null value. This is called tomb stoning. In this case, we are pulling these messages using KStream using stream method of StreamBuilder. Here we are also using the Serde (s) discussed above. This is how we can build the whole DSL.

After building the whole DSL, we need to create a KafkaStreams object and call it’s start() method.

The merged stream would be part of building up the DSL. In our example, we can use the above code to create KStream from the heart beat topics of all 4 sensors discussed in this post. Here we are using all four stream and merging them using StreamBuilder.merge method.

Here is the complete example:

And most certainly, we can view data using Landoop’s Kafka Topics UI, and most certainly, the data is available in the published topics.

Merged Stream Topic

Merged Stream Topic

Download Code

Download code

Download code