Apache Flink: could not find implicit value for evidence parameter of type
The solution is simple. We just need to import the following:
Which should import the necessary types for the operations.
In order to connect with a data source or sink, we need to use credentials. Kafka Connect added support for specifying credential using config providers. The support for file config provider is available with the installation package. This is discussed in KIP 297. The KIP was released in Apache Kafka 2.
Here is the JIRA.
Changes in worker properties file [etc/schema-registry/connect-avro-distributed.properties]
Since the connectors are exposed through the RESTful service from Connect, the user credentials will be returned as a response. An additional update was required to fix this. This was worked as KAFKA-5117.
The fix for RESTful service was released in Kafka 2.1.x.
If you are using Confluent Version, it should be available in 5.1.x.
Here is the credentials file couchbase.properties:
Couchbase supports document expiration. If the documents are ingested from Kafka through Couchbase Sink connector the expiration can be set at the connector side where we can set the expiration for each record upserted as document in couchbase.
Here is a sample configuration for the connector:
Here is a sample configuration from couchbase:
Sample Config from Couchbase: https://github.com/couchbase/kafka-connect-couchbase/blob/master/config/quickstart-couchbase-sink.properties
Confluent kafka process start with these default arguments. You can see that JMX authentication is disabled by default. This is a security vulnerability and might lead to possible issues.
Confluent kafka consists of the following services. We need to enable authentication for all of these services for JMX:
Kafka Broker, Zookeeper and Kafka REST use kafka-run-class. We can update it to enable authentication for JMX for these services as follows:
In order to update KSQL, you need to update ksql-run-class in bin folder of your confluent installation as follows:
On the Confluent Control Center (C3) server, following update is required in control-center-run-class:
Schema Registry uses schema-registry-run-class. We can update it as follows:
kafka-rest-run-class is used to run KAFKA REST proxy. It can be updated as follows:
kafkacat is an amazing kafka tool based on librdkafka library, which is a C/C++ library for kafka. It means that it doesn’t have dependency on JVM to work with kafka data as administrator. It can be used to consume and produce messages from kafka topics. It also supports getting information about metadata information from kafka brokers and topics.
The simplest way is to use docker. Confluent also has the image available on its repository. It’s also packaged and made available by others. You can find them here:
I have a mac. It can also be installed using homebrew. But if you don’t have development tools installed, it might show some errors as follows:
It was clear what needs to be done here. You can just install developer tools using xcode-select –install as follows:
And it goes through the steps for installation successfully as follows:
Now the installation runs successfully. You can see that it also has installed the dependencies automatically including librdkafka.
The outputs from the tool can also be requested in json format using -J switch. I would recommend you to install jq tool. The tool allows to pretty print the json on the console.
-L switch is used to get the metadata information. It can be used to get the metadata information for all topics in the cluster. You can also specify the particular topic you are interested in using the -t switch.
Most commonly, you might use kafkacat for consuming messages in an environment to see the details about the messages in a topic.
You can use the format expression with various parameters:
In development environment specifically, one needs a tool to easily send messages on a topic to be consumed by a consumer service. kafkacat supports producing messages directly from the console.
It also supports loading the data from file and sending them to the topic. If we don’t use -l switch in this case, it would send the contents of whole file as one record.
Apparently, there is still no schema registry support so please be careful with your messages in AVRO format.
While working with kafka we, sometimes, need to purge records from a topic. This is specially needed in a development environment where we just want to get rid of some records and want to keep the other ones. Previously we have discussed how we can purge all records from a topic by playing with the minimum retention configuration on it. This is useful when the records are far apart in time. Here is the reference of our discussion:
Kafka has one more extremely useful utility which also allows us to delete records. This is kafka-delete-records.sh tool. You can find it in the bin folder of your installation. Here I have Confluent Community Edition installed:
Let’s first create a topic my-records. The topic is loaded with messages in source-data.txt file. We have 15 lines in the file. The records are pushed to kafka using kafka-console-producer tool.
We can verify that the topic has actually been created using kafka-topics tool using –describe switch.
The tool also list number of partitions, replication factors and overridden configurations of the topic. Here is the output:
We can also verify that the messages are actually created by verifying the offsets of records in the topic. GetOffsetShell is another tool for this purpose. We have discussed about this previously. You can find the discussion here: http://www.alternatestack.com/development/app-development/kafka-tools-kafka-tools-getoffsetshell/.
kafka-delete-records supports and requires certain switches. This includes the cluster to connect and details about the records to delete. The records’ details can be specified in a json file passed to the tool using one of the switch.
Here is a sample format of the json file. You can note that we can even make it granular to the level of partition. Here offset is the minimum offset we want to keep. So all records before the specified offset will be purged from the topic’s partition.
Specifying incorrect topics always result in error. Here I kept the topic foo instead of my-records topic created above. Obviously, this resulted in: This server does not host this topic-partition.
Specifying non-existent partition also results in an error. Here I specified 1 instead of existing partition 0 on the topic.
Here I have played with values of offset in the json file. Please note the output:
In the end, the records did get deleted from the partition. We can verify using kafka-console-consumer tool. It appears that the tool was successful and did delete 3 records from the partition as intended.
If we need to delete all records from a partition, use -1 for offset in the configuration.
One of the problems we have faced running Apache Flink that we have a very limited windows for getting access to the details about failed / crashed jobs. The job details just disappear and cannot be retrieved especially form web front end. Here is how it looks like:
Actually we are not that helpless. Apache Flink does support keeping the history of completed jobs. We can also retrieve it. But this is not available through regular web front end. This is available through a separate server called History Server. The server is available with the regular Apache Flink package download.
The history server needs a way to keep a track of all completed jobs by flink. They also include both failed jobs. Job Manager makes it easier for it. It pushes the details about the completed jobs in the specified folder in a JSON format with all details about the job including vertices, metrics, watermarks and exceptions.
The History server is available as both web front end and RESTful service. Here is the default view for the list of completed jobs. It just has one tab with details about completed jobs. The web front end is on the same port under # sub-path. So if your history server is running at http://myserver:9091 then then web front end is available at http://myserver:9091/#.
It is also available as a RESTful service. It can be accessed directly through http request. So in our case an example query can be curl http://myserver:9091/jobs/overview. We can query various end-points. The results are available in JSON format. All the end-points can be found here: [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/historyserver.html]
As we saw above, job manager must be configured with the same settings for completed jobs data as used by history server. Here is a sample configuration where we have used a sample file system folder to keep completed jobs data. You can see that we have used the exact same values for jobmanager.archive.fs.dir and historyserver.archive.fs.dir. You might want to use a distributed file system (hdfs) in your production environment.
The history server runs on port 8082 by default. It was conflicting with another service on my machine so I just changed the default value to 9091.
It’s simple to run this. You just need to use historyserver.sh in the bin folder of the downloaded package. This is a very simple shell script which uses flink-console.sh or flink-daemon.sh depending on how you want to run the server. For development purposes, you might want to try and see if the server is running successfully before starting it up as daemon.
In order to run it as daemon use start, otherwise, uses start-foreground to start the server. You can use stop switch to stop the server. It just assumes that you are using flink-daemon to run it and passes the relevant switches to stop it.
While setting it up, you might run into some issues. Here are the two most common ones:
I missed setting the archive directory first time. Starting without setting the property results in the history server not being able to startup. You should get the following error:
It is also required to specify the file scheme. I also missed it and got the following error:
Confluent Hub is an online repository for extensions and components for Kafka. Kafka is based on extensible model for many of its services. It allows plug-ins and extensions which makes it generic enough to be suitable for many real world streaming based applications. They include both Confluent and 3rd party components. Generally, you would go there to search for components including:
Since you wouldn’t be changing the serialization a lot, you might find yourself search connectors and SMTs on this repository most of the time.
Confluent Hub obviously have a web page where you can go and search for the component you might be looking for.
There is also a CLI available with the latest versions of Confluent Enterprise. You can find it in bin folder of the installation.
But if you have a community edition, the CLI can also be downloaded as a tar ball. Just make sure that your confluent installation and confluent hub packages are added to PATH environment variable.
Here is the command we can use to download the components. Please note that there are switches to specify the component’s download directory. You can also note that there is an additional switch to specify the worker’s properties file. This is to ensure that the plugins details are updated so connector service can use the component from the specified directory.
The complete list of command’s switches can be found here:
And the component does get downloaded to the specified directory.
You can see that the worker’s properties file specified has also been updated with the details where the component has been dowloaded and can be used from:
Although the confluent tutorials suggest to run the installation command directly providing just the component’s details but there are other switches too, some of them seem mandatory.
You can certainly post your connectors and other components to Confluent Hub and make it available to community once approved. Further details about such postings can be found here:
Of course, you need to build the component before you can upload it. There are some specifications for building such components. The details can be found here:
I ran into some issues running Elasticsearch on Linux. The issues and solutions might be of some use for others.
If you are getting the following error:
Then you can resolve it by increasing the max_map_count to the suggested value for the vm option using sysctl. But you need to run it as root.
Here is the error you might see:
In order to resolve this, we can update the following for the user in /etc/security/limits.conf.
Confluent packages can simply be removed the same way they are installed. We can download the package and run the services separately. But if you have it installed through yum, it can be as easily removed. Here we are removing it using yum remove command.
After getting a confirmation, it erases the installed components as follows:
As the older version is removed, we can get the new packages and directly run it. Here we have downloaded the whole package and running it using confluent CLI directly from bin folder: