Advanced   Java   Services SynchronousQueue Back Next Up Home


Aus der API

A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order.








Das Verhalten

Die folgende Tabelle ist die Tabelle der BloquingQueues angepaßt auf die Situaution einer SynchronousQueue. Sie zeigt auf den ersten Blick ein recht ungewöhnliches Verhalten.

Throws exception Returns special value Times out Blocks
Insert add(E e) IllegalStateException: Queue full (always, at Runtime) offer(E e)   (always false) offer(E e, long timeout, TimeUnit unit) put(E e) blocks (needs try/catch)
Remove remove() NoSuchElementException (always, at Runtime) poll()      (immer null) poll(long timeout, TimeUnit unit) take() blocks (needs try/catch)
Examine (no remove) element() NoSuchElementException (always, at Runtime) peek()     (immer null) not applicable not applicable

Bedingt durch dieses Verhalten braucht eine SynchronousQueue immer mindestens zwei Threads um sinnvoll zu arbeiten. Sie wirkt wie eine Schleuse. Die Schleuse kann nur passiert werden, wenn auf der anderen Seite Abnehmer existieren und umgekehrt.


Verwendung

Eine SynchronousQueue wird z. Bsp. vom CachedThreadPool verwendet, den die Factoryklasse Executors liefert.


Ein Beispiel

Im folgenden werden jeweils zwei Producer und zwei Consumer eingeführt. Ein Producer mit put() und ein Producer mit offer(). Dazu ein Consumer mit take() und ein Consumer mit poll().

Der PutProducer

class PutProducer extends Thread
{
   SynchronousQueue<String> sq;

   public PutProducer(SynchronousQueue<String> sq)
   {
      this.sq = sq;
   }

   public void run()
   {
      System.out.println(this.getName() +  " puts");
      try
      {
         sq.put("hi");
         sq.put("this is");
         sq.put("a short");
         sq.put("message");
      }
      catch(Exception ex)
      {
         ex.printStackTrace();
      }
      System.out.println("end put " + this.getName());
   }
} // end class

Der OfferProducer

class OfferProducer extends Thread
   {
      SynchronousQueue<String> sq;

      public OfferProducer(SynchronousQueue<String> sq)
      {
         this.sq = sq;
      }

      public void run()
      {
         System.out.println(this.getName() +  " offers");
         try
         {
            sq.offer("hi", 2, TimeUnit.SECONDS);
            sq.offer("this is", 2, TimeUnit.SECONDS);
            sq.offer("a short", 2, TimeUnit.SECONDS);
            sq.offer("message", 2, TimeUnit.SECONDS);
         }
         catch(Exception ex)
         {
            ex.printStackTrace();
         }
         System.out.println("end offer " + this.getName());
      }
   }

Der TakeConsumer

   static class TakeConsumer extends Thread
   {
      SynchronousQueue<String> sq;

      public TakeConsumer(SynchronousQueue<String> sq)
      {
         this.sq = sq;
      }

      public void run()
      {
         try
         {
            String erg = sq.take();
            System.out.println(erg);
            erg = sq.take();
            System.out.println(erg);
            erg = sq.take();
            System.out.println(erg);
         }
         catch(InterruptedException ex)
         {
            ex.printStackTrace();
         }
         System.out.println("end take " + this.getName());
      }
   } // end class

Der PollConsumer

static class PollConsumer extends Thread
{
   SynchronousQueue<String> sq;

   public PollConsumer(SynchronousQueue<String> sq)
   {
      this.sq = sq;
   }

   public void run()
   {
      try
      {
         String erg = "";
         while(erg != null)
         {
            erg = sq.poll(2, TimeUnit.SECONDS);
            System.out.println(this.getName() + " polled: " + erg);
         }
         System.out.println("2 seconds without reveiving a message");
      }
      catch(InterruptedException ex)
      {
         ex.printStackTrace();
      }
      System.out.println("end poll " + this.getName());
   }
} // end class

Mit dem blockierenden put() werden immer vier Strings angeboten, mit offer() vier Strings mit einem Timeout von 2 Sekunden. Das blockierende take() möchte drei Strings nehmen, poll() mit einem Timeout nimmt solange, bis es 2 Sekunden lang nichts mehr bekommt. Auf diese Weise kann man verschiedene Kombinationen ausprobieren.

Im folgenden eine Kombination von put() und take().

/*
   Verwendet put und take
   put bietet 4, take nimmt 3
   da take nicht alles abholt, blockiert put "endlos"
*/
public static void putandtake()
{
   SynchronousQueue<String> sq = new SynchronousQueue<>();

   // put blocks
   PutProducer pp = new PutProducer(sq);
   pp.start();

   // take blocks
   TakeConsumer tc = new TakeConsumer(sq);
   tc.start();
}

Output

Thread-0 puts
hi
this is
a short
end take Thread-1

Im folgenden eine Kombination von offer() und take().

/*
   Verwendet offer und take
   offer bietet 4 und wartet jeweils 2 sek
   take nimmt 3 und wartet jeweils 2 sek
   Beide Threads können sich beenden.
*/
public static void offerandtake()
{
   SynchronousQueue<String> sq = new SynchronousQueue<>();

   OfferProducer op = new OfferProducer(sq);
   op.start();

   // take blocks 2 seconds
   TakeConsumer tc = new TakeConsumer(sq);
   tc.start();
} // end beispiel04

Output

Thread-0 offers
hi
this is
a short
end take Thread-1
end offer Thread-0

























Valid XHTML 1.0 Strict top Back Next Up Home