Entwicklung eines Datenverarbeitungs-Jobs mit Apache Beam – Streaming-Pipelines

Entwicklung eines Datenverarbeitungs-Jobs mit Apache Beam – Streaming-Pipelines

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

Thema unseres letzten Blogposts war die Entwicklung eines Jobs zur Batch-Datenverarbeitung mit Apache Beam. Diesmal geht es um einen Prozess, der wie kaum ein anderer in der Big-Data-Welt von heute gefragt ist – die Verarbeitung von Streamingdaten.

Der Hauptunterschied zwischen der  Batch- und der Streaming-Verarbeitung liegt in der Quelle der verarbeiteten Daten. Wenn Sie es mit einem irgendwie begrenzten Datensatz zu tun haben (wobei „begrenzt“ durchaus „riesig“ bedeuten kann), der sich während der Verarbeitung nicht verändert, dann ist eine Batch-Pipeline wahrscheinlich die am besten geeignete Form der Verarbeitung. Dabei können als Eingabequelle beliebige Daten wie Dateien, Datenbanktabellen, Objekte in Objektspeichern usw. dienen. In diesem Zusammenhang möchte ich nochmals betonen, dass bei der Batch-Verarbeitung vorausgesetzt wird, dass die Daten während der gesamten Verarbeitung veränderlich sind und dass die Zahl von Eingabe-Datensätzen konstant bleibt. Warum ist dieser Unterschied wichtig? Weil selbst dann, wenn Dateien verarbeitet werden, ein unbegrenzter Datenstrom vorliegen kann, wenn Dateien hinzugefügt werden oder wenn sich diese ändern. In diesem Fall ist die Streaming-Verarbeitung der passende Ansatz. Im anderen Fall entwickeln wir nur dann eine Batch-Pipeline, wenn wir wissen, dass unsere Daten begrenzt und unveränderlich sind.

Die Sache wird komplizierter, wenn unser Datensatz unbegrenzt ist (d. h. immer wieder neue Daten eintreffen) oder/und sich verändert. Beispiele für derartige Datenquellen sind etwa die folgenden: Nachrichtensysteme (wie Apache Kafka), neue Dateien in einem Verzeichnis (Webserver-Logs) oder von anderen Systemen, die Echtzeitdaten erfassen (z. B. IoT-Sensoren). Allen diesen Datenquellen ist gemeinsam, dass wir immer wieder mit neuen Daten rechnen müssen. Wir könnten natürlich unsere Daten in Batches (nach Zeitraum oder Datenmenge) aufteilen und die aufgeteilten Daten im Batchverfahren verarbeiten. Allerdings wäre es schwierig, bestimmte Funktionen auf alle Daten gleichermaßen anzuwenden und hierfür die gesamte Pipeline zu erstellen. Zum Glück gibt es jedoch eine Reihe von Streaming-Engines, die diese Art der Datenverarbeitung souverän bewältigen – Apache SparkApache FlinkApache Apex und Google DataFlow. Sie alle werden von Apache Beam unterstützt und erlauben die Verwendung derselben Pipeline auf unterschiedlichen Engines ohne die geringste Code-Änderung. Es ist sogar möglich, dieselbe Pipeline wahlweise im Batch- oder im Streaming-Modus einzusetzen. Es genügt, die Eingabequelle korrekt einzurichten, und – voilà! – wie von Zauberhand und ohne weiteres Zutun wird der jeweils andere Modus ausgeführt. Von so etwas habe ich geträumt, als ich vor einiger Zeit noch mühsam meine Batch-Jobs zu Streaming-Jobs umschreiben musste.

Aber genug der Theorie – nehmen wir uns jetzt ein Beispiel vor und schreiben unseren ersten Streaming-Code. Wir lesen einige Daten aus Kafka ein (unbegrenzte Quelle), führen einige einfache Datenverarbeitungsschritte aus und und schreiben die Ergebnisse wieder nach Kafka zurück.

Nehmen wir an, wir haben einen unbegrenzten Strom von Geo-Koordinaten (x- und y-Werten) von mehreren Objekten auf einer Karte (sagen wir: Autos), die in Echtzeit eintreffen und von denen wir nur diejenigen auswählen wollen, die sich in einem bestimmten Gebiet befinden. Dies bedeutet, wir müssen aus einem Kafka-Thema Textdaten aufnehmen, die Daten parsen, nach angegebenen Grenzwerten filtern und in ein zweites Kafka-Thema zurückschreiben. Schauen wir einmal, wie dies mit Apache Beam funktioniert.

Die einzelnen Kafka-Nachrichten enthalten Text in folgendem Format:
id,x,y

Hierbei gilt:
  id – eindeutige ID des Objekts,
  x, y – Koordinaten auf der Karte (Ganzzahlen).

Wir werden auf das Datenformat achten und, falls ungültige Formate auftreten, die betreffenden Datensätze überspringen.

Erstellung einer Pipeline

Ähnlich wie im vorangegangenen Blogbeitrag, in dem es um Batchverarbeitung ging, erstellen wir auch hier die Pipeline:

Pipeline pipeline = Pipeline.create(options);

Wir können das Objekt Options so differenzieren, dass damit Befehlszeilenoptionen an die Pipeline übergeben werden. Das vollständige Beispiel mit allen Details finden Sie hier auf Github.

Anschließend müssen wir die Daten aus dem Kafka-Eingabethema auslesen. Wie oben erwähnt, stellt Apache Beam bereits verschiedene EA-Konnektoren zur Verfügung, zu denen auch KafkaIO gehört. Wir erzeugen also eine neue unbegrenzte Transformation PTransform, die eintreffende Nachrichten aus dem angegebenen Kafka-Thema empfängt und sie an den nächsten Schritt weiterleitet:

pipeline.apply(
    KafkaIO.read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))

KafkaIO verkapselt standardmäßig alle empfangenen Nachrichten im Objekt KafkaRecord. Von der nächsten Transformation wird jedoch nur die Nutzlast (String-Werte) durch das neu erstellte Objekt  DoFn abgerufen:

.apply(
    ParDo.of(
        new DoFn, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)

Nach diesem Schritt wäre eigentlich die Filterung der Datensätze an der Reihe (siehe die ursprüngliche Aufgabenbeschreibung oben), aber vorher müssen wir noch unseren String-Wert gemäß dem definierten Format parsen. Dadurch kann er in genau einem Funktionsobjekt verkapselt werden, das dann durch die interne Beam-Transformation Filter weiterverarbeitet wird.

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)

Nun müssen wir die an Kafka zurückzuschreibenden gefilterten Nachrichten erzeugen. Dazu erstellen wir ein neues Schlüssel/Wert-Paar mithilfe der internen Beam-Klasse KV, die mit unterschiedlichen EA-Konnektoren einsetzbar ist, darunter auch KafkaIO.

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)

Mit der letzten Transformation müssen wir unsere Nachrichten nach Kafka zurückschreiben. Hierfür verwenden wir einfach KafkaIO.write()  als Implementierung der Nachrichtensenke. Als Lesehilfe für die übrigen Zeilen: Diese Transformation ist außerdem mit einigen erforderlichen Optionen zu konfigurieren, z. B. Kafka-Bootstrap-Servern, dem Namen des Ausgabe-Themas und Serializern für die Schlüssel/Wert-Paare.

.apply(
    "WriteToKafka",
    KafkaIO.write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);

Am Ende wird unsere Pipeline dann wie üblich ausgeführt:

pipeline.run();

Dieses Beispiel erscheint Ihnen möglicherweise etwas komplizierter als das im vorherigen Blogpost dieser Serie. Aber wie Ihnen vielleicht schon aufgefallen ist, haben wir gar nichts Besonderes getan, um unsere Pipeline Streaming-kompatibel zu machen. Hierfür ist einzig und allein die Datenmodell-Implementierung von Apache Beam zuständig, die es Beam-Benutzern ganz einfach macht, zwischen Batch- und Streaming-Verarbeitung hin- und herzuwechseln.

Erstellung des Builds und Ausführung der Pipeline

Damit KafkaIO von Beam verwendet werden kann, fügen wir die erforderlichen Abhängigkeiten hinzu:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.1.0</version>
</dependency>

Anschließend erstellen wir einfach eine JAR-Datei und prüfen durch Ausführung mit DirectRunner, ob sie funktioniert:

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

Falls erforderlich, können wir über die Option “exec.args” weitere in der Pipeline zu verwendende Argumente hinzufügen. Achten Sie außerdem vor Ausführung der Beam-Pipeline darauf, dass Ihre Kafka-Server verfügbar und korrekt angegeben sind. Mit dem Maven-Befehl schließlich erreichen Sie, dass eine Pipeline gestartet und endlos ausgeführt wird, bis jemand sie manuell beendet. (Alternativ dazu können Sie eine maximale Ausführungszeit angeben.) Auf diese Weise werden Daten im Streaming-Modus kontinuierlich verarbeitet.

Wie immer ist sämtlicher Code dieses Beispiels auf folgendem Github-Repository veröffentlicht.

Viel Spaß beim Streamen!

An der Diskussion teilnehmen

0 Comments

Hinterlasse eine Antwort

Your email address will not be published. Required fields are marked *