Overriding Default Serialization for Kafka Connect – Be Careful
Serialization / Deserialization for Confluent platform can be sometimes tricky. There are certain defaults to the platform. As developers, it is our responsibility to understand the defaults and tweak the parts which are necessary. Even just a little mismatch can be sometimes very cumbersome and might cost a lot of time investigating. Now there are tools developed by community to figure out various issues with different parts of our application.
Here we are using Kafka Connect CLI from datamountaineer. You can download connect-cli from github.
Here we used MongoSink connector for Confluent platform. The connector pulls messages from a certain topic based on a certain KCQL statement and inserts into a particular collection in Mongo Database. Adding the connector was easy but we never saw any documents being added to the collection. While investigating through the came CLI, we found the following issue.
Apparently we are having issues with the deserialization of the messages as it is being pulled by the connector. The error message seems to suggest that there are some issues with the schema. But why?? We successfully generated the types from schema definition file (*.avsc) using avro-tools and KafkaProducer was successful in publishing the message to Kafka.
Let’s first see how we have configured our Producer. Here are a some configurations for serialization. It is interesting to note that serialization details are defined for both key and message. Here we are using String based serialization for key while Avro based serialization for the message.
And that’s what the issue is. Actually connectors assume Avro serialization for both Key and Value. And that’s the issue causing MongoSink Connector to fail as it seems to assume Avro serialization of keys. In order to fix this, let’s update the configuration for the connector to define String based serialization for key.
I have realized that just updating the connector and restarting it doesn’t cause it to update the configuration. Let’s just remove the connector and create it again.
Now when we check the status of the connector, it shows success. You can try sending a few messages and verify that the documents are being added to the Mongo Database running locally as specifed by the connector configuration.