Monthly Archives: December 2017

Building Serde for Kafka Streams Application

Building Serde for Kafka Streams Application

While developing Kafka Streams applications, I found myself requiring some utility code over and over. One such code is to build Serde for custom types. It is better if it is refactored into separate types and used when needed.

It has two methods specificAvroSerde and genericAvroSerde. It must be remembered that we need to specify if the created Serde will be used for Key or value through an argument to configure method. We are just exposing this through our methods to give more flexibility to the users of this utility type.

The SerdeBuilder can be used to build custom Serdes. Here we are using it for a custom Avro type SensorHeartbeat.

Now this Serde can be used for any serialization / deserialization. Here we are using for two different purposes. The first is for KStream’s to method, which is used to write the events to a new Kafka type. In this case, the Serde is being used specifically for serialization. In the other case, we are using it for groupBy. In this case, if we don’t provide a Serde, then the default Serde is picked for the KafkaStreams, which might not be what you need.

Kafka Tools – Mirror Maker

Kafka Tools – Mirror Maker

MirrorMaker is a Kafka tools for copying data from one cluster to the other. In this post we are going to see how we can run Mirror Maker to copy data from one cluster to the other. This can be specially useful when we want to copy data between two clusters.

Mirror Maker

Mirror Maker

Mirror Maker and Source and Destination Topics

You must remember that if a topic doesn’t already exist in the destination cluster, it should be automatically enabled if auto creation is enabled in your environment. Just check auto.create.topics.enable configuration for your broker. It is enabled by default. But in your production environment if you want to generate partitions as per your liking. This includes setting for number of partitions.

How to Install

If you are using Confluent packages, it should be part of the install. You can run it using Kafka-run-class.

Source Configurations

Mirror maker consumes data from source. Hence it’s configuration is passed in as a consumer config. Here is a sample consumer config I used. Here I am using bootstrap.servers to specify Kafka Brokers. Alternatively zookeeper.connect configuration can be used with its appropriate values.

auto.offset.reset is an important configuration if you want all existing data to be copied to the destination too. Please note that earliest is not the default value, which might be learnt as a surprise. Apparently there is already an issue about it.

https://issues.apache.org/jira/browse/KAFKA-4668

Mirror Maker - earliest offset

Mirror Maker – earliest offset

Destination Configuration

Mirror Maker would be producing records in the destination cluster. So its configuration is specified as producer config. Here is a sample producer configuration.

Running Mirror Maker

As we discussed above, it is very easy to run Mirror Maker tool. Either of whitelist or blacklist configuration is supported. These configurations can include comma-separated list of topics of interest.

Serde and Kafka Streams

Serde and Kafka Streams

In order to push / pull messages from Kafka we need to use Serde. I have found that if you create a Serializer / Deserialzer like following then it becomes really useful to create the Serde for your types. You can also use the method to create a GenericSerde to serialize / deserialize GenericRecord.

Another important part is serializer / deserializer. Kafka Streams introduced Serde. There are default Serde instances available for primitive types in org.apache.kafka.common.serialization.Serdes including String, Integer.

These Serde (s) are used when subscribing using StreamsBuilder. We can pull the messages using SpecificRecord or GenericRecord. For a SpecificRecord, we need to create a SpecificAvroSerde with the type parameter for the specific type.

The Serde (s) can be used for key and / or value serialization. After instantiating the Serde, we need to configure it and specify if we want to use it for key or value serialization using isForKey parameter. We also need to specify properties map with details about schema-registry. You might learn about it as a surprise and might find it just before pulling your hair.

Kafka Streams Config’s General Properties

Kafka Streams Config’s General Properties

In this post, I just need to add Kafka Stream configuration which I have to use over and over again for a Kafka streams application. This can be used when developing a new Kafka Streams application.

First we need to introduce configurations for Stream. They include details of broker and schema-registry. In addition, we also need to provide ApplicationId for a streaming app. This is how Kafka Streams maintain multiple instance of the same application. If more than one application instance is running, then Kafka streams automatically assigns the partitions to the instances dividing the topic’s data over these instances.

The ApplicationId is also specially useful when joining two KStream / KTable where topics are equally partitioned. In this case, KafkaStreams should make sure that the correct partitions are assigned to a particular instance of application.

I specially have commented out the configuration for Key and Value serializers here. This is to emphasize that we can avoid this configuration here and specify it when building individual KStream (s) using StreamBuilder.

The applications are technically consumers of messages from a topic. As the messages are pulled from the topic, the offset is committed. During development and while debugging, if you continue to commit the offsets or if offset is committed automatically then KafkaStreams wouldn’t be pulling the messages again, which can be really frustrating. In that case, just update the ApplicationId and this would be as good as a new consumer and would start pulling it again from beginning.

SBT 1.0 and IntelliJ IDEA Scala plugin – needs update

SBT 1.0 and IntelliJ IDEA Scala plugin – needs update

I had to do a few updates to my Scala development tools, which hit me with a surprise when I tried to build one of Scala SBT applications. The build failed with misleading error messages. Digging deeper, I could find this message in one of the SBT windows.

org.jetbrains.sbt.CreateTask failed

org.jetbrains.sbt.CreateTask failed

This is referring to sbt.last.log file. Here are the contents of the file. It refers to the ClassNotFoundException for org.jetbrains.sbt.CreateTasks$.

sbt.last.log

sbt.last.log

So naturally, I tried to google if anyone else has experienced the same issue, and fair enough, not only this has been logged but Jetbrains people have suggested a solution too. Now I realized what happened, I got excited when sbt version 1.0 was released a few months back, so I updated that. Apparently, the version is not compatible with the Scala plugin on my IntelliJ IDEA.

https://intellij-support.jetbrains.com/hc/en-us/community/posts/115000671044-What-is-estimate-to-support-SBT-1-0-x-or-java-lang-ClassNotFoundException-org-jetbrains-sbt-CreateTasks-

Preferences | Plugins

Preferences | Plugins

Here is the IntelliJ IDEA edition, I was using to build this app. This is IntelliJ IDEA 15 Community Edition. But the new plugin is not available for the IntelliJ edition I was using. So naturally, I had to upgrade the editor to a newer version.

IntelliJ IDEA 15

IntelliJ IDEA 15

If you are also struggling with finding the upgrade button in the editor menus then we are friends. I tried finding it in the Help menu but, to my surprise, it is available in File menu. Well, that seems a little counter intuitive, doesn’t it?

Upgrade IntelliJ IDEA

Upgrade IntelliJ IDEA

Anyway, after hitting the download button, restarting my IntelliJ sessions. I finally was able to run the IntelliJ editor. But I still need to upgrade to the new Scala plugin version, and so I did.

Install Scala Plugin

Install Scala Plugin

Now the project builds fine. I don’t know why I have been smiling since morning today. Any clues???

Compiled successfully

Compiled successfully

Using Private Key for Authentication in teamcity

Using Private Key for Authentication in teamcity

In this post, we are going to see how we can use ssh keys with teamcity. In order to use this, you need to have GitBash installed in your machine. It provides ssh-key tool to generate public/private keys.

You need to to upload the private key file to you teamcity.
upload_key

Now you can use the uploaded key in the VCS configuration used by the teamcity project.
vcsroot

If you are using bitbucket, just make sure that you have assigned public key to your account.
bitbucket

Change git password for windows in IntelliJ

Change git password for windows in IntelliJ

If you are using Windows Credentials for Git. Just open settings and search for password. Here you need to select “Do no save, forget passwords after restart”. You can restart your IntelliJ now. Now when you interact with Git again in a new IntelliJ session, it would ask for password. You can revert it back once you have changed the password.

2017-11-10_10-35-06

Kafka Tools – kafka.tools.ReplayLogProducer [Copy from a Topic]

Kafka Tools – kafka.tools.ReplayLogProducer

Replay Log Producer is a Kafka tool available to copy messages from one Kafka Topic to the other in the same Broker cluster. This supports copying a limited or all messages from a Kafka Topic. The tool is specially useful in a development environment when we want to test something with a limited number of messages first.

Here are some of the options available to run the tool:

The input and output topics are specified through the inputtopic and outputtopic switches respectively.

Let’s startup Landoop’s Kafka Topics UI tool to see the messages as they are copied to our new topics. It should start running on port 8000.

Copying Limited Messages

The tool supports copying a limited number of messages by specifying the messages switch when running the tool. Here we are just copying one message from Merged-SensorsHeartbeat topic to another topic with name Destination-Topic-Limited.

We can see the new topic being created. We can also notice that only one message is copied to the topic, which is what we expected.

Screen Shot 2017-11-19 at 6.43.06 PM

Copying the whole Topic

If we don’t specify messages switch at all, then it just uses the default value, which is -1.

Surely, all the messages are copied to Destination-Topic-All topic.

Destination topics with all messages

Destination topics with all messages

I have noticed that the tool warns you if topic doesn’t already exist. Apparently, you can ignore this for new topic.

Warning - Leader not available

Warning – Leader not available

Kafka Streams – Interactive Queries (Intra Application Instance)

Kafka Streams – Interactive Queries (Intra Application Instance)

Kafka Streams application need to maintain KTables in an internal application store. It maintains the store using RocksDB. It even allows to query this store during application lifecycle anywhere from the application parts.

Let’s first create a simple application KafkaStreamsInteractiveQueriesApp. It is a simple Scala Application.

Create App

Create App

In order to setup Streams from Kafka cluster, we need to provide its details in configuration. Here we are creating a Map with the required properties. This can be pulled and used when building streams. For the purposes of Interactive Queries,
StreamsConfig.APPLICATION_SERVER_CONFIG is the most important configuration. This is also used when a kafka Streams app has more than one instances , and we are interested in the application state which can be found in another application instance.

Kafka Stream applications use Serde to serialize and deserialize messages from Kafka. Here is a simple type to make it easier for us.

Basic Setup

Let’s do a basic setup of the ingredients used to build Kafka Streams DSL. Here we are merely using the above types to build StreamsConfig. StreamBuilder object is used to build Kafka Streams DSL.

Store for KTable for Simple Aggregations

Having built StreamsBuilder object, we can use it to hook up to Kafka Streams. Here we are first using it for building KStream from messages in the topic Merged-SensorsHeartbeat. We can now group this using the default key for the topic. You can note that we have used specific Serde (s) here. In the absence of these Serde (s), Kafka Streams try to use default Serde and might fail if they are incompatible.

Here we are doing another aggregation. This is even simpler aggregation. We are simply using the KGroupedStream object created above (mergedStreamTable), and are applying another aggregation to create a KTable to maintain counts of individual keys.

Store for table obtained through StreamBuilder

We can also directly create KTable by simply hooking up to Kafka topics using StreamBuilder’s table method. Here we are specifying another store to keep this table’s info.

Using State Stores

As we discussed above, we can use query these stores anytime during application lifecycle. In order to make sure that we don’t try to get them before the streams actually started, we are using Timers. As the timers are triggered we are simply obtaining the information maintained in the underlying stores.

KafkaStreams provide access to the stores using KafkaStreams.store method. Here we need to specify the name of store and the types of keys and values. This is a simple Key / Value store with String based keys and SensorHeartbeat values. We are getting all values from the store and simply printing them to the console.

Using Store for TimeWindow and SessionWindow

We can also build KTable (s) holding values of a certain window in its data store. Here we are building two KTable (s). One is using a TimeWindow, which would keep count of unique keys for last eight hours.

There is another KTable with an interesting concept of session. Here sessions are defined with periods of inactivity provided through its with method. Here we are just using 1000 ms as marker between sessions.

Complete Code MainApp

This is the complete code for MainApp for reference.

Invalid State Store Exception

If an invalid state store is specified, it might result in the following exception.

Further Reading

https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams

Code Download

https://github.com/msiddiqi/kafka-confluent-example-apps

https://github.com/msiddiqi/kafka-confluent-example-apps