Advanced Java Services | SynchronousQueue |
A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order.
Die folgende Tabelle ist die Tabelle der BloquingQueues angepaßt auf die Situaution einer SynchronousQueue. Sie zeigt auf den ersten Blick ein recht ungewöhnliches Verhalten.
Throws exception | Returns special value | Times out | Blocks | |
Insert | add(E e) IllegalStateException: Queue full (always, at Runtime) | offer(E e) (always false) | offer(E e, long timeout, TimeUnit unit) | put(E e) blocks (needs try/catch) |
Remove | remove() NoSuchElementException (always, at Runtime) | poll() (immer null) | poll(long timeout, TimeUnit unit) | take() blocks (needs try/catch) |
Examine (no remove) | element() NoSuchElementException (always, at Runtime) | peek() (immer null) | not applicable | not applicable |
Bedingt durch dieses Verhalten braucht eine SynchronousQueue immer mindestens zwei Threads um sinnvoll zu arbeiten. Sie wirkt wie eine Schleuse. Die Schleuse kann nur passiert werden, wenn auf der anderen Seite Abnehmer existieren und umgekehrt.
Eine SynchronousQueue wird z. Bsp. vom CachedThreadPool verwendet, den die Factoryklasse Executors liefert.
Im folgenden werden jeweils zwei Producer und zwei Consumer eingeführt. Ein Producer mit put() und ein Producer mit offer(). Dazu ein Consumer mit take() und ein Consumer mit poll().
Der PutProducer
class PutProducer extends Thread { SynchronousQueue<String> sq; public PutProducer(SynchronousQueue<String> sq) { this.sq = sq; } public void run() { System.out.println(this.getName() + " puts"); try { sq.put("hi"); sq.put("this is"); sq.put("a short"); sq.put("message"); } catch(Exception ex) { ex.printStackTrace(); } System.out.println("end put " + this.getName()); } } // end class
Der OfferProducer
class OfferProducer extends Thread { SynchronousQueue<String> sq; public OfferProducer(SynchronousQueue<String> sq) { this.sq = sq; } public void run() { System.out.println(this.getName() + " offers"); try { sq.offer("hi", 2, TimeUnit.SECONDS); sq.offer("this is", 2, TimeUnit.SECONDS); sq.offer("a short", 2, TimeUnit.SECONDS); sq.offer("message", 2, TimeUnit.SECONDS); } catch(Exception ex) { ex.printStackTrace(); } System.out.println("end offer " + this.getName()); } }
Der TakeConsumer
static class TakeConsumer extends Thread { SynchronousQueue<String> sq; public TakeConsumer(SynchronousQueue<String> sq) { this.sq = sq; } public void run() { try { String erg = sq.take(); System.out.println(erg); erg = sq.take(); System.out.println(erg); erg = sq.take(); System.out.println(erg); } catch(InterruptedException ex) { ex.printStackTrace(); } System.out.println("end take " + this.getName()); } } // end class
Der PollConsumer
static class PollConsumer extends Thread { SynchronousQueue<String> sq; public PollConsumer(SynchronousQueue<String> sq) { this.sq = sq; } public void run() { try { String erg = ""; while(erg != null) { erg = sq.poll(2, TimeUnit.SECONDS); System.out.println(this.getName() + " polled: " + erg); } System.out.println("2 seconds without reveiving a message"); } catch(InterruptedException ex) { ex.printStackTrace(); } System.out.println("end poll " + this.getName()); } } // end class
Mit dem blockierenden put() werden immer vier Strings angeboten, mit offer() vier Strings mit einem Timeout von 2 Sekunden. Das blockierende take() möchte drei Strings nehmen, poll() mit einem Timeout nimmt solange, bis es 2 Sekunden lang nichts mehr bekommt. Auf diese Weise kann man verschiedene Kombinationen ausprobieren.
Im folgenden eine Kombination von put() und take().
/* Verwendet put und take put bietet 4, take nimmt 3 da take nicht alles abholt, blockiert put "endlos" */ public static void putandtake() { SynchronousQueue<String> sq = new SynchronousQueue<>(); // put blocks PutProducer pp = new PutProducer(sq); pp.start(); // take blocks TakeConsumer tc = new TakeConsumer(sq); tc.start(); }
Output
Thread-0 puts hi this is a short end take Thread-1
Im folgenden eine Kombination von offer() und take().
/* Verwendet offer und take offer bietet 4 und wartet jeweils 2 sek take nimmt 3 und wartet jeweils 2 sek Beide Threads können sich beenden. */ public static void offerandtake() { SynchronousQueue<String> sq = new SynchronousQueue<>(); OfferProducer op = new OfferProducer(sq); op.start(); // take blocks 2 seconds TakeConsumer tc = new TakeConsumer(sq); tc.start(); } // end beispiel04
Output
Thread-0 offers hi this is a short end take Thread-1 end offer Thread-0