By   November 7, 2018

Apache Flink – mapWithState on KeyedStream

mapWithState operator provides a syntactic ease for keeping ValueState for map operations on KeyedStreams in Apache Flink. It is specially useful for doing a running count on the data. That is exactly what we are going to try in this post. Just remember that you don’t have to return the state as a result of map operation. But the elements returned can use the state for manipulation of input elements.



Here we are just using mapWithState operator to do a running count of keys. We are also tagging the count with the elements and returning a Tuple2.

First we are creating a DataStream from elements of a collection using StreamExecutionEnvironment‘s fromElements method. All the elements are of integer type resulting this to be a DataStream[Int]. We are creating a KeyedStream from the elements keeping the elements themselves as the key of the stream. The KeyStream[ValType, KeyType] has both keys and values of type Int KeyedStream[Int, Int].

We can now simply use mapWithState operator of KeyedStream. This is just a shortcut of map() or flatMap with a single ValueState. The function used here takes the current element, plus an Option of the current value of the ValueState. It must return the updated value of the ValueState, which is then passed to the function when the same key is encountered next time.

Here we are creating a DataStream for tuples of two elements. The first item is the value, plus the updated count as the second element. The count should keep incrementing as the same key gets encountered. For the first time for any key, the ValueState would be None. We are returning 1 in this case.

Here is the build.sbt file for the application: