Advanced   Java   Services Executorhierarchie und die FactoryKlasse Executors Back Next Up Home

Einführung

Das in Java 1.5 eingeführte concurrent-Package soll die Arbeit mit Threads erleichtern und Klassen und Methoden für bestimmte Standardsituationen bereitstellen. Es wurde zum überwiegenden Teil von Doug Lea entworfen. Wichtige Mitarbeiter von Doug Lea waren Joshua Bloch und Brian Goetz. Die Exekutoren, also die Realisierungen des Interfaces ExecutorService verwalten und starten Threads selbständig, sobald man ihnen Objekte vom Typ Runnable oder Callable übergibt. Diese können in praktisch beliebiger Anzahl einem Exekutor übergeben werden.


Die Executorhierarchie

executor-hierarchie.jpg


Das Interface Executor

Das Interface führt lediglich die Methode execute() ein.

void   execute(Runnable r)

Startet einen Thread mit einem Runnable. Für mehrere braucht man eine Schleife. Wie die Threads gestartet werden bestimmt die Implementierung des Executors. Es gibt keine direkten Rückmeldungen von diesem Thread, da die run()-Methode nichts zurückgibt.


Das Interface ExecutorService

Dieses Interface führt die zentralen Methoden zum Starten und Beenden von Runnables und eben auch Callables ein. Ein Callable-Objekt ist eine Realisierung des Interfaces Callable<T>. Die in diesem Interface vereinbarte Methode hat die Signatur T call() throws Exception. Im Gegensatz zu run() hat sie einen Returnwert und hat eine Exception in der Signatur.


Future<?>            submit(Runnable r)
<T>  Future<T>   submit(Callable<T> task)

Startet einen Thread mit einem Runnable oder euinem Callable. Für mehrere braucht man eine Schleife. Wie die Threads gestartet werden bestimmt die Implementierung des Executors. Gibt ein Futureobjekt zurück. Über dieses Futureobjekt erhält man Informationen über den Thread. So gibt es eine Methode isDone(). Diese gibt true zurück falls der Thread beendet ist. Die Methode get() liefert für Runnable null falls der Thread normal beendet worden ist. Für ein Callable liefert get() ein Objekt des Returntyps der Methode call(). Im Gegensatz zu isDone() wartet get() auf das Ende, blockiert also den abfragenden Thread.


<T> List<Future<T>>   invokeAll(Collection<? extends Callable<T>> tasks)
<T> List<Future<T>>   invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

Startet alle mit der Collection übergebenen Callables. Wie die Threads gestartet werden bestimmt die Implementierung des Executors. Gibt ein Futureobjekt zurück. Returnwert ist nun eine Liste von Futureobjekten <T> List<Future<T>>. Die Methode invokeAll blockiert immer, egal ob man die Futureliste auswertet oder nicht. Sie wartet immer bis alle Threads beendet sind. Mit der jeweiligen get()-Methode kann man die Ergebnisse abholen. Falls ein Callable durch eine Exception beendet wird so gibt es keine direkte Möglichkeit dies festzustellen.


<T>T   invokeAny(Collection<? extends Callable<T>> tasks)
<T>T   invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

Startet alle mit der Collection übergebenen Callables. Wie die Threads gestartet werden bestimmt die Implementierung des Executors. Sobald ein Task vollendet ist werden die anderen abgebrochen, was eine gewisse Zeit dauern kann. In dieser köönnen vielleicht noch andere Callables beendet werden. Zurückgegeben wird der Returnwert der ersten call()-Methode, die beendet worden ist. Hierzu ist in diesem Fall kein Futureobjekt notwendig. Man beachte den daher unterschiedlichen Returntyp zu invokeAll.

boolean   isTerminated()
boolean   awaitTermination(long timeout, TimeUnit unit)

isTerminated() liefert den Status ohne zu blockieren, awaitTermination() blockiert bis zum TimeOut.

void   shutdown()
List<Runnable>   shutdownNow()

shutdown() initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. shutdownNow() attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.


ThreadPoolExecutor vs. ScheduledThreadPoolExecutor

Wie man aus der eingangs gezeigten Graphik sieht gibt es abgesehen von ForkJoinPool, der für spezielle Aufgaben zuständig ist, nur zwei Arten von Executorservices: Die einen führen einmalig die übergebene Runnables oder Callables sofort aus, die anderen erst verzögert nach einer übergebenen Zeitspanne und dann eventuell wiederholt. Für die erste Sorte gibt es die abstrakte Klasse AbstractExecutorservice, die bereits einige der Methoden aus dem Interface implementiert und die dann über die Klasse ThreadPoolExecutor realisiert wird. Für die andere gibt es das Interface ScheduledExecutorService, das vier schedule()-Methoden für das Zeithandling vereinbart und über die Klasse ScheduledThreadPoolExecutor realisiert wird.


Die Factoryklasse Executors

Die Factoryklasse Executors stellt statische Methoden bereit die mit einfachen Aufrufen fertig konfigurierte ExecutorServices liefert. Zurückgegeben wird immer der Typ des Interfaces ExecutorService bzw. ScheduledExecutorService. In den meisten Fällen liefern die Methoden keine eigenen Implementierungen der Interfaces sondern vorkonfigurierte Exekutoren vom Typ ThreadPoolExecutor bzw. ScheduledThreadPoolExecutor.

Methode
Executors.newSingleThreadExecutor()
Executors.newFixedThreadPool(int nThreads)
Executors.newCachedThreadPool()
Executors.newSingleThreadScheduledExecutor()
Executors.newScheduledThreadPool(int corePoolSize)()

Da insbesondere die Klasse ThreadPoolExecutor sehr aufwendige Konstruktoren besitzt - jedem Konstruktor müssen mindesten 5 Parameter übergeben werden, ist es angenehem, daß es die Factoryklasse Executors gibt, die uns fertige ExecutorServices "out of the box" liefert.


Schematischer Aufbau und Funktionsweise eines Exekutors

java-executor-aufbau.jpg

Ein ExecutorService erhält ein oder mehrere Runnables bzw. Callables. Diese werden in einer internen Warteschlange vom Typ des Interfaces BlockingQueue<Runnable> gespeichert (auch die Hierarchie der Queue-Klassen wurde übrigens von Doug Lea entworfen.). Zum Erzeugen der Threads bedient sich ein Exekutor einer internen ThreadFactory. ThreadFactory ist ein nichtgenerischeas Interface mit der einzigen Methode Thread  newThread(Runnable r). Eine ThreadFactory macht aus einem Runnable mit Hilfe dieser Methode ein Javaobjekt vom Typ Thread. Diese Threads werden vom Exekutor in einem internen ThreadPool verwaltet. Ein wesentlicher Unterschied zum händischen Starten von Threads ist, daß Exekutoren Threads wiederverwenden können. Zudem warten sie immer auf weitere Tasks und laufen damit selbst in einem quasi endlosen Thread und müssen daher explizit mit der Methode shutDown() bzw. shutDownNow() beendet werden. Die Ergebnisse, die die Threads liefern können über die übergebenen Runnables abgeholt werden oder im Falle von Callables in einem Futureobjekt liegen. Das Interface Future vereinbart Methoden zum Abholen der Ergebnisse bzw. zum Anfragen ob diese bereits vorliegen, siehe dazu den Absatz Die Callable-Future Hierarchie weiter unten.


Die Default-ThreadFactory

Allex Exekutoren, denen keine ThreadFactory übergeben wird, arbeiten mit der DefaultThreadFactory, einer packageprivaten statischen inneren Klasse der FactoryKlasse Executors, die man mit der Methode ThreadFactory  defaultThreadFactory() erhält.

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

Dem Quellcode entnimmt man, daß diese Factory keine Dämonen zuläßt und daß alle Threads mit derselben mittleren Priorität arbeiten.


Callables in der BlockingQueue<Runnable> ??

Der weiter oben gezeigte schematische Aufbau eines Executors läßt darauf schließen, daß auch Callables in die BlockingQueue<Runnable> aufgenommen werden. Wie ist das möglich? Dazu werfen wir einen Blick in den Quellcode der Methode <T>  Future<T>  submit(Callable<T> task) die bereits in der Klasse AbstractExecutorService realisiert ist.

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

Diese Methode stützt sich offenbar auf newTaskFor(task).

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

FutureTask<T> ist eine ebenfalls in 1.5 eingeführte Klasse, die ein Callable in ein Runnable verwandeln kann, da Sie selbst das Interface Runnable implementiert. Schaut man sich die anderen Methoden an, die mit Callables arbeiten, so sieht man, daß in allen die Klasse FutureTask eingesetzt wird. Callable ist also eine Art Fassade, letzlich wird ein Thread immer über die Methode run() gestartet. Hier die zugehörige Hierarchie:


Die Callable-Future Hierarchie

java-runnable-future-hierarchie.jpg

Ein Blick in den Quellcode der Klasse FutureTask zeigt: Der Konstruktor vonFutureTask legt das übergeben Callable im Datenteil ab und ruft dann in seinem eigenen run() dazu die Methode call(). Der Returnwert von call() wird wieder im Datenteil gespeichert. Erst mit der Methode get() aus Future erhält man dann das Ergebnis von call(), daher blockiert diese Methode, da sie auf das Ergebnis von call() warten muß. Da FutureTask ein Runnable ist kann es in die BlockingQueue aufgenommen werden.

Interessant ist, daß das Interface RunnableFuture erst in Java 1.6 eingeführt wurde, vermutlich deswegen um den Swingworker nicht von FutureTask ableiten zu müssen.

Natürlich kann man FutureTask auch dazu verwenden, eigene Callables über einen Thread zu starten. Hier ein Beispiel.


Mit Hilfe von FutureTask ein Callable mit der Klasse Thread starten
private static void startCallableWithThread()
{
  Callable<String> call = new Callable(){
    @Override
    public String call() throws Exception
    {
      StringBuilder sb = new StringBuilder();
      for(int i=0; i<10; i++)
      {
        char ch = (char) ('a' + (int)(Math.random()*26));
        sb.append(ch);
        TimeUnit.MILLISECONDS.sleep(200);
      }
      return sb.toString();
    }};
    // end anonymous class

  FutureTask<String> ft = new FutureTask<>(call); // blockiert (noch) nicht
  Thread th = new Thread(ft);
  th.start();
  try
  {
    System.out.println(ft.get());  // blockiert
  }
  catch(InterruptedException | ExecutionException ex)
  {
    ex.printStackTrace();
  }
}






















Valid XHTML 1.0 Strict top Back Next Up Home