Advanced Java Services | ForkJoinPool |
Der ForkJoinPool ist ein ExceutorService der speziell für die parallele Abarbeitung von Threads geschaffen wurde. Situationen bei denen sich Threads weiter verzweigen und dadurch viele parallele Vorgänge entstehen lassen sind das ideale Einsatzgebiet für den ForkJoinPool. Alle Threads dieses Pools suchen ständig nach abzuarbeitenden Runnables oder Callables, sodaß möglichst viele Threads parallel ablaufen. Dieses Prinzip nennt man "work-stealing". Der Parallelisierungsgrad von ForkJoinPool richtet sich nach der Anzahl der zur Verfügung stehenden Prozessoren. ForkJoinPool bringt erst in Mehrprozessorsystemen und bei vielen Subtasks eine Performanceverbesserung gegenüber anderen Threadpools.
Der ForkJoinPool arbeitet nach dem klassischen "Teile- und Herrsche"-Prinzip, das nicht nur in der Informatik und in der Mathematik eine wichtige Rolle spielt:
Genau nach diesem Prinzip arbeitet der ForkJoinPool, der unter der Federführung von Doug Lea entwickelt wurde. Dazu wurde der neue Begriff Task eingeführt. Tasks sind Teilthreads, die von einem Thread ausgeüfhrt werden und voneinander unabhängig sind, sodaß sie parallel ausgeführt werden können und nicht aufeiandner warten müssen.
Um das Prinzip "work-stealing" realisieren zu können müssen dem ForkJoinPool spezielle Objekte übergeben werden, in denen man die Aufgaben definiert, die abgearbeitet werden sollen. Dazu gibt es die abstrakte Klasse ForkJoinTask<T> und ihre ebenfalls abstrakten Ableitungen RecursiveAction und RecursiveTask<T>. Diese beiden Klassen entsprechen in etwa den Interfaces Runnable und Callable.
Beide Klassen vereinbaren die obigen vier Methoden. Implementiert werden müssen in beiden Klassen nur die abstrakten compute()-Methoden. Man beachte den Unterschied beim Returntyp. So hat in RecursiveAction die Methode void compute() den Returntyp void und in RecursiveTask<T> die Methode T compute() den generischen Returntyp T. Eine der beiden Klassen muß vom Programmierer implementiert werden. In den compute()-Methoden müssen rekursive Aufrufe in einer bestimmten Reihenfolge stehen, nur so erhält man einen wirklichen ForkJoinPool.
In ForkJoinPool gibt es Überladungen der Methoden execute(), invoke() und submit() für Objekte vom Typ ForkJoinTask<T>.
An diesem absichtlich einfachen Beispiel soll nur das grundsätzliche Vorgehen gezeigt werden.
Dieser Schritt ist trivial. Man braucht in der Regel nur einen Pool.
import java.util.concurrent.ForkJoinPool; ... ForkJoinPool forkJoinPool = new ForkJoinPool();
In beiden Fällen muß lediglich die Methode compute() geschrieben werden. Die Implementierungen hier sind wiederum trivial. Insbesondere fehlen die rekursiven Aufrufe, die die parallele Abarbeitung erst ermöglichen.
class MyRecursiveAction extends RecursiveAction { int data; int result; MyRecursiveAction(int x) { data = x; } @Override public void compute() { result = data + 1; } } // end class
Da wir hier keinen Returnwert haben speichern wir das Ergebnis in result.
class MyRecursiveTask extends RecursiveTask<Integer> { int data; MyRecursiveTask(int x) { data = x; } @Override public Integer compute() { return data + 1; } } // end class
Im zweiten Fall geben wir das Ergebnis mit return zurück. Die (triviale) Aufgabe hier ist also die Inkrementierung eines Wertes.
Dies kann mit execute(), invoke() oder submit() geschehen.
forkJoinPool.execute(myRecursiveTask); forkJoinPool.execute(myRecursiveAction); System.out.println("result2 " + myRecursiveAction.result);
execute() gibt keinen Wert zurück und blockiert nicht, daher kann man nicht erwarten, daß System.out.println() das Ergebnis liefert. In der Regel dauert die zu lösende Aufgabe und die Ausgabe kommt schon während die Threads noch arbeiten.
int result = forkJoinPool.invoke(myRecursiveTask); // blockiert System.out.println("result1 " + result); forkJoinPool.invoke(myRecursiveAction); // Returntyp void ! blockiert System.out.println("result2 " + myRecursiveAction.result);
invoke() blockiert und wartet daher auf das Ergebnis.
ForkJoinTask<Integer> result1 = forkJoinPool.submit(myRecursiveTask); // blockiert nicht ! System.out.println("xxx"); try { System.out.println("result1 = " + result1.get()); // blockiert } catch(InterruptedException | ExecutionException ex) {} ForkJoinTask<Void> result2 = forkJoinPool.submit(myRecursiveAction); // blockiert nicht System.out.println("yyy"); try { System.out.println("result2 = " + result2.get()); // blockiert liefert 0 System.out.println("result2 = " + myRecursiveAction.result); } catch(InterruptedException | ExecutionException ex) {}
Im Gegensatz zu invoke() blockiert submit() (noch) nicht. Da ForkJoinTask das Interface Future implementiert, erhält man im ersten Fall über get() den Returnwert der compute()-Methode. Im zweiten Fall liefert zwar get() nicht das Ergebnis, blockiert aber bis das Ergebnis vorliegt.
Der ForkJoinPool dient dazu rechenintensive Aufgaben parallel abzuarbeiten um so Performanceverbesserungen zu erzielen. In diesem Beispiel werden wir lediglich die Werte eines großen Arrays aufsummieren. Um sicher mit großen Zahlen arbeiten zu können verwenden wir den Datentyp BigInteger. Wir legen ein Array vom variabler Größe an (hier 200 001 Elemente) und belegen es mit stark anwachsenden Werten ( die Feldelemente haben der Reihe nach die Werte 0, 1, 16, 81, 4^4, 5^4, ..., 200000^4 ). Der ForkJoinPool hat nun die Aufgabe jedes Feldelement durch sich selbst zu teilen (Ergebnis ist jeweils 1) und diese Werte aufzusummieren. Die Summe wird also immer Feldlänge - 1 betragen. So kann man bei wachsender Feldlänge sehen wie der ForkJoinPool performanter wird verglichen mit der sequentiellen Berechnung.
Wir beginnen mit dem eigentlich interessanten Teil, der Realisierung der abstrakten Klassen RecursiveAction bzw. RecursiveTask<V>.
Dem Konstruktor wird ein Array übergeben und die zwei Grenzen innerhalb derer die Summierung durchgeführt werden soll. compute() besteht aus zwei Teilen. Für kleine Arrays, die unterhalb der Grenze SEQUENTIAL_THRESHOLD liegen lohnt sich eine parallele Berechnung nicht, deswegen wird die Berechnung sequentiell vorgenommen. Im else-Zweig wird das Array aufgeteilt und es gibt zwei rekursive Aufrufe left und right. Zum linken Teil wird nun die Methode fork() aufgerufen, die den parallelen Prozeß initiiert. Der rechte Teil ruft rekursiv compute(). left wartet mit join auf den rechten Teil und anschließend werden die Ergebnisse addiert.
class MyRecursiveTask extends RecursiveTask<BigInteger> { private static final int SEQUENTIAL_THRESHOLD = 5000; private static int taskCount; int low; int high; BigInteger[] array; boolean print_S=true; boolean print_P=true; public MyRecursiveTask(BigInteger[] array, int low, int high) { taskCount++; this.array = array; this.low = low; this.high = high; } @Override protected BigInteger compute() { // Solange man unter der Grenze ist // läuft die Berechnung sequentiell if (high - low <= SEQUENTIAL_THRESHOLD) { // if(print_S) // { // System.out.println("sequentielle Berechnung"); // print_S=false; // } BigInteger sum = BigInteger.ZERO; for(int i = (int)low; i < (int)high; ++i) { BigInteger tmp = array[i].divide(BigInteger.valueOf(i)) .divide(BigInteger.valueOf(i)) .divide(BigInteger.valueOf(i)) .divide(BigInteger.valueOf(i)); //System.out.println(tmp); sum = sum.add(tmp); } return sum; } else { // if(print_S) // { // System.out.println("parallele Berechnung"); // print_S=false; // } // Hier ist die Aufteilung in parallele Threads // int mid = low + (high - low) / 2; MyRecursiveTask left = new MyRecursiveTask(array, low, mid); MyRecursiveTask right = new MyRecursiveTask(array, mid, high); left.fork(); // linker Teil wird abgezweigt BigInteger rightResult = right.compute(); // rechter teil wird mit compute() berechnet BigInteger leftResult = left.join(); // Warten auf den abgezweigten linken teil return leftResult.add(rightResult); } } public int getTaskCount() { return taskCount; } } // end class
Man beachte die Reihenfolge bei (1), (2) und (3). Die Methodenaufrufe müssen in der Reihenfolge
left.fork()
right.compute()
left.join()
oder
right.fork()
left.compute()
right.join()
stattfinden. Andernfalls findet keine parallele Verarbeitung statt.
import java.math.BigInteger; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinPoolDemo { static int arrSize = 200_001; private static BigInteger array[] = new BigInteger[arrSize]; /** */ public static void main(String[] args) { Runtime runtime = Runtime.getRuntime(); int procCount = runtime.availableProcessors(); System.out.println(procCount + " processors available"); System.out.println(); long startTime = 0; long endTime = 0; // Schritt 0 Arrayinhalt definieren. for(int i = 0; i < array.length; i++) { array[i] = BigInteger.valueOf(i).multiply( BigInteger.valueOf(i) ) .multiply( BigInteger.valueOf(i) ) .multiply( BigInteger.valueOf(i) ); //System.out.println("arr["+i+"] = " +array[i]); } //System.out.println("max = " + array[arrSize-1]); startTime = System.currentTimeMillis(); BigInteger sum = BigInteger.ZERO; for(int i = 1; i < array.length; ++i) { BigInteger tmp = array[i].divide(BigInteger.valueOf(i)) .divide(BigInteger.valueOf(i)) .divide(BigInteger.valueOf(i)) .divide(BigInteger.valueOf(i)); //System.out.println(tmp); sum = sum.add(tmp); } endTime = System.currentTimeMillis(); System.out.println("normale Berechnung sum = " + sum); System.out.println("startTime " + startTime); System.out.println("endTime " + endTime); System.out.println("dauer " + (endTime - startTime) + " millis"); System.out.println(); // Schritt 1 Anlegen eines ForkJoinPool // ForkJoinPool: you create exactly one of these to run all your fork-join // tasks in the whole program ForkJoinPool forkJoinPool = new ForkJoinPool(); // then in some method in your program use the global pool we made above: //T invoke(ForkJoinTask task) startTime = System.currentTimeMillis(); BigInteger result = forkJoinPool.invoke(new MyRecursiveTask(array, 1, array.length)); //System.out.println(forkJoinPool); endTime = System.currentTimeMillis(); System.out.println("Berechnung mit ForkJoinPool sum = " + result); System.out.println("startTime " + startTime); System.out.println("endTime " + endTime); System.out.println("dauer " + (endTime - startTime) + " millis"); System.out.println("taskCount " + recursiveTask.getTaskCount()); } }
Einige Ausgaben
4 processors available normale Berechnung sum = 200000 startTime 1435586359357 endTime 1435586359544 dauer 187 millis Berechnung mit ForkJoinPool sum = 200000 startTime 1435586359544 endTime 1435586359638 dauer 94 millis taskCount 127
4 processors available normale Berechnung sum = 300000 startTime 1435586476997 endTime 1435586477262 dauer 265 millis Berechnung mit ForkJoinPool sum = 300000 startTime 1435586477262 endTime 1435586477402 dauer 140 millis taskCount 127
4 processors available normale Berechnung sum = 400000 startTime 1435586506594 endTime 1435586506937 dauer 343 millis Berechnung mit ForkJoinPool sum = 400000 startTime 1435586506937 endTime 1435586507109 dauer 172 millis taskCount 127