Advanced   Java   Services Phaser


Phaser

Der mit Java 7 eingeführte Phaser kombiniert die Fähigkeiten von CountDownLatch und CyclicBarrier und kann zudem mit einer beliebigen Anzahl von Threads arbeiten. Ähnlich wie bei CyclicBarrier gibt es eine Möglichkeit am Barrierepunkt eine zusätzliche Aktion zu triggern. Bei CyclicBarrier übergibt man dazu ein ein Runnable an den Konstruktor, bei Phaser bildet man eine Unterklasse und überschreibt die protected Methode boolean onAdvance(int phase, int registeredParties) Die Eínsatzmöglichkeiten für den Phaser sind komplex. Die Beispiele hier zeigen nur einige der Funktionen des Phasers.


Zusammenfassung

Wir nähern uns dem Phaser, in dem wir ihn zunächst als erweiterten CountDownLatch bzw. CyclicBarrier einsetzen.


Der Phaser als StartLatch

In diesem Beispiel wartet eine beliebige Anzahl von Threads auf ein Startsignal. Dazu haben sie ein Phaserobjekt zur Verfügung und rufen phaser.awaitAdvance(0), dies entsprcht der Methode await() bei CountDownLatch. Es muß hier allerdings übergeben werden auf welches Phasenende man wartet. Die Zählung beginnt bei 0. Stimmt die Nummer nicht mit der zu beendenden Phase überein blockiert diese Methode nicht. Unsere Workerklasse sieht also folgendermaßen aus.

Die Workerklasse

import java.util.concurrent.Phaser;

public class Worker extends Thread
{
   Phaser phaser;
   int result = 0;

   public Worker(Phaser phaser)
   {
      this.phaser = phaser;
   }

   @Override
   public void run()
   {
      System.out.println(this.getName() + " waiting to start, phase = " + phaser.getPhase());
      int phase = phaser.awaitAdvance(0);  // blockiert und liefert 1

      System.out.println(this.getName() + " running, phase = " + phase);
      Be.idleFor(200);
      System.out.println(this.getName() + " end");
   }
}

Die Mainklasse legt den Phaser an und übergibt eine gewisse Anzahl von Workers einem Executorservice. Anschließend registriert sich main zum Phaser und ruft dann arrive().

/*
   Phaser als StartLatch
*/
import java.util.concurrent.Phaser;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class PhaserDemo01
{
   public static void main(String[] args)
   {
      Phaser phaser = new MyPhaser();
      int workerCount = 5;

      ExecutorService executorService = Executors.newCachedThreadPool();
      for(int i = 0; i < workerCount; i++)
      {
         Worker worker = new Worker(phaser);
         // alle workers warten mit awaitAdvance(0)
         worker.setName("Worker-" + (i + 1));
         executorService.execute(worker);
      }

      Be.idleFor(200);
      System.out.println("main registers to phaser");
      phaser.register();
      System.out.println("main calls arrive");
      phaser.arrive();
      System.out.println("end main");
   }
}

Um die Verwendung der Methode onAdvance() zu dokumentieren, bilden wir eine Unterklasse von Phaser und geben darin einige Zustandsinformationen bekannt.

import java.util.concurrent.Phaser;

public class MyPhaser extends Phaser
{
   @Override
   protected boolean onAdvance(int phase, int registeredParties)
   {
      System.out.println("----------------------- onAdvance begin -----------------------");
      System.out.println("phase = " + phase +
                        ", arrived parts = " + this.getArrivedParties() +
                        ", registered parts " + registeredParties +
                        ", thread = " + Thread.currentThread().getName());
      System.out.println("------------------------ onAdvance end ------------------------");
      return super.onAdvance(phase, registeredParties);
   }
}

Hier ein möglicher Ablauf des Programms

Worker-1 waiting to start, phase = 0
Worker-3 waiting to start, phase = 0
Worker-2 waiting to start, phase = 0
Worker-4 waiting to start, phase = 0
Worker-5 waiting to start, phase = 0
main registers to phaser
main calls arrive
----------------------- onAdvance begin -----------------------
phase = 0, arrived parts = 1, registered parts 1, thread = main
------------------------ onAdvance end ------------------------
Worker-4 running, phase = 1
Worker-5 running, phase = 1
Worker-1 running, phase = 1
Worker-3 running, phase = 1
Worker-2 running, phase = 1
end main
Worker-4 end
Worker-3 end
Worker-2 end
Worker-1 end
Worker-5 end

Die Methode arrive() entspricht hier der Methode countDown des CountDownLatches. Man kann hier schon einige Regeln erkennen. Die Seite die registriert ruft auch arrive() arrive() blockiert nicht. awaitAdvance() dagegen blockiert und macht dann einen Phasenübergang (hier von 0 auf 1).


Der Phaser als StopLatch

Nun kehren sich die Verhältnisse um. main() wartet mit awaitAdvance(0) und die Workers geben mit arrive() bekannt, daß sie ihre Arbeit beendet haben. Nachdem alle Worker ein arrive() abgesetzt haben, kann main die Ergebnisse der einzelnen Workers einsammeln. Hier die Mainklasse.

/*
   Phaser als StopLatch
*/
public class PhaserDemo02
{
   public static void main(String[] args)
   {
      MyPhaser phaser = new MyPhaser();
      int workerCount = 5;

      ArrayList<Worker> workers = new ArrayList<>();
      ExecutorService executorService = Executors.newCachedThreadPool();
      for(int i = 0; i < workerCount; i++)
      {
         Worker worker = new Worker(phaser);
         // alle workers registrieren sich und rufen phaser.arrive() !
         workers.add(worker);
         executorService.execute(worker);
      }

      System.out.println("main begins waiting, phase = " + phaser.getPhase());
      int phase = phaser.awaitAdvance(0);  // blockiert  und liefert 1
      System.out.println("main: end waiting, phase = " + phase);
      System.out.print("Collecting results from workers:  ");
      for(Worker worker : workers)
      {
         System.out.print(worker.getResult() + "  ");
      }
      System.out.println();

      System.out.println("end main");
   }
}

Die Workers bekommen den Phaser über den Konstruktor und registrieren sich. Damit wird im Phaser ein interner Zähler hochgezählt. Dieser wird durch ein arrive() um eins heruntergezählt. Steht der Zähler auf 0, wird die Methode onAdvance() aufgerufen. Damit triggered der als letzer ankommende Worker diese Methode.

public class Worker implements Runnable
{
   private MyPhaser phaser;
   private int result = 0;

   public Worker(MyPhaser phaser)
   {
      this.phaser = phaser;
      this.phaser.register();
   }

   @Override
   public void run()
   {
      result = 10 + (int)(Math.random()*100);
      Be.idleFor(result);
      int unArrived = phaser.getUnarrivedParties();
      System.out.println(Thread.currentThread().getName() + ", unarrived parts = " + unArrived + ", phase = " + phaser.getPhase());
      int phase = phaser.arrive();
      // entspricht countDown(); // blockiert nicht

      System.out.println(Thread.currentThread().getName() + " passed arrive, phase = " + phase);
      // Phase wird nicht hochgezählt

   }  // end run()

   public int getResult()
   {
      return result;
   }
}

Die Unterklasse von Phaser ist identisch zum vorigen Beispiel. Hier ein möglicher Ausgang des Ablaufs.

main begins waiting, phase = 0
pool-1-thread-1, unarrived parts = 5, phase = 0
pool-1-thread-1 passed arrive, phase = 0
pool-1-thread-3, unarrived parts = 4, phase = 0
pool-1-thread-3 passed arrive, phase = 0
pool-1-thread-5, unarrived parts = 3, phase = 0
pool-1-thread-5 passed arrive, phase = 0
pool-1-thread-4, unarrived parts = 2, phase = 0
pool-1-thread-4 passed arrive, phase = 0
pool-1-thread-2, unarrived parts = 1, phase = 0
---------------------------- onAdvance begin -----------------------------
phase = 0, arrived parts = 5, registered parts 5, thread = pool-1-thread-2
----------------------------- onAdvance end ------------------------------
pool-1-thread-2 passed arrive, phase = 0
main: end waiting, phase = 1
Collecting results from workers:  93  94  73  55  11
end main

Der Phaser als CyclicBarrier

Auch ein Phaser braucht einen Counter. Die vorigen Beispiel zeigen, daß ein Aufruf von register() diesen Counter um 1 erhöht, während arrive() den Counter um 1 erniedrigt. Steht der Counter auf 0, so ist eine Phase beendet und eine neue Phase kann beginnen. Der Phaser verfügt über drei verschiedene arrive()-Methoden. Jede dieser arrive()-Methoden zählt den Zähler um eins herunter.

ReturntypName der Methode
intarrive()
Arrives at this phaser, without waiting for others to arrive.
intarriveAndAwaitAdvance()
Arrives at this phaser and awaits others.
intarriveAndDeregister()
Arrives at this phaser and deregisters from it without waiting for others to arrive.

Um den Phaser wie einen CyclicBarrier zu verwenden, setzen wir die Methode arriveAndAwaitAdvance() ein, da keine der await()-Methoden den Zähler beeinflußt.

Wir beginnen mit main(). es sieht praktisch genauso wie im Beispiel zu CycliBarrier.

public class PhaserDemo_03
{
// Der Phaser als CyclicBarrier
   public static void main(String[] args)
   {
      MyPhaser phaser = new MyPhaser();

      for(int i = 0; i < 5; i++)
      {
         Worker worker = new Worker(phaser);
         // alle workers registrieren sich und rufen phaser.arriveAndAwaitAdvance() !
         worker.setName("Worker-" + (i + 1));
         worker.start();
      }
      System.out.println("end main");
   }
}

Hier der Worker. Jeder Worker registriert sich beim Phaser. Die Anzahl der registrierten Worker bestimmen den Umfang einer Phase. Hat man etwa 5 Worker und pro Worker einen register()-Aufruf, so ist der Phasenumfang 5. Man braucht also ebensoviele arrive()-Aufrufe um eine Phase zu beenden. Der letzte arrive() Aufruf löst dann einen Aufruf von onAdvance() als Abschluß der Phase auf. Danach kann eine neue Phase beginnen.

/**
 arriveAndAwaitAdvance wartet auf alle registrierten Workers. Der zuletzt
 angekommene ruft onAdvance auf
*/
public class Worker extends Thread
{
   MyPhaser phaser;

   public Worker(MyPhaser phaser)
   {
      this.phaser = phaser;
      this.phaser.register();
   }

   @Override
   public void run()
   {
      // phase 0
      Be.idleFor(100);
      int unArrived = phaser.getUnarrivedParties();
      int arrived = phaser.getArrivedParties();
      System.out.println(this.getName() + " arriveAndAwaitAdvance, unarrived parties = " + unArrived + " phase = " + phaser.getPhase());
      int phase = phaser.arriveAndAwaitAdvance();
      // entspricht genau dem  await() von Cyclicbarrier

      // phase 1
      Be.idleFor(100);
      unArrived = phaser.getUnarrivedParties();
      arrived = phaser.getArrivedParties();
      System.out.println(this.getName() + " arriveAndAwaitAdvance, unarrived parties = " + unArrived + " phase = " + phaser.getPhase());

      phase = phaser.arriveAndAwaitAdvance();

      // phase 2
      Be.idleFor(100);
      unArrived = phaser.getUnarrivedParties();
      arrived = phaser.getArrivedParties();
      System.out.println(this.getName() + " arriveAndAwaitAdvance, unarrived parties = " + unArrived + " phase = " + phaser.getPhase());

      phase = phaser.arriveAndAwaitAdvance();

      // phase 3
      Be.idleFor(100);
      unArrived = phaser.getUnarrivedParties();
      arrived = phaser.getArrivedParties();
      System.out.println(this.getName() + " after arriveAndAwaitAdvance, arrived parties = " + arrived + " phase = " + phaser.getPhase());
   }  // end run()
}

Die Unterklasse von Phaser ist identisch zum ersten Beispiel. Hier ein möglicher Ausgang des Ablaufs.

end main
Worker-5 arriveAndAwaitAdvance, unarrived parties = 5 phase = 0
Worker-2 arriveAndAwaitAdvance, unarrived parties = 4 phase = 0
Worker-4 arriveAndAwaitAdvance, unarrived parties = 3 phase = 0
Worker-1 arriveAndAwaitAdvance, unarrived parties = 2 phase = 0
Worker-3 arriveAndAwaitAdvance, unarrived parties = 1 phase = 0
------------------------- onAdvance begin -------------------------
phase = 0, arrived parts = 5, registered parts 5, thread = Worker-3
-------------------------- onAdvance end --------------------------
Worker-1 arriveAndAwaitAdvance, unarrived parties = 5 phase = 1
Worker-3 arriveAndAwaitAdvance, unarrived parties = 4 phase = 1
Worker-2 arriveAndAwaitAdvance, unarrived parties = 3 phase = 1
Worker-4 arriveAndAwaitAdvance, unarrived parties = 2 phase = 1
Worker-5 arriveAndAwaitAdvance, unarrived parties = 1 phase = 1
------------------------- onAdvance begin -------------------------
phase = 1, arrived parts = 5, registered parts 5, thread = Worker-5
-------------------------- onAdvance end --------------------------
Worker-5 arriveAndAwaitAdvance, unarrived parties = 5 phase = 2
Worker-2 arriveAndAwaitAdvance, unarrived parties = 4 phase = 2
Worker-4 arriveAndAwaitAdvance, unarrived parties = 3 phase = 2
Worker-1 arriveAndAwaitAdvance, unarrived parties = 2 phase = 2
Worker-3 arriveAndAwaitAdvance, unarrived parties = 1 phase = 2
------------------------- onAdvance begin -------------------------
phase = 2, arrived parts = 5, registered parts 5, thread = Worker-3
-------------------------- onAdvance end --------------------------
Worker-3 after arriveAndAwaitAdvance, arrived parties = 0 phase = 3
Worker-2 after arriveAndAwaitAdvance, arrived parties = 0 phase = 3
Worker-4 after arriveAndAwaitAdvance, arrived parties = 0 phase = 3
Worker-5 after arriveAndAwaitAdvance, arrived parties = 0 phase = 3
Worker-1 after arriveAndAwaitAdvance, arrived parties = 0 phase = 3

Hier noch eine Übersicht über die drei await()-Methoden. Keine dieser Methoden beeinflußt den internen Zähler.

ReturntypName der Methode
intawaitAdvance(int phase)
Awaits the phase of this phaser to advance from the given phase value, returning immediately if the current phase is not equal to the given phase value or this phaser is terminated.
int awaitAdvanceInterruptibly(int phase)
Awaits the phase of this phaser to advance from the given phase value, throwing InterruptedException if interrupted while waiting, or returning immediately if the current phase is not equal to the given phase value or this phaser is terminated.
int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
Awaits the phase of this phaser to advance from the given phase value or the given timeout to elapse, throwing InterruptedException if interrupted while waiting, or returning immediately if the current phase is not equal to the given phase value or this phaser is terminated.