Entwicklung eines Datenverarbeitungs-Jobs mit Apache Beam

Entwicklung eines Datenverarbeitungs-Jobs mit Apache Beam

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

Der folgende Blogpost ist der erste Teil einer Serie von Beiträgen über Apache Beam.

Kennen Sie Apache Beam? Falls nicht, ist das überhaupt keine Schande: Als eines der jüngsten von der Apache Software Foundation entwickelten Projekte ist das im Juni 2016 veröffentlichte Apache Beam in der Welt der Datenverarbeitung noch immer relativ neu. Auch ich selbst beschäftige mich erst seit Kurzem näher mit Apache Beam und habe dabei seine zahlreichen Vorzüge kennen und schätzen gelernt.

Apache Beam ist ein einheitliches Programmiermodell für die einfache Implementierung von Jobs zur Batch- und Streaming-Datenverarbeitung sowie für deren Ausführung auf einer beliebigen Ausführungs-Engine mit verschiedenen Eingaben und Ausgaben (EAs). Wie bitte? Klingt schon interessant, aber was ist genau damit gemeint? Um das zu erklären, habe ich mich entschlossen, eine Serie von Blogposts über Apache Beam zu schreiben, in denen ich anhand von konkreten Beispielen und praktischen Anwendungsfällen die Vorteile von Datenverarbeitungs-Jobs mit Apache Beam aufzeige.

Das Thema des ersten Posts ist die Batchverarbeitung. Nehmen wir etwa folgendes Beispiel: Sie arbeiten für einen Autohändler und sollen die Fahrzeugverkäufe über einen bestimmten Zeitraum analysieren. Die Aufgabenstellung lautet: Wie viele Fahrzeuge verschiedener Marken wurden in diesem Zeitraum verkauft? Diese Aufgabenstellung bedeutet zunächst, dass unsere Daten begrenzt sind (die Datenmenge ist endlich) und dass sie nicht aktualisiert werden können (denn die Verkäufe liegen in der Vergangenheit). Für die Datenauswertung empfiehlt sich ein Batchprozess.

Als Eingabedaten liegen uns Textprotokolle von Fahrzeugverkäufen in folgendem Format vor:

id,markenname,modellname,verkaufszahl

Beispiel:
1,Toyota,Prius,3
2,Nissan,Sentra,2
3,Ford,Fusion,4

Bevor wir mit der Umsetzung unserer ersten Beam-Anwendung beginnen, sollten wir ein paar Begriffe klären, z. B. die drei wichtigsten Konzepte in Beam: Pipeline, PCollection und PTransform.

  • In einer Pipeline wird der Workflow Ihrer sämtlichen Datenverarbeitungsaufgaben jeweils von Anfang bis Ende eingekapselt.
  • Eine PCollection ist eine Abstraktion von verteilten Datensätzen, mit der Beam Daten zwischen Transformationen, den PTransforms, überträgt. 
  • Eine solche PTransform-Transformation ist ein Prozess, der Eingabedaten (Eingabe-PCollections) verarbeitet und Ausgabedaten (Ausgabe-PCollections) erzeugt. Die jeweils erste und letzte PTransform-Transformation steht normalerweise für bestimmte Arten der Eingabe/Ausgabe von Daten, die entweder begrenzt (Batch-Verarbeitung) oder unbegrenzt (Streaming-Verarbeitung) sein können.

Zur Vereinfachung können wir die  Pipeline als einen DAG (Directed Acyclic Graph, gerichteten azyklischen Graphen) betrachten, der Ihren gesamten Workflow darstellt; die PTransforms wären Knoten (zur Umwandlung der Daten) und die PCollections die Kanten des Graphen. (Weitere Informationen hierzu finden Sie im Beam Programming Guide.)

Nach diesem Exkurs können wir zu unserem Beispiel zurückkehren. Wir versuchen jetzt, die erste Pipeline zu implementieren, die den bereitgestellten Datensatz verarbeitet.

Erstellung einer Pipeline

Erstellen Sie zunächst eine neue Pipeline:

Pipeline pipeline = Pipeline.create();

Anschließend erstellen wir eine neue Transformation PTransform mit der Methode pipeline.apply(), durch die Daten von einer Textdatei eingelesen werden und eine neue PCollection mit Strings erzeugt wird. Zu diesem Zweck greifen wir auf eine der in Beam bereits implementierten EAs zurück, nämlich TextIO. Mit TextIO ist es möglich, Daten zeilenweise aus Textdateien zu lesen und in diese zu schreiben. TextIO verfügt noch über zahlreiche weitere Funktionen, z. B. kann es mit unterschiedlichen Dateisystemen arbeiten und unterstützt Dateimuster sowie Datei-Streaming. Weitere Informationen finden Sie in der Dokumentation zu Apache Beam.

apply(TextIO.read().from(/path/to/input/file))

Als Ausgabe dieser Transformation PTransform erhalten wir eine neue Instanz von PCollection, in der jeder Eintrag der PCollection eine Textzeile der Eingabedatei ist.

Da uns als Ergebnis die Summe aller Verkäufe nach Fahrzeugmarke interessiert, müssen wir die Textzeilen entsprechend gruppieren. Der nächste Schritt besteht deshalb darin, jede Zeile zu parsen und ein Schlüssel/Wert-Paar zu erzeugen, bei dem der Schlüssel ein Markenname und der Wert die Anzahl der Fahrzeugverkäufe ist. Man beachte hierbei, dass die Ausgabe-PCollection aus einer vorherigen PTransform wiederum als Eingabe-PCollection für die aktuelle PTransform dient.

Bei diesem Schritt verwenden wir eine interne Beam-PTransform namens MapElements, um über die bereitgestellte SimpleFunction-Schnittstelle für jeden Eintrag der Eingabe ein neues Schlüssel/Wert-Paar zu generieren.

Anschließend gliedern wir die Anzahl der Verkäufe mithilfe einer weiteren Beam-Transformation, nämlich GroupByKey, nach Marke auf. Die resultierende Ausgabe ist eine PCollection von Schlüssel/Wert-Paaren, bei denen der Schlüssel ein Markenname ist und der Wert durch eine iterierbare Sammlung von Verkaufszahlen für diese Marke dargestellt wird.

.apply(GroupByKey.<String, Integer>create())

 

Nun sind wir an dem Punkt angelangt, an dem wir alle Verkaufszahlen von Fahrzeugen aufgeschlüsselt nach Marke summieren können. Hierzu dient uns unsere eigene Implementierung der Transformation ParDo:

Wir stellen die Pipeline fertig, indem wir eine weitere EA-Transformation wie folgt auf die PCollection mit Strings anwenden und diese in eine Textdatei schreiben:

.apply(TextIO.write().to(/path/to/output/dir).withoutSharding());

Zuletzt müssen wir nur noch unsere erstellte Pipeline ausführen:

pipeline.run();

Das sieht ganz einfach aus, oder? Genau hier zeigt sich die Stärke von Apache Beam: die Erstellung komplizierter Verarbeitungspipelines mit minimalem Programmieraufwand.

Falls sich jemand mit Hadoop auskennt, ist ihm oder ihr an dieser Pipeline vielleicht noch eine Parallele aufgefallen:

  • Textdaten werden zeilenweise gelesen und geparst, wobei neue Schlüssel/Wert-Paare erzeugt werden (Map)
  • Diese Schlüssel/Wert-Paare werden nach dem Schlüssel gruppiert (GroupBy)
  • Und schließlich werden durch Anwendung einer Benutzerfunktion (Reduce) alle Werte eines Schlüssels iterativ verarbeitet.

Ja, es stimmt – diese einfache Pipeline ließe sich auch mit einem klassischen MapReduce-Job ausführen! Aber im Vergleich wird deutlich, um wie viel einfacher und klarer die Programmierung in Beam aussieht (obwohl es sich um Java handelt!) und dass es auch nicht viel komplizierter wird, wenn wir der Pipeline eine weitere Transformation hinzufügen.

Erstellung des Builds und Ausführung der Pipeline

Wie bereits erwähnt, kann eine Beam-Pipeline auf unterschiedlichen Runnern (Verarbeitungs-Engines) ausgeführt werden:

  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow

Hierzu müssen wir nur eine entsprechende Abhängigkeit in die Konfiguration unseres Maven- oder Gradle-Projekts einfügen. Die gute Nachricht dabei ist, dass wir den Code nicht umschreiben oder anpassen müssen, damit er auf den einzelnen Runnern ausführbar ist. Außerdem bleibt uns sogar erspart, unsere JAR-Dateien neu kompilieren zu müssen, wenn die Abhängigkeiten aller benötigten Runner von vornherein hinzugefügt werden – wir geben lediglich den Runner an, den wir verwenden wollen!

Direct Runner ist ein lokaler Runner, mit dem normalerweise Ihre Pipeline getestet wird. Bei Verwendung von Java geben Sie die Abhängigkeit vom Direct Runner in Ihrer Datei pom.xml wie folgt an:


<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.3.0</version>
   <scope>runtime</scope>
</dependency>


Anschließend kompilieren Sie Ihr Projekt:
# mvn clean package

Und Sie führen Ihre Pipeline auf Direct Runner aus:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”

Beispiel: Wenn unsere Eingabedatei die folgenden Daten enthält:
# cat /tmp/beam/cars_sales_log
 1,Toyota,Prius,3
 2,Nissan,Sentra,2
 1,Toyota,Yaris,4
 3,Ford,Fusion,5
 3,Ford,Kuga,3

Dann sieht das Endergebnis wie folgt aus:
# cat /tmp/beam/cars_sales_report
Toyota: 7
Nissan: 2
Ford: 8

Die Liste aller unterstützten Runner mit Anleitungen zu ihrer Verwendung finden Sie auf dieser Seite.

Sämtlicher Code dieses Beispiels ist in folgendem GitHub-Repository veröffentlicht: https://github.com/aromanenko-dev/beam-tutorial.

Im nächsten Teil dieser Blogpost-Serie geht es um die Verarbeitung von Streaming-Daten in Beam. Wir werden uns ein Beispiel für eine Datenanalyse-Aufgabe mit einer unbegrenzten Datenquelle ansehen und herausfinden, welche Dienste Beam in diesem Fall leisten kann.

An der Diskussion teilnehmen

0 Comments

Hinterlasse eine Antwort

Your email address will not be published. Required fields are marked *