Author Archives: Muhammad

Removing Confluent Kafka from RHEL7

Removing Confluent Kafka from RHEL7

Confluent packages can simply be removed the same way they are installed. We can download the package and run the services separately. But if you have it installed through yum, it can be as easily removed. Here we are removing it using yum remove command.

yum-remove

yum-remove

After getting a confirmation, it erases the installed components as follows:

is_ok

is_ok

As the older version is removed, we can get the new packages and directly run it. Here we have downloaded the whole package and running it using confluent CLI directly from bin folder:

confluent start

confluent start

com.esotericsoftware.kryo.KryoException: Unusual solution upgrading Flink

com.esotericsoftware.kryo.KryoException: Unusual solution upgrading Flink

While upgrading from Flink from 1.4 to a newer version 1.6.1, there are a few build issues. After we fix the issues, suddenly we started getting KryoException. Why are we getting this and how it this related to the upgrade? Let’s start with the exception message. If you try to start the job a few times and you have a few Source tasks, you would find it failing at different stages each time. That is based on the data received by Source tasks from topic. Here we are using Kafka source in Flink.

The biggest issue was the code ran fine in IntelliJ but as we run it in a cluster, it throws this error.

And restarting the job, it started throwing error another schema missing.

There are a number of solutions recommended by different developers and some of them have worked in certain cases.

Disabling Kryo fallback
Flink allows us to disable Kryo fallback. Here are the details:
https://ci.apache.org/projects/flink/flink-docs-master/dev/execution_configuration.html

But this didn’t work for us. We still kept getting the same errors as we run the job.

Solution

The solution was not to package the flink runtime libraries with the fat jar package built using sbt assembly. We can do that by adding dependencies as follows:

Flink Upgrade SBT Assembly Merge Strategy

Flink Upgrade SBT Assembly Merge Strategy

We decided to upgrade Flink to 1.6.1. As we updated the code, it was running fine on the development machine. I realize that I started getting build issues as soon as I pushed out to source control. It was picked up by the Teamcity build agent and it threw this error:

Actually we were using sbt assembly and we found that it is reproducible on our machines too.

So the natural solution was to just pick one log4j.properties file and try to atleast get it to build:

It did get us through the issue but started getting another error:

So we just added flink-avro dependency to sbt file and the issue was fixed.
https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/connectors.html#avro-support-in-flink

JDK Tools – jps ( Java Process Tool )

JDK Tools – jps ( Java Process Tool )

Java Process Tool is an amazing JDK tool to check the details of java process on local or remote machines. It can be a very useful tool in the tool belt of a developer.

By default, it shows the PID and short name of class.

We can also check the full package name of the applications’ main class using -l argument.

Checking Application Arguments

jps can provide the arguments passed to the main method of the application as well as the arguments to JVM through -m and -v arguments respectively.

Piping with other tools

Surely, it can be used with other useful tools in MAC. Here we are using the output of jps with grep to filter.

Further Details

Further details about the tool can be found at oracle.

jps @ oracle

jps @ oracle

Mocking Inner Singleton objects for Scala Tests using MockitoSugar

Mocking Inner Singleton objects for Scala Tests using MockitoSugar

This is a special case of testing when you need to test a type but dependencies come in all shapes and sizes. In order to test your class, you might not have any control on the dependencies. This is specially true when your dependencies are from a third party package. In our post today, we will consider such a case.

In this example we have a dependency of a dependency with an inner singleton object. Here we have a Bike trait with a companion object. The trait has a singleton inner object Seat. The inner object has a property color.

A rider type is passed with this. As always, the rider’s favorite color is the color of his bike’s seat.

In order to test our Rider. We can simply mock Bike using MockitoSugar as it is a trait. But what about the inner object. This is a singleton. In order to make sure that getFavoriteColor always return the same color as color property of Seat inner object, we need to mock Seat. But how? MockitoSugar makes it easy to allow Singleton mocking usin g type property. Here we are plainly mocking Bike but using this mock (bike) to mock the inner object i.e. mock[bike.Seat.type]. Next we just need to set expectations and mocked return value when color property is used.

And the unit test definitely successful. May all your unit tests are also always green!!!

RiderTests

RiderTests


Here is my sbt:

Flink – Interactive Scala Shell

Flink – Interactive Scala Shell

Interactive scala shell is available in Flink download package. It allows us to run flink streams and batch processing statements in scala by building and executing the jobs directly created from the shell. We can simply call execute() method, which should request to run the job.

The shell supports including additional jars added to the class path using -a, –addclasspath argument. This can allow us to use types from the jars.

scala shell & Flink Cluster

scala shell & Flink Cluster

Here we are starting up a local flink cluster and connecting to it. It provides prebuilt execution environments to implement batch (benv) or streaming programs (senv).

In the following we are using senv to create a DataStream from integer elements. We can then calculating the twice of each elements of the DataStream. In order to check output we can simply print() the elements to the console. In order to build the execution graph, we can simply call execute() method on senv.

Executing on a Remote Flink Cluster

If we are connected to a remote Flink cluster, we can verify the execution from the web dashboard. Since our Stream just has a few elements and the closes, the job finishes very fast. Afterwards, we can see it in the Completed Jobs section.

Here is the simple execution plan of our job:

Apache Flink – mapWithState on KeyedStream

Apache Flink – mapWithState on KeyedStream

mapWithState operator provides a syntactic ease for keeping ValueState for map operations on KeyedStreams in Apache Flink. It is specially useful for doing a running count on the data. That is exactly what we are going to try in this post. Just remember that you don’t have to return the state as a result of map operation. But the elements returned can use the state for manipulation of input elements.

mapWithState

mapWithState

Here we are just using mapWithState operator to do a running count of keys. We are also tagging the count with the elements and returning a Tuple2.

First we are creating a DataStream from elements of a collection using StreamExecutionEnvironment‘s fromElements method. All the elements are of integer type resulting this to be a DataStream[Int]. We are creating a KeyedStream from the elements keeping the elements themselves as the key of the stream. The KeyStream[ValType, KeyType] has both keys and values of type Int KeyedStream[Int, Int].

We can now simply use mapWithState operator of KeyedStream. This is just a shortcut of map() or flatMap with a single ValueState. The function used here takes the current element, plus an Option of the current value of the ValueState. It must return the updated value of the ValueState, which is then passed to the function when the same key is encountered next time.

Here we are creating a DataStream for tuples of two elements. The first item is the value, plus the updated count as the second element. The count should keep incrementing as the same key gets encountered. For the first time for any key, the ValueState would be None. We are returning 1 in this case.

Here is the build.sbt file for the application:

Kafka Connect – Single Message Transforms (SMTs)

Kafka Connect – Single Message Transforms (SMTs)

Single Message Transforms were released with 0.10.2 release [ Release notes ]. It provides us the ability to transform a message before they get in or out of a connector using Kafka Connect.

Source Transforms

Source connector can be configured with a list of transforms. These transforms are applied operate on the data produced by a source connector. They work in a pipeline fashion where the output of one transform is provided as the input for the next transform. In the end, the data from the last transform is published to Kafka.

Source Transforms

Source Transforms

Sink Transforms

Sink connectors can also be configured with a list of transforms. As consumer API pulls data from Kafka, it also goes through a list of transforms in a pipeline fashion. The data is then provided to the Sink connector, which can operate and push it to an actual sink e.g. database.

Sink Transforms

Sink Transforms

Available Transforms

Kafka Connect is installed with a number of connectors. Here is the list of available Transforms:

Available Transforms

Available Transforms

Using Transforms for students topic

In the previous post, we saw how we saw how easily we can use off-the-shelf connectors to load data in or out of Kafka [http://www.alternatestack.com/development/app-development/confluent-using-off-the-shelf-connectors/]. As an example, we used FileStreamSourceConnector to load data from a file into a kafka topic. Here is how the message looked like in Kafka Topics UI:

Messages with empty key

Messages with empty key

Here we are using transform to pick timestamp from metadata and add as String field:

Good Reads

https://kafka.apache.org/documentation.html#connect_transforms
https://dzone.com/articles/real-time-data-pipelines-with-kafka-connect
https://www.slideshare.net/ConfluentInc/kafka-summit-nyc-2017-singe-message-transforms-are-not-the-transformations-youre-looking-for
https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
Kafka Summit Videos and Slides
https://jaxlondon.com/wp-content/uploads/2017/10/Real_Kafka_Implementations_of_the_Enterprise_Tim_Berglund.pdf
https://engineering.linkedin.com/blog/topic/kafka
https://github.com/airbnb/kafkat
Schema registry CLI
https://www.slideshare.net/ConfluentInc/kafka-summit-nyc-2017-singe-message-transforms-are-not-the-transformations-youre-looking-for
https://www.confluent.io/kafka-summit-nyc17/resource/

KAFKA REST Proxy – Publishing Avro Messages to Kafka

KAFKA REST Proxy – Publishing Avro Messages to Kafka

Apache Kafka supports message publishing through REST too. We can use Kafka REST proxy for this purpose. It supports http verbs including GET, POST and DELETE. Her is an example POST using curl which we will be trying to dissect throughout this post:

Required Schema

Apache Kafka messages are key / value pairs, which can both have their own schemas. Kafka REST Proxy supports publishing messages with their schema details. In the above case, the key is straight forward int specified as follows:

The value is also being published in avro format. Here we have a simple schema for a type having fields including studentId, studentName and height.

Headers

The request to publish data should include recommended HTTP headers. They are Content-type and Accept headers in this case.

Content-type

Here Embedded format is the format of data while Serialization format is the format of serialization of request. In our case, the data is in the avro format while request is serialized in json format.

Currently the only serialization format supported is json and allowed API versions are v1 and v2. The embedded format can be json, avro or binary.

Accept

Here you can specify the requirement for the response. Generally you specify the API version and serialization format expected. In you example, we are simply specifying the v1 version of API and json format.

Like other web api requests, it also supports content negotiation. It also supports specifying weight preferences as following:

Records

Kafka REST Proxy supports posting multiple records simultaneously by providing support for array for records field in the data.

Topic

Kafka REST proxy listens on port 8082 by default. The messages can be published to any topic specified in path of the request. Just make sure the topic already exists if the cluster is not enabled for auto-creation of topics (which should usually be the case in your production environment). Here is how we have specified our required end-point for Kafka REST proxy topic:

Response

As command is executed the response is returned. Since we are using curl, it would simply be printed to console. The response has details including the offset (plus partition) and key / value schema IDs.

Kafka REST is smart enough to not recreate the new schema if the same one is used to post corresponding messages.

Published Message format in Schema Registry

Since our messages are in Avro format. The schema can be saved in schema registry. Kafka REST proxy available with confluent packages automatically publishes the schemas to Schema registry. We can get the details of the stored schemas from /subjects endpoint of schema registry running on port 8081 by default

Here you can notice that it has created schema for bot key and value.

We can get the details of versions of key and value schemas directly from schema registry endpoints. Here are the curl commands and responses from schema registry:

Using Schema Ids to POST messages

We don’t need to specify schema details in every POST request. We can simply use the schema ids for key and value for subsequent HTTP posts. Here we are publishing to the same Student topic using key_schema_id and value_schema_id instead of key_schema and value_schema fields of the request.

Apache Flink – Starting it up

Downloading Flink

You can download flink from it’s Apache’s site. We are downloading Flink 1.6, which is the latest version available.
https://flink.apache.org/downloads.html

It requires Java version 8.

java --version

java –version

Running Flink Cluster

Let’s start the cluster. We can just run it directly from bin folder using start-cluster.sh utility available with the download. Alternatively we can run it locally using start-local.sh.

./bin/start-cluster.sh

./bin/start-cluster.sh

The dashboard is available on port 8081. We can launch it access it through a web browser.

http://localhost:8081/#/overview

http://localhost:8081/#/overview

We can check the processes it starts up when kicking up the cluster.

ps -ef | grep flink

ps -ef | grep flink

It creates a few log files in the log folder:

ls -ltr

ls -ltr

Task Managers

Apache Flink supports more than one task managers.

Task Manager

Task Manager

Job Manager

Apache Flink has one job manager.

Job Manager

Job Manager