Advanced   Java   Services Senden und Empfangen mit einer Queue

Einige Beispiele.


Beispiel 1 : Nachricht senden, Nachricht empfangen

Das erste Codebeispiel zeigt einen Sender der zwei Nachrichten sendet. Die Nachrichten werden in der Queue solange aufbewahrt bis der Empfänger sie abholt (asynchrone Übertragung). Es gibt nur einen Kanal (Queue) und er wird nur in einer Richtung benutzt.


Der Sender
package hms;

import java.util.Date;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
man muß die gf-client.jar im originalverzeichnis lassen !
das liegt an den relativen verweisen, die in dieser jar sind !
*/

public class JMS_1_Sender
{
   public static void main(String[] args)
   {
       System.out.println("-------------------------------------------");
       System.out.println("jms sender");
       System.out.println("-------------------------------------------");
       System.out.println("Begin " + new Date(System.currentTimeMillis()) + "\n");

      try
      {
         // step 1 create initialcontext
         System.out.println("about to create initialcontext");
         Properties env = new Properties();
         env.put("org.omg.CORBA.ORBInitialHost","127.0.0.1"); // default ist localhost !!
         env.put("org.omg.CORBA.ORBInitialPort","3700"); // ist default
         env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory");
         InitialContext ctx = new InitialContext(env); // NamingException
         // für glassfish reicht default-constructor
         //InitialContext ctx = new InitialContext(); // NamingException
         System.out.println("initialcontext received\n");

         // step 2 lookup connection factory
         System.out.println("try to lookup QueueConnectionFactory");
         QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("jms/MyQueueConnectionFactory");
         System.out.println("QueueConnectionFactory received: " + queueConnectionFactory.getClass());
         Class[] interfaces = queueConnectionFactory.getClass().getInterfaces();
         System.out.println("omplemented interfaces:");
         for(Class face : interfaces)
            System.out.println(face);
         System.out.println();

         if (queueConnectionFactory != null)
         {
            // step 3 use connection factory to create a JMS connection
            System.out.println("try to create a QueueConnection");
            QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); // JMSException
            System.out.println("JMS QueueConnection created\n");

            // step 4 use connection to create a session
            System.out.println("try to create a QueueSession");
            QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // JMSException
            System.out.println("QueueSession created" + queueSession);
            System.out.println();

            // step 5 lookup the Queue
            System.out.println("try to lookup the Queue");
            Queue queue = (Queue) ctx.lookup("jms.MyQueue");
            System.out.println("Queue received: " + queue);
            System.out.println();

            queueConnection.start(); // JMSException
            System.out.println("queueConnection started");

            System.out.println("create sender\n");
            QueueSender queueSender = queueSession.createSender(queue);
            System.out.println("QueueSender created" + queueSender);
            String text2send = "mary had a little lamb";
            TextMessage tm = queueSession.createTextMessage(text2send);
            queueSender.send(tm);
            System.out.println("Text:");
            System.out.println(tm.getText());
            System.out.println("gesendet um");
            System.out.println(new Date(System.currentTimeMillis()));
            System.out.println();

            queueSession.close();
            queueConnection.stop();
            queueConnection.close();

            System.out.println("Session closed");
            System.out.println("Connection closed");
            System.out.println("Sender shutdown");
         }
      }
      catch(NamingException | JMSException ex)
      {
         ex.printStackTrace();
      }
      System.out.println("\nEnd " + new Date(System.currentTimeMillis()));
   }
}

jms-sender.jpg


Der Empfänger

Der Verbindungsaufbau erfolgt wie beim Sender. Mit Hilfe der Session wird aber nun ein (Queue-)Receiver erzeugt. Der Receiver reagiert beim Finden einer Nachricht mit einem Nachrichtenereignis. Über die Methode setMessageListener geben wir einen Ereignisempfänger bekannt. Für jede gefundene Nachricht kann dann der Receiver die Methode onMessage aufrufen und dieser die gefundene Nachricht übergeben.

package hms;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JMS_1_Receiver
{
   public static void main(String[] args)
   {
      System.out.println("-------------------------------------------");
      System.out.println("receiver");
      System.out.println("-------------------------------------------");
      System.out.println("Begin " + new Date(System.currentTimeMillis()) + "\n");

      try
      {
         // initialize JNDI
         // step 1 create initialcontext
         System.out.println("about to create initialcontext");
         Properties env = new Properties();
         env.put("org.omg.CORBA.ORBInitialHost", "192.168.77.77"); // default ist localhost !!
         env.put("org.omg.CORBA.ORBInitialPort", "3700"); // ist default
         env.put("java.naming.factory.initial", "com.sun.enterprise.naming.SerialInitContextFactory");
         //InitialContext ctx = new InitialContext(env); // NamingException
         InitialContext ctx = new InitialContext(); // NamingException
         System.out.println("initialcontext received\n");

         // step 2 lookup connection factory
         System.out.println("try to lookup QueueConnectionFactory");
         QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("jms/MyQueueConnectionFactory");
         System.out.println("QueueConnectionFactory received\n");
         // wenn nix da ist -> wartet endlos

         // step 3 use connection factory to create a JMS connection
         System.out.println("try to create a QueueConnection");
         QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); // JMSException
         System.out.println("JMS QueueConnection created\n");

         // step 4 use connection to create a session
         System.out.println("try to create a QueueSession");
         QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
         System.out.println("QueueSession created\n");

         // step 5 lookup the Queue
         System.out.println("try to lookup the Queue");
         Queue queue = (Queue) ctx.lookup("jms.MyQueue");
         System.out.println("Queue received\n");

         queueConnection.start();

         System.out.println("create QueueReceiver");
         QueueReceiver queueReceiver = queueSession.createReceiver(queue);
         System.out.println("QueueReceiver created");
         System.out.println("set MessageListener for receiver\n");
         queueReceiver.setMessageListener(new MyMessageListener());
         System.out.println("waiting for messages\n");
         // blockiert bis eine message kommt

         queueSession.close();
         queueConnection.stop();
         queueConnection.close();
         System.out.println("Session closed");
         System.out.println("Connection closed");
         System.out.println("Receiver shutdown");

      }
      catch(NamingException | JMSException ex)
      {
         ex.printStackTrace();
      }
   }


   private static class MyMessageListener implements MessageListener
   {
      // listener to receive messages
      public void onMessage(Message msg)
      {
         TextMessage tm = (TextMessage) msg;
         try
         {
            System.out.println(
                  "onMessage (" + new Date(System.currentTimeMillis()) + "), received text :\n" + tm.getText() + "\n");
         }
         catch(JMSException ex)
         {
            ex.printStackTrace();
         }
      }
   }
}

jms-receiver.jpg


Ein möglicher Ablauf
-------------------------------------------
jms sender
-------------------------------------------
Begin Fri Oct 09 18:51:09 CEST 2015

about to create initialcontext
initialcontext received

try to lookup QueueConnectionFactory
Okt 09, 2015 6:51:15 PM org.hibernate.validator.internal.util.Version 
INFO: HV000001: Hibernate Validator 5.0.0.Final
Okt 09, 2015 6:51:15 PM com.sun.messaging.jms.ra.ResourceAdapter start
INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter: Version:  5.1  (Build 9-b) Compile:  July 29 2014 1229
Okt 09, 2015 6:51:15 PM com.sun.messaging.jms.ra.ResourceAdapter start
INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter starting: broker is REMOTE, connection mode is TCP
Okt 09, 2015 6:51:15 PM com.sun.messaging.jms.ra.ResourceAdapter start
INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter Started:REMOTE
QueueConnectionFactory received: class com.sun.messaging.jms.ra.ConnectionFactoryAdapter
omplemented interfaces:
interface javax.jms.ConnectionFactory
interface javax.jms.QueueConnectionFactory
interface javax.jms.TopicConnectionFactory
interface javax.resource.Referenceable
interface java.io.Serializable

try to create a QueueConnection
JMS QueueConnection created

try to create a QueueSession
QueueSession createdcom.sun.messaging.jms.ra.SessionAdapter@5586b

try to lookup the Queue
Queue received: Oracle GlassFish(tm) Server MQ Destination
getName():    MyQueuePhys
Class:      com.sun.messaging.Queue
getVERSION():   3.0
isReadonly():   false
getProperties():  {imqDestinationName=MyQueuePhys, imqDestinationDescription=A Description for the Destination Object}

queueConnection started
create sender

QueueSender createdConnectionID=5717793383320320, SessionID=5717793383353600, ProducerID=5717793383377920, DestName=MyQueuePhys
Text:
mary had a little lamb
gesendet um
Fri Oct 09 18:51:16 CEST 2015

Session closed
Connection closed
Sender shutdown

End Fri Oct 09 18:51:16 CEST 2015



-------------------------------------------
receiver
-------------------------------------------
Begin Fri Oct 09 18:55:21 CEST 2015

about to create initialcontext
initialcontext received

try to lookup QueueConnectionFactory
Okt 09, 2015 6:55:25 PM org.hibernate.validator.internal.util.Version 
INFO: HV000001: Hibernate Validator 5.0.0.Final
Okt 09, 2015 6:55:25 PM com.sun.messaging.jms.ra.ResourceAdapter start
INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter: Version:  5.1  (Build 9-b) Compile:  July 29 2014 1229
Okt 09, 2015 6:55:25 PM com.sun.messaging.jms.ra.ResourceAdapter start
INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter starting: broker is REMOTE, connection mode is TCP
Okt 09, 2015 6:55:25 PM com.sun.messaging.jms.ra.ResourceAdapter start
INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter Started:REMOTE
QueueConnectionFactory received

try to create a QueueConnection
JMS QueueConnection created

try to create a QueueSession
QueueSession created

try to lookup the Queue
Queue received

create QueueReceiver
QueueReceiver created
set MessageListener for receiver

waiting for messages

onMessage (Fri Oct 09 18:55:26 CEST 2015), received text :
mary had a little lamb

Session closed
Connection closed
Receiver shutdown

Beispiel 2 : Nachricht senden, Nachricht empfangen und Antworten

Noch nicht uaf GlassFish 4 aktualisiert!

Das zweite Codebeispiel zeigt einen Sender der eine Nachricht sendet und auf dem gleichen Kanal auf eine Antwort wartet. Der Empfänger wartet auf Nachrichten. Erhält er eine Nachricht, so schickt er eine Antwort an den Sender. Sender und Empfänger sind getrennt. Es gibt nur einen Kanal (Queue) und er wird nur in beiden Richtungen benutzt. Der Sender teilt dem Empfänger mit, auf welchem Kanal er antworten kann. Dies geschieht mit der Methode setJMSReplyTo() der Klasse TextMessage. Der Empfänger kann dann mit Hilfe der Methode getJMSReplyTo() der selben Klasse den Kanal zum Antworten ermitteln (siehe die entsprechenden Codezeilen in Rot).


Der Sender
/*
   beispiel 2
   ------------------------------
   der sender
   ------------------------------
   realisierung von P2P mit Queue
   senden und empfangen über eine einzige Queue
   sender kann antwort empfangen, empfänger kann antworten

   falls der sender zuerst gestartet wird, dann empfängt er seine eigene nachricht
   da es keinen empfänger gibt

   bei diesem beispiel sollte also der empfänger zuerst gestartet werden
*/

import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.JMSException;
import javax.jms.QueueReceiver;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

import javax.naming.InitialContext;
import javax.naming.NamingException;

import java.util.Date;
import java.util.Properties;


public class SendAndReceive implements MessageListener
{
   private Queue queue;
   private QueueSender sender;
   private QueueReceiver receiver;
   private QueueSession queueSession;
   private QueueConnection queueConnection;

   public SendAndReceive() throws JMSException, NamingException
   {
      setupPTP();  // lookup context, set PTP connection and session
      setupSender();
      setupReceiver();
   }

   public void setupPTP() throws JMSException, NamingException
   {
      // step 1 create initialcontext
      System.out.println("about to create initialcontext");
      Properties env = new Properties();
      env.put("org.omg.CORBA.ORBInitialHost","192.168.45.2");  // default ist localhost !!
      env.put("org.omg.CORBA.ORBInitialPort","3700");  // ist default
      env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory");
      InitialContext ctx = new InitialContext(env);   // NamingException
      System.out.println("initialcontext received\n");

      // step 2 lookup connection factory
      System.out.println("try to lookup QueueConnectionFactory");
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
      System.out.println("QueueConnectionFactory received\n");

      // step 3 use connection factory to create a JMS connection
      System.out.println("try to create a QueueConnection");
      queueConnection = queueConnectionFactory.createQueueConnection();
      System.out.println("JMS QueueConnection created\n");

      // step 4 use connection to create a session
      System.out.println("try to create a QueueSession");
      queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("QueueSession created\n");

      // step 5 lookup the Queue
      System.out.println("try to lookup the Queue");
      queue = (Queue) ctx.lookup("jms.Queue");
      System.out.println("Queue received\n");

      queueConnection.start();
   }

   public void setupSender() throws JMSException, NamingException
   {
      System.out.println("create sender");
      sender = queueSession.createSender(queue);
   }

   public void setupReceiver() throws JMSException, NamingException
   {
      System.out.println("create receiver");
      receiver = queueSession.createReceiver(queue);
      System.out.println("set MessageListener for receiver\n");
      receiver.setMessageListener(this);
   }

   public void sendMessage(String text2Send) throws JMSException, NamingException, InterruptedException
   {
      TextMessage tm = queueSession.createTextMessage(text2Send);
      tm.setJMSReplyTo(queue);
      sender.send(tm);
      System.out.println("gesendet :\n" + tm.getText() + "\n");
   }

   public void onMessage(Message msg)
   {
      TextMessage tm = (TextMessage) msg;
      try
      {
         System.out.println("onMessage, reply received : \n" + tm.getText());
      }
      catch(JMSException ex)
      {
         ex.printStackTrace();
      }
   }

   // not in use
   public void stop() throws JMSException
   {
      queueConnection.stop();
      queueSession.close();
      queueConnection.close();
   }

   public static void main(String args[]) throws Exception
   {
      System.out.println("--------------------------------------------------------");
      System.out.println("starting the sender at " + new java.util.Date(System.currentTimeMillis()) );
      System.out.println("--------------------------------------------------------");
      SendAndReceive client = new SendAndReceive();
      client.sendMessage("message: " + new java.util.Date(System.currentTimeMillis()) );
      // client waits for reply
   }
}

Der Empfänger
/*
   beispiel 2
   ------------------------------
   der empfänger
   ------------------------------
   realisierung von P2P mit Queue
   empfänger erhält eine nachricht und antwortet
*/
import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

import javax.naming.InitialContext;
import javax.naming.NamingException;

import java.util.Properties;


public class ReceiveAndReply implements MessageListener
{
   private Queue queue;
   private QueueReceiver receiver;
   private QueueSession queueSession;
   private QueueConnection queueConnection;

   public ReceiveAndReply() throws JMSException, NamingException
   {
      // lookup context, set PTP connection and session
      setupPTP();
      setupReceiver();
   }

   public void setupPTP() throws JMSException, NamingException
   {
      // step 1 create initialcontext
      System.out.println("about to create initialcontext");
      Properties env = new Properties();
      env.put("org.omg.CORBA.ORBInitialHost","192.168.45.2");  // default ist localhost !!
      env.put("org.omg.CORBA.ORBInitialPort","3700");  // ist default
      env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory");
      InitialContext ctx = new InitialContext(env);   // NamingException
      System.out.println("initialcontext received\n");

      // step 2 lookup connection factory
      System.out.println("try to lookup QueueConnectionFactory");
      QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
      System.out.println("QueueConnectionFactory received\n");

      // step 3 use connection factory to create a JMS connection
      System.out.println("try to create a QueueConnection");
      queueConnection = queueConnectionFactory.createQueueConnection();
      System.out.println("JMS QueueConnection created\n");

      // step 4 use connection to create a session
      System.out.println("try to create a QueueSession");
      queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("QueueSession created\n");

      // step 5 lookup the Queue
      System.out.println("try to lookup the Queue");
      queue = (Queue) ctx.lookup("jms.Queue");
      System.out.println("Queue received\n");

      queueConnection.start();
   }

   public void setupReceiver() throws JMSException
   {
      // Set the async listener for queueForReceiving
      System.out.println("create receiver");
      receiver = queueSession.createReceiver(queue);
      System.out.println("set MessageListener for receiver\n");
      receiver.setMessageListener(this);
      System.out.println("waiting for messages\n");
   }

   private void sendReply(String text, Queue dest) throws JMSException
   {
      if (dest!=null)
      {
         System.out.println("sendReply");
         System.out.println(text);
         QueueSender sender = queueSession.createSender(dest);
         TextMessage tm = queueSession.createTextMessage(text);
         sender.send(tm);
         sender.close();
      }
      else
         System.out.println("cannot reply, dest = null");
   }

   public void onMessage(Message msg)
   {
      TextMessage tm = (TextMessage) msg;
      try
      {
         String mess = tm.getText();
         System.out.println("onMessage, received text :\n" +mess + "\n");
         Queue dest = (Queue) msg.getJMSReplyTo();
         mess = "the reply : " + mess;
         sendReply(mess , dest);
      }
      catch(JMSException ex)
      {
         ex.printStackTrace();
      }
   }

   // not in use
   public void stop() throws JMSException
   {
      queueConnection.stop();
      queueSession.close();
      queueConnection.close();
   }

   public static void main(String args[]) throws Exception
   {
      System.out.println("--------------------------------------------------------");
      System.out.println("starting receiver at " + new java.util.Date(System.currentTimeMillis()) );
      System.out.println("--------------------------------------------------------");
      ReceiveAndReply client = new ReceiveAndReply();
      // client waits
   }
}

Ein möglicher Ablauf

Die folgenden Screenshots zeigen einen möglichen Ablauf.

Empfänger wird gestartet.

Sender wird gestartet.

Empfänger erhält Nachricht und antwortet.

In diesem Fall ist es wichtig, den Empfänger zuerst zu starten. Andernfalls erhält der Sender seine eigene Nachricht sofort aus dem Nachrichtenkanal zurück, da er ja auch Empfänger ist.