Beliebte Suchanfragen
//

Ein tolles Paar: Spring Webflux und Kotlin Coroutines

18.12.2023 | 6 Minuten Lesezeit

In diesem Artikel gehen wir darauf ein, wie mithilfe des Spring-Webflux-Projekts eine reaktive Anwendung erstellt werden kann und welche Herausforderungen dieser Ansatz mit sich bringt. Wir erläutern kurz, was Kotlin Coroutines sind und zeigen, wie die Herausforderungen des Reactive Programmings mit Spring Webflux mit einer Implementierung in Kotlin Coroutines umgangen oder zumindest abgefedert werden können.

Reactive Programming mit Spring Webflux

In einem vorangegangenen Artikel haben wir eine Einführung ins Reactive Programming und das Spring-Webflux-Projekt gegeben. Den in diesem Artikel genutzten Beispiel-Controller wollen wir hier nochmal aufgreifen. Dieser wurde vollständig reaktiv mit Java, der Reactive Streams API und Spring Webflux implementiert:

1@RestController
2class CustomerController {
3
4   private final CustomerRepository repository;
5
6   CustomerController(final CustomerRepository repository) {
7       this.repository = repository;
8   }
9
10   @PostMapping("/customer")
11   Mono<Customer> createNewCustomer(@RequestBody Mono<CustomerCreationRequest> request) {
12      return request.flatMap( dto -> 
13            repository.save(new Customer(dto.firstName(), dto.lastName()))
14      );
15   }
16}
17
18record CustomerCreationRequest(String firstName, String lastName) {}
19
20@Repository
21interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
22
23   Flux<Customer> findByFirstNameAndLastName(String firstName, String lastName);
24}

Durch ein HTTP POST gegen die URL /customer kann hier ein neues Customer-Objekt erzeugt und in der Datenbank persistiert werden. Das Ganze passiert reaktiv, da in der Methode createNewCustomer() das Mono für den Eingangsparameter request mithilfe der flatMap()-Methode mit dem reaktiven Repository und dessen save()-Methode verknüpft wird. Dadurch kann Spring Webflux dann einen HTTP Request ohne jegliche Blocking IO verarbeiten.

Es wird in diesem Beispiel jedoch auch deutlich, welche Nachteile das Reactive Programming mit sich bringt. Der Code ist für viele Entwickler komplizierter zu schreiben und zu verstehen. Im Gegensatz zu einem klassischen, imperativen Ansatz, bei dem der Code von oben nach unten ausgeführt wird, muss der Entwickler hier in Verkettungen von Streams denken. Und diese Verkettungen können deutlich komplexer werden als in diesem einfachen Beispiel, wie wir am Ende dieses Artikels zeigen werden. Diese Art des Denkens ist für viele Entwickler ungewohnt und stellt damit zunächst eine Hürde für die Nutzung von Reactive Programming dar. Wir werden im Folgenden zeigen, wie man mithilfe von Kotlin Coroutines das gewohnte imperative Programmiermodell auch für die Implementierung reaktiver Anwendungen nutzen kann.

Kotlin und Coroutines

Das zentrale Element für die Implementierung von Nebenläufigkeit in Kotlin sind die sog. Coroutines. Dabei handelt es sich, ähnlich wie bei Threads, um Programmabläufe, die parallel zueinander ausgeführt werden können. Im Gegensatz zu Threads sind Coroutines aber sehr leichtgewichtig und können zu Tausenden nebeneinander gestartet werden. Das ist möglich, weil Coroutines mithilfe sog. Worker-Threads implementiert werden. Dabei gibt es einen festen Pool von Threads, der sich um die Ausführung aller Coroutines im Hintergrund kümmert. Eine Coroutine kann an bestimmten Punkten während der Ausführung unterbrochen werden und zu einem späteren Zeitpunkt von einem anderen Thread weiter bearbeitet werden. Wann diese Thread-Wechsel passieren ist für den Entwickler transparent und wird von der Kotlin Runtime entschieden. Für einen tiefergehenden Einblick in Coroutines verweisen wir auf den Einführungsartikel hier im Blog.

Controller mit Suspend Function

Fest verbunden mit Coroutines sind in Kotlin die sog. suspend functions. Diese Funktionen werden innerhalb einer Coroutine ausgeführt und erlauben es, unterbrochen zu werden (deshalb suspend). Die zugehörige Coroutine wird dabei dann auch unterbrochen, was letztendlich bedeutet, dass sie keinem Thread aus dem Thread-Pool mehr zugeordnet wird. Diesen Mechanismus können wir uns zunutze machen, um Reactive Programming mit einem imperativen Programmiermodell umzusetzen. Betrachten wir dazu die Implementierung unseres Beispiel-Controllers mit Kotlin:

1@RestController
2class CustomerController(
3   private val repository: CustomerRepository
4) {
5
6   @PostMapping("/customer")
7   suspend fun createNewCustomer(@RequestBody request: Flow<CustomerCreationRequest>): Customer {
8      val dto = request.single()
9      return repository.save(Customer(dto.firstName, dto.lastName))
10   }
11}
12
13data class CustomerCreationRequest(val firstName: String, val lastName: String)
14
15@Repository
16interface CustomerRepository : CoroutineCrudRepository<Customer, Long>{
17
18   fun findByFirstNameAndLastName(firstName: String, lastName: String): Flow<Customer>
19}

Neben der etwas kompakteren Schreibweise, die uns Kotlin erlaubt, fällt gleich auf, dass die beiden Monos verschwunden sind. Außerdem wird auch die flatMap()-Methode nicht mehr verwendet. Die Methode createNewCustomer() gibt direkt das Customer-Objekt zurück. Trotz dieser Änderungen ist der Controller vollständig non-blocking. Der Trick ist hier das Schlüsselwort suspend. Dadurch kann diese Methode immer dann unterbrochen werden, wenn sie eine andere suspend function aufruft. Und das wiederum erlaubt es Spring Webflux (genauer, der InvocableHandlerMethod), den Aufruf dieser Methode in einem Mono zu verpacken. Das resultierende Mono emittiert also genau dann das Customer-Objekt, wenn obige suspend function von der Coroutine abgearbeitet wurde. Dadurch können wir als Entwickler uns das Mono als Rückgabetyp ersparen.

Eine weitere Auffälligkeit ist der Typ des Parameters request. Dieser hat sich von Mono zu Flow geändert. Dabei handelt es sich um eine Abstraktion von Kotlin, die als eine Vereinigung der Reactive Streams Publisher Mono und Flux betrachtet werden kann. Anders als in der Reactive Streams API wurde Flow aber vor allem deshalb eingeführt, um zu erlauben, dass Coroutines asynchron einen Stream von Objekten austauschen können (siehe auch hier in der Kotlin-Dokumentation). In unserem Fall möchten wir in der Methode createNewCustomer() darauf warten, dass ein CustomerCreationRequest verfügbar wird. Der Aufruf der single()-Methode (ebenfalls eine suspend function) wird die Ausführung daher so lange unterbrechen, bis ein Request-Objekt (durch Spring Webflux) verfügbar wird. Anschließend wird die Ausführung innerhalb der Coroutine fortgesetzt, ein Customer-Objekt erzeugt und die suspend function save() des Repositorys aufgerufen. Auch dieser Aufruf wird wieder zu einer Unterbrechung führen, bis die Persistierung in der Datenbank abgeschlossen ist. Erst dann wird die Coroutine weiterlaufen und das persistierte Customer-Objekt als Ergebnis zurückliefern. Der Sourcecode sieht fast wie in einem klassischen imperativen Programmiermodell aus, durch die Möglichkeit der Unterbrechung der Methode ist aber eine Ausführung wie in einem Reactive-Ansatz möglich. Der Entwickler muss sich damit aber gedanklich kaum beschäftigen, was die Komplexität deutlich reduziert.

Komplexeres Beispiel

Um die mögliche Reduktion der Komplexität nochmal deutlicher zu zeigen, möchten wir abschließend unser Beispiel durch eine komplexere Abfrage ergänzen. Dabei sollen alle Customer-Objekte anhand ihrer Kundengruppe abgefragt werden. Der reaktive Java-Code dieser Abfrage sieht wie folgt aus:

1@GetMapping("/search/customerGroup/{customerGroup}")
2Flux<Customer> findByCustomerGroup(@PathVariable final String customerGroup) {
3   return webClient.getCustomerGroup(customerGroup)
4           .flatMapMany(group ->
5                   Flux.concat(
6                           group.entries()
7                                   .stream()
8                                   .map(entry -> repository.findByFirstNameAndLastName(entry.firstName(), entry.lastName()))
9                                   .toList()
10                   )
11           );
12}

Zunächst wird mithilfe eines Webclients eine REST-API eines anderen Services aufgerufen, um den Namen der Kundengruppe zu einer Liste von Kundennamen aufzulösen. Diese Liste kann dann anschließend benutzt werden, um aus dem oben bereits genutzten Repository die tatsächlichen Customer-Objekte auszulesen. Sowohl der Aufruf der REST-API als auch der Aufruf des Repositorys sind dabei non-blocking und nutzten Mono bzw. Flux als Rückgabewert. Diese müssen dann wieder miteinander verkettet werden (diesmal mit den flatMapMany()- und Flux.concat()-Methoden). Der ungeübte Entwickler wird hier einen Augenblick brauchen, um genau zu verstehen, wie die einzelnen Aufrufe zusammenhängen. Und selbst viele geübte Entwickler werden diesen Code nicht ohne Weiteres runterschreiben können. Die Kotlin-Implementierung dieses Beispiels ist nicht nur einfacher zu verstehen, sondern einer klassischen imperativen Implementierung auch deutlich ähnlicher:

1@GetMapping("/search/customerGroup/{customerGroup}")
2suspend fun findByCustomerGroup(@PathVariable customerGroup: String): Flow<Customer> {
3   val group = webClient.getCustomerGroup(customerGroup)
4   return group
5       .entries
6       .map { repository.findByFirstNameAndLastName(it.firstName, it.lastName) }
7       .merge()
8}

Hier gibt es noch eine Besonderheit. In Kotlin hat es sich als Best Practice etabliert, dass eine Methode mit Rückgabewert Flow keine suspend function sein sollte, sondern stattdessen unmittelbar den Flow zurückgibt. Eventuelle Aufrufe von blockierenden oder suspending functions innerhalb dieser Methode werden dabei in die Ausführung des Flow selbst verschoben. Dies lässt sich in obiger Methode leicht erreichen, indem man den Methodenkörper in einen flow { ... } Builder einbettet und die return-Anweisung durch ein emitAll() ersetzt. Durch Kotlins ausdrucksstarke Syntax fällt diese Änderung kaum auf, ist für einen Entwickler des klassischen, imperativen Ansatzes dennoch weniger vertraut. Aus unserer Sicht sind für eine Controller-Methode beide Varianten akzeptable Lösungen:

1@GetMapping("/search/customerGroup/{customerGroup}")
2fun findByCustomerGroup(@PathVariable customerGroup: String) = flow {
3   val group = webClient.getCustomerGroup(customerGroup)
4   emitAll(group
5      .entries
6      .map { repository.findByFirstNameAndLastName(it.firstName, it.lastName) }
7      .merge())
8}

Fazit

Wir hoffen, wir konnten mit den obigen Beispielen zeigen, dass in Java ein Reactive-Ansatz wie Spring Webflux eine kognitive Belastung für die Entwickler mitbringen kann und dass es möglich ist, diese durch die Benutzung von Kotlin Coroutines bzw. suspend functions zu reduzieren. Gerade Entwicklern, die bisher eher im Blocking-IO-Umfeld Erfahrung gesammelt haben (z. B. mit Spring MVC), kann der Umstieg in die Non-blocking-Welt vereinfacht werden, wenn man dabei die von Kotlin angebotenen Konzepte nutzt.

Beitrag teilen

Gefällt mir

1

//

Weitere Artikel in diesem Themenbereich

Entdecke spannende weiterführende Themen und lass dich von der codecentric Welt inspirieren.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.