Advanced   Java   Services ConcurrentLinkedQueue


ConcurrentLinkedQueue<E>

Aus dert API

An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.

This implementation employs an efficient "wait-free" algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.

This class and its iterator implement all of the optional methods of the Queue and Iterator interfaces.

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedQueue happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedQueue in another thread.

Die ebenfalls von Doug Lea entworfene Klasse verwendet keine Locks und ist deswegen sher performant.


Ein Producer-Consumer Beispiel

Consumer und Producer verwenden eine ConcurrentLinkedQueue gemeinsam. Ein Synchronisieren ist nicht notwendig. ConcurrentLinkedQueue stellt alle notwendigen Methoden bereit. Da die Queue nicht blockiert kann es bei einer leeren Queue zu einer NoSuchElementException kommen. Man sollte also beim Entnehmen entsprechend vorsichtig sein und lieber poll() verwenden. Die Methoden put() und take() exisatieren hier noch nicht, sie werden erst bon BloquingQueue eingeführt.


Der Producer

Auffüllen bedeutet das Einfügen einer 1 in die Queue (genauer new Integer(1)).

import java.util.Queue;
import java.util.concurrent.TimeUnit;

public class Producer extends Thread
{
   private Queue<Integer> queue;
   private int sleepTime;

   public Producer(Queue<Integer> queue, String name, int sleepTime)
   {
      this.queue = queue;
      this.setName(name);
      this.sleepTime = sleepTime;
   }

   public void run()
   {
      for(;;)
      {
         try
         {
            queue.offer(1); // auffüllen um 1  // put gibt es nicht
            System.out.println("filled 1, size = " + queue.size());
            TimeUnit.MILLISECONDS.sleep(1+(int)(Math.random()*sleepTime));
         }
         catch(InterruptedException ex)
         {
            System.out.println(ex);
            interrupt();
            System.out.println(this.getName() + " end");
            break;
         }
      }
   }
}

Der Consumer

Wir entnehmen nur, wenn die Queue nicht leer ist.

import java.util.Queue;
import java.util.concurrent.TimeUnit;

public class Consumer extends Thread
{
   private Queue<Integer> queue;
   private int sleepTime;

   public Consumer(Queue<Integer> queue, String name, int sleepTime)
   {
      this.queue = queue;
      this.setName(name);
      this.sleepTime = sleepTime;
   }

   public void run()
   {
      int value=0;
      for(;;)
      {
         try
         {
            if(!queue.isEmpty())
            {
               value = queue.poll(); // 1 abholen
               System.out.println("value = " + value);
               System.out.println("removed 1, size = " + queue.size());
            }
            else
               System.out.println("queue is empty");
            TimeUnit.MILLISECONDS.sleep(1+(int)(Math.random()*sleepTime));
         }
         catch(InterruptedException ex)
         {
            System.out.println(ex);
            interrupt();
            System.out.println(Thread.currentThread().getName() + " end");
            break;
         }
         catch(NullPointerException ex)
         {
            System.out.println(ex);
         }
      }
   }
}

main
private static void main()
{
   Queue<Integer> queue = new ConcurrentLinkedQueue<>();
   Producer prod = new Producer(queue, "prod", 50);
   Consumer cons = new Consumer(queue, "cons", 50);
   prod.start();
   cons.start();
   try
   {
      Thread.sleep(3000);
   }
   catch(InterruptedException ex)
   {
      // TODO Auto-generated catch block
      ex.printStackTrace();
   }
   prod.interrupt();
   cons.interrupt();

}

Consumervarianten

Der folgende Consumer fragt das Ergebnis von poll() ab. Man beachte, daß hier Integer verwendet wird und nicht int. Hier muß Autounboxing vermieden werden damit eine Abfrage value!=null möglich ist. Eine Verwendung von int und eine Abfrage value!=0 führt zu einer NullPointerException.

import java.util.Queue;
import java.util.concurrent.TimeUnit;

public class Consumer2 extends Thread
{
   private Queue<Integer> queue;
   private int sleepTime;

   public Consumer2(Queue<Integer> queue, String name, int sleepTime)
   {
      this.queue = queue;
      this.setName(name);
      this.sleepTime = sleepTime;
   }

   public void run()
   {
      Integer value=0;
      for(;;)
      {
         try
         {

            value = queue.poll(); // 1 abholen
            if(value!=null)
            {
               System.out.println("value = " + value);
               System.out.println("removed 1, size = " + queue.size());
            }
            else
               System.out.println("queue is empty");

            TimeUnit.MILLISECONDS.sleep(1+(int)(Math.random()*sleepTime));
         }
         catch(InterruptedException ex)
         {
            System.out.println(ex);
            interrupt();
            System.out.println(Thread.currentThread().getName() + " end");
            break;
         }
         catch(NullPointerException ex)
         {
            System.out.println(ex);
         }
      }
   }
}