Monthly Archives: November 2018

Flink – Interactive Scala Shell

Flink – Interactive Scala Shell

Interactive scala shell is available in Flink download package. It allows us to run flink streams and batch processing statements in scala by building and executing the jobs directly created from the shell. We can simply call execute() method, which should request to run the job.

The shell supports including additional jars added to the class path using -a, –addclasspath argument. This can allow us to use types from the jars.

scala shell & Flink Cluster

scala shell & Flink Cluster

Here we are starting up a local flink cluster and connecting to it. It provides prebuilt execution environments to implement batch (benv) or streaming programs (senv).

In the following we are using senv to create a DataStream from integer elements. We can then calculating the twice of each elements of the DataStream. In order to check output we can simply print() the elements to the console. In order to build the execution graph, we can simply call execute() method on senv.

Executing on a Remote Flink Cluster

If we are connected to a remote Flink cluster, we can verify the execution from the web dashboard. Since our Stream just has a few elements and the closes, the job finishes very fast. Afterwards, we can see it in the Completed Jobs section.

Here is the simple execution plan of our job:

Apache Flink – mapWithState on KeyedStream

Apache Flink – mapWithState on KeyedStream

mapWithState operator provides a syntactic ease for keeping ValueState for map operations on KeyedStreams in Apache Flink. It is specially useful for doing a running count on the data. That is exactly what we are going to try in this post. Just remember that you don’t have to return the state as a result of map operation. But the elements returned can use the state for manipulation of input elements.

mapWithState

mapWithState

Here we are just using mapWithState operator to do a running count of keys. We are also tagging the count with the elements and returning a Tuple2.

First we are creating a DataStream from elements of a collection using StreamExecutionEnvironment‘s fromElements method. All the elements are of integer type resulting this to be a DataStream[Int]. We are creating a KeyedStream from the elements keeping the elements themselves as the key of the stream. The KeyStream[ValType, KeyType] has both keys and values of type Int KeyedStream[Int, Int].

We can now simply use mapWithState operator of KeyedStream. This is just a shortcut of map() or flatMap with a single ValueState. The function used here takes the current element, plus an Option of the current value of the ValueState. It must return the updated value of the ValueState, which is then passed to the function when the same key is encountered next time.

Here we are creating a DataStream for tuples of two elements. The first item is the value, plus the updated count as the second element. The count should keep incrementing as the same key gets encountered. For the first time for any key, the ValueState would be None. We are returning 1 in this case.

Here is the build.sbt file for the application:

Kafka Connect – Single Message Transforms (SMTs)

Kafka Connect – Single Message Transforms (SMTs)

Single Message Transforms were released with 0.10.2 release [ Release notes ]. It provides us the ability to transform a message before they get in or out of a connector using Kafka Connect.

Source Transforms

Source connector can be configured with a list of transforms. These transforms are applied operate on the data produced by a source connector. They work in a pipeline fashion where the output of one transform is provided as the input for the next transform. In the end, the data from the last transform is published to Kafka.

Source Transforms

Source Transforms

Sink Transforms

Sink connectors can also be configured with a list of transforms. As consumer API pulls data from Kafka, it also goes through a list of transforms in a pipeline fashion. The data is then provided to the Sink connector, which can operate and push it to an actual sink e.g. database.

Sink Transforms

Sink Transforms

Available Transforms

Kafka Connect is installed with a number of connectors. Here is the list of available Transforms:

Available Transforms

Available Transforms

Using Transforms for students topic

In the previous post, we saw how we saw how easily we can use off-the-shelf connectors to load data in or out of Kafka [http://www.alternatestack.com/development/app-development/confluent-using-off-the-shelf-connectors/]. As an example, we used FileStreamSourceConnector to load data from a file into a kafka topic. Here is how the message looked like in Kafka Topics UI:

Messages with empty key

Messages with empty key

Here we are using transform to pick timestamp from metadata and add as String field:

Good Reads

https://kafka.apache.org/documentation.html#connect_transforms
https://dzone.com/articles/real-time-data-pipelines-with-kafka-connect
https://www.slideshare.net/ConfluentInc/kafka-summit-nyc-2017-singe-message-transforms-are-not-the-transformations-youre-looking-for
https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
Kafka Summit Videos and Slides
https://jaxlondon.com/wp-content/uploads/2017/10/Real_Kafka_Implementations_of_the_Enterprise_Tim_Berglund.pdf
https://engineering.linkedin.com/blog/topic/kafka
https://github.com/airbnb/kafkat
Schema registry CLI
https://www.slideshare.net/ConfluentInc/kafka-summit-nyc-2017-singe-message-transforms-are-not-the-transformations-youre-looking-for
https://www.confluent.io/kafka-summit-nyc17/resource/

KAFKA REST Proxy – Publishing Avro Messages to Kafka

KAFKA REST Proxy – Publishing Avro Messages to Kafka

Apache Kafka supports message publishing through REST too. We can use Kafka REST proxy for this purpose. It supports http verbs including GET, POST and DELETE. Her is an example POST using curl which we will be trying to dissect throughout this post:

Required Schema

Apache Kafka messages are key / value pairs, which can both have their own schemas. Kafka REST Proxy supports publishing messages with their schema details. In the above case, the key is straight forward int specified as follows:

The value is also being published in avro format. Here we have a simple schema for a type having fields including studentId, studentName and height.

Headers

The request to publish data should include recommended HTTP headers. They are Content-type and Accept headers in this case.

Content-type

Here Embedded format is the format of data while Serialization format is the format of serialization of request. In our case, the data is in the avro format while request is serialized in json format.

Currently the only serialization format supported is json and allowed API versions are v1 and v2. The embedded format can be json, avro or binary.

Accept

Here you can specify the requirement for the response. Generally you specify the API version and serialization format expected. In you example, we are simply specifying the v1 version of API and json format.

Like other web api requests, it also supports content negotiation. It also supports specifying weight preferences as following:

Records

Kafka REST Proxy supports posting multiple records simultaneously by providing support for array for records field in the data.

Topic

Kafka REST proxy listens on port 8082 by default. The messages can be published to any topic specified in path of the request. Just make sure the topic already exists if the cluster is not enabled for auto-creation of topics (which should usually be the case in your production environment). Here is how we have specified our required end-point for Kafka REST proxy topic:

Response

As command is executed the response is returned. Since we are using curl, it would simply be printed to console. The response has details including the offset (plus partition) and key / value schema IDs.

Kafka REST is smart enough to not recreate the new schema if the same one is used to post corresponding messages.

Published Message format in Schema Registry

Since our messages are in Avro format. The schema can be saved in schema registry. Kafka REST proxy available with confluent packages automatically publishes the schemas to Schema registry. We can get the details of the stored schemas from /subjects endpoint of schema registry running on port 8081 by default

Here you can notice that it has created schema for bot key and value.

We can get the details of versions of key and value schemas directly from schema registry endpoints. Here are the curl commands and responses from schema registry:

Using Schema Ids to POST messages

We don’t need to specify schema details in every POST request. We can simply use the schema ids for key and value for subsequent HTTP posts. Here we are publishing to the same Student topic using key_schema_id and value_schema_id instead of key_schema and value_schema fields of the request.