Apache Spark und Talend: Performance & Tuning

Apache Spark und Talend: Performance & Tuning

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.

Zunächst einmal vielen Dank an alle, die meine letzten beiden Blog-Posts zum Thema Talend und Apache Spark gelesen haben.

Falls Sie sich gerade erst zugeschaltet haben und meine früheren Posts noch nicht kennen, können Sie diese unter den folgenden Links abrufen: „Talend und Apache Spark: technische Grundlagen“ und Teil II „Talend vs. Spark Submit Konfiguration: Wo liegt der Unterschied?“.

In diesen ersten beiden Teilen meiner Reihe über Apache Spark habe ich versucht darzulegen, wie Talend mit Spark funktioniert, welche Gemeinsamkeiten es zwischen Talend und Spark Submit gibt und welche Konfigurationsoptionen Talend für Spark Jobs bietet.

In diesem Blog-Post geht es nun um die Performance und das Tuning von Apache Spark. Das ist für jeden, der Apache Spark nutzt, ein spannendes Thema, ob nun mit Talend oder ohne. Und jedes Mal, wenn jemand seine ersten Spark Jobs entwickelt und ausführt, tauchen dieselben Fragen auf:

  • Wie viele Executors sollte ich für meinen Spark Job bereitstellen?
  • Wie viel Speicher benötigt jeder Executor?
  • Wie viele Cores sollte ich einsetzen?
  • Warum dauert die Verarbeitung von 10 GB bei manchen Spark Jobs Stunden und wie bekomme ich das in den Griff?

In diesem Blog-Post gehe ich auf jede einzelne dieser Fragen ein und hoffe, Ihnen ein paar gute Antworten und Erkenntnisse bieten zu können. Aber bevor wir fortfahren, hier ein paar Kernkonzepte, die in diesem Post immer wieder auftauchen werden:

Partition: Eine Partition ist ein Teil eines verteilten Datensatzes. Sie wird durch die HDFS Standard-Blockgröße erstellt. Spark nutzt Partitionen, um Datensätze parallel bearbeiten zu können.

Tasks: Tasks sind die Arbeitseinheiten, die innerhalb eines Executors laufen können.

Core: Core bezeichnet den Prozessorkern. Dieser bestimmt, wie viele Aufgaben bei Spark in einem Executor parallel verarbeitet werden können.

Executor: Ein Prozess, der auf Worker-Knoten gestartet wird und die Jobverarbeitung im Speicher oder auf Platte ausführt.

Application Master: Jede YARN Applikation startet einen Application Master Process, der dafür zuständig ist, vom Resource Manager Ressourcen anzufordern. Nach Allokierung der Ressourcen startet der Prozess dann gemeinsam mit den Node Managern die in ihnen enthaltenen, benötigten Container.

Spark Tuning

Schauen wir uns als erstes an, wie Sie Ihre Apache Spark Jobs innerhalb von Talend tunen können. Wie bereits erwähnt, finden Sie in Ihrem Talend Spark Job den Tab „Spark Configuration“. Hier können Sie die Tuning-Parameter einstellen (bei Talend immer deaktiviert).

In diesem Bereich können Sie definieren, wie viel Speicher und wie viele Cores Application Master und Executors verwenden sollen und wie viele Executors Ihr Job nutzen wird. Bevor Sie hier zum ersten Mal die entsprechenden Werte eingeben, stellt sich natürlich die Frage, wie viele Cores oder wie viel Speicher Ihre Application Master und Executors benötigen, um eine gute Performance zu bieten. Gehen wir dieser Frage also als erstes auf den Grund.

So bestimmen Sie die richtige Anzahl an Cores für Ihren Spark Job.

Um die Anzahl bestimmen zu können, sind einige Faktoren zu berücksichtigen:

  1. Die Größe der Datensätze
  2. Der Zeitraum, in dem der Job abgeschlossen werden muss
  3. Die Schritte und Aufgaben, die der Job ausführen muss

Sobald wir uns darüber Gedanken gemacht haben, können wir nun damit anfangen, unseren Job zu konfigurieren, um die Performance zu maximieren. Tunen wir als erstes unseren Application Master. Hier können wir die Standardwerte stehen lassen, da er nur für die Orchestrierung von Ressourcen zuständig ist und keine Verarbeitungsaufgaben übernimmt. Das heißt, es sind keine massiven Speicher- und Core-Kapazitäten erforderlich.

Im nächsten Schritt konfigurieren wir Speicher und Cores für unsere Executors. Stellt sich die Frage, wie viele Executors und Cores man benötigt und wie hoch die Speicherkapazität sein sollte. Stellen wir uns einfach mal vor, wir hätten ein Hadoop-Cluster mit sechs Worker-Knoten, die jeweils mit 32 Cores und 120 GB Speicher ausgestattet sind. Man könnte jetzt natürlich vermuten, dass die Performance sich daran misst, wie viele Aufgaben wir in jedem Executor parallel ausführen können. Leitlinien für das Performance-Tuning von Hadoop-Distributoren wie zum Beispiel Cloudera zeigen aber, dass mehr als fünf Cores je Executor zu Lasten des HDFS I/O gehen. Das heißt, die für die Performance optimale Anzahl an Cores ist 5.

Schauen wir als nächstes, wie viele Executors wir einsetzen wollen. Basierend auf der Anzahl an Cores und Knoten, lässt sich das einfach ermitteln. Wie bereits erwähnt, sind fünf Cores pro Executor optimal. Nun müssen wir von den vorhandenen 32 Cores pro Knoten die entfernen, die wir für unsere Jobs nicht verwenden können, da sie vom Betriebssystem und den Hadoop-Deamons, die auf dem Knoten laufen, genutzt werden. Das Hadoop-Cluster Management-Tool erledigt das für uns. So lässt sich schnell und einfach bestimmen, wie viele Cores pro Knoten wir für unsere Spark Jobs nutzen können.

Nehmen wir einmal an, nach Durchführung dieser Berechnungen bleiben uns pro Knoten 30 nutzbare Cores. Da wir bereits bestimmt haben, dass fünf Cores pro Knoten die optimale Menge ist, heißt das, wir können problemlos bis zu sechs Cores pro Knoten einsetzen.

Berechnen wir nun abschließend, wie viel Speicher wir nutzen können. Basierend auf den oben genannten Hardware-Spezifikationen, sehen wir, dass pro Knoten 120 GB Speicher bereitstehen. Aber wie bereits beim Thema Cores erwähnt, steht nicht die gesamte Speichermenge zur Verfügung, da ein Teil durch das Betriebssystem genutzt wird. Auch hier kann das Hadoop-Cluster Management-Tool bestimmen, wie viel Speicherkapazität für unsere Jobs nutzbar ist. Wenn das Betriebssystem und die Hadoop-Daemons 2 GB Speicherkapazität benötigen, lässt uns das 118 GB Speicher für unsere Spark Jobs. Da wir bereits definiert haben, dass wir 6 Executors pro Knoten nutzen können, lässt sich einfach errechnen, dass pro Executor etwa bis zu 20 GB Speicher zur Verfügung stehen. Diese Zahl ist aber nicht zu 100 % korrekt, da wir auch den Speicher-Overhead jedes Executors einrechnen sollten. In meinem letzten Blog-Post erwähnte ich, dass der Overhead-Standard bei 384 MB liegt. Wenn ich das von den 20 GB abziehe, bleiben etwa 19 GB, die ich jedem Executor zuteilen kann.

Dynamische vs. feste Allokation von Cluster-Ressourcen

Die oben genannten Zahlen greifen bei Spark Jobs sowohl bei einer festen, als auch bei einer dynamischen Allokation von Speicher-Ressourcen. Der Unterschied zwischen beiden ist die dynamische Allokation. Bei der dynamischen Allokation lässt sich die Anzahl der eingangs verwendeten Executors ebenso definieren, wie die minimale und maximale Anzahl für Phasen mit geringem Workload bzw. für Auslastungsspitzen. Zwar wäre es schön, wenn wir alle Cluster-Ressourcen nutzen könnten, aber wir müssen die Rechnerleistung natürlich mit anderen Jobs teilen, die auf dem Cluster laufen. Deshalb bestimmt das System anhand der Anforderungen, die wir bei der Evaluierung der Tuning-Faktoren definiert haben, welchen Anteil dieser Maximalwerte wir für unseren Talend Spark Job nutzen können.

Nachdem nun alles konfiguriert ist, können wir damit beginnen, den Job auszuführen. Angenommen, die Verarbeitung unseres Spark Jobs nimmt sehr viel Zeit in Anspruch, obwohl wir die oben erwähnte Maximaleinstellung gewählt haben. In diesem Fall müssen wir uns ein paar weitere Einstellungen anschauen, und prüfen, ob sie die maximale Performance unterstützen.

Spark Performance

Einer der für die Optimierung von Spark Jobs relevanten Faktoren ist die Größe der Datensätze. Gehen wir einfach mal davon aus, dass wir bei unserem Spark Job zwei Tabellen integrieren wollen, von denen die eine 50 GB groß ist und die andere 100 MB. In dem Fall sollten wir prüfen, ob wir bei den Talend Komponenten die hilfreiche Funktion „Replicated Join“ aktiviert haben.

Replicated Join

Ein Replicated Join (auch Map-Side Join oder Broadcast Join) wird oft genutzt, wenn man eine große Tabelle mit einer kleinen integrieren will, um dann die Daten der kleinen Tabelle an alle Executors zu verteilen. Da hierbei der kleinere Datensatz in den Speicher passt, können wir ein Replicated Join nutzen, um die Daten an die Executors zu übertragen, was die Performance unseres Spark Jobs optimiert.

Da die Tabellendaten auf Executor-Ebene integriert werden müssen, lässt sich durch die Weiterleitung des kleineren Datensatzes an die Executors vermeiden, dass die Daten der großen Tabelle übers Netzwerk übergeben werden müssen. Viele Performance-Probleme bei Spark gehen darauf zurück, dass große Datenmengen über das Netzwerk transportiert werden. Das lässt sich einfach innerhalb des Talend Jobs überprüfen, indem wir, wie unten dargestellt, in der Komponente tMap die Option „Use Replicated Join“ aktivieren. Dann werden automatisch die Daten der Zuordnungstabelle an alle Executors weitergeleitet.


Im nächsten Schritt sollten wir prüfen, ob unser Job Aufgaben beinhaltet, die aufwändige Mehrfachberechnungen verursachen können.

Spark Cache

Um das Thema zu verdeutlichen, nehmen wir als Beispiel eine Datei, die Angaben zur Kaufhistorie von Kunden enthält. Aus diesen Daten wollen wir zwei Kennzahlen isolieren:

  • die Gesamtzahl der Kunden
  • die Zahl der gekauften Artikel

Falls wir hierbei keinen Spark Cache nutzen, werden die Daten für jede der oben genannten Berechnungen geladen. Das drückt die Performance, da so aufwändige Mehrfachberechnungen ausgelöst werden. Da wir aber wissen, dass dieser Datensatz zu einem späteren Zeitpunkt erneut benötigt wird, legen wir ihn über Spark Cache im Speicher ab und vermeiden so, dass er immer wieder neu geladen werden muss.

In Talend Spark Jobs geschieht das mithilfe der Komponenten tCacheIn und tCacheOut, die in der Apache Spark Palette in Talend verfügbar sind. So können Sie die verschiedenen Optionen des Spark Cache-Mechanismus nutzen.

Sie können die Daten alternativ auch nur auf Disk zwischenspeichern. Zudem haben Sie die Option, die Cache-Daten für Speicher, Disk oder beides zu serialisieren und sie an zwei andere Knoten zu replizieren. Am häufigsten wird die Option Speicher ohne Serialisierung gewählt, da das schneller ist. Weiß man aber, dass der Datensatz im Cache nicht in den Speicher passt und will man vermeiden, dass der Überhang auf Disk geschrieben wird, empfiehlt sich die Serialisierung, da sich so das Volumen des Datensatzes reduzieren lässt. Das aber kostet zusätzlichen Overhead und drückt die Performance. Am besten prüfen Sie die verfügbaren Optionen und wählen die für Ihren Job optimale Lösung.


Sind die Performance-Probleme auch dann noch nicht behoben, müssen wir uns das Spark History Web Interface vornehmen und prüfen, was vor sich geht. Wie in meinem letzten Blog-Post erwähnt, können wir im Bereich Spark History der Spark Konfiguration in Talend Spark Logging aktivieren. Spark Logging archiviert die Logs nach Abschluss eines Jobs und macht sie über das Spark History Web Interface verfügbar, was das Troubleshooting bei Spark Jobs deutlich vereinfacht. Die Funktion Spark Event Logging bei Spark Jobs zu aktivieren, ist definitiv eine Best-Practice und hilft uns, Performance-Problemen schneller auf die Spur zu kommen.


Wurde Spark Event Logging aktiviert, können Sie über das Spark History Web Interface und die Applikations-Nummer des Jobs nachvollziehen, dass wir die folgenden Tabs haben:

Auf unserer Spark Benutzeroberfläche oben wollen wir uns nun die Stages Tabs anschauen und herausfinden, welcher davon die Job-Performance beeinträchtigt, die Details dazu ermitteln und prüfen, ob wir ein Verhalten wie das unten beschriebene feststellen können:


Wie wir sehen, verarbeitet nur ein Executor den Großteil der Daten, während die anderen untätig sind, obwohl wir zehn Executors zugeteilt haben. Woran das liegt? Um die Frage zu beantworten, müssen wir zunächst klären, in welcher Jobphase das Problem auftritt. Es könnte zum Beispiel sein, dass sich das Problem bei einem Spark Job einstellt, für den wir Daten von einer komprimierten Datei einlesen. Da archivierte Dateien beim Lesen nicht standardmäßig partitioniert werden, wird für jede Archivdatei, die wir lesen, ein RDD mit einer einzigen Partitionierung erstellt. Und das führt zu diesem Verhalten. Liegt die komprimierte Datei in einem teilbaren Archivformat vor, wie PZIB2, und kann beim Lesen partitioniert werden, können wir in den erweiterten Einstellungen von tFileInputDelimited die Eigenschaft „Set Minimum Partitions“ aktivieren und dann als Minimalwert so viele Partitionen setzen, wie wir Executors haben.

Haben wir es mit einem Archivformat wie GZIP zu tun, das sich beim Lesen nicht erneut partitionieren lässt, können wir es mit der Komponente tPartition explizit partitionieren. Wie unten dargestellt, ermöglicht die Komponente eine erneute Partitionierung der Datei, sodass wir die Last gleichmäßig auf alle Executors verteilen können.

Die Lese-Partitionierung (partitioning at read) kann auch genutzt werden, wenn man mithilfe der Komponente tJDBC von einer Datenbank liest. Dabei werden folgende Eigenschaften verwendet:

Wie wir oben gesehen haben, kann die erneute Partitionierung nur in bestimmten Situationen genutzt werden. Stellen wir beispielsweise auf den zur Integration genutzten Keys einen Datenversatz fest, müssen andere Methoden herangezogen werden. Wie sich solch ein Datenversatz feststellen lässt? Schauen Sie sich den Datensatz nach Partitionen an und prüfen Sie, wie unsere Daten auf den zur Integration genutzten Keys gruppiert sind. Hier ein Beispiel dafür, wie ein „schiefer“ Datensatz nach Partition aussehen würde:

Wenn wir in solch einem Fall nicht über einen anderen Key erneut partitionieren können, sollten wir nach anderen Methoden suchen, um unseren Talend Spark Job zu optimieren. Eine weit verbreitete Technik nennt sich „Salzen“. Dabei wird dem genutzten Key ein „Fake Key“ an die Seite gestellt, um die Datenverteilung je Partition auszugleichen. Das lässt sich bei Spark Jobs über die Komponente tmap durchführen. Hier ein Beispiel:

Wie wir oben sehen, fügen wir den Fake Key auf tmap Ebene als numerische Einheit hinzu und verknüpfen ihn mit dem ursprünglichen Key und dem Zuordnungsdatensatz. Da die Integration nun auf der Kombination aus echtem und zusätzlich generiertem Key basiert, wird ein möglicher Datenversatz vermieden, der bei der Integration von Datensätzen in Spark die Performance beeinträchtigen kann.

Fazit

Es gibt zahlreiche weitere Methoden, um die Performance von Talend Spark Jobs zu tunen. Ich hoffe, Sie fanden die hier präsentierte Auswahl trotzdem hilfreich. Ich wünschen Ihnen jedenfalls viel Spaß mit Ihren nächsten Talend Spark Jobs.

Quellenangaben:

https://spark.apache.org/docs/latest/tuning.html

https://www.cloudera.com/documentation/enterprise/latest/topics/admin_spark_tuning1.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-tuning.html

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_spark-component-guide/content/ch_tuning-spark.html

An der Diskussion teilnehmen

0 Comments

Hinterlasse eine Antwort

Your email address will not be published. Required fields are marked *