MapReduce

aus Wikipedia, der freien Enzyklopädie
Wechseln zu: Navigation, Suche

MapReduce ist ein vom Unternehmen Google Inc. eingeführtes Programmiermodell für nebenläufige Berechnungen über (mehrere Petabyte[1]) große Datenmengen auf Computerclustern. MapReduce ist auch der Name einer Implementierung des Programmiermodells in Form einer Software-Bibliothek.

Beim MapReduce-Verfahren werden die Daten in zwei Phasen verarbeitet. Dadurch lassen sich Berechnungen parallelisieren und auf mehrere Rechner verteilen. Bei sehr großen Datenmengen ist die Parallelisierung unter Umständen bereits schon deshalb erforderlich, weil die Datenmengen für einen einzelnen Prozess (und das ausführende Rechnersystem) zu groß sind.

Das Programmiermodell wurde durch die in der funktionalen Programmierung häufig verwendeten Funktionen map und reduce inspiriert,[2] auch wenn die Arbeitsweise der Bibliothek davon abweicht.[3] 2010 wurde für MapReduce ein US-Patent erteilt.[4]

MapReduce-Implementierungen wurden in C++, Erlang, Java, Python und vielen anderen Programmiersprachen realisiert.

Arbeitsweise[Bearbeiten]

Illustration des Datenflusses[Bearbeiten]

MapReduce2.svg

Das obige Bild illustriert den Datenfluss bei der MapReduce-Berechnung.

  • Die Eingabedaten (D, A, T, A) werden auf eine Reihe von Map-Prozessen verteilt (bunte Rechtecke), welche jeweils die vom Nutzer bereitgestellte Map-Funktion berechnen.
  • Die Map-Prozesse werden idealerweise parallel ausgeführt.
  • Jede dieser Map-Instanzen legt Zwischenergebnisse ab (dargestellt durch die pinken Sterne).
  • Von jeder Map-Instanz fließen Daten in eventuell verschiedene Zwischenergebnisspeicher.
  • Sind alle Zwischenergebnisse berechnet, ist diese sogenannte Map-Phase beendet und die Reduce-Phase beginnt.
  • Für jeden Satz an Zwischenergebnissen berechnet jeweils genau ein Reduce-Prozess (violette Rechtecke) die vom Nutzer bereitgestellte Reduce-Funktion und damit die Ausgabedaten (violette Kreise X, Y und Z).
  • Die Reduce-Prozesse werden idealerweise auch parallel ausgeführt.

Definition der MapReduce-Funktion[Bearbeiten]

Die MapReduce-Bibliothek realisiert eine Funktion, welche aus einer Liste von Schlüssel-Wert-Paaren (Eingabeliste) eine neue Liste von Schlüssel-Wert-Paaren (Ausgabeliste) berechnet:


\begin{align}
 \mathrm{MapReduce} : (K\times V)^* &\to (L\times W)^* \\
 {\lbrack(k_1, v_1), \ldots, (k_n, v_n)\rbrack} &\mapsto \lbrack (l_1, w_1), \ldots, (l_m, w_m)\rbrack 
\end{align}

Erläuterung:

  • Die Mengen K und L enthalten Schlüssel, die Mengen V und W enthalten Werte.
  • Alle Schlüssel k \in K sind vom gleichen Typ, z. B. Strings.
  • Alle Schlüssel l \in L sind vom gleichen Typ, z. B. ganze Zahlen.
  • Alle Werte v \in V sind vom gleichen Typ, z. B. Atome.
  • Alle Werte w \in W sind vom gleichen Typ, z. B. Gleitkommazahlen.
  • Wenn A und B Mengen sind, so ist mit A\times B die Menge aller Paare (a, b) gemeint, wobei a \in A und b \in B (kartesisches Produkt).
  • Wenn M eine Menge ist, so ist mit M^* die Menge aller endlichen Listen mit Elementen aus M gemeint (angelehnt an den Kleene-Stern) - die Liste kann auch leer sein.

Definition der Map- und Reduce-Funktionen[Bearbeiten]

Der Nutzer konfiguriert die Bibliothek über die Bereitstellung der beiden Funktionen Map und Reduce, die wie folgt definiert sind:

\begin{align}
 \mathrm{Map} : K\times V &\to (L\times W)^* \\
 (k, v) &\mapsto \lbrack(l_1, x_1), \ldots, (l_{r_k}, x_{r_k})\rbrack
\end{align}

bzw.

\begin{align}
 \mathrm{Reduce} : L\times W^* &\to X^* \\
 \left(l, {\lbrack y_1, \ldots, y_{s_l}\rbrack}\right) &\mapsto \lbrack w_1, \ldots, w_{m_l}\rbrack
\end{align}

Map-Phase[Bearbeiten]

  • Map bildet ein Paar, bestehend aus einem Schlüssel k und einem Wert v, auf eine Liste von neuen Paaren (l_r, x_r) ab, welche die Rolle von Zwischenergebnissen spielen, die Werte x_r sind vom gleichen Typ wie die Endergebnisse w_i.
  • Bei einem neuen Paar (l, x) verweist der von Map vergebene Schlüssel l dabei auf eine Liste T_l von Zwischenergebnissen, in welcher der von Map berechnete Wert x gesammelt wird.
  • Die Bibliothek ruft für jedes Paar in der Eingabeliste die Funktion Map auf.
  • All diese Map-Berechnungen sind voneinander unabhängig, so dass man sie nebenläufig und verteilt auf einem Computercluster ausführen kann.

Reduce-Phase[Bearbeiten]

  • Sind alle Map-Aufrufe erfolgt bzw. liegen alle Zwischenergebnisse in T_l vor, so ruft die Bibliothek für jede Zwischenwertliste die Funktion Reduce auf, welche daraus eine Liste von Ergebniswerten w_j berechnet, die von der Bibliothek in der Ausgabeliste als Paare (l, w_j) gesammelt werden.
  • Auch die Aufrufe von Reduce können unabhängig auf verschiedene Prozesse im Computercluster verteilt werden.

Anmerkung: Diese Darstellung war etwas vereinfacht, denn in der Regel wird die Steuerung des MapReduce Verfahrens eine Anzahl R von Reduce-Prozessen anstreben, so dass, wenn es für mehr als R verschiedene Schlüssel l Zwischenergebnisse (l,x) gibt, Zwischenergebnisse (l,x) mit verschiedenen Schlüsseln l in einer gemeinsamen Liste gespeichert werden. Die entsprechenden Paare werden vor der Reduce-Berechnung nach Schlüsseln sortiert.

Combine-Phase[Bearbeiten]

Optional kann zwischen der Map- und der Reduce-Phase noch eine Combine-Phase erfolgen. Diese hat in der Regel die gleiche Funktionalität wie die Reducefunktion, wird aber auf dem gleichen Knoten wie die Map-Phase ausgeführt. Dabei geht es darum, die Netzwerklast zu reduzieren.[5] Der Sinn der Combine-Phase erschließt sich sofort bei der Betrachtung des Wordcount-Beispiels: Auf Grund der unterschiedlichen Häufigkeit von Wörtern in natürlicher Sprache, würde bei einem deutschen Text beispielsweise sehr oft eine Ausgabe der Form ("und", 1) erzeugt (gleiches gilt für Artikel und Hilfsverben). Durch die Combine-Phase wird nun aus 100 Nachrichten der Form ("und", 1) lediglich eine Nachricht der Form ("und", 100). Dies kann die Netzwerkbelastung signifikant reduzieren, ist aber nicht in allen Anwendungsfällen möglich.

Beispiel: Verteilte Häufigkeitsanalyse mit MapReduce[Bearbeiten]

Problem[Bearbeiten]

Man möchte für umfangreiche Texte herausfinden, wie oft welche Wörter vorkommen.

Angabe der Map- und Reduce-Funktionen[Bearbeiten]

 map(String name, String document):
  // name: document name ("key")
  // document: document contents ("value")
  for each word w in document:
    EmitIntermediate(w, 1);
 
 reduce(String word, Iterator partialCounts):
  // word: a word ("key")
  // partialCounts: a list of aggregated partial counts ("values")
  //     for 'word'
  int result = 0;
  for each v in partialCounts:
    result++;
  Emit(word, result);

Map-Phase[Bearbeiten]

  • Map bekommt jeweils einen Dokumentnamen name und ein Dokument document als Zeichenkette übergeben.
  • Map durchläuft das Dokument Wort für Wort.
  • Jedes Mal, wenn ein Wort w angetroffen wird, wandert eine 1 in die w-Zwischenergebnisliste (falls diese noch nicht existiert, wird sie angelegt).
  • Ist man mit allen Wörtern durch und hat der Text insgesamt n verschiedene Wörter, so endet die Map-Phase mit n Zwischenergebnislisten, jede für ein anderes Wort sammelnd, welche so viele 1-Einträge enthält, wie das entsprechende Wort im Dokument gefunden wurde.
  • Eventuell liefen viele Map-Instanzen gleichzeitig, falls der Bibliothek mehrere Worte und Dokumente übergeben wurden.

Reduce-Phase[Bearbeiten]

  • Reduce wird für das Wort word und die Zwischenergebnisliste partialCounts aufgerufen.
  • Reduce durchläuft die Zwischenergebnisliste und addiert alle gefundenen Zahlen auf.
  • Die Summe result wird an die Bibliothek zurückgegeben, sie enthält, wie oft das Wort word im Dokument gefunden wurde.
  • Die Zwischenergebnisse konnten parallel, durch gleichzeitige Reduce-Aufrufe, berechnet werden.

Insgesamt[Bearbeiten]

  • Aus einer Liste von Dokumentnamen und Dokumenten wird eine Liste von Worten und Worthäufigkeiten generiert.

Beispielhafte Berechnung[Bearbeiten]

Zum Beispiel wäre folgende Berechnung auf einem klassischen Text denkbar:

 Text = "Fest gemauert in der Erden
         Steht die Form, aus Lehm gebrannt.
         Heute muß die Glocke werden,
         Frisch, Gesellen! seid zur Hand.
         Von der Stirne heiß
         Rinnen muß der Schweiß,
         Soll das Werk den Meister loben,
         Doch der Segen kommt von oben."

Der Text wird in Sätze aufgeteilt, dabei bietet sich eine Normalisierung an, indem man alles klein schreibt und die Satzzeichen entfernt:

 Eingabeliste = [ (satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt"),
                  (satz_2, "heute muß die glocke werden frisch gesellen seid zur hand"),
                  (satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der "
                           "segen kommt von oben") ]

Die Eingabeliste hat drei Paare als Elemente, wir können daher drei Map-Prozesse starten:

 P1 = Map(satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt")
 P2 = Map(satz_2, "heute muß die glocke werden frisch gesellen seid zur hand")
 P3 = Map(satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen "
                  "kommt von oben")

Die Map-Aufrufe generieren diese Zwischenergebnispaare:

 P1 = [ ("fest", 1), ("gemauert", 1), ("in", 1), ("der", 1), ("erden", 1), 
        ("steht", 1), ("die", 1), ("form", 1), ("aus", 1), ("lehm, 1), 
        ("gebrannt", 1) ]
 P2 = [ ("heute", 1), ("muß", 1), ("die", 1), ("glocke", 1), ("werden", 1), 
        ("frisch", 1), ("gesellen", 1), ("seid", 1), ("zur", 1), ("hand", 1) ]
 P3 = [ ("von", 1), ("der", 1), ("stirne", 1), ("heiß", 1), ("rinnen", 1), 
        ("muß, 1), ("der", 1), ("schweiß", 1), ("soll", 1), ("das", 1), 
        ("werk", 1), ("den", 1), ("meister", 1), ("loben", 1), ("doch", 1), 
        ("der", 1), ("segen", 1), ("kommt", 1), ("von", 1), ("oben", 1) ]

Die Map-Prozesse liefern ihre Paare an die MapReduce-Bibliothek, welche diese in den Zwischenergebnislisten sammelt. Parallel könnte folgendes geschehen (Die gleiche Taktung der 3 Map-Prozesse ist unrealistisch, tatsächlich überlappen sich die Ausführungen. Die T_wort-Listen sind lokal pro Map-Prozess vorhanden und werden *nicht* zwischen den Schritten synchronisiert):

 1. Iteration:
    P1:  T_fest     = [ 1 ]     (neu)
    P2:  T_heute    = [ 1 ]     (neu)
    P3:  T_von      = [ 1 ]     (neu)
 
 2. Iteration:
    P1:  T_gemauert = [ 1 ]     (neu)
    P2:  T_muß      = [ 1 ]     (neu)
    P3:  T_der      = [ 1 ]     (neu)
 
 3. Iteration:
    P1:  T_in       = [ 1 ]     (neu)
    P2:  T_die      = [ 1 ]     (neu)
    P3:  T_stirne   = [ 1 ]     (neu)

Im vierten Schritt sieht man, dass Zwischenergebnislisten lokal für jeden Map-Prozess existieren und nicht global wiederverwendet werden können:

 4. Iteration:
    P1:  T_der      = [ 1 ]     (neu, der 1. Map-Prozess hat noch kein T_der, nur P2)
    P2:  T_glocke   = [ 1 ]     (neu)
    P3:  T_heiss    = [ 1 ]     (neu)
 
 5. Iteration
    P1:  T_erden    = [ 1 ]     (neu)
    P2:  T_werden   = [ 1 ]     (neu)
    P3:  T_rinnen   = [ 1 ]     (neu)
 
 6. Iteration
    P1:  T_steht    = [ 1 ]     (neu)
    P2:  T_frisch   = [ 1 ]     (neu)
    P3:  T_muß      = [ 1 ]     (neu, der 3. Map-Prozess hat noch kein T_muß, nur P2)

Im siebten Schritt kommt dann zum ersten Mal vor, dass ein weiteres Vorkommen in einer bereits angelegten Zwischenergebnisliste gesammelt wird:

 7. Schritt
    P1:  T_die      = [ 1 ]     (neu, der 1. Map-Prozess hat noch kein T_die)
    P2:  T_gesellen = [ 1 ]     (neu)
    P3:  T_der      = [ 1, 1 ]  (beim 3. Map-Prozess seit Iteration 2 vorhandene Liste verwenden)

usw.

Nach 21 Schritten sind alle drei Map-Prozesse mit ihrer Arbeit durch, die Map-Phase endet und es beginnt die Reduce-Phase. Die Zwischenergebnislisten, die von verschiedenen Map-Prozessen zu demselben Wort angelegt wurden, werden zusammengefügt. Für jede der entstandenen Zwischenergebnislisten (hier sortiert aufgeführt)

                     reduce
   T_der      = [ 1 ] ++ [ 1, 1, 1 ] -> [ 4 ]
   T_die      = [ 1 ] ++ [ 1 ]       -> [ 2 ]
   T_fest     = [ 1 ]                -> [ 1 ]
   T_gemauert = [ 1 ]                -> [ 1 ]
   T_glocke   = [ 1 ]                -> [ 1 ]
   T_heiss    = [ 1 ]                -> [ 1 ]
   T_heute    = [ 1 ]                -> [ 1 ]
   T_in       = [ 1 ]                -> [ 1 ]
   T_muß      = [ 1 ] ++ [ 1 ]       -> [ 2 ]
   T_stirne   = [ 1 ]                -> [ 1 ]
   T_von      = [ 1, 1 ]             -> [ 2 ]
   .
   .
   . (für alle verschiedenen T-Listen)

können wir parallel einen Reduce-Prozess starten, der jeweils die Elemente aufzählt. Das Ergebnis von MapReduce sieht in etwa so aus:

 Ausgabeliste = [ ("fest", 1), ("heute", 1), ("von", 2), ("gemauert", 1), 
                  ("muß", 2), ("der", 4), ("in", 1), ("die", 2), .. ]

Weitere Beispiele[Bearbeiten]

Verfahren Map-Funktion Reduce-Funktion
Verteiltes grep Gibt die gefundene Zeile (hit) in einen Zwischenergebnisspeicher Reicht durch (Identische Abbildung, genauer: Projektion der 2. Komponente)

Weblinks[Bearbeiten]

 Commons: MapReduce – Sammlung von Bildern, Videos und Audiodateien

Fachartikel[Bearbeiten]

Software[Bearbeiten]

Einzelnachweise[Bearbeiten]

  1. Google spotlights data center inner workings | Tech news blog - CNET News.com
  2. "Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages." -"MapReduce: Simplified Data Processing on Large Clusters", von Jeffrey Dean und Sanjay Ghemawat, Google Labs
  3. "Google's MapReduce Programming Model -- Revisited" – Paper von Ralf Lämmel, Microsoft
  4. USP 7,650,331 (United States Patent and Trademark Office)
  5. MapReduce-Paper