Advanced   Java   Services CountDownLatch


CountDownLatch

Ein CountDownLatch ist eine Synchronisierungshilfe und ermöglicht ein einfaches Warten auf die Beendigung mehrerer Threads. Die Anzahl der Threads die CountDownLatch überwacht wird am Anfang über den Konstruktor festgelegt und ist dann nicht mehr veränderbar. Auch zum quasigleichzeitigen Starten von mehreren Thread kann ein CountDownLatch verwendet werden. Ein CountDownLatch kann nur einmal verwendet werden. Ein erneutes Starten ist nicht möglich.


Starten mehrerer Threads

Für das Starten mehrere Threads stellt man den Counter auf 1 und zählt ihn dann auf 0 herunter. Die Threads bekommen ein Objekt vom Typ CountDownLatch und warten. Die Wartebedingung schreibt man sofort am Beginn von run(). Ein anderer Thread verfügt auch über dieses Objekt und zählt den Zähler um eins herunter. Hier die Anfangssituation der zu startenden Threads.

public class Worker implements Runnable
{
   CountDownLatch startLatch;
   String name;

   Worker(CountDownLatch startLatch, ...)
   {
      this.startLatch = startLatch;
      //...
   }

   public void run()
   {
      try
      {
         startLatch.await();  // blockiert
         // wait until the latch has counted down to zero
         // runtergezählt wird in einem anderen Thread

         // do Work
         for(int i = 0 ; i < 2 ; i++)
         {
            System.out.println(i + " " + name + " " + new Date().getTime());
            Thread.currentThread().sleep(1000);
         }
      }
      catch(InterruptedException ex)
      {
         System.out.println(ex);
      }
   }
}

Ein anderer Thread instanziiert ein Objekt vom Typ CountDownLatch und setzt den Zähler auf 1. Eine feste Anzahl der obigen Worker erhält den CountDownLatch über den Konstruktor. Die Workers werden einem ExecutorService übergeben. Obwohl dieser sofort start() ruft, können die Workers nicht laufen, da sie am Anfang von run() durch startLatch.await() blockiert werden. Erst durch das Kommando startLatch.countDown() wird der Zähler von 1 auf 0 heruntergezählt und die Workers können laufen.

int MAX_WORKERS = ...
CountDownLatch startLatch = new CountDownLatch(1);

Worker[] worker = new Worker[MAX_WORKERS];
ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS);
for(int i = 0; i < MAX_WORKERS; i++)
{
   worker[i] = new Worker(startLatch, "Thread-"+(i+1));
   executorService.execute(worker[i]);
}
System.out.println("Start the Workers now");
startLatch.countDown(); // von 1 auf 0
//...
executorService.shutdown();

Warten auf mehrere Threads mit CountDownLatch

Während in der ersten Situation ein Thread ein Signal gibt, das andere Threads startet ist hier die Situation genau umgekehrt. Jeder der Workerthreads gibt am Ende eines bestimmten Abschnitts eine Mitteilung, daß dieser Abschnitt abgearbeitet ist. Hier muß der Thread nicht zu Ende sein. Nach countDown() können, da es nicht blockiert, sofort andere Aufgaben übernommen werden. Aus diesem Grund nennt man den CountDownLatch advanceable.

Ein anderer Thread wartet und registriert die Meldungen. Für jede Meldung wird ein internen Zähler heruntergezählt. Steht dieser auf 0 so ist das Warten zu Ende. Es ist dabei egal in welcher Reihenfolge die Mitteilungen eintreffen. Jeder Thread gibt genau eine Mitteilung. Hier die Skizze dieser zweiten Situation.

public class Worker implements Runnable
{
   CountDownLatch stopLatch;

   Worker(CountDownLatch stopLatch, ...)
   {
      this.stopLatch = stopLatch;
   }

   public void run()
   {
      try
      {
         // do work
      }
      finally
      {
         stopLatch.countDown(); // blockiert nicht (advanceable)
      }

      // do some other work
   }
}

Hier ein Ausschnitt aus dem wartenden Thread. Der wartende Thread verfügt über dasselbe Objekt stopLatch. Zum selben CountDownLatchobjekt sind also await() und countDown() immer in verschiedenen Threads.

...
try
{
   // warten bis alle Workers fertig sind
   stopLatch.await();
   // counter ist 0
   // Ergebnisse auswerten
}
catch(InterruptedException ex)
{
   ex.printStackTrace();
}
...

Ein vollständiges Beispielprogramm

Ein Worker mit einem startLatch und einem stopLatch

public class Worker implements Runnable
{
   private CountDownLatch startLatch;
   private CountDownLatch stopLatch;
   private String name;
   private int result = 0;

   Worker(CountDownLatch startLatch, CountDownLatch stopLatch, String name)
   {
      this.startLatch = startLatch;
      this.stopLatch = stopLatch;
      this.name = name;
   }

   public void run()
   {
      try
      {
         startLatch.await(); // wait until the latch has counted down to zero
         // runtergezählt wird in einem anderen Thread
         System.out.println("end waiting " + name);
         // do Work
         Thread.currentThread().sleep(500);
         result = 7;
      }
      catch(InterruptedException ex)
      {
         System.out.println(ex);
      }
      finally
      {
         stopLatch.countDown(); // blockiert nicht
         System.out.println(name + " nach countdown: " + stopLatch.getCount());
      }
   }

   public int getResult()
   {
      return result;
   }
}

Die Mainklasse

import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

public class CountDownLatchDemo_00
{
   public static void main(String args[])
   {
      final int COUNT = 6;  // Anzahl der Threads
      // CountDownLatch(int count)
      // Constructs a CountDownLatch initialized with the given count.
      CountDownLatch startSignal = new CountDownLatch(1);
      CountDownLatch stopSignal = new CountDownLatch(COUNT);

      Worker[] worker = new Worker[COUNT];
      ExecutorService executorService = Executors.newFixedThreadPool(COUNT);
      for(int i = 0; i < COUNT; i++)
      {
         worker[i] = new Worker(startSignal, stopSignal, "Thread-"+(i+1));
         executorService.execute(worker[i]);
      }
      System.out.println("Start the Workers now");
      startSignal.countDown(); // von 1 auf 0
      try
      {
         // main wartet bis alle Workers fertig sind
         stopSignal.await();
         // counter ist 0, Ergebnisse einsammeln
         int res = 0;
         for(int i = 0; i < worker.length; i++)
         {
            res += worker[i].getResult();
         }
         System.out.print("result = " + res);
       }
      catch(InterruptedException ex)
      {
         ex.printStackTrace();
      }

      executorService.shutdown();
      System.out.println("\nend main");
   }
}

Ein möglicher Ablauf des Programms

waiting Thread-1
waiting Thread-5
waiting Thread-2
Start the Workers now
waiting Thread-3
waiting Thread-4
end waiting Thread-2
end waiting Thread-4
end waiting Thread-5
end waiting Thread-1
end waiting Thread-3
Thread-5 nach countdown: 3
Thread-3 nach countdown: 1
Thread-4 nach countdown: 2
Thread-1 nach countdown: 3
728   699   632   68 285
end main
Thread-2 nach countdown: 0

Zusammenfassung

CountDownLatch ist die erste von insgesamt drei Synchronisationshilfen, die leicht unterschiedliche Eigenschaften haben. Die anderen beiden sind CyclicBarrier und der in Java 7 eingeführte Phaser. Um einen leichten Vergleich zu ermöglichen fassen wir hier die Eigenschaften des CountDownLatch zusammen.