Monthly Archives: September 2018

Apache Flink – Collections & Streams

Apache Flink – Collections & Streams

While developing a streaming application, it is often necessary to use some inputs as collections. This can be specially useful for testing purposes. In this post we are trying to discuss how we can create a DataStream from a collection. After applying some operators on the stream, we are going to create an iterator from the elements of the stream.

StreamExecutionEnvironment supports creating streams from elements of a collection using fromElements method. Here we are using an integer collection. Here it returns DataStream[Int].

We are then using map operator to calculate double of the elements of each element. Finally DataStreamUtils is used to convert the stream to an iterator. We can now simply print out the elements of the iterator. Here is the output:

Output

Output

It is important that DataStreamUtils is used after executing the stream operators. Otherwise, it returns in the following error on executing the stream using StreamExecutionEnvironment.

Download Code

https://github.com/msiddiqi/ApacheFlingStreamsAndIterators

https://github.com/msiddiqi/ApacheFlingStreamsAndIterators

Apache Flink – Class Not found [org/apache/flink/streaming/api/scala/DataStream]

Apache Flink – Class Not found [org/apache/flink/streaming/api/scala/DataStream]
If you are developing a Flink app following examples on the web, you can most likely encounter the following error:

Generally, the problem is resolved by correctly setting the source folder in a scala project in IntelliJ but the weird thing is that it is happening for a dependency. Additionally, there are no compile issues. The actual problem is the dependency itself. Here are the recommended dependencies:

These dependencies expect a running flink. Just update the dependencies as follows:

And now you can see that the application is successfully running in IntelliJ and the breakpoints are being hit as expected:

flink_dependencies

flink_dependencies

Apache Flink – Change Port for web front end

Apache Flink – Change Port for web front end

Apache Flink runs the dashboard on port 8081. Since this is a common port there might be conflict with some other services running on the same machines. You might encounter this scenario especially during development when many services are running on the your development machine.

But the port can be changed. Just open conf/flink-conf.yaml file and update the rest.port property.

vi conf/flink-conf.yaml

vi conf/flink-conf.yaml

Now let’s start the cluster using bin/start-cluster.sh.

bin\start-cluster.sh

bin\start-cluster.sh

And flink is launched. You can verify by opening the dashboard in the browser. As you can see we are able top open the front-end using port 8081. The port is also correctly reflected in the job manager configuration details page as follows:

http://localhost:8089

http://localhost:8089

IntelliJ & Scala – Could not load main class

IntelliJ & Scala – Could not load main class

This is one of the most common error we see when we get new code from a source repository. We might also see it when we create a new Scala project. As we run the application, the console shows could not load main class

Could not load Main class

Could not load Main class

The solution is simple. Just open module setting and see if the source folder for code has been correctly specified for the module which is being used to run the application.

source folder in module

source folder in module

Just make sure that the same module is selected in your debug configuration in Use class path of module option.

classpath of module

classpath of module

IntelliJ – Add New Scala Class Option – Not Available

IntelliJ – Add New Scala Class Option – Not Available

You might get into this problem every time a new Scala project is created. By default, the support of Scala is not available. So when you right click your src folder to add Scala Class, you might be disappointed to see no option for adding a new Scala class is available.

Screen Shot 2018-09-01 at 5.00.33 PM

This can be resolved by following the following simple steps. Just right click your project and select Add Framework Support from the context menu shown.

Screen Shot 2018-09-01 at 5.00.00 PM

If you have Scala plugin installed, you should see the option for Scala available. Select the option and hit OK.

Screen Shot 2018-09-01 at 5.00.11 PM

You also might need to mark src folder as Source root.

Screen Shot 2018-09-01 at 5.04.47 PM

Now the option should be available to add a new Scala class.

Screen Shot 2018-09-01 at 5.05.00 PM