Java Magazin 06/12

Transaktionen in Spring Batch: Massenverarbeitung mit Restart, Skip und Retry

Autor:

Die Batch-Verarbeitung steht nicht gerade in dem Ruf, ein hippes Thema zu sein, wenn man MapReduce mal außen vor lässt. Und dennoch spielt sie in vielen Unternehmen immer noch eine große Rolle. Transaktionen sind gerade in Batch-Anwendungen ein wichtiges Thema, denn natürlich kann nicht der gesamte Job innerhalb einer Transaktion ablaufen. Genauso wenig möchte man eine Transaktion für jedes Element aufmachen, da so die Performance leidet. Spring Batch, vermutlich das relevanteste Framework zur Batch-Verarbeitung von Daten im Java-Bereich, bietet viele Funktionalitäten, die zwangsläufig ein kompliziertes Transaktionsverhalten mit sich bringen, wie beispielsweise Skip oder Retry. Dieser Artikel erläutert das Transaktionsverhalten verschiedener Features und spart dabei die Stolpersteine, auf die man als Batch-Entwickler stoßen kann, nicht aus.
Gleich eine Sache vorweg: Dieser Artikel ist nichts für Einsteiger. Wir gehen in medias res und schauen dem Framework unter die Haube. Da die Domänensprache von Spring Batch glücklicherweise sehr intuitiv ist, sollten aber auch Leser, die bisher nichts oder wenig mit Spring Batch gemacht haben, einen Großteil des Artikels verstehen können. Eine Einführung in Spring Batch und Beispiele bieten [1], [2], [3].

Jedem Entwickler ist die Wichtigkeit von Transaktionen bewusst. Und doch muss man sich meistens nicht tiefergehend mit ihnen beschäftigen, denn es genügt eine @Transactional- Annotation oder das Wissen, dass die EJB das schon regelt, um ruhigen Gewissens den Businesscode zu implementieren. In Batch-Anwendungen ist das anders: Diese verarbeiten viele Daten am Stück, üblicherweise so viele, dass eine Verarbeitung innerhalb einer Transaktion nicht in Frage kommt. Es müssen also Commits her, die dann natürlich zur Folge haben, dass ein abgebrochener Job nicht komplett zurückgerollt werden kann. Das wirft Fragen auf: Können wir den Job einfach neu starten, oder verarbeiten wir dann bestimmte Daten doppelt? Welche Daten sind verarbeitet worden, welche nicht? Spring Batch bietet Möglichkeiten, einen Job Restart-fähig zu machen. Spring Batch kann auch fehlerhafte Elemente überspringen (Skip) oder erneut verarbeiten (Retry). Aber natürlich hat es in jedem Skip- und Retry-Fall vorher ein Rollback gegeben, und da üblicherweise nicht eine Transaktion pro Element gestartet wird, hat das Rollback auch gesunde Elemente umfasst. Wie löst Spring Batch diese Aufgabe? Diese und weitere Fragen sollen in diesem Artikel beantwortet werden. Los geht es nun aber mit dem grundsätzlichen Transaktionsmodell in Spring Batch.

Transaktionen in Chunk-orientierten Steps

Spring Batch liefert eine klare Domänensprache, die direkt im XML-Namespace umgesetzt ist (Listing 1).

Wir haben Jobs, die aus beliebig vielen Steps bestehen können. Ein Step kann entweder ein TaskletStep oder ein Chunk-orientierter Step sein, wobei Tasklet-Steps eher für die Anbindung von Legacy-Code geeignet sind. Chunks teilen sich weiter auf in Reader, Processors und Writer, und erst hier kann Spring Batch all seine Vorteile ausspielen. Deswegen werde ich mich in diesem Artikel auch auf die Chunk-orientierten Steps beschränken. Schauen wir uns die Grafik an, die durch diesen Artikel begleiten wird (Abb. 1), je nach Fokus abgeändert.

Transaktionsmodell

Eine Transaktionsklammer, die mehrere Steps oder gar den ganzen Job umfasst, gibt es nicht. Deswegen taucht der Job in der Grafik auch nicht auf. Abbildung 1 sagt schon einiges über das Transaktionsverhalten in Spring Batch aus: Wir haben ItemReader, die zu verarbeitende Elemente auslesen und zurückliefern: eins nach dem anderen. Wenn es keine Elemente mehr gibt, liefert der ItemReader null. Wir haben optionale ItemProcessors, die immer genau ein Element verarbeiten und dabei ein Element zurückgeben, das einen anderen Typ haben kann. Und am Ende gibt es dann noch die Item-Writer, die eine Liste von Elementen entgegennehmen und schreiben.

Jeder Step besteht aus Chunks, und jeder Chunk läuft in einer eigenen Transaktion. Die Größe des Chunks wird dabei durch die CompletionPolicy (Punkt 1 in Abb. 1) bestimmt: Wenn diese erfüllt ist, hört Spring Batch auf zu lesen und beginnt mit dem Verarbeiten der Elemente. Wenn man das Attribut commit-interval auf dem chunk-Tag verwendet, bekommt man automatisch eine SimpleCompletionPolicy, die nach der spezifizierten Anzahl abbricht. Braucht man ein angepasstes Verhalten, so kann man auch selbst eine CompletionPolicy implementieren und über das Attribut chunk-completion-policy setzen.

Jobmetadaten und ein fehlerhafter Lauf

So sieht also der Positivfall eines erfolgreichen Laufs aus. Und der fehlerhafte Joblauf? Siehe Abbildung 2.

Wenn eine RuntimeException in einer am Chunk beteiligten Komponente geworfen wird und weder Skip noch Retry konfiguriert wurde (darauf gehe ich später ein), so wird die Transaktion für den Chunk zurückgerollt, und der Step und dann auch der Job brechen im Standardfall ab. Jeder Chunk, der schon committet wurde, bleibt natürlich in dem Status.

In Abbildung 2 ist ebenfalls zu sehen, an welchen Stellen Metadaten des Steps geschrieben und aktualisiert werden. Die Metadaten enthalten beispielsweise den Status des Steps, Startzeitpunkt, Anzahl von Commits und Rollbacks und Ähnliches. Zusätzlich wird für jeden Step eine Map gehalten (der Step ExecutionContext), die genutzt werden kann, um dort beliebige serialisierbare Daten zu persistieren. Ein häufiger Anwendungsfall ist die Gewährleistung der Restart-Fähigkeit bei Abbruch eines Jobs (siehe dazu den Abschnitt „Restart eines fehlerhaften Joblaufs“). Aber im Prinzip ist diese Map frei nutzbar. Der Zugang ist beispielsweise über das ItemStream-Interface möglich. Initialisiert werden diese Metadaten bei Start des Steps (Punkt 3 in der Grafik), abgeschlossen nach Beendigung des Steps (Punkt 5). Beides sind eigene Transaktionen. Man möchte ja beispielsweise bei einem Abbruch des Steps den Status trotz Rollback des letzten Chunks auf „FAILED“ setzen. Innerhalb des Steps werden immer kurz vor dem Commit innerhalb der Transaktion eines Chunks Metadaten aktualisiert. Hier wird beispielsweise auch der ExecutionContext des Steps persistiert.

Lesen über einen Cursor

Häufig können die Input-Daten eines Steps durch ein SQL-Statement beschrieben werden („Hole alle Verträge mit Eingangsdatum heute und Status unverarbeitet“). Allerdings ist es nicht sinnvoll, dieses bei Beginn des Steps auszuführen und das gesamte ResultSet auszulesen, da dann wohl der Speicher überlaufen würde. Spring Batch bietet mit dem JdbcCursorItemReader die Möglichkeit, einen Datenbank-Cursor zu öffnen, von dem konstant bei Bedarf Daten gelesen werden, ohne das komplette ResultSet zurückzuliefern. Wir haben hier nur ein Problem: Ein Commit der Transaktion würde auch die Verbindung zum Cursor schließen. Wie halten wir diese offen? Einfache Lösung: Sie nimmt nicht an der Transaktion teil. Der JdbcCursorItemReader verwendet eine separate Connection für das Lesen der Daten vom Cursor und umgeht somit die Transaktion, die vom Transaktionsmanager verwaltet wird.

In einem Application-Server bekommen wir unsere Connections normalerweise von einer DataSource, die vom App-Server verwaltet wird, und all diese Connections nehmen immer an Transaktionen teil. Da wir aber eine Connection brauchen, die nicht an der Transaktion teilnimmt, müssen wir hier noch eine weitere nicht transaktionale DataSource einrichten und sie in unseren JdbcCursorItemReader injizieren. Hier müssen wir aufpassen: Diese DataSource darf natürlich nicht anderweitig verwendet werden, ansonsten bekommen wir Probleme mit der Transaktionssicherheit.

Restart eines fehlerhaften Joblaufs

Spring Batch bietet die Möglichkeit, einen fehlerhaften Joblauf erneut zu starten. Dabei wird eine Job-Instanz über die JobParameters identifiziert: Ein Jobstart mit einem Set von Parametern, die so vorher schon verwendet wurden, löst automatisch einen Restart des bereits gelaufenen Jobs aus, wenn die erste Ausführung fehlerhaft war. War die erste Ausführung erfolgreich, ist es nicht mehr möglich, einen Job mit diesen Parametern zu starten.

So weit, so gut, aber kann man jeden fehlerhaften Joblauf einfach erneut starten? Natürlich nicht. Jemand muss wissen, wo der Job abgebrochen ist und wo wir ihn wieder aufnehmen müssen. Reader, die von AbstractItemCountingItemStreamItemReader erben, speichern einen Zähler im ExecutionContext, der innerhalb der Transaktion des Chunks persistiert wird. Ein Beispiel: Wir haben eine Chunk-Größe von 5 und einen Fehler beim Verarbeiten von Element 23. Die letzten erfolgreich verarbeiteten Elemente sind also die Elemente 16 bis 20, der Zähler im ExecutionContext steht in der Datenbank auf 20. Wenn der Job erneut gestartet wird, fährt er mit Element 21 fort. Es gibt eine ganze Familie von Readern, die so funktionieren. Der JdbcCursorItemReader gehört beispielsweise auch dazu. All diese Reader sind nicht threadsafe, da sie den Zähler im Zustand halten.

Okay, wir verwenden also einen dieser Reader und haben seinen Scope auf Step gesetzt, um die Thread-Safety-Problematik zu umgehen. Sind wir dann sicher? Nein. Sagen wir, wir verwenden den JdbcCursorItem-Reader, haben unser SQL-Statement definiert, und wir wollen die Restart-Funktionalität nutzen. Dann müssen wir absolut sicher sein, dass unser SQL-Statement zumindest für alle bereits verarbeiteten Elemente das gleiche Ergebnis liefert. Bei einem Restart mit Element 21 muss sicher sein, dass die Elemente 1 bis 20 genau die sind, die im ersten Versuch bereits verarbeitet wurden. Ansonsten bekommen wir nicht die Ergebnisse, die wir erwarten. Die Reihenfolge ist wichtig.

Gleiches gilt für den FlatFileItemReader, der dafür genutzt werden kann, eine Datei Zeile für Zeile einzulesen. Bei einem Problem mit einer bestimmten Zeile muss beim Fixen der Datei darauf geachtet werden, die bereits verarbeiteten Zeilen nicht zu verändern.

Beim Schreiben eines eigenen Readers sollte man immer im Hinterkopf haben, dass Restart-Fähigkeit nicht von alleine kommt, sondern dass man sie programmieren muss. So kann man zum Beispiel ebenfalls von AbstractItemCountingItemStreamItemReader erben. Oder man speichert den Zustand, den man wiederherstellen möchte, direkt im ExecutionContext. Diese Arbeit kann einem Spring Batch nicht abnehmen.

Listener

Es gibt eine ganze Reihe von Listenern in Spring Batch, die auf bestimmte Ereignisse reagieren und neben Item-Reader, ItemProcessor und ItemWriter eine weitere Möglichkeit darstellen, Businesslogik in unseren Job zu integrieren. Die wichtigsten Listener, ihre Methoden und deren Verhältnis zur Transaktion des Chunks sind im Folgenden aufgezählt:

  • Der JobExecutionListener hat zwei Methoden, beforeJob und afterJob. Beide werden natürlich außerhalb der Transaktion des Chunks in einer eigenen Transaktion ausgeführt.
  • Der StepExecutionListener hat zwei Methoden, beforeStep und afterStep. Beide werden natürlich außerhalb der Transaktion des Chunks in einer eigenen Transaktion ausgeführt.
  • Der ChunkListener hat zwei Methoden, before-Chunk und afterChunk. Erstere wird dabei innerhalb der Transaktion des Chunks ausgeführt, Letztere außerhalb.
  • Der ItemReadListener hat drei Methoden, before-Read, afterRead und onReadError. Sie werden alle innerhalb der Transaktion des Chunks ausgeführt.
  • Der ItemProcessListener hat drei Methoden, beforeProcess, afterProcess und onProcessError. Sie werden alle innerhalb der Transaktion des Chunks ausgeführt.
  • Der ItemWriteListener hat drei Methoden, beforeWrite, afterWrite und onWriteError. Sie werden alle innerhalb der Transaktion des Chunks ausgeführt.
  • Der SkipListener hat drei Methoden, onSkipInRead, onSkipInProcess und onSkipInWrite. Sie werden alle innerhalb der Transaktion des Chunks ausgeführt. Diesen Listener schauen wir uns im Abschnitt über die Skip-Funktionalität genauer an.

Abbildung 3 zeigt die Stellen innerhalb der Ausführung eines Steps, an denen die Listener feuern. Eine wichtiges Detail ist leicht zu sehen: die onXXXError-Methoden werden direkt vor dem Rollback ausgeführt, jegliche Aktionen auf transaktionalen Ressourcen, die innerhalb dieser Methoden ausgeführt werden, werden also in der Regel zurückgerollt. Will man das verhindern, muss man in der Methode eine neue Transaktion aufmachen. Bei Annotations-basierter Transaktionskonfiguration genügt es, die Annotation @Transactional(propagation=Propagation. REQUIRES_NEW) an die Methode zu setzen.

Listener

Skip

Mit der Skip-Funktionalität können bestimmte Exception-Typen und eine Maximalanzahl definiert werden, und immer, wenn eine dieser Exceptions fliegt, wird nicht der gesamte Step abgebrochen, sondern das fehlerhafte Element übersprungen. Wird die spezifizierte Maximalanzahl überschritten, so bricht der Step dennoch ab. Natürlich benötigen wir für das eine fehlerhafte Element einen Rollback. Allerdings besteht solch ein Chunk üblicherweise nicht aus einem Element, und die gesunden Elemente sollen committet werden. Wie macht Spring Batch das?

Es gibt zwei Wege, Skip-Funktionalität in Spring Batch zu konfigurieren. Der übliche Weg ist wohl, ein skip-limit auf dem Chunk zu definieren und das Tag skippable-exception-classes innerhalb des chunk-Tags zu verwenden (Listing 2).

Wenn diese Standardfunktionalität nicht ausreicht, kann man auch das SkipPolicy-Interface implementieren und in den Chunk einbinden. skip-limit und skippableexception-classes werden dann ignoriert (Listing 3).

Als Erstes schauen wir uns in Abbildung 4 einen Skip in einem ItemProcessor an. Wenn nun also eine als skippable markierte Exception geworfen wird (oder die SkipPolicy sagt, dass das Element übersprungen wird), wird die Transaktion zurückgerollt. Spring Batch hält die gelesenen Elemente in einem Cache, aus dem das aktuelle Element nun entfernt wird. Es wird eine neue Transaktion gestartet, und alle noch im Cache gehaltenen Elemente werden an die Processor-Phase übergeben. Wenn ein SkipListener konfiguriert wurde, so wird dessen onSkipInProcess-Methode kurz vor dem Commit des Chunks mit dem übersprungenen Element aufgerufen. Außerdem wird bei jedem übersprungenem Element überprüft, ob das skip-limit erreicht ist, und, falls es erreicht wurde, der Step abgebrochen.

Skip im ItemProcessor

Was hat diese Funktionsweise für Konsequenzen?
Durch den Cache bekommen wir Probleme, wenn wir einen transaktionalen Reader haben. Ein Beispiel dafür ist eine Queue: Sie nimmt an der Transaktion teil. Das bedeutet, dass bei einem Rollback durch eine skippable Exception auch die gelesenen Nachrichten wieder zurück in die Queue gehen. Spring Batch liest dann aber nicht erneut von der Queue, sondern nutzt den Cache, was im Endeffekt dazu führt, dass Elemente doppelt verarbeitet werden. Genau dafür gibt es das Attribut transactionalreader-queue auf dem chunk-Tag: Setzt man dieses auf „true“ („false“ ist der Default), so werden gelesene Elemente nicht im Cache gehalten, sondern neu gelesen.

Selbst wenn man keinen transaktionalen Reader hat, können Probleme auftreten. Wenn man zum Beispiel mit einem ItemReadListener gelesene Elemente in einer transaktionalen Ressource protokolliert, werden diese Protokolle bei einem Rollback natürlich auch zurückgerollt, auch wenn in diesem Fall alle bis auf ein Element erfolgreich verarbeitet wurden.

Noch komplizierter wird es, wenn wir einen Skip im ItemWriter betrachten. Der Writer wird nur einmal mit allen Elementen aufgerufen. Also kann Spring Batch auch gar nicht wissen, welches Element denn nun der Auslöser für die skippable Exception war. Die einzige Möglichkeit, das herauszufinden, ist, den Chunk in viele Mini-Chunks mit genau einem Element aufzuteilen, also für jedes Element eine eigene Transaktion aufzumachen. Abbildung 5 zeigt, wie das funktioniert.

Skip im ItemWriter

Es kommt nun eine zweite Schleife hinzu, die in Abbildung 5 rot gekennzeichnet ist. Sie startet mit einer skippable Exception in unserem normalen Chunk, woraufhin die Transaktion zurückgerollt wird. Um jetzt das fehlerhafte Element zu isolieren, wird für jedes Element aus dem Chunk eine neue Transaktion aufgemacht, in der jeweils der ItemProcessor und dann der ItemWriter aufgerufen werden. Gibt es keinen Fehler, wird dieser Mini-Chunk committet, und das nächste Element wird verarbeitet. Wir erwarten mindestens eine skippable Exception, und wenn die kommt, wird der Mini-Chunk zurückgerollt und das Element für einen eventuell konfigurierten SkipListener als übersprungen markiert. Wurden alle Elemente des fehlerhaften Chunks auf diese Art und Weise verarbeitet, so fährt Spring Batch mit der normalen Verarbeitung fort. Natürlich gelten auch hier die oben erwähnten Bedingungen bezüglich des Caches von gelesenen Elementen. Zusätzlich ist es möglich, den ItemProcessor als nicht transaktional zu markieren, indem man das Attribut processor-transactional auf dem chunk-Tag auf „false“ setzt („true“ ist der Default). Tut man dies, richtet Spring Batch auch einen Cache für durch den Processor verarbeitete Elemente ein und führt die Processor-Phase nicht erneut durch, wenn es einen Fehler im Writer gibt. Das kann man natürlich nur so konfigurieren, wenn es im Processor keine schreibende Interaktion mit einer transaktionalen Ressource gibt.

Fehlt nur noch ein Fall: Skip während des Lesens. Dieser Fall ist relativ einfach, deswegen gibt es dazu auch kein Diagramm. Wenn eine skippable Exception fliegt, wird der Skip-Zähler um eins erhöht. Außerdem merken wir uns die Exception für den SkipListener. Es gibt kein Rollback.

Retry

Mit der Retry-Funktionalität können bestimmte Exception-Typen und eine Maximalanzahl definiert werden. Immer, wenn eine dieser Exceptions fliegt, wird nicht der gesamte Step abgebrochen, nur das fehlerhafte Element erneut verarbeitet. Wird die spezifizierte Maximalanzahl überschritten, bricht der Step dennoch ab. Ähnlich wie beim Skip stellt sich hier die Frage, wie Spring Batch mit den übrigen, nicht fehlerhaften Elementen umgeht, die ebenfalls vom Rollback betroffen sind. Auch beim Retry gibt es zwei Wege der Konfiguration: einerseits mit einem retry-limit und retryable-exception-classes (Listing 4) und andererseits mit einer eigenen RetryPolicy (Listing 5).

Abbildung 6 zeigt, wie die Retry-Funktionalität bezüglich Transaktionen umgesetzt ist. Immer, wenn eine retryable Exception geworfen wird, wird der Chunk zurückgerollt. Sollte die konfigurierte Maximalanzahl (retry-limit) noch nicht erreicht worden sein, so werden alle Elemente aus dem Cache der gelesenen Elemente erneut als Input für die Processor-Phase verwendet. Im Prinzip gelten die Einschränkungen, die auch beim Skip gelten, und auch hier können wir das transaktionale Verhalten durch reader-transactional-queue und processor-transactional modifizieren.

Retry

Übrigens: In Spring Batch 2.1.8 gibt es einen Bug in der Retry-Funktionalität: Wenn es eine retryable Exception im Writer gibt, so wird nur das erste Element im Cache erneut durch den Processor verarbeitet [4].

Fazit

Spring Batch hat inzwischen einen sehr stabilen und ausgereiften Status erreicht und bietet viele interessante Features. Transaktionen sind in Batch-Anwendungen naturgemäß ein kritisches Thema, und viele Features von Spring Batch bringen ein kompliziertes Transaktionsverhalten mit sich. Batch-Entwickler benötigen ein größeres Vertrauen in und ein größeres Verständnis für die Funktionsweise eines genutzten Frameworks, da ein Fehlschlag häufig kritischer ist als bei Nicht-Batch-Anwendungen. Zudem sind Batch-Anwendungen in der Regel schwerer zu testen. Dieser Artikel trägt dazu bei, dieses Verständnis in Bezug auf Transaktionen zu vertiefen.

Links & Literatur

[1] Schlimm, Niklas und Hinkel, Frank: „Spring Batch 2.0 – Frühling in der Stapelverarbeitung“, Java Magazin 1.2010

[2] http://static.springsource.org/spring-batch/

[3] http://static.springsource.org/spring-batch/spring-batch-samples/index.html

[4] https://jira.springsource.org/browse/BATCH-1761

Vollständiger Artikel