Kafka Streams – Interactive Queries (Intra Application Instance)
Kafka Streams application need to maintain KTables in an internal application store. It maintains the store using RocksDB. It even allows to query this store during application lifecycle anywhere from the application parts.
Let’s first create a simple application KafkaStreamsInteractiveQueriesApp. It is a simple Scala Application.
In order to setup Streams from Kafka cluster, we need to provide its details in configuration. Here we are creating a Map with the required properties. This can be pulled and used when building streams. For the purposes of Interactive Queries,
StreamsConfig.APPLICATION_SERVER_CONFIG is the most important configuration. This is also used when a kafka Streams app has more than one instances , and we are interested in the application state which can be found in another application instance.
Kafka Stream applications use Serde to serialize and deserialize messages from Kafka. Here is a simple type to make it easier for us.
Basic Setup
Let’s do a basic setup of the ingredients used to build Kafka Streams DSL. Here we are merely using the above types to build StreamsConfig. StreamBuilder object is used to build Kafka Streams DSL.
Store for KTable for Simple Aggregations
Having built StreamsBuilder object, we can use it to hook up to Kafka Streams. Here we are first using it for building KStream from messages in the topic Merged-SensorsHeartbeat. We can now group this using the default key for the topic. You can note that we have used specific Serde (s) here. In the absence of these Serde (s), Kafka Streams try to use default Serde and might fail if they are incompatible.
Here we are doing another aggregation. This is even simpler aggregation. We are simply using the KGroupedStream object created above (mergedStreamTable), and are applying another aggregation to create a KTable to maintain counts of individual keys.
Store for table obtained through StreamBuilder
We can also directly create KTable by simply hooking up to Kafka topics using StreamBuilder’s table method. Here we are specifying another store to keep this table’s info.
Using State Stores
As we discussed above, we can use query these stores anytime during application lifecycle. In order to make sure that we don’t try to get them before the streams actually started, we are using Timers. As the timers are triggered we are simply obtaining the information maintained in the underlying stores.
KafkaStreams provide access to the stores using KafkaStreams.store method. Here we need to specify the name of store and the types of keys and values. This is a simple Key / Value store with String based keys and SensorHeartbeat values. We are getting all values from the store and simply printing them to the console.
Using Store for TimeWindow and SessionWindow
We can also build KTable (s) holding values of a certain window in its data store. Here we are building two KTable (s). One is using a TimeWindow, which would keep count of unique keys for last eight hours.
There is another KTable with an interesting concept of session. Here sessions are defined with periods of inactivity provided through its with method. Here we are just using 1000 ms as marker between sessions.
Complete Code MainApp
This is the complete code for MainApp for reference.
Invalid State Store Exception
If an invalid state store is specified, it might result in the following exception.
Further Reading
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams