Apache Flink – Collections & Streams
While developing a streaming application, it is often necessary to use some inputs as collections. This can be specially useful for testing purposes. In this post we are trying to discuss how we can create a DataStream from a collection. After applying some operators on the stream, we are going to create an iterator from the elements of the stream.
StreamExecutionEnvironment supports creating streams from elements of a collection using fromElements method. Here we are using an integer collection. Here it returns DataStream[Int].
We are then using map operator to calculate double of the elements of each element. Finally DataStreamUtils is used to convert the stream to an iterator. We can now simply print out the elements of the iterator. Here is the output:
It is important that DataStreamUtils is used after executing the stream operators. Otherwise, it returns in the following error on executing the stream using StreamExecutionEnvironment.
Apache Flink – Class Not found [org/apache/flink/streaming/api/scala/DataStream]
If you are developing a Flink app following examples on the web, you can most likely encounter the following error:
Generally, the problem is resolved by correctly setting the source folder in a scala project in IntelliJ but the weird thing is that it is happening for a dependency. Additionally, there are no compile issues. The actual problem is the dependency itself. Here are the recommended dependencies:
These dependencies expect a running flink. Just update the dependencies as follows:
And now you can see that the application is successfully running in IntelliJ and the breakpoints are being hit as expected:
Custom Log4J Configuration file for Kafka
In order to use a custom Log4J configuration file and to override the default one, we need to add the details in -Dlog4j.configuration. Here is how you can update the configuration path:
As you can notice, we have also update the directory where logs are generated by overriding LOG_DIR.
Apache Flink – Change Port for web front end
Apache Flink runs the dashboard on port 8081. Since this is a common port there might be conflict with some other services running on the same machines. You might encounter this scenario especially during development when many services are running on the your development machine.
But the port can be changed. Just open conf/flink-conf.yaml file and update the rest.port property.
Now let’s start the cluster using bin/start-cluster.sh.
And flink is launched. You can verify by opening the dashboard in the browser. As you can see we are able top open the front-end using port 8081. The port is also correctly reflected in the job manager configuration details page as follows:
IntelliJ & Scala – Could not load main class
This is one of the most common error we see when we get new code from a source repository. We might also see it when we create a new Scala project. As we run the application, the console shows could not load main class
Could not load Main class
The solution is simple. Just open module setting and see if the source folder for code has been correctly specified for the module which is being used to run the application.
source folder in module
Just make sure that the same module is selected in your debug configuration in Use class path of module option.
classpath of module
IntelliJ – Add New Scala Class Option – Not Available
You might get into this problem every time a new Scala project is created. By default, the support of Scala is not available. So when you right click your src folder to add Scala Class, you might be disappointed to see no option for adding a new Scala class is available.
This can be resolved by following the following simple steps. Just right click your project and select Add Framework Support from the context menu shown.
If you have Scala plugin installed, you should see the option for Scala available. Select the option and hit OK.
You also might need to mark src folder as Source root.
Now the option should be available to add a new Scala class.