"Databases? Where we're going we don't need databases" – Doc Brown, 1985
Well, we’re certainly not there yet, but this article is going to introduce you to a new feature of the popular streaming platform Apache Kafka that can make a dedicated external database redundant for some use cases.
Kafka 0.10.0 introduced the “Kafka Streams” API – a new Kafka client that enables stateless and stateful processing of incoming messages, with state being stored internally where necessary. In the initial release, state could only be exposed by writing to another Kafka topic. Since Kafka 0.10.1, this internal state can be queried directly. This article introduces the API and talks about the challenges in building a distributed streaming application with interactive queries. It assumes basic knowledge of the Streams API .
Let’s consider a simple example that models the tracking of visits to a web page. A topic “visitsTopic” contains Kafka messages that contain key-value pairs in the format . So the key of the message contains an IP address of a visitor and the value contains the timestamp of that visit. This is of course a bit contrived as we would not use one topic per trackable web page, but let’s keep it simple.
For the sake of this example, we’re interested in three aspects:
- how many times did a user with a given IP visit our page in total?
- how often was it visited by a given IP in the last hour?
- how many times per user session did an IP visit the page?
This can be achieved with the following topology:
There are three state stores:
- “totalVisitCount” contains the total amount of visits per unique IP
- “hourlyVisitCount” contains the number of visits in the last hour
- “sessionVisitCount” contains the count per session (with a new session being started when there is no activity for more than a minute)
In Kafka 0.10.0, the only option to retrieve that data would have been to materialize it into another Kafka topic. For many use cases, this can be considered quite wasteful. Why do we have to persist the data once again in Kafka if all we want to do is answer a few very simple queries?
Interactive Queries to the Rescue
As outlined in KIP-67 , interactive queries were designed to give developers access to the internal state that the Streams-API keeps anyway. This is the first bit to take away: interactive queries are not a rich Query-API built on Kafka Streams. They merely make existing internal state accessible to developers.
The state is exposed by a new method in org.apache.kafka.streams.KafkaStreams. While this client originally mainly contained the capability to start and stop streaming topologies, it has been extended in Kafka 0.10.1 and further with 0.10.2. The entrypoint into querying a local state store is the store method. Let’s look a bit more closely at its signature:
public T store(String storeName,
The first parameter is easy, it takes the name of the store that we want to query – “totalVisitCount”, “hourlyVisitCount” or “sessionVisitCount” in our example. It is not the topic name! The second parameter is a bit more intriguing. It declares the type of the provided store. At this point, it’s worth taking a step back to understand what that is about. By default, the Kafka Streams high-level DSL uses RocksDB (http://rocksdb.org/) to store the internal state. This is generally pluggable by the way – you could supply your own StateStoreProvider. RocksDB mainly works in memory but may flush to disk as well. There are three standard types of RocksDB-backed state stores:
- Key-Value based
- Window based
- Session window based (since 0.10.2)
In our example, “totalVisitCount” is an example of a key-value based state that maps an IP address to a counter. “hourlyVisitCount” is window-based – it stores the count of visits of an IP address as it occurred in a specific time window. “sessionVisitCount” is an example of a session window store. Session windows are a new feature of Kafka 0.10.2 and allow to group repeated occurrences of keys into specific windows that dynamically expand if a new record arrives within a so-called inactivity gap. Simple example: if the inactivity gap is 1 minute, a new session window would be opened if there was no new record for a key for longer than that minute. Two messages within say 20 seconds would belong to the same window.
Each store type has its specifically tailored API. A key value store enables different types of queries than window stores.
Accessing a key-value store works like this:
ReadOnlyKeyValueStore store = streams.store(“visitsTable”,
An important aspect of interactive queries is in the name of the return type – they are read-only. There are no inserts, updates, deletes whatsoever. This is a good thing – Kafka topics are your only data source and underlying computations could really get messed up if you were allowed to manipulate data.
The ReadOnlyKeyValueStore interface does not contain a lot of methods. You can basically query the value of a certain key, the values of a range of keys, all keys and an approximate count of entries. Applied to our example, this store enables you to query for the total count of visits for a given IP, the count for a range of IPs, all IPs and their count and an approximate count of all unique IPs in the store.
Creating a handle to a windowed store works like this:
ReadOnlyWindowStore store = streams.store(“hourlyVisitCount”,
This interface is even sparser as it only has one method called fetch that takes a key as well as a “from” and and a “to” timestamp.
This retrieves the aggregated results of the windows that fall into the passed timeframe. The resulting iterator contains KeyValue objects where the long is the starting timestamp of the window and the V the type of the value of that KTable. With regards to the example, this would retrieve the hourly counts for all visits by a given IP starting with the window that contains “timeFrom” and ending with the window containing “timeTo”.
Session windows stores are retrieved with
The store interface is the simplest of all as it only has one fetch method that takes a key and nothing else. It retrieves the results for all existing session windows at that point in time.
So this looks easy enough. When running a single instance of the streaming application, all partitions of the topic are handled by that instance and can be queried. However, running a single instance of a consumer is not really what Kafka is about, is it? How do interactive queries work when the partitions of the source topics – and by extension the state – is distributed across instances of your streaming application?
Running your application in distributed mode
There is no beating around the bush – here be dragons. As mentioned above, interactive queries have not turned Kafka Streams into an almighty query server.
So the bad news is:
- you need an additional layer that glues together your instances
- you need to know which instance(s) would be responsible for a given query
- you need to build it yourself
Sucks a bit, right? It’s not hard to see where this restriction is coming from, though – building an efficient generalized query facade running in a distributed mode, working for all kinds of data on Kafka is hard when all you can count on is the fact that keys and values are byte arrays containing god knows what. Another main reason for this is that Kafka Streams aims to be completely agnostic to the kind of context it is run in – it does not want to restrict you to certain frameworks. The Confluent blog argues this case very nicely.
Kafka Streams is not leaving you completely alone with that problem, though.
When you provide the properties for your streaming application, a new one is application.server. This expects a host:port pair that will be published among the instances of your application. This does not mean that the Streams API will actually open that port and listen to some sort of request. That is your responsibility and you are completely responsible for communication protocols etc. But it will communicate that endpoint to the other instances via the Kafka protocol, so if you keep your end of the bargain, you can query any instance for metadata and it will provide a comprehensive view. The following illustration demonstrates the setup:
There are two instances of the application, running on 188.8.131.52:42 and 184.108.40.206:4711. A query layer talks to those instances via a user-defined (that means you) protocol. The instances themselves need to run some kind of server that provides endpoints for that protocol. You’re completely free what to use here, there is a lot of choice in the Java ecosystem – Spring MVC, Netty, Akka, Vert.x, you name it). Initially, the query layer needs to know at least one instance by address, but that instance can – if your protocol allows it – pass on the information about the other endpoints. The query layer can ask any instance for information about the location of a given key or store.
Accessing the metadata
So how do we get this metadata on the low level? For this, we return to org.apache.kafka.streams.KafkaStreams. Apart from the method that let’s us access a store, it also provides access to metadata on varying levels. You can simply query all metadata for a streaming application. This will give you an overview of:
- what instances of my application are running where (according to the “application.server” property?
- what state stores are available on those instances?
- what partitions of what topics are handled by an instance?
In a simple example with only one instance, this metadata looks like this (via its toString):
The host info object contains the provided application server values, the three state store names are present and the instance handles partitions 0 and 1 of topic “visitsTopic”. If there were more instances, we’d get all metadata. That metadata is of course a snapshot of the time you call the allMetadata() method – starting or stopping instances can result in partition reassignment.
The API provides more fine-grained access as well. We can query all the metadata for a given state store, for example. This operation only returns metadata for instances where a store of that name is present. Even more specific are two methods that take the name of a store and a key (and either a Serializer for that key or a StreamPartitioner). This is a very interesting operation as it will return the single metadata for the instance that will hold the data for a key if any data exists, which of course cannot be guaranteed – we won’t know if data is there unless we execute an actual query.
Interactive queries are a very cool feature that just might make your database redundant one day. Kafka is not the only technology moving in that direction – Apache Flink 1.2 introduced a similar feature.
But let’s not get ahead of ourselves – these are early days for this kind of technologies. Interactive queries in Kafka are at the moment only suitable for very simple key-based queries and the need to build your own distributed query layer might put people off. But with an ever-growing Kafka community, there is some real potential. The future is not quite here yet, but interactive queries shows us what it might look like.
As an entry point for further reading,I recommend reading Confluent’s introductory post . Confluent also provides a reference implementation of a query layer.
Validating Topic Configurations in Apache Kafka
Messages in Apache Kafka are appended to (partitions of) a topic. Topics have a partition count, a replication factor and various other configuration values. Why do those matter and what could possibly go wrong? Why does Kafka topic configuration matter...
- Big Data
7.12.2017 | 8 Minuten Lesezeit
Building a distributed Runtime for Interactive Queries in Apache Kafka...
Interactive Queries are a fairly new feature of Apache Kafka Streams that provides programmatic access to the internal state held by a streaming application. However, the Kafka API only provides access to the state that is held locally by an instance...
20.3.2017 | 9 Minuten Lesezeit
Crossing the Streams – Joins in Apache Kafka
Version 0.10.0 of the popular distributed streaming platform Apache Kafka saw the introduction of Kafka Streams. In its initial release, the Streams-API enabled stateful and stateless Kafka-to-Kafka message processing using concepts such as map, flatMap...
- Big Data
15.2.2017 | 14 Minuten Lesezeit
Realtime Fast Data Analytics with Druid
I have been working with the SMACK stack for a while now and it is great fun from a developer’s point of view. Kafka is a very robust data buffer, Spark is great at streaming all that buffered data and Cassandra is really fast at writing and retrieving...
18.8.2016 | 13 Minuten Lesezeit
Neues in Apache Kafka 0.10 und Confluent Platform 3.0.0
Die im Mai erschienenen neuen Versionen von Apache Kafka und Confluent Platform enthalten einige spannende Neuerungen. Diese werden in diesem Artikel vorgestellt. Was ist Apache Kafka? Kafka ist ein verteilter Message Broker, der nach dem Publish-Subscribe...
7.6.2016 | 10 Minuten Lesezeit
The SMACK stack – hands on!
The SMACK stack is all the rage these days. Instead of just talking about it, this post is going to guide you through the steps for setting up a simple SMACK stack that will enable you to get a hands on experience with the tools. In the first step,...
1.5.2016 | 9 Minuten Lesezeit
First steps with Java 9 and Project Jigsaw – Part 2
This is part 2 of a series that aims to get you started with project Jigsaw. In part 1 , we briefly talked about the definition of a module and how the Java Runtime was modularized. We then proceeded to a simple example that demonstrated how to (and ...
1.12.2015 | 12 Minuten Lesezeit
First steps with Java 9 and Project Jigsaw – Part 1
Eight years after its inception, Project Jigsaw – the modularization of the Java platform and introduction of a general module system – is on track to be included in Java 9. The target release has changed over the years from Java 7 via Java 8 to Java...
24.11.2015 | 11 Minuten Lesezeit
Dein Job bei codecentric?
Agile Developer & Consultant (w/d/m)
An allen Standorten
More articles in this subject area
Discover exciting further topics and let the codecentric world inspire you.
Streaming Wikipedia mit Apache Kafka
Apache Kafka ist in aller Munde und entwickelt sich im Kontext von verteilten Systemen zum De-facto-Standard als Plattform für Event Streaming. Im Rahmen unserer OffProject Time (Weiterbildungszeit) haben wir uns die Plattform auch näher angeschaut und...
15.8.2022 | 10 Minuten Lesezeit
Stream Processing mit Kafka Streams und Spring Boot
Kontinuierliche Datenströme in verteilten Systemen ohne Zeitverzögerung zu verarbeiten, birgt einige Herausforderungen. Wir zeigen euch, wie Stream Processing mit Kafka Streams und Spring Boot gelingen kann. Alles im Fluss: Betrachtet man Daten als fortlaufenden...
20.12.2021 | 20 Minuten Lesezeit
Mule: Streaming mit DataWeave
Mule legt den Datentyp für die Payload einer Nachricht nicht fest. Genauer als Object will es das Maultier nicht wissen. Häufig sind es PoJos, XML oder JSON. Da die letzten beiden nur strukturierter Text sind, müssen sie irgendwie abgelegt werden. Das...
9.9.2018 | 8 Minuten Lesezeit
Wie schreibt man eine Kotlin-DSL – z.B. für Apache Kafka?
Das Interesse an der Programmiersprache Kotlin wächst, und auch die Verwendung von Kotlin in Projekten nimmt zu. Ein Bereich, in dem Kotlin hervorragend verwendet werden kann, ist die Implementierung von speziellen Domänen-spezifischen Sprachen, den ...
23.6.2018 | 9 Minuten Lesezeit
Datenlookup in Spark Streaming
Bei der Verarbeitung von Streaming-Daten reichen die Rohdaten aus den Events häufig nicht aus. Meist müssen noch zusätzliche Daten hinzugezogen werden, beispielsweise Metadaten zu einem Sensor, von dem im Event nur die ID mitgeschickt wird.In diesem ...
- Big Data
1.6.2017 | 7 Minuten Lesezeit
Event-Zeit-Verarbeitung in Apache Spark und Apache Flink
Mit dem neuen Release von Spark 2.1 wurden die Eventzeit-Fähigkeiten von Spark Structured Streaming ausgebaut. Höchste Zeit also den Stand der Unterstützung genauer unter die Lupe zu nehmen und mit Apache Flink – ausgestattet mit einem breiten Support...
- Big Data
- Machine Learning
19.4.2017 | 9 Minuten Lesezeit
Verteilte Stream Processing Frameworks für Fast Data & Big Data – Ein ...
Spark Streaming, Flink, Storm, Kafka Streams – das sind nur die populärsten Vertreter einer stetig wachsenden Auswahl zur Verarbeitung von Streaming-Daten in großen Mengen. In diesem Artikel soll es um die wesentlichen Konzepte hinter diesen Frameworks...
- Big Data
- Open Source
- Machine Learning
26.3.2017 | 10 Minuten Lesezeit
Aufbau eines Mesosphere DC/OS-Clusters mit Terraform
Der Ein oder Andere kennt höchstwahrscheinlich die Herausforderung, ein verteiltes System zu betreiben. Selbst der Betrieb von einem einfachen Online-Shop kann eine nicht triviale Aufgabe sein, wenn der Shop in einer Microservice-Architektur über mehrere...
- Reactive Programming
- Big Data
24.4.2016 | 5 Minuten Lesezeit
AMQP Messaging mit RabbitMQ und Spring
RabbitMQ ist als Messaging-System Teil der vFabric Cloud Application Platform. Die Unterstützung des performanten Messaging Protokolls AMQP prädestiniert RabbitMQ für den Einsatz in Hochverfügbarkeitsszenarien. RabbitMQ ist ein Open-Source-Produkt ...
20.4.2011 | 3 Minuten Lesezeit
Gemeinsam bessere Projekte umsetzen.
Wir helfen deinem Unternehmen.
Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.
Hilf uns, noch besser zu werden.
Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.
Senior IT Consultant
Do you still have questions? Just send me a message.