Advanced   Java   Services ArrayBlockingQueue und LinkedBlockingQueue Back Back Up Home

ArrayBlockingQueue<E>

Eine nach unten und oben begrenzte BlockingQueue, die im Hintergrund mit einem Array arbeitet. Die obere Grenze muß über den Konstruktor angegeben. Aus der API:

This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

This is a classic "bounded buffer", in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.


LinkedBlockingQueue<E>

Eine nach unten begrenzte BlockingQueue. Ohne Angabe einer oberen Grenze wird Integer.MAX_VALUE als maximaler Wert gesetzt. Bei Producer-Consumer Modellen, wo der Producer sehr aktiv ist kann man über den Konstruktor eine obere Grenze angeben.

This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.


Verwendung

Eine LinkedBlockingQueue wird z. Bsp. vom SingleThreadExecutor und vom FixedThreadPool verwendet, zwei Executorservices, den die Factoryklasse Executors liefert.


Realisierung des Producer-Consumer Modells mit einer BlockingQueue<E>

Das Lager entfällt. Die BlockingQueue ist nun unser Lager. Den Typ zu speichernden Objekte geben wir generisch an. Jeder nicht primitive Datentyp ist möglich. Consumer und Producer erhalten nun über den Konstruktor die BlockingQueue. Sie sind im Prinzip wie vorher aufgebaut. Es werden nur Konsolmeldungen eingebaut, die den Ablauf dokumentieren, da das in der BackingQueue nicht mehr geschehen kann.


Der Consumer
import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue;

public class Consumer extends Thread
{
   private BlockingQueue<Integer> blockingQueue;
   private int sleepTime;

   public Consumer(BlockingQueue<Integer> blockingQueue, String name, int sleepTime)
   {
      this.blockingQueue = blockingQueue;
      this.setName(name);
      this.sleepTime = sleepTime;
   }

   public void run()
   {
      int value=0;
      for(;;)
      {
         try
         {
            value = blockingQueue.take(); // abholen
            System.out.println("removed 1, size = " + blockingQueue.size());
            TimeUnit.MILLISECONDS.sleep((int)(Math.random()*sleepTime));
         }
         catch(InterruptedException ex)
         {
            System.out.println(ex);
            interrupt();
            System.out.println(Thread.currentThread().getName() + " end");
            break;
         }
      }
   }
}

Der Producer
import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue;

public class Producer extends Thread
{
   private BlockingQueue<Integer> blockingQueue;
   private int sleepTime;

   public Producer(BlockingQueue<Integer> blockingQueue, String name, int sleepTime)
   {
      this.blockingQueue = blockingQueue;
      this.setName(name);
      this.sleepTime = sleepTime;
   }

   public void run()
   {
      for(;;)
      {
         try
         {
            blockingQueue.put(1); // auffüllen um 1
            System.out.println("filled 1, size = " + blockingQueue.size());
            TimeUnit.MILLISECONDS.sleep((int)(Math.random()*sleepTime));
         }
         catch(InterruptedException ex)
         {
            System.out.println(ex);
            interrupt();
            System.out.println(this.getName() + " end");
            break;
         }
      }
   }
}

Das Hauptprogramm

Wir legen in main beide Queues an und produzieren verschiedene Abläufe. Durch verändern der sleep()-Zeiten kann man verschiedene Aktivitäten nachstellen. Kleine Wartezeiten erhöhen die Aktivität von Consumer bzw. Producer.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main
{
   public static void main(String[] args)
   {
      BlockingQueue<Integer>  blockingQueue = new ArrayBlockingQueue<>(7);
      // 7 ist die obere Grenze
      //LinkedBlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();

      blockingQueue.add(1);
      blockingQueue.add(1);
      blockingQueue.add(1);

      Consumer c1 = new Consumer(blockingQueue, "consumer", 100);
      Producer p1 = new Producer(blockingQueue, "producer", 20);
      Consumer c2 = new Consumer(blockingQueue, "consumer2", 100);
//    Producer p2 = new Producer(lager, "producer2", 50);
      p1.start();
      c1.start();
//    p2.start();
      c2.start();

      try
      {
         Thread.sleep(1000);
         c1.interrupt();
         p1.interrupt();
         c2.interrupt();
//       p2.interrupt();
      }
      catch(InterruptedException e)
      {
         e.printStackTrace();
      }
   }
}

Einige Abläufe

ArrayBlockingQueue<Integer>(7)
Producer ist aktiver als beide Consumer
Consumer c1 = new Consumer(blockingQueue, "consumer", 100);
Producer p1 = new Producer(blockingQueue, "producer", 20);
Consumer c2 = new Consumer(blockingQueue, "consumer2", 100);

filled 1, size = 4
removed 1, size = 2
removed 1, size = 3
filled 1, size = 3
filled 1, size = 4
removed 1, size = 3
removed 1, size = 2
filled 1, size = 3
filled 1, size = 4
filled 1, size = 5
filled 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
removed 1, size = 5
removed 1, size = 4
filled 1, size = 5
filled 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
filled 1, size = 7
removed 1, size = 7
java.lang.InterruptedException
java.lang.InterruptedException: sleep interrupted
consumer end
java.lang.InterruptedException: sleep interrupted
producer end
consumer2 end

LinkedBlockingQueue<Integer>
Producer ist aktiver als beide Consumer
Consumer c1 = new Consumer(blockingQueue, "consumer", 100);
Producer p1 = new Producer(blockingQueue, "producer", 20);
Consumer c2 = new Consumer(blockingQueue, "consumer2", 100);

filled 1, size = 3
removed 1, size = 2
removed 1, size = 3
filled 1, size = 3
filled 1, size = 4
filled 1, size = 5
removed 1, size = 4
filled 1, size = 5
removed 1, size = 4
filled 1, size = 5
filled 1, size = 6
filled 1, size = 7
removed 1, size = 6
filled 1, size = 7
filled 1, size = 8
filled 1, size = 9
filled 1, size = 10
filled 1, size = 11
filled 1, size = 12
removed 1, size = 11
filled 1, size = 12
removed 1, size = 11
filled 1, size = 12
removed 1, size = 11
filled 1, size = 12
filled 1, size = 13
filled 1, size = 14
filled 1, size = 15
removed 1, size = 14
filled 1, size = 15
filled 1, size = 16
filled 1, size = 17
filled 1, size = 18
filled 1, size = 19
filled 1, size = 20
filled 1, size = 21
removed 1, size = 20
filled 1, size = 21
filled 1, size = 22
filled 1, size = 23
removed 1, size = 22
filled 1, size = 23
filled 1, size = 24
filled 1, size = 25
removed 1, size = 24
filled 1, size = 25
filled 1, size = 26
removed 1, size = 25
filled 1, size = 26
filled 1, size = 27
filled 1, size = 28
filled 1, size = 29
filled 1, size = 30
removed 1, size = 29
filled 1, size = 30
filled 1, size = 31
filled 1, size = 32
filled 1, size = 33
filled 1, size = 34
removed 1, size = 33
filled 1, size = 34
filled 1, size = 35
java.lang.InterruptedException: sleep interrupted
java.lang.InterruptedException: sleep interrupted
consumer2 end
producer end
java.lang.InterruptedException: sleep interrupted
consumer end

LinkedBlockingQueue<Integer>
Beide Consumer zusammen sind doppelt so aktiv wie der Producer
Consumer c1 = new Consumer(blockingQueue, "consumer", 50);
Producer p1 = new Producer(blockingQueue, "producer", 50);
Consumer c2 = new Consumer(blockingQueue, "consumer2", 50);

filled 1, size = 4
removed 1, size = 2
removed 1, size = 3
filled 1, size = 3
filled 1, size = 4
filled 1, size = 5
filled 1, size = 6
removed 1, size = 5
removed 1, size = 4
removed 1, size = 3
filled 1, size = 4
removed 1, size = 3
removed 1, size = 2
removed 1, size = 1
filled 1, size = 2
removed 1, size = 1
filled 1, size = 2
removed 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
filled 1, size = 1
removed 1, size = 0
java.lang.InterruptedException
java.lang.InterruptedException: sleep interrupted
consumer2 end
consumer end
java.lang.InterruptedException: sleep interrupted
producer end

Valid XHTML 1.0 Strict Back Back Up Home