Monthly Archives: September 2017

Kafka Message Size Issue – Record Too Long to Send

Kafka Message Size Issue – Record Too Long to Send

As soon as the message size increases, Kafka starts throwing errors when sending the message. Here are the error message we received when sending a message with size a few mege bytes.

RecordTooLarge

RecordTooLarge

The message clearly suggests that the size needs to updated in some configuration file before we could send it. Since we are specifying the sending details in the properties configuration, let’s add the configuration for the message size in the properties configuration too.

But the error message doesn’t go away but it changes to a different form. We are still getting a similar message:

Record Too Large for Server

Record Too Large for Server

You also need to update it on the Confluent side. Here are the updates I made to server.properties and producer.properties as specified at this github post:

Using Avro Schema in JVM based applications

Using Avro Schema in JVM based applications

Like Protobuf, Apache Avro is specially suitable for organizations with polyglot development stack. Data serialized through one language can easily be deserialized in other language on the other end of your messaging platform. This is one of the supported serialization for Confluent platform.

The benefits of Avro over other formats including Protobuf and Thrift is the usage both data and schema in serialization and deserialization. So you don’t need to generate types and still be able to use the data using the available schema. Having said that, it is really not necessary for the schema to be put on the wire for sending it to some other party. This is the approach used by Confluent platform where a separate Schema registry is introduced which is used to contain this meta data. So before sending data over to Kafka, a schema must be already registered with Schema registry. Only Schema Id is put with data on Kafka brokers. As the data reaches a consumer, this schema Id is used to pull the schema definition from the registry and used for deserialization.

The current version of the specification is 1.8.2. You can get more details from the specification page.

Options for Serialization / Deserialization

There a few options for serialization / serialization into using Avro schema. After serialization, we can send it over wire. This can be received on the other end, and deserialized. The sender and receiver don’t have to be implemented using the same languages / platforms.

Generating Java Classes

In this approach, avrotools is used to generate Java classes from the schema file. More than one file can be used here. It is also possible to have more than one record in the same avro schema file, but avrotools would still generate separate files for these types.

You can also directly create Scala class using the following:

Using Parser for Serialization / Deserialization

In this approach, we directly use the schema files in our code and use it for generating records. I am sure, this would look a little loose design to you as the records would be generated in a key / value fashion. Actually a GenericRecord is created, which can then used to build up an object.

Avro in a Scala Application

Let’s create a simple Scala SBT Project. We are naming the application as StudentApp.

Create App

Create App

Defining Avro Schema

First of all, we need to define schema for types expected to be shared between applications. These schemas can be used to generate types in the language of choice for applications.

The schema introduces a type Student in studentModels schema. It has three properties, which are student_id (int), first_name (string) and last_name (string). The last of these are nullable.

Using avro-tools

Since we need to use it in a JVM based application, we can use Java implementation of Avro to generate Java types using the above schema. Let’s first download avro-tools for 1.8.2 specification from Apache.

Download Avro Tools

Download Avro Tools

Here we are compiling the schema and generating types in java/generated folder.

But when we open type, we get all these indication of errors, and sure enough, the code doesn’t even compile. So what are we missing?

Generated Code Error

Generated Code Error

In order to fix this, we need to add the following dependencies to our build.sbt file; and sure enough, the compilation is successful finally.

Now we can simply use the generated type in our Scala code. Here we are simply creating building a Student instance. Later, we are just printing the properties of the instance to the console.

Download Code

Download code

Download code

Overriding Default Serialization for Kafka Connect – Be Careful

Overriding Default Serialization for Kafka Connect – Be Careful

Serialization / Deserialization for Confluent platform can be sometimes tricky. There are certain defaults to the platform. As developers, it is our responsibility to understand the defaults and tweak the parts which are necessary. Even just a little mismatch can be sometimes very cumbersome and might cost a lot of time investigating. Now there are tools developed by community to figure out various issues with different parts of our application.

Here we are using Kafka Connect CLI from datamountaineer. You can download connect-cli from github.

connect-cli download

connect-cli download

Here we used MongoSink connector for Confluent platform. The connector pulls messages from a certain topic based on a certain KCQL statement and inserts into a particular collection in Mongo Database. Adding the connector was easy but we never saw any documents being added to the collection. While investigating through the came CLI, we found the following issue.

Apparently we are having issues with the deserialization of the messages as it is being pulled by the connector. The error message seems to suggest that there are some issues with the schema. But why?? We successfully generated the types from schema definition file (*.avsc) using avro-tools and KafkaProducer was successful in publishing the message to Kafka.

Let’s first see how we have configured our Producer. Here are a some configurations for serialization. It is interesting to note that serialization details are defined for both key and message. Here we are using String based serialization for key while Avro based serialization for the message.

And that’s what the issue is. Actually connectors assume Avro serialization for both Key and Value. And that’s the issue causing MongoSink Connector to fail as it seems to assume Avro serialization of keys. In order to fix this, let’s update the configuration for the connector to define String based serialization for key.

I have realized that just updating the connector and restarting it doesn’t cause it to update the configuration. Let’s just remove the connector and create it again.

Now when we check the status of the connector, it shows success. You can try sending a few messages and verify that the documents are being added to the Mongo Database running locally as specifed by the connector configuration.

CORS with Kafka REST Proxy

CORS with Kafka REST Proxy

Landoop’s Kafka Topics UI is an excellent tools to view the topics in a Kafka Cluster in Confluent platform.

landoop

It is based on AngularJS. If you just follow the instruction and run it using http_server, you would get the following error:

connectivity_error

The issue is with the default CORS settings on Kafka REST proxy. Just update kafka-rest/kafka-rest.properties file by adding CORS permissions.

Now it should be OK running the UI. Here is my screen.

running_cluster