Author Archives: Muhammad

Kafka Streams – Resetting Application State

Kafka Streams – Resetting Application State

I the previous post, we discussed that we might need to reprocess data during development during application development. Since Kafka Streams ensures the application state, it doesn’t pull and reprocess data. In this case, you might find yourself keep waiting for the join operations to get triggered, but to your disappointment, you might not see the breakpoints being hit.

There are two alternate ways which can be used during application development and debugging ensuring the reprocessing of data.

Here is the streams configuration for our Kafka Streams application:

Use New Application Id

One possible solution is to use the new application state each time you need to re-process in your development environment. You can just use a random number for the configuration. It is very common to destroy the whole confluent environment using confluent destroy command. So if it looks non-manageable anytime, you can always destroy it. I wouldn’t recommend this as there is an alternate solution available which is also very easy to use.

Application Reset Tool

Alternatively you can use Kafka Application reset tool. Here we are resetting the state of application with myApplicationId id. After executing the command, the offset should be set to the beginning of input topics. It would also delete all intermediate topics.

https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html

Connectors getting deleted after restart

Connectors getting deleted after restart

Kafka connectors get deleted when you restart connector process after a few days. In order to resolve that we need to enable log compaction on the following topics:

Kafka CLI Tools – Add / Delete Topics

Kafka CLI Tools – Add / Delete Topics

Deleting Topic

/bin/kafka-topics –delete –zookeeper localhost:2181 –topic MyTopic

Creating Topic

/bin/kafka-topics –create –zookeeper localhost:2181 –partitions 8 –replication-factor 3 –topic MyTopic

Building Serde for Kafka Streams Application

Building Serde for Kafka Streams Application

While developing Kafka Streams applications, I found myself requiring some utility code over and over. One such code is to build Serde for custom types. It is better if it is refactored into separate types and used when needed.

It has two methods specificAvroSerde and genericAvroSerde. It must be remembered that we need to specify if the created Serde will be used for Key or value through an argument to configure method. We are just exposing this through our methods to give more flexibility to the users of this utility type.

The SerdeBuilder can be used to build custom Serdes. Here we are using it for a custom Avro type SensorHeartbeat.

Now this Serde can be used for any serialization / deserialization. Here we are using for two different purposes. The first is for KStream’s to method, which is used to write the events to a new Kafka type. In this case, the Serde is being used specifically for serialization. In the other case, we are using it for groupBy. In this case, if we don’t provide a Serde, then the default Serde is picked for the KafkaStreams, which might not be what you need.

Kafka Tools – Mirror Maker

Kafka Tools – Mirror Maker

MirrorMaker is a Kafka tools for copying data from one cluster to the other. In this post we are going to see how we can run Mirror Maker to copy data from one cluster to the other. This can be specially useful when we want to copy data between two clusters.

Mirror Maker

Mirror Maker

Mirror Maker and Source and Destination Topics

You must remember that if a topic doesn’t already exist in the destination cluster, it should be automatically enabled if auto creation is enabled in your environment. Just check auto.create.topics.enable configuration for your broker. It is enabled by default. But in your production environment if you want to generate partitions as per your liking. This includes setting for number of partitions.

How to Install

If you are using Confluent packages, it should be part of the install. You can run it using Kafka-run-class.

Source Configurations

Mirror maker consumes data from source. Hence it’s configuration is passed in as a consumer config. Here is a sample consumer config I used. Here I am using bootstrap.servers to specify Kafka Brokers. Alternatively zookeeper.connect configuration can be used with its appropriate values.

auto.offset.reset is an important configuration if you want all existing data to be copied to the destination too. Please note that earliest is not the default value, which might be learnt as a surprise. Apparently there is already an issue about it.

https://issues.apache.org/jira/browse/KAFKA-4668

Mirror Maker - earliest offset

Mirror Maker – earliest offset

Destination Configuration

Mirror Maker would be producing records in the destination cluster. So its configuration is specified as producer config. Here is a sample producer configuration.

Running Mirror Maker

As we discussed above, it is very easy to run Mirror Maker tool. Either of whitelist or blacklist configuration is supported. These configurations can include comma-separated list of topics of interest.

Serde and Kafka Streams

Serde and Kafka Streams

In order to push / pull messages from Kafka we need to use Serde. I have found that if you create a Serializer / Deserialzer like following then it becomes really useful to create the Serde for your types. You can also use the method to create a GenericSerde to serialize / deserialize GenericRecord.

Another important part is serializer / deserializer. Kafka Streams introduced Serde. There are default Serde instances available for primitive types in org.apache.kafka.common.serialization.Serdes including String, Integer.

These Serde (s) are used when subscribing using StreamsBuilder. We can pull the messages using SpecificRecord or GenericRecord. For a SpecificRecord, we need to create a SpecificAvroSerde with the type parameter for the specific type.

The Serde (s) can be used for key and / or value serialization. After instantiating the Serde, we need to configure it and specify if we want to use it for key or value serialization using isForKey parameter. We also need to specify properties map with details about schema-registry. You might learn about it as a surprise and might find it just before pulling your hair.