Advanced   Java   Services CyclicBarrier


CyclicBarrier

Ein CyclicBarrier kann man als eine Variante eines CountDownLatch auffassen. Eine gewisse feste Anzahl von Threads wird gestartet, wir nennen sie wieder Worker. Sie laufen bis sie auf eine Barriere stossen und warten müssen. Ein Objekt vom Typ CyclicBarrier ist diese Barriere und zählt die Ankommenden. Haben alle Worker die Barriere erreicht, so gibt es zwei Möglichkeitn. Im einfachen Fall können dann alle Worker die Barriere überschreiten und evtl. auf die nächste Barriere stoßen. Im anderen Fall wird an der Barriere ein neuer Thread gestartet und alle Angekommenen warten noch einmal bis dieser Thread beendet ist. Der BarriereThread wird als Runnable dem CyclicBarrier per Konstruktor übergeben. Anschließend wird die Barriere überschritten und evtl. eine weitere Barriere erreicht.

Im Unterschied zum CountDownLatch kann die Barriere zusammen mit dem BarriereThread beliebig oft verwendet werden. Es gibt zwei Konstruktoren. Der erste erhält nur die Anzahl der Threads auf die gewartet werden soll. In diesem Fall findet am Barrierepunkt keine Aktion statt. Der zweite Konstruktor, hat als zweites Argument ein Runnable, das die Aktion am Barrierepunkt enthält.

Im Gegensatz zum CountDownLatch gibt es hier kein Gegenstück zur await()-Methode. Es gibt weder eine eine countDown()-Methode noch Methoden wie notify() oder signal(). Aus diesem Grund nent man CyclicBarrier nicht advanceable (nicht vorrückbar). Wir können die Eigenschaften sofort zusammenfassen.


Zusammenfassung

Dieses Beispiel erklärt die Funktionsweise

Das folgende Beispiel verdeutlicht den Ablauf durch Konsolmeldungen. Studiert man an Hand der Konsolmeldungen den Code sollte das Prinzip klar erkennbar sein. Um den Code nicht permanent mit try-catch um sleep() aufzublähen verwenden wir die Hilfsklasse Be mit der Methode idleFor(int millis).

Die Hilfsklasse Be

public class Be
{
   public static void idleFor(int millis)
   {
      try
      {
         Thread.sleep(millis);
      }
      catch(InterruptedException ex)
      {
         System.out.println(ex);
      }
   }
}

Am Barrierepunkt kann eie Aktion geschehen. Die folgende Klasse dokumentiert, daß diese Aktion stattfindet.

public class BarrierAction implements Runnable
{
   @Override
   public void run()
   {
      System.out.println("BarrierAction running");
      Be.idleFor(500);
   }
}

Die Workerklasse erhält ein CyclicBarrierobjekt mit dem sich Wartepunkte realisieren kann. Dieses Objekt kennt die Anzahl der Threads auf die gewartet werden soll. Es sind zwei Wartepunkte eingebaut, damit das Prinzip deutlicher wird. Die Konsolmeldungen dokumentieren in welchem Abschnitt sich der Thread befindet.

public class Worker extends Thread
{
   private CyclicBarrier barrier;

   Worker(CyclicBarrier barrier)
   {
      this.barrier = barrier;
   }

   public void run()
   {
      try
      {
         System.out.println(this.getName() + " running");
         Be.idleFor(500);
         // wait for others
         System.out.println(this.getName() + " barrier1 reached (waiting)");
         barrier.await();  // blockiert
         System.out.println(this.getName() + " barrier1 passed");
         Be.idleFor(500);
         // wait for others
         System.out.println(this.getName() + " barrier2 reached (waiting)");
         barrier.await();  // blockiert
         System.out.println(this.getName() + " barrier2 passed");
      }
      catch(InterruptedException | BrokenBarrierException ex)
      {
         System.out.println(ex);
      }
   }
}

Das Hauptprogramm legt ein Objekt der BarrierAction an. Dann wird ein ein Objekt cycliBarrier vom Typ CycliBarrier eingerichtet, das fünf Worker kontrollieren kann. Diesen fünf Workern wird das Objekt cycliBarrier per Konstruktor übergeben. Anschließend brauchen die Workers nur noch gestartet werden.

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo_01
{
   public static void main(String args[])
   {
      BarrierAction barrierAction = new BarrierAction();
      int COUNT = 5;

      CyclicBarrier barrier = new CyclicBarrier(COUNT, barrierAction);
      for(int i = 0; i < COUNT; i++)
      {
         Worker worker = new Worker(barrier);
         worker.setName("Worker-" + (i+1));
         worker.start();
      }
      System.out.println("end main");
   }
}

Hier ein Durchlauf des Programms

Worker-1 running
end main
Worker-2 running
Worker-4 running
Worker-3 running
Worker-5 running
Worker-5 barrier1 reached (waiting)
Worker-2 barrier1 reached (waiting)
Worker-4 barrier1 reached (waiting)
Worker-3 barrier1 reached (waiting)
Worker-1 barrier1 reached (waiting)
BarrierAction running
Worker-1 barrier1 passed
Worker-3 barrier1 passed
Worker-4 barrier1 passed
Worker-2 barrier1 passed
Worker-5 barrier1 passed
Worker-1 barrier2 reached (waiting)
Worker-3 barrier2 reached (waiting)
Worker-2 barrier2 reached (waiting)
Worker-4 barrier2 reached (waiting)
Worker-5 barrier2 reached (waiting)
BarrierAction running
Worker-5 barrier2 passed
Worker-1 barrier2 passed
Worker-3 barrier2 passed
Worker-2 barrier2 passed
Worker-4 barrier2 passed

Hinter den Kulissen

Beim CoundDownLatch liegt die Funktionsweise offen. await() blockiert solange, bis durch die Methode countDown, die immer um 1 herunterzählt. der interne Zähler auf 0 steht. Aber woher weiß CyclicBarrier, wann das Warten zu Ende ist. Über den Konstruktor wird anfangs die Anzahl der Threads übergeben. Da die Barriere überwunden wird, wenn alle Threads diese erreicht haben, muß ein Zähler heruntergezählt werden. Die einzige Möglichkeit, wo dies geschehen kann ist die await() Methode selbst. Ein Blick in den Quellcode bestätigt dies.

Der Konstruktor

/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation.  It is reset to parties on each new
* generation or when broken.
*/
private int count;

public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}

Die Variable count erhält die Anzahl der Threads. Sie wird in der Methode await() heruntergezählt. Hier ein Ausschnitt aus dem Quellcode.

Die await()-Methode ruft im Wesentlich nur eine private Methode namens dowait() auf.

public int await() throws InterruptedException, BrokenBarrierException {
  try {
      return dowait(false, 0L);
  } catch (TimeoutException toe) {
      throw new Error(toe); // cannot happen;
  }
}

In dowait() wird (u.a.) heruntergezählt.

/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
  throws InterruptedException, BrokenBarrierException,
         TimeoutException {
  final ReentrantLock lock = this.lock;
  lock.lock();
  //...
    int index = --count;
  //...

Bemerkung

Daß es nur eine BarrierAction gibt, ist keine wesentliche Einschränkung. Über den Konstruktor von BarrierAction kann man Parameter übergeben und so von Barriere zu Barriere verschiedene Aktionen realisieren.