Kafka Tools – kafka-delete-records
While working with kafka we, sometimes, need to purge records from a topic. This is specially needed in a development environment where we just want to get rid of some records and want to keep the other ones. Previously we have discussed how we can purge all records from a topic by playing with the minimum retention configuration on it. This is useful when the records are far apart in time. Here is the reference of our discussion:
Kafka has one more extremely useful utility which also allows us to delete records. This is kafka-delete-records.sh tool. You can find it in the bin folder of your installation. Here I have Confluent Community Edition installed:
Let’s first create a topic my-records. The topic is loaded with messages in source-data.txt file. We have 15 lines in the file. The records are pushed to kafka using kafka-console-producer tool.
We can verify that the topic has actually been created using kafka-topics tool using –describe switch.
The tool also list number of partitions, replication factors and overridden configurations of the topic. Here is the output:
We can also verify that the messages are actually created by verifying the offsets of records in the topic. GetOffsetShell is another tool for this purpose. We have discussed about this previously. You can find the discussion here: http://www.alternatestack.com/development/app-development/kafka-tools-kafka-tools-getoffsetshell/.
kafka-delete-records supports and requires certain switches. This includes the cluster to connect and details about the records to delete. The records’ details can be specified in a json file passed to the tool using one of the switch.
Here is a sample format of the json file. You can note that we can even make it granular to the level of partition. Here offset is the minimum offset we want to keep. So all records before the specified offset will be purged from the topic’s partition.
Specifying incorrect topics always result in error. Here I kept the topic foo instead of my-records topic created above. Obviously, this resulted in: This server does not host this topic-partition.
Specifying non-existent partition also results in an error. Here I specified 1 instead of existing partition 0 on the topic.
Here I have played with values of offset in the json file. Please note the output:
In the end, the records did get deleted from the partition. We can verify using kafka-console-consumer tool. It appears that the tool was successful and did delete 3 records from the partition as intended.
If we need to delete all records from a partition, use -1 for offset in the configuration.