Advanced   Java   Services LinkedTransferQueue


LinkedTransferQueue

LinkedTransferQueue leitet sich von AbstrctQueue ab und implementiert das Interface TransferQueue ab, das wiederum das Interface Blocking Queue beerbt. Somit ist LinkedTransferQueue eine spezielle Form einer BlockingQueue. Im folgenden sind die durch das Interface TransferQueue neu hinzugekommenen Methoden aufgelistet.




Die Methoden von TransferQueue
Modifier and TypeMethod and Description
int getWaitingConsumerCount()
Returns an estimate of the number of consumers waiting to receive elements via BlockingQueue.take() or timed poll.
boolean hasWaitingConsumer()
Returns true if there is at least one consumer waiting to receive an element via BlockingQueue.take() or timed poll.
void transfer(E e)
Transfers the element to a consumer, waiting if necessary to do so.
boolean tryTransfer(E e)
Transfers the element to a waiting consumer immediately, if possible.
boolean tryTransfer(E e, long timeout, TimeUnit unit)
Transfers the element to a consumer if it is possible to do so before the timeout elapses.

Beispiel: Consumer-Producer Beispiel mit hoher Nachfrage (Unterproduktion)

Die LinkedTransferQueue wird als Queue für eine Producer-Consumer Aufgabe verwendet, bei der die Consumer mehr entnehmen wollen als der Producer liefern kann. Die Produktivität des Producer wird über eine Pausenzeit im Konstruktor festgelegt. Je kürzer die Pause desto produktiver der Producer. Für die Consumer gilt dasselbe. Je kürzer die Pausenzeit, desto häufiger will ein Consumer ein Element aus der Queue entnehmen. Mit Hilfe der neuen Methode getWaitingConsumerCount() stellt der Producer fest wieviel Consumer warten müssen. Er erhöht daraufhin seine Produktion solange, bis kein Consumer mehr warten muß.

Ein statischer Zähler zählt die gesamte Produktion sowie den gesamten Konsum.

1) Der Producer

package hms;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 */
public class Producer extends Thread
{
  private static AtomicInteger count = new AtomicInteger();
  private TransferQueue transferQueue;
  private int sleepTime;

  public Producer(TransferQueue transferQueue, String name, int sleepTime)
  {
    this.transferQueue = transferQueue;
    this.setName(name);
    this.sleepTime = sleepTime;
  }

  public void run()
  {
    for(;;)
    {
      try
      {
        transferQueue.transfer(1);  //  auffüllen um 1  wie put(1)
        count.incrementAndGet();
        System.out.println("filled 1, queuesize = " + transferQueue.size());
        int waitingConsumers = transferQueue.getWaitingConsumerCount();
        System.out.println("waitingConsumers " + waitingConsumers);
        if (waitingConsumers > 0)
        {
          sleepTime -= 2;
        }
        TimeUnit.MILLISECONDS.sleep(sleepTime);
        System.out.println("sleepTime: " + sleepTime);
      }
      catch(InterruptedException ex)
      {
        System.out.println(ex);
        System.out.println(this.getName() + " end");
        break;
      }
    }
  }

  public static int getTotalProducedItems()
  {
    return count.get();
  }
}

2) Der Consumer

package hms;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 */
public class Consumer extends Thread
{
  private static AtomicInteger count = new AtomicInteger();

  private TransferQueue transferQueue;
  private int sleepTime;

  public Consumer(TransferQueue transferQueue, String name, int sleepTime)
  {
    this.transferQueue = transferQueue;
    this.setName(name);
    this.sleepTime = sleepTime;
  }

  public void run()
  {
    int value = 0;
    for(;;)
    {
      try
      {
        value = transferQueue.take(); // abholen
        count.incrementAndGet();
        System.out.println(this.getName() + " removed " + value + ", queuesize = " + transferQueue.size());
        TimeUnit.MILLISECONDS.sleep(sleepTime);
      }
      catch(InterruptedException ex)
      {
        System.out.println(ex);
        System.out.println(Thread.currentThread().getName() + " end");
        break;
      }
    }
  }

  public static int getTotalConsumedItems()
  {
    return count.get();
  }
}

3) Main

public static void main(String[] args)
{
   TransferQueue<Integer>  transferQueue = new LinkedTransferQueue<>();

   Consumer c1 = new Consumer(transferQueue, "consumer1", 100);
   Consumer c2 = new Consumer(transferQueue, "consumer2", 100);
   Consumer c3 = new Consumer(transferQueue, "consumer3", 100);
   Consumer c4 = new Consumer(transferQueue, "consumer4", 100);

   Producer p1 = new Producer(transferQueue, "producer", 100);  // je höher die Zahl, desto weniger
   // wird produziert, da die übergebene Zahl die Sleeptime ist
   c1.start();
   c2.start();
   c3.start();
   c4.start();
   p1.start();

   try
   {
      Thread.sleep(2500);
      c1.interrupt();
      c2.interrupt();
      c3.interrupt();
      c4.interrupt();
      p1.interrupt();
   }
   catch(InterruptedException e)
   {
      e.printStackTrace();
   }
}

Output
Der Producer verringert seine Pausenzeiten von 100ms schrittweise auf schließlich 30ms. Damit kann er die vier Komsumenten ohne Wartezeiten beliefern.

filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 3
sleepTime: 98
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 96
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 94
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 92
filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 90
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 88
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 86
filled 1, queuesize = 0
waitingConsumers 2
consumer4 removed 1, queuesize = 0
sleepTime: 84
filled 1, queuesize = 0
waitingConsumers 2
consumer2 removed 1, queuesize = 0
sleepTime: 82
consumer1 removed 1, queuesize = 0
filled 1, queuesize = 0
waitingConsumers 2
sleepTime: 80
filled 1, queuesize = 0
waitingConsumers 2
consumer3 removed 1, queuesize = 0
sleepTime: 78
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 76
filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 74
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 72
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 70
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 68
filled 1, queuesize = 0
waitingConsumers 2
consumer2 removed 1, queuesize = 0
sleepTime: 66
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 64
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 62
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 60
consumer2 removed 1, queuesize = 0
filled 1, queuesize = 0
waitingConsumers 2
sleepTime: 58
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 56
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 54
consumer4 removed 1, queuesize = 0
filled 1, queuesize = 0
waitingConsumers 2
sleepTime: 52
filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 50
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 2
sleepTime: 48
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 46
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 44
filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 42
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 40
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 38
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 36
filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 34
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 32
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 1
sleepTime: 30
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
consumer4 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
consumer2 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
consumer1 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
consumer3 removed 1, queuesize = 0
waitingConsumers 0
sleepTime: 30
filled 1, queuesize = 0
waitingConsumers 0
consumer4 removed 1, queuesize = 0
java.lang.InterruptedException: sleep interrupted
java.lang.InterruptedException
java.lang.InterruptedException: sleep interrupted
consumer4 end

Gesamtkonsum    : 44
Gesamtproduktion: 44
consumer2 end
java.lang.InterruptedException: sleep interrupted
java.lang.InterruptedException: sleep interrupted
consumer3 end
consumer1 end
producer end