Advanced   Java   Services Senden und Empfangen mit zwei Queues


Beispiel 3 : Senden, Empfangen und Antworten mit zwei Queues

Noch nicht auf GlassFish 4 aktualisiert!

Das Beispiel zeigt einen Sender der fünf Nachrichten sendet und fünf Antworten erhält. Für jede Übertragungsrichtung gibt es eine eigene Queue.


Der Sender
/*
   beispiel 3
   ------------------------------
   der sender
   ------------------------------
   realisierung von P2P mit Queue
   es gibt zwei queues, eine zum senden und eine zum erhalten von antworten
   sender sendet fünf nachrichten im abstand einer sekunde und erhält fünf antworten
*/

import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.JMSException;
import javax.jms.QueueReceiver;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

import javax.naming.InitialContext;
import javax.naming.NamingException;

import java.util.Date;
import java.util.Properties;

public class SenderClient implements MessageListener
{
   private QueueSender sender;
   private Queue queueForReply;
   private Queue queueForSending;
   private QueueReceiver receiver;
   private QueueSession queueSession;
   private QueueConnection queueConnection;

   public SenderClient() throws JMSException, NamingException
   {
      setupPTP();  // lookup context, set PTP connection and session
      setupSender();
      setupReceiver();
   }

   public void setupPTP() throws JMSException, NamingException
   {
      // step 1 create initialcontext
      System.out.println("about to create initialcontext");
      Properties env = new Properties();
      env.put("org.omg.CORBA.ORBInitialHost","192.168.45.2");  // default ist localhost !!
      env.put("org.omg.CORBA.ORBInitialPort","3700");  // ist default
      env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory");
      InitialContext ctx = new InitialContext(env);   // NamingException
      System.out.println("initialcontext received\n");

      // step 2 lookup connection factory
      System.out.println("try to lookup QueueConnectionFactory");
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
      System.out.println("QueueConnectionFactory received\n");

      // step 3 use connection factory to create a JMS connection
      System.out.println("try to create a QueueConnection");
      queueConnection = queueConnectionFactory.createQueueConnection();
      System.out.println("JMS QueueConnection created\n");

      // step 4 use connection to create a session
      System.out.println("try to create a QueueSession");
      queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("QueueSession created\n");

      // step 5 lookup the Queue for sending
      System.out.println("try to lookup the Queue for sending");
      queueForSending = (Queue) ctx.lookup("jms.Queue");
      System.out.println("Queue received\n");

      // step 5 lookup the Queue for reply
      System.out.println("try to lookup the Queue for the reply");
      queueForReply = (Queue) ctx.lookup("jms.Queue2");
      System.out.println("Queue received\n");

      queueConnection.start();
   }

   public void setupSender() throws JMSException
   {
      System.out.println("create sender");
      sender = queueSession.createSender(queueForSending);
   }

   public void setupReceiver() throws JMSException
   {
      System.out.println("create receiver");
      receiver = queueSession.createReceiver(queueForReply);
      System.out.println("set MessageListener for reply\n");
      receiver.setMessageListener(this);
      System.out.println("waiting for messages\n");
   }

   public void sendMessage(String text2Send) throws JMSException
   {
      TextMessage tm = queueSession.createTextMessage(text2Send);
      tm.setJMSReplyTo(queueForReply);
      sender.send(tm);
      System.out.println("gesendet: " + tm.getText() + "\n");
   }

   // not in use
   public void stop() throws JMSException
   {
      queueConnection.stop();
      queueSession.close();
      queueConnection.close();
   }

   public void onMessage(Message msg)
   {
      TextMessage tm = (TextMessage) msg;
      try
      {
         String mess = tm.getText();
         System.out.println("onMessage, reply received :\n" + mess + "\n");
      }
      catch(JMSException ex)
      {
         ex.printStackTrace();
      }
   }

   public static void main(String args[]) throws Exception
   {
      System.out.println("starting the sender at " + new java.util.Date(System.currentTimeMillis()) + "\n");
      SenderClient client = new SenderClient();

      for(int i=0; i6lt;5; i++)
      {
         client.sendMessage("" + new java.util.Date(System.currentTimeMillis()) );
         beIdleFor(1000);
      }
   }

   public static void beIdleFor(int millis)
   {
      try { Thread.sleep(millis); } catch(InterruptedException ex) {}
   }
}

Der Empfänger

Der Verbindungsaufbau erfolgt wie beim Sender. Mit Hilfe der Session wird aber nun ein (Queue-)Receiver erzeugt. Der Receiver reagiert beim Finden einer Nachricht mit einem Nachrichtenereignis. Über die Methode setMessageListener geben wir einen Ereignisempfänger bekannt. Für jede gefundene Nachricht kann dann der Receiver die Methode onMessage aufrufen und dieser die gefundene Nachricht übergeben.

/*
   beispiel 2
   ------------------------------
   der empfänger
   ------------------------------
   realisierung von P2P mit Queue
   empfänger erhält 5 nachrichten und gibt eine antwort
*/

import java.util.Date;
import java.util.Properties;

import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.JMSException;
import javax.jms.QueueReceiver;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ReceiverClient implements MessageListener
{
   private QueueSender sender;
   private Queue queueForReply;
   private QueueReceiver receiver;
   private Queue queueForReceiving;
   private QueueSession queueSession;
   private QueueConnection queueConnection;

   public ReceiverClient() throws JMSException, NamingException
   {
      setupPTP();
      setupReceiver();
   }

   public void setupPTP() throws JMSException, NamingException
   {
      // step 1 create initialcontext
      System.out.println("about to create initialcontext");
      Properties env = new Properties();
      env.put("org.omg.CORBA.ORBInitialHost","192.168.45.2");  // default ist localhost !!
      env.put("org.omg.CORBA.ORBInitialPort","3700");  // ist default
      env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory");
      InitialContext ctx = new InitialContext(env);   // NamingException
      System.out.println("initialcontext received\n");

      // step 2 lookup connection factory
      System.out.println("try to lookup QueueConnectionFactory");
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
      System.out.println("QueueConnectionFactory received\n");

      // step 3 use connection factory to create a JMS connection
      System.out.println("try to create a QueueConnection");
      queueConnection = queueConnectionFactory.createQueueConnection();
      System.out.println("JMS QueueConnection created\n");

      // step 4 use connection to create a session
      System.out.println("try to create a QueueSession");
      queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("QueueSession created\n");

      // step 5 lookup the Queue for receiving
      System.out.println("try to lookup the Queue");
      queueForReceiving = (Queue) ctx.lookup("jms.Queue");
      System.out.println("Queue received\n");

      queueConnection.start();
   }

   public void setupReceiver() throws JMSException, NamingException
   {
      // Set the async listener for queueForReceiving
      System.out.println("create receiver");
      receiver = queueSession.createReceiver(queueForReceiving);
      System.out.println("set MessageListener for receiver\n");
      receiver.setMessageListener(this);
      System.out.println("waiting for messages\n");
   }

   private void sendReply(String text, Queue dest) throws JMSException
   {
      if (dest!=null)
      {
         //System.out.println("sendReply, this=" + hashCode() + ", dest="+dest);
         QueueSender sender = queueSession.createSender(dest);
         TextMessage tm = queueSession.createTextMessage(text);
         sender.send(tm);
         sender.close();
      }
      else
         System.out.println("cannot reply, dest = null");
   }

   public void stop() throws JMSException
   {
      queueConnection.stop();
      queueSession.close();
      queueConnection.close();
   }

   public void onMessage(Message msg)
   {
      TextMessage tm = (TextMessage) msg;
      try
      {
         String mess = tm.getText();
         System.out.println("onMessage, received text :\n" + mess);
         Queue dest = (Queue) msg.getJMSReplyTo();
         mess = "receiver replies: " + mess;
         sendReply(mess, dest);
         System.out.println("onMessage, reply send\n");
      }
      catch(JMSException ex)
      {
         ex.printStackTrace();
      }
   }

   public static void main(String args[]) throws Exception
   {
      System.out.println("starting receiver at " + new java.util.Date(System.currentTimeMillis()) + "\n");
      ReceiverClient client = new ReceiverClient();
   }
}

Ein möglicher Ablauf

Die folgenden Screenshots zeigen einen möglichen Ablauf.

Wir starten diesmal den Sender zuerst.

Empfänger wird gestartet.

Sender erhält Antworten.