Advanced   Java   Services ForkJoinPool


ForkJoinPool

Der ForkJoinPool ist ein ExceutorService der speziell für die parallele Abarbeitung von Threads geschaffen wurde. Situationen bei denen sich Threads weiter verzweigen und dadurch viele parallele Vorgänge entstehen lassen sind das ideale Einsatzgebiet für den ForkJoinPool. Alle Threads dieses Pools suchen ständig nach abzuarbeitenden Runnables oder Callables, sodaß möglichst viele Threads parallel ablaufen. Dieses Prinzip nennt man "work-stealing". Der Parallelisierungsgrad von ForkJoinPool richtet sich nach der Anzahl der zur Verfügung stehenden Prozessoren. ForkJoinPool bringt erst in Mehrprozessorsystemen und bei vielen Subtasks eine Performanceverbesserung gegenüber anderen Threadpools.


Arbeitsweise

Der ForkJoinPool arbeitet nach dem klassischen "Teile- und Herrsche"-Prinzip, das nicht nur in der Informatik und in der Mathematik eine wichtige Rolle spielt:


ForkJoinTask<T>, RecursiveAction, RecursiveTask<T>

Genau nach diesem Prinzip arbeitet der ForkJoinPool, der unter der Federführung von Doug Lea entwickelt wurde. Dazu wurde der neue Begriff Task eingeführt. Tasks sind Teilthreads, die von einem Thread ausgeüfhrt werden und voneinander unabhängig sind, sodaß sie parallel ausgeführt werden können und nicht aufeiandner warten müssen.

Um das Prinzip "work-stealing" realisieren zu können müssen dem ForkJoinPool spezielle Objekte übergeben werden, in denen man die Aufgaben definiert, die abgearbeitet werden sollen. Dazu gibt es die abstrakte Klasse ForkJoinTask<T> und ihre ebenfalls abstrakten Ableitungen RecursiveAction und RecursiveTask<T>. Diese beiden Klassen entsprechen in etwa den Interfaces Runnable und Callable.


RecursiveAction


RecursiveTask<T>

Beide Klassen vereinbaren die obigen vier Methoden. Implementiert werden müssen in beiden Klassen nur die abstrakten compute()-Methoden. Man beachte den Unterschied beim Returntyp. So hat in RecursiveAction die Methode void compute() den Returntyp void und in RecursiveTask<T> die Methode T compute() den generischen Returntyp T. Eine der beiden Klassen muß vom Programmierer implementiert werden. In den compute()-Methoden müssen rekursive Aufrufe in einer bestimmten Reihenfolge stehen, nur so erhält man einen wirklichen ForkJoinPool.


Einem ForkJoinPool eine ForkJoinTask<T> übergeben

In ForkJoinPool gibt es Überladungen der Methoden execute(), invoke() und submit() für Objekte vom Typ ForkJoinTask<T>.


Das prinzipielle Vorgehen

An diesem absichtlich einfachen Beispiel soll nur das grundsätzliche Vorgehen gezeigt werden.


Schritt 1: ForkJoinPool anlegen

Dieser Schritt ist trivial. Man braucht in der Regel nur einen Pool.

import java.util.concurrent.ForkJoinPool;
...
ForkJoinPool forkJoinPool = new ForkJoinPool();

Schritt 2: RecursiveAction oder RecursiveTask<T> ableiten

In beiden Fällen muß lediglich die Methode compute() geschrieben werden. Die Implementierungen hier sind wiederum trivial. Insbesondere fehlen die rekursiven Aufrufe, die die parallele Abarbeitung erst ermöglichen.

class MyRecursiveAction extends RecursiveAction
{
   int data;
   int result;

   MyRecursiveAction(int x)
   {
      data = x;
   }

   @Override
   public void compute()
   {
      result = data + 1;
   }
}  // end class

Da wir hier keinen Returnwert haben speichern wir das Ergebnis in result.

class MyRecursiveTask extends RecursiveTask<Integer>
{
   int data;

   MyRecursiveTask(int x)
   {
      data = x;
   }

   @Override
   public Integer compute()
   {
      return data + 1;
   }
}  // end class

Im zweiten Fall geben wir das Ergebnis mit return zurück. Die (triviale) Aufgabe hier ist also die Inkrementierung eines Wertes.


Schritt 3: Objekte der entworfenen Klassen dem ForkJoinPool übergeben

Dies kann mit execute(), invoke() oder submit() geschehen.


Übergabe an execute()
forkJoinPool.execute(myRecursiveTask);
forkJoinPool.execute(myRecursiveAction);
System.out.println("result2 " + myRecursiveAction.result);

execute() gibt keinen Wert zurück und blockiert nicht, daher kann man nicht erwarten, daß System.out.println() das Ergebnis liefert. In der Regel dauert die zu lösende Aufgabe und die Ausgabe kommt schon während die Threads noch arbeiten.


Übergabe an invoke()
int result = forkJoinPool.invoke(myRecursiveTask);  // blockiert
System.out.println("result1 " + result);
forkJoinPool.invoke(myRecursiveAction);  // Returntyp void !   blockiert
System.out.println("result2 " + myRecursiveAction.result);

invoke() blockiert und wartet daher auf das Ergebnis.


Übergabe an submit()
ForkJoinTask<Integer> result1 = forkJoinPool.submit(myRecursiveTask);  // blockiert nicht !
System.out.println("xxx");
try
{
   System.out.println("result1 = " + result1.get());  // blockiert
}
catch(InterruptedException | ExecutionException ex)
{}
ForkJoinTask<Void> result2 = forkJoinPool.submit(myRecursiveAction);  // blockiert nicht
System.out.println("yyy");
try
{
   System.out.println("result2 = " + result2.get());   // blockiert  liefert 0
   System.out.println("result2 = " + myRecursiveAction.result);
}
catch(InterruptedException | ExecutionException ex)
{}

Im Gegensatz zu invoke() blockiert submit() (noch) nicht. Da ForkJoinTask das Interface Future implementiert, erhält man im ersten Fall über get() den Returnwert der compute()-Methode. Im zweiten Fall liefert zwar get() nicht das Ergebnis, blockiert aber bis das Ergebnis vorliegt.


Das tatsächliche Vorgehen

Der ForkJoinPool dient dazu rechenintensive Aufgaben parallel abzuarbeiten um so Performanceverbesserungen zu erzielen. In diesem Beispiel werden wir lediglich die Werte eines großen Arrays aufsummieren. Um sicher mit großen Zahlen arbeiten zu können verwenden wir den Datentyp BigInteger. Wir legen ein Array vom variabler Größe an (hier 200 001 Elemente) und belegen es mit stark anwachsenden Werten ( die Feldelemente haben der Reihe nach die Werte 0, 1, 16, 81, 4^4, 5^4, ..., 200000^4 ). Der ForkJoinPool hat nun die Aufgabe jedes Feldelement durch sich selbst zu teilen (Ergebnis ist jeweils 1) und diese Werte aufzusummieren. Die Summe wird also immer Feldlänge - 1 betragen. So kann man bei wachsender Feldlänge sehen wie der ForkJoinPool performanter wird verglichen mit der sequentiellen Berechnung.

Wir beginnen mit dem eigentlich interessanten Teil, der Realisierung der abstrakten Klassen RecursiveAction bzw. RecursiveTask<V>.


Die Realisierung von RecursiveTask

Dem Konstruktor wird ein Array übergeben und die zwei Grenzen innerhalb derer die Summierung durchgeführt werden soll. compute() besteht aus zwei Teilen. Für kleine Arrays, die unterhalb der Grenze SEQUENTIAL_THRESHOLD liegen lohnt sich eine parallele Berechnung nicht, deswegen wird die Berechnung sequentiell vorgenommen. Im else-Zweig wird das Array aufgeteilt und es gibt zwei rekursive Aufrufe left und right. Zum linken Teil wird nun die Methode fork() aufgerufen, die den parallelen Prozeß initiiert. Der rechte Teil ruft rekursiv compute(). left wartet mit join auf den rechten Teil und anschließend werden die Ergebnisse addiert.

class MyRecursiveTask extends RecursiveTask<BigInteger>
{
   private static final int SEQUENTIAL_THRESHOLD = 5000;
   private static int taskCount;

   int low;
   int high;
   BigInteger[] array;
   boolean print_S=true;
   boolean print_P=true;

   public MyRecursiveTask(BigInteger[] array, int low, int high)
   {
      taskCount++;
      this.array = array;
      this.low = low;
      this.high = high;
   }

   @Override
   protected BigInteger compute()
   {
      // Solange man unter der Grenze ist
      // läuft die Berechnung sequentiell
      if (high - low <= SEQUENTIAL_THRESHOLD)
      {
//          if(print_S)
//          {
//             System.out.println("sequentielle Berechnung");
//             print_S=false;
//          }

         BigInteger sum = BigInteger.ZERO;

         for(int i = (int)low; i < (int)high; ++i)
         {
            BigInteger tmp =  array[i].divide(BigInteger.valueOf(i))
              .divide(BigInteger.valueOf(i))
              .divide(BigInteger.valueOf(i))
              .divide(BigInteger.valueOf(i));
            //System.out.println(tmp);
            sum = sum.add(tmp);
         }
         return sum;
      }
      else
      {
//          if(print_S)
//          {
//             System.out.println("parallele Berechnung");
//             print_S=false;
//          }
         // Hier ist die Aufteilung in parallele Threads
         //
         int mid = low + (high - low) / 2;
         MyRecursiveTask left = new MyRecursiveTask(array, low, mid);
         MyRecursiveTask right = new MyRecursiveTask(array, mid, high);
         left.fork();                               // linker Teil wird abgezweigt
         BigInteger rightResult = right.compute();  // rechter teil wird mit compute() berechnet
         BigInteger leftResult = left.join();       // Warten auf den abgezweigten linken teil
         return leftResult.add(rightResult);
      }
   }

   public int getTaskCount()
   {
      return taskCount;
   }

}  // end class

Man beachte die Reihenfolge bei (1), (2) und (3). Die Methodenaufrufe müssen in der Reihenfolge

left.fork()
right.compute()
left.join()


oder

right.fork()
left.compute()
right.join()


stattfinden. Andernfalls findet keine parallele Verarbeitung statt.


Das Mainprogramm

import java.math.BigInteger;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPoolDemo
{
   static int arrSize = 200_001;
   private static BigInteger array[] = new BigInteger[arrSize];

   /**
    */
   public static void main(String[] args)
   {
      Runtime runtime = Runtime.getRuntime();
      int procCount = runtime.availableProcessors();
      System.out.println(procCount + " processors available");
      System.out.println();

      long startTime = 0;
      long endTime = 0;

      // Schritt 0  Arrayinhalt definieren.
      for(int i = 0; i < array.length; i++)
      {
         array[i] = BigInteger.valueOf(i).multiply( BigInteger.valueOf(i) )
                                         .multiply( BigInteger.valueOf(i) )
                                         .multiply( BigInteger.valueOf(i) );
         //System.out.println("arr["+i+"] = " +array[i]);

      }
      //System.out.println("max = " + array[arrSize-1]);
      startTime = System.currentTimeMillis();
      BigInteger sum = BigInteger.ZERO;
      for(int i = 1; i < array.length; ++i)
      {
         BigInteger tmp =  array[i].divide(BigInteger.valueOf(i))
                                   .divide(BigInteger.valueOf(i))
                                   .divide(BigInteger.valueOf(i))
                                   .divide(BigInteger.valueOf(i));
         //System.out.println(tmp);
         sum = sum.add(tmp);
      }
      endTime = System.currentTimeMillis();
      System.out.println("normale Berechnung          sum = " +  sum);
      System.out.println("startTime " + startTime);
      System.out.println("endTime   " + endTime);
      System.out.println("dauer     " + (endTime - startTime) + " millis");
      System.out.println();

      // Schritt 1 Anlegen eines ForkJoinPool
      // ForkJoinPool: you create exactly one of these to run all your fork-join
      // tasks in the whole program
      ForkJoinPool forkJoinPool = new ForkJoinPool();

      // then in some method in your program use the global pool we made above:
      // T  invoke(ForkJoinTask task)
      startTime = System.currentTimeMillis();
      BigInteger result = forkJoinPool.invoke(new MyRecursiveTask(array, 1, array.length));
      //System.out.println(forkJoinPool);
      endTime = System.currentTimeMillis();
      System.out.println("Berechnung mit ForkJoinPool sum = " + result);
      System.out.println("startTime " + startTime);
      System.out.println("endTime   " + endTime);
      System.out.println("dauer     " + (endTime - startTime) + " millis");
      System.out.println("taskCount " + recursiveTask.getTaskCount());
   }
}

Einige Ausgaben

4 processors available

normale Berechnung          sum = 200000
startTime 1435586359357
endTime   1435586359544
dauer     187 millis

Berechnung mit ForkJoinPool sum = 200000
startTime 1435586359544
endTime   1435586359638
dauer     94 millis
taskCount 127
4 processors available

normale Berechnung          sum = 300000
startTime 1435586476997
endTime   1435586477262
dauer     265 millis

Berechnung mit ForkJoinPool sum = 300000
startTime 1435586477262
endTime   1435586477402
dauer     140 millis
taskCount 127
4 processors available

normale Berechnung          sum = 400000
startTime 1435586506594
endTime   1435586506937
dauer     343 millis

Berechnung mit ForkJoinPool sum = 400000
startTime 1435586506937
endTime   1435586507109
dauer     172 millis
taskCount 127