Author Archives: Muhammad

Kafka Streams Config’s General Properties

Kafka Streams Config’s General Properties

In this post, I just need to add Kafka Stream configuration which I have to use over and over again for a Kafka streams application. This can be used when developing a new Kafka Streams application.

First we need to introduce configurations for Stream. They include details of broker and schema-registry. In addition, we also need to provide ApplicationId for a streaming app. This is how Kafka Streams maintain multiple instance of the same application. If more than one application instance is running, then Kafka streams automatically assigns the partitions to the instances dividing the topic’s data over these instances.

The ApplicationId is also specially useful when joining two KStream / KTable where topics are equally partitioned. In this case, KafkaStreams should make sure that the correct partitions are assigned to a particular instance of application.

I specially have commented out the configuration for Key and Value serializers here. This is to emphasize that we can avoid this configuration here and specify it when building individual KStream (s) using StreamBuilder.

The applications are technically consumers of messages from a topic. As the messages are pulled from the topic, the offset is committed. During development and while debugging, if you continue to commit the offsets or if offset is committed automatically then KafkaStreams wouldn’t be pulling the messages again, which can be really frustrating. In that case, just update the ApplicationId and this would be as good as a new consumer and would start pulling it again from beginning.

SBT 1.0 and IntelliJ IDEA Scala plugin – needs update

SBT 1.0 and IntelliJ IDEA Scala plugin – needs update

I had to do a few updates to my Scala development tools, which hit me with a surprise when I tried to build one of Scala SBT applications. The build failed with misleading error messages. Digging deeper, I could find this message in one of the SBT windows.

org.jetbrains.sbt.CreateTask failed

org.jetbrains.sbt.CreateTask failed

This is referring to sbt.last.log file. Here are the contents of the file. It refers to the ClassNotFoundException for org.jetbrains.sbt.CreateTasks$.

sbt.last.log

sbt.last.log

So naturally, I tried to google if anyone else has experienced the same issue, and fair enough, not only this has been logged but Jetbrains people have suggested a solution too. Now I realized what happened, I got excited when sbt version 1.0 was released a few months back, so I updated that. Apparently, the version is not compatible with the Scala plugin on my IntelliJ IDEA.

https://intellij-support.jetbrains.com/hc/en-us/community/posts/115000671044-What-is-estimate-to-support-SBT-1-0-x-or-java-lang-ClassNotFoundException-org-jetbrains-sbt-CreateTasks-

Preferences | Plugins

Preferences | Plugins

Here is the IntelliJ IDEA edition, I was using to build this app. This is IntelliJ IDEA 15 Community Edition. But the new plugin is not available for the IntelliJ edition I was using. So naturally, I had to upgrade the editor to a newer version.

IntelliJ IDEA 15

IntelliJ IDEA 15

If you are also struggling with finding the upgrade button in the editor menus then we are friends. I tried finding it in the Help menu but, to my surprise, it is available in File menu. Well, that seems a little counter intuitive, doesn’t it?

Upgrade IntelliJ IDEA

Upgrade IntelliJ IDEA

Anyway, after hitting the download button, restarting my IntelliJ sessions. I finally was able to run the IntelliJ editor. But I still need to upgrade to the new Scala plugin version, and so I did.

Install Scala Plugin

Install Scala Plugin

Now the project builds fine. I don’t know why I have been smiling since morning today. Any clues???

Compiled successfully

Compiled successfully

Using Private Key for Authentication in teamcity

Using Private Key for Authentication in teamcity

In this post, we are going to see how we can use ssh keys with teamcity. In order to use this, you need to have GitBash installed in your machine. It provides ssh-key tool to generate public/private keys.

You need to to upload the private key file to you teamcity.
upload_key

Now you can use the uploaded key in the VCS configuration used by the teamcity project.
vcsroot

If you are using bitbucket, just make sure that you have assigned public key to your account.
bitbucket

Change git password for windows in IntelliJ

Change git password for windows in IntelliJ

If you are using Windows Credentials for Git. Just open settings and search for password. Here you need to select “Do no save, forget passwords after restart”. You can restart your IntelliJ now. Now when you interact with Git again in a new IntelliJ session, it would ask for password. You can revert it back once you have changed the password.

2017-11-10_10-35-06

Kafka Tools – kafka.tools.ReplayLogProducer [Copy from a Topic]

Kafka Tools – kafka.tools.ReplayLogProducer

Replay Log Producer is a Kafka tool available to copy messages from one Kafka Topic to the other in the same Broker cluster. This supports copying a limited or all messages from a Kafka Topic. The tool is specially useful in a development environment when we want to test something with a limited number of messages first.

Here are some of the options available to run the tool:

The input and output topics are specified through the inputtopic and outputtopic switches respectively.

Let’s startup Landoop’s Kafka Topics UI tool to see the messages as they are copied to our new topics. It should start running on port 8000.

Copying Limited Messages

The tool supports copying a limited number of messages by specifying the messages switch when running the tool. Here we are just copying one message from Merged-SensorsHeartbeat topic to another topic with name Destination-Topic-Limited.

We can see the new topic being created. We can also notice that only one message is copied to the topic, which is what we expected.

Screen Shot 2017-11-19 at 6.43.06 PM

Copying the whole Topic

If we don’t specify messages switch at all, then it just uses the default value, which is -1.

Surely, all the messages are copied to Destination-Topic-All topic.

Destination topics with all messages

Destination topics with all messages

I have noticed that the tool warns you if topic doesn’t already exist. Apparently, you can ignore this for new topic.

Warning - Leader not available

Warning – Leader not available

Kafka Streams – Interactive Queries (Intra Application Instance)

Kafka Streams – Interactive Queries (Intra Application Instance)

Kafka Streams application need to maintain KTables in an internal application store. It maintains the store using RocksDB. It even allows to query this store during application lifecycle anywhere from the application parts.

Let’s first create a simple application KafkaStreamsInteractiveQueriesApp. It is a simple Scala Application.

Create App

Create App

In order to setup Streams from Kafka cluster, we need to provide its details in configuration. Here we are creating a Map with the required properties. This can be pulled and used when building streams. For the purposes of Interactive Queries,
StreamsConfig.APPLICATION_SERVER_CONFIG is the most important configuration. This is also used when a kafka Streams app has more than one instances , and we are interested in the application state which can be found in another application instance.

Kafka Stream applications use Serde to serialize and deserialize messages from Kafka. Here is a simple type to make it easier for us.

Basic Setup

Let’s do a basic setup of the ingredients used to build Kafka Streams DSL. Here we are merely using the above types to build StreamsConfig. StreamBuilder object is used to build Kafka Streams DSL.

Store for KTable for Simple Aggregations

Having built StreamsBuilder object, we can use it to hook up to Kafka Streams. Here we are first using it for building KStream from messages in the topic Merged-SensorsHeartbeat. We can now group this using the default key for the topic. You can note that we have used specific Serde (s) here. In the absence of these Serde (s), Kafka Streams try to use default Serde and might fail if they are incompatible.

Here we are doing another aggregation. This is even simpler aggregation. We are simply using the KGroupedStream object created above (mergedStreamTable), and are applying another aggregation to create a KTable to maintain counts of individual keys.

Store for table obtained through StreamBuilder

We can also directly create KTable by simply hooking up to Kafka topics using StreamBuilder’s table method. Here we are specifying another store to keep this table’s info.

Using State Stores

As we discussed above, we can use query these stores anytime during application lifecycle. In order to make sure that we don’t try to get them before the streams actually started, we are using Timers. As the timers are triggered we are simply obtaining the information maintained in the underlying stores.

KafkaStreams provide access to the stores using KafkaStreams.store method. Here we need to specify the name of store and the types of keys and values. This is a simple Key / Value store with String based keys and SensorHeartbeat values. We are getting all values from the store and simply printing them to the console.

Using Store for TimeWindow and SessionWindow

We can also build KTable (s) holding values of a certain window in its data store. Here we are building two KTable (s). One is using a TimeWindow, which would keep count of unique keys for last eight hours.

There is another KTable with an interesting concept of session. Here sessions are defined with periods of inactivity provided through its with method. Here we are just using 1000 ms as marker between sessions.

Complete Code MainApp

This is the complete code for MainApp for reference.

Invalid State Store Exception

If an invalid state store is specified, it might result in the following exception.

Further Reading

https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams

Code Download

https://github.com/msiddiqi/kafka-confluent-example-apps

https://github.com/msiddiqi/kafka-confluent-example-apps

Purging Data in a Kafka Topics

Purging Data in a Kafka Topics

If you have been working with Kafka for sometime, you might need to purge data in a particular topic. In a dev environment, you might worked it around by just publishing and consuming from a different topic. But there are some cases when you might want to purge data from a topic. Some of the cases, are as follows:

  • You just released with a new topic and there are messages now in the new topic which have some data generated with code with error. Some wrong calculations have generated incorrect data which might need to be wiped out.
  • It’s a micro-service based environment and you have no control over the consumers. They have updated and released but haven’t subscribed yet. But you have published some data that you might not be so proud of and want the topic cleaned up.
  • You are fed-up with creating a new topic every time you need to consume or you just believe that as you create a new topic, a kangaroo dies in Australia.

So, Kafka doesn’t have to keep the data forever. As a matter of fact, the default setting is to automatically purge data after 7 days period. But we can always change that. Here we are changing it to 1000 milliseconds.

Just try to consume the data from the topic. I have realized that kafka doesn’t just purge the data after 1000ms in this case but it is just qualified for purging. But the data is indeed purged in a few minutes.

If you start a consumer and it keeps waiting without consuming any messages, the topic is empty. Just update the configuration to set a bigger value for retention. Or you can just use Kafka Topics UI.

Kafka Streams: Converting KStream to KTable

Kafka Streams: Converting KStream to KTable

There are no methods in KStream to convert into a KTable. But there are workarounds to do that. They are as follows:

Write on a Kafka Topic and read back as Kable

We can also write to an intermediate Kafka Topic and read it back using StreamBuilder. Here is an example:

Perform a dummy aggregation

Here is an example we are converting a KStream to KTable using reduce method of KGroupedStream.

Please make sure to use specified Serde (s) for your key and value. I kept getting the following exception until I specified the exact Serde (s)

Ref: These techniques are actually discussed on Confluent.io blog. You can read in details here:

https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

Kafka Streams – Merging multiple KStream (s)

Kafka Streams – Merging multiple KStream (s)

In this post we are going to see how we can merge messages from multiple topics into a single topic in using Kafka Streams. In the example scenario, we will be using heart beat data for four sensors and merge into a single topic as follows:

Sensors-heartbeats

Sensors-heartbeats

Here we are making an assumption that you already have all services in the confluent packages running. Here we are using confluent CLI to run these services.

confluent start

confluent start

Creating Sensor data on Individual Topics

Let’s create data on individual topics. We can do this by simple producers. Let’s create a sample Scala SBT application.

Create Scala SBT Project

Create Scala SBT Project

Here are some of the details. We are using scala 2.12.4 for this application.

Scala SBT Project

Scala SBT Project

Let’s add dependencies for producing records. I also needed to update the sbt version in build.properties file for the version installed in the system.

In order to send messages to Kafka, let’s first create schema for the record to be pushed to the topic. It is the simplest type with a single field SensorId of type string. We are saving the schema in Heartbeat.avsc file.

We can use avro-tools to generate types which can be used in JVM based application. We can use these java class even in a Scala application. Let’s first download the tool from apache’s download mirror.

avro-tools download

avro-tools download

Now let’s use avro-tools to compile the above schema into a java type. The type is generated in the specified ./src/main/java folder.

Let’s produce records on four different topics Sensor1-heartbeat, Sensor2-heartbeat, Sensor3-heartbeat, Sensor4-heartbeat. We are pushing five messages to each of these topics. In order to make sure that the messages are successfully pushed, we are calling KafkaProducer.flush().

Verifying Messages

We can verify these messages using Landoop’s tools including Kafka-topics-ui and Schema-registry-ui. We are using the Docker images for the two tools. We need to correctly specify the details of Kafka Brokers and schema registry URIs. We are running these tools on 8000 and 8001 ports.

And sure enough we can see messages on the expected topics. Here is the view of Kafka Topics UI.

kafka topics http://www.localhost:8000

kafka topics http://www.localhost:8000

The schema-registry-ui also shows the automatic registration of schema by the producer when the messages are pushed to the topics.

docker run landoop/schema-registry-ui

docker run landoop/schema-registry-ui

Using Kafka Streams to Merge Sensor’s data

Let’s create a simple Scala application to merge the heart beat of all sensors into a single topic.

KafkaMergerApp

KafkaMergerApp

Streams Config Properties

First we need to introduce configurations for Stream. They include details of broker and schema-registry. In addition, we also need to provide ApplicationId for a streaming app. This is how Kafka Streams maintain multiple instance of the same application. If more than one application instance is running, then Kafka streams automatically assigns the partitions to the instances dividing the topic’s data over these instances.

The ApplicationId is also specially useful when joining two KStream / KTable where topics are equally partitioned. In this case, KafkaStreams should make sure that the correct partitions are assigned to a particular instance of application.

I specially have commented out the configuration for Key and Value serializers here. This is to emphasize that we can avoid this configuration here and specify it when building individual KStream (s) using StreamBuilder.

The applications are technically consumers of messages from a topic. As the messages are pulled from the topic, the offset is committed. During development and while debugging, if you continue to commit the offsets or if offset is committed automatically then KafkaStreams wouldn’t be pulling the messages again, which can be really frustrating. In that case, just update the ApplicationId and this would be as good as a new consumer and would start pulling it again from beginning.

Serde for Serialization / Deserialization

Another important part is serializer / deserializer. Kafka Streams introduced Serde. There are default Serde instances available for primitive types in org.apache.kafka.common.serialization.Serdes including String, Integer.

These Serde (s) are used when subscribing using StreamsBuilder. We can pull the messages using SpecificRecord or GenericRecord. For a SpecificRecord, we need to create a SpecificAvroSerde with the type parameter for the specific type.

The Serde (s) can be used for key and / or value serialization. After instantiating the Serde, we need to configure it and specify if we want to use it for key or value serialization using isForKey parameter. We also need to specify properties map with details about schema-registry. You might learn about it as a surprise and might find it just before pulling your hair.

Other details for Stream configurations can be found here: Confluent Stream Config
Before we pull messages from Kafka, we need to create KStreamBuilder and StreamsConfig. For StreamsConfig, we need to use the properties map discussed earlier in this post.

We can pull messages from Kafka as KStream or KTable. KTable would keep only the recent messages for individual keys. If another message is received for an existing key, the message is updated. In order to delete message from the KTable, we can just push message with the same key and null value. This is called tomb stoning. In this case, we are pulling these messages using KStream using stream method of StreamBuilder. Here we are also using the Serde (s) discussed above. This is how we can build the whole DSL.

After building the whole DSL, we need to create a KafkaStreams object and call it’s start() method.

The merged stream would be part of building up the DSL. In our example, we can use the above code to create KStream from the heart beat topics of all 4 sensors discussed in this post. Here we are using all four stream and merging them using StreamBuilder.merge method.

Here is the complete example:

And most certainly, we can view data using Landoop’s Kafka Topics UI, and most certainly, the data is available in the published topics.

Merged Stream Topic

Merged Stream Topic

Download Code

Download code

Download code

Confluent – Using off-the-shelf-connectors

Confluent – Using off-the-shelf-connectors with Landoop’s Kafka Connect UI

In this post we are going to see how easy it is to run a Kafka Connector in order to source data in Kafka using a well designed tool like Landoop’s Kafka Connect UI.

We can use Kafka Connect’s REST API to determine what connectors are available for our cluster. Here we are pulling this using /connector-plugins resource [http://localhost:8083/connector-plugins].

The same list is available in Kafka-connect-ui (discussed in an earlier post: Kafka-connect-ui). Here we are searching for file connectors. This is included as one of the default connectors available with the simple installation.

filestream-source-connector

filestream-source-connector

Let’s provide the necessary configuration for FileStreamSourceConnector. This is used to pull data from a file into a kafka topic. Here we are pulling the contents of /tmp/students.csv file into students topic. As you can notice, we can easily pause, restart or delete the connector here.

connector-running

connector-running

Here is how Landoop’s Kafka Connect UI Dashboard looks like:

Kafka Connect UI - Dashboard

Kafka Connect UI – Dashboard

We can also verify using Kafka Connect REST API. Here we are hitting the /connectors resource; which lists all the connectors currently running.

/connectors

/connectors

In order to verify that the connector has actually loaded the file’s contents, we can use Kafka-topics-ui. We are simply pulling the docker image for the landoop’s tool and running it as follows:

Here we can select students topics and verify the data being pulled into this topic.

Connect Source data in Kafka Topics UI

Connect Source data in Kafka Topics UI

Let’s update the contents of the file by adding some more contents. And we can see the items are correctly reflected in the topics.

topic-updated

topic-updated