Advanced Java Services | Senden und Empfangen mit einer Queue |
Einige Beispiele.
Das erste Codebeispiel zeigt einen Sender der zwei Nachrichten sendet. Die Nachrichten werden in der Queue solange aufbewahrt bis der Empfänger sie abholt (asynchrone Übertragung). Es gibt nur einen Kanal (Queue) und er wird nur in einer Richtung benutzt.
package hms; import java.util.Date; import java.util.Properties; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** man muß die gf-client.jar im originalverzeichnis lassen ! das liegt an den relativen verweisen, die in dieser jar sind ! */ public class JMS_1_Sender { public static void main(String[] args) { System.out.println("-------------------------------------------"); System.out.println("jms sender"); System.out.println("-------------------------------------------"); System.out.println("Begin " + new Date(System.currentTimeMillis()) + "\n"); try { // step 1 create initialcontext System.out.println("about to create initialcontext"); Properties env = new Properties(); env.put("org.omg.CORBA.ORBInitialHost","127.0.0.1"); // default ist localhost !! env.put("org.omg.CORBA.ORBInitialPort","3700"); // ist default env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory"); InitialContext ctx = new InitialContext(env); // NamingException // für glassfish reicht default-constructor //InitialContext ctx = new InitialContext(); // NamingException System.out.println("initialcontext received\n"); // step 2 lookup connection factory System.out.println("try to lookup QueueConnectionFactory"); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("jms/MyQueueConnectionFactory"); System.out.println("QueueConnectionFactory received: " + queueConnectionFactory.getClass()); Class[] interfaces = queueConnectionFactory.getClass().getInterfaces(); System.out.println("omplemented interfaces:"); for(Class face : interfaces) System.out.println(face); System.out.println(); if (queueConnectionFactory != null) { // step 3 use connection factory to create a JMS connection System.out.println("try to create a QueueConnection"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); // JMSException System.out.println("JMS QueueConnection created\n"); // step 4 use connection to create a session System.out.println("try to create a QueueSession"); QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // JMSException System.out.println("QueueSession created" + queueSession); System.out.println(); // step 5 lookup the Queue System.out.println("try to lookup the Queue"); Queue queue = (Queue) ctx.lookup("jms.MyQueue"); System.out.println("Queue received: " + queue); System.out.println(); queueConnection.start(); // JMSException System.out.println("queueConnection started"); System.out.println("create sender\n"); QueueSender queueSender = queueSession.createSender(queue); System.out.println("QueueSender created" + queueSender); String text2send = "mary had a little lamb"; TextMessage tm = queueSession.createTextMessage(text2send); queueSender.send(tm); System.out.println("Text:"); System.out.println(tm.getText()); System.out.println("gesendet um"); System.out.println(new Date(System.currentTimeMillis())); System.out.println(); queueSession.close(); queueConnection.stop(); queueConnection.close(); System.out.println("Session closed"); System.out.println("Connection closed"); System.out.println("Sender shutdown"); } } catch(NamingException | JMSException ex) { ex.printStackTrace(); } System.out.println("\nEnd " + new Date(System.currentTimeMillis())); } }
Der Verbindungsaufbau erfolgt wie beim Sender. Mit Hilfe der Session wird aber nun ein (Queue-)Receiver erzeugt. Der Receiver reagiert beim Finden einer Nachricht mit einem Nachrichtenereignis. Über die Methode setMessageListener geben wir einen Ereignisempfänger bekannt. Für jede gefundene Nachricht kann dann der Receiver die Methode onMessage aufrufen und dieser die gefundene Nachricht übergeben.
package hms; import java.util.Date; import java.util.Properties; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; public class JMS_1_Receiver { public static void main(String[] args) { System.out.println("-------------------------------------------"); System.out.println("receiver"); System.out.println("-------------------------------------------"); System.out.println("Begin " + new Date(System.currentTimeMillis()) + "\n"); try { // initialize JNDI // step 1 create initialcontext System.out.println("about to create initialcontext"); Properties env = new Properties(); env.put("org.omg.CORBA.ORBInitialHost", "192.168.77.77"); // default ist localhost !! env.put("org.omg.CORBA.ORBInitialPort", "3700"); // ist default env.put("java.naming.factory.initial", "com.sun.enterprise.naming.SerialInitContextFactory"); //InitialContext ctx = new InitialContext(env); // NamingException InitialContext ctx = new InitialContext(); // NamingException System.out.println("initialcontext received\n"); // step 2 lookup connection factory System.out.println("try to lookup QueueConnectionFactory"); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("jms/MyQueueConnectionFactory"); System.out.println("QueueConnectionFactory received\n"); // wenn nix da ist -> wartet endlos // step 3 use connection factory to create a JMS connection System.out.println("try to create a QueueConnection"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); // JMSException System.out.println("JMS QueueConnection created\n"); // step 4 use connection to create a session System.out.println("try to create a QueueSession"); QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println("QueueSession created\n"); // step 5 lookup the Queue System.out.println("try to lookup the Queue"); Queue queue = (Queue) ctx.lookup("jms.MyQueue"); System.out.println("Queue received\n"); queueConnection.start(); System.out.println("create QueueReceiver"); QueueReceiver queueReceiver = queueSession.createReceiver(queue); System.out.println("QueueReceiver created"); System.out.println("set MessageListener for receiver\n"); queueReceiver.setMessageListener(new MyMessageListener()); System.out.println("waiting for messages\n"); // blockiert bis eine message kommt queueSession.close(); queueConnection.stop(); queueConnection.close(); System.out.println("Session closed"); System.out.println("Connection closed"); System.out.println("Receiver shutdown"); } catch(NamingException | JMSException ex) { ex.printStackTrace(); } } private static class MyMessageListener implements MessageListener { // listener to receive messages public void onMessage(Message msg) { TextMessage tm = (TextMessage) msg; try { System.out.println( "onMessage (" + new Date(System.currentTimeMillis()) + "), received text :\n" + tm.getText() + "\n"); } catch(JMSException ex) { ex.printStackTrace(); } } } }
------------------------------------------- jms sender ------------------------------------------- Begin Fri Oct 09 18:51:09 CEST 2015 about to create initialcontext initialcontext received try to lookup QueueConnectionFactory Okt 09, 2015 6:51:15 PM org.hibernate.validator.internal.util.VersionINFO: HV000001: Hibernate Validator 5.0.0.Final Okt 09, 2015 6:51:15 PM com.sun.messaging.jms.ra.ResourceAdapter start INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter: Version: 5.1 (Build 9-b) Compile: July 29 2014 1229 Okt 09, 2015 6:51:15 PM com.sun.messaging.jms.ra.ResourceAdapter start INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter starting: broker is REMOTE, connection mode is TCP Okt 09, 2015 6:51:15 PM com.sun.messaging.jms.ra.ResourceAdapter start INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter Started:REMOTE QueueConnectionFactory received: class com.sun.messaging.jms.ra.ConnectionFactoryAdapter omplemented interfaces: interface javax.jms.ConnectionFactory interface javax.jms.QueueConnectionFactory interface javax.jms.TopicConnectionFactory interface javax.resource.Referenceable interface java.io.Serializable try to create a QueueConnection JMS QueueConnection created try to create a QueueSession QueueSession createdcom.sun.messaging.jms.ra.SessionAdapter@5586b try to lookup the Queue Queue received: Oracle GlassFish(tm) Server MQ Destination getName(): MyQueuePhys Class: com.sun.messaging.Queue getVERSION(): 3.0 isReadonly(): false getProperties(): {imqDestinationName=MyQueuePhys, imqDestinationDescription=A Description for the Destination Object} queueConnection started create sender QueueSender createdConnectionID=5717793383320320, SessionID=5717793383353600, ProducerID=5717793383377920, DestName=MyQueuePhys Text: mary had a little lamb gesendet um Fri Oct 09 18:51:16 CEST 2015 Session closed Connection closed Sender shutdown End Fri Oct 09 18:51:16 CEST 2015
------------------------------------------- receiver ------------------------------------------- Begin Fri Oct 09 18:55:21 CEST 2015 about to create initialcontext initialcontext received try to lookup QueueConnectionFactory Okt 09, 2015 6:55:25 PM org.hibernate.validator.internal.util.VersionINFO: HV000001: Hibernate Validator 5.0.0.Final Okt 09, 2015 6:55:25 PM com.sun.messaging.jms.ra.ResourceAdapter start INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter: Version: 5.1 (Build 9-b) Compile: July 29 2014 1229 Okt 09, 2015 6:55:25 PM com.sun.messaging.jms.ra.ResourceAdapter start INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter starting: broker is REMOTE, connection mode is TCP Okt 09, 2015 6:55:25 PM com.sun.messaging.jms.ra.ResourceAdapter start INFORMATION: MQJMSRA_RA1101: GlassFish MQ JMS Resource Adapter Started:REMOTE QueueConnectionFactory received try to create a QueueConnection JMS QueueConnection created try to create a QueueSession QueueSession created try to lookup the Queue Queue received create QueueReceiver QueueReceiver created set MessageListener for receiver waiting for messages onMessage (Fri Oct 09 18:55:26 CEST 2015), received text : mary had a little lamb Session closed Connection closed Receiver shutdown
Noch nicht uaf GlassFish 4 aktualisiert!
Das zweite Codebeispiel zeigt einen Sender der eine Nachricht sendet und auf dem gleichen Kanal auf eine Antwort wartet. Der Empfänger wartet auf Nachrichten. Erhält er eine Nachricht, so schickt er eine Antwort an den Sender. Sender und Empfänger sind getrennt. Es gibt nur einen Kanal (Queue) und er wird nur in beiden Richtungen benutzt. Der Sender teilt dem Empfänger mit, auf welchem Kanal er antworten kann. Dies geschieht mit der Methode setJMSReplyTo() der Klasse TextMessage. Der Empfänger kann dann mit Hilfe der Methode getJMSReplyTo() der selben Klasse den Kanal zum Antworten ermitteln (siehe die entsprechenden Codezeilen in Rot).
/*
beispiel 2
------------------------------
der sender
------------------------------
realisierung von P2P mit Queue
senden und empfangen über eine einzige Queue
sender kann antwort empfangen, empfänger kann antworten
falls der sender zuerst gestartet wird, dann empfängt er seine eigene nachricht
da es keinen empfänger gibt
bei diesem beispiel sollte also der empfänger zuerst gestartet werden
*/
import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.JMSException;
import javax.jms.QueueReceiver;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Date;
import java.util.Properties;
public class SendAndReceive implements MessageListener
{
private Queue queue;
private QueueSender sender;
private QueueReceiver receiver;
private QueueSession queueSession;
private QueueConnection queueConnection;
public SendAndReceive() throws JMSException, NamingException
{
setupPTP(); // lookup context, set PTP connection and session
setupSender();
setupReceiver();
}
public void setupPTP() throws JMSException, NamingException
{
// step 1 create initialcontext
System.out.println("about to create initialcontext");
Properties env = new Properties();
env.put("org.omg.CORBA.ORBInitialHost","192.168.45.2"); // default ist localhost !!
env.put("org.omg.CORBA.ORBInitialPort","3700"); // ist default
env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory");
InitialContext ctx = new InitialContext(env); // NamingException
System.out.println("initialcontext received\n");
// step 2 lookup connection factory
System.out.println("try to lookup QueueConnectionFactory");
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
System.out.println("QueueConnectionFactory received\n");
// step 3 use connection factory to create a JMS connection
System.out.println("try to create a QueueConnection");
queueConnection = queueConnectionFactory.createQueueConnection();
System.out.println("JMS QueueConnection created\n");
// step 4 use connection to create a session
System.out.println("try to create a QueueSession");
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("QueueSession created\n");
// step 5 lookup the Queue
System.out.println("try to lookup the Queue");
queue = (Queue) ctx.lookup("jms.Queue");
System.out.println("Queue received\n");
queueConnection.start();
}
public void setupSender() throws JMSException, NamingException
{
System.out.println("create sender");
sender = queueSession.createSender(queue);
}
public void setupReceiver() throws JMSException, NamingException
{
System.out.println("create receiver");
receiver = queueSession.createReceiver(queue);
System.out.println("set MessageListener for receiver\n");
receiver.setMessageListener(this);
}
public void sendMessage(String text2Send) throws JMSException, NamingException, InterruptedException
{
TextMessage tm = queueSession.createTextMessage(text2Send);
tm.setJMSReplyTo(queue);
sender.send(tm);
System.out.println("gesendet :\n" + tm.getText() + "\n");
}
public void onMessage(Message msg)
{
TextMessage tm = (TextMessage) msg;
try
{
System.out.println("onMessage, reply received : \n" + tm.getText());
}
catch(JMSException ex)
{
ex.printStackTrace();
}
}
// not in use
public void stop() throws JMSException
{
queueConnection.stop();
queueSession.close();
queueConnection.close();
}
public static void main(String args[]) throws Exception
{
System.out.println("--------------------------------------------------------");
System.out.println("starting the sender at " + new java.util.Date(System.currentTimeMillis()) );
System.out.println("--------------------------------------------------------");
SendAndReceive client = new SendAndReceive();
client.sendMessage("message: " + new java.util.Date(System.currentTimeMillis()) );
// client waits for reply
}
}
/*
beispiel 2
------------------------------
der empfänger
------------------------------
realisierung von P2P mit Queue
empfänger erhält eine nachricht und antwortet
*/
import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class ReceiveAndReply implements MessageListener
{
private Queue queue;
private QueueReceiver receiver;
private QueueSession queueSession;
private QueueConnection queueConnection;
public ReceiveAndReply() throws JMSException, NamingException
{
// lookup context, set PTP connection and session
setupPTP();
setupReceiver();
}
public void setupPTP() throws JMSException, NamingException
{
// step 1 create initialcontext
System.out.println("about to create initialcontext");
Properties env = new Properties();
env.put("org.omg.CORBA.ORBInitialHost","192.168.45.2"); // default ist localhost !!
env.put("org.omg.CORBA.ORBInitialPort","3700"); // ist default
env.put("java.naming.factory.initial","com.sun.enterprise.naming.SerialInitContextFactory");
InitialContext ctx = new InitialContext(env); // NamingException
System.out.println("initialcontext received\n");
// step 2 lookup connection factory
System.out.println("try to lookup QueueConnectionFactory");
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
System.out.println("QueueConnectionFactory received\n");
// step 3 use connection factory to create a JMS connection
System.out.println("try to create a QueueConnection");
queueConnection = queueConnectionFactory.createQueueConnection();
System.out.println("JMS QueueConnection created\n");
// step 4 use connection to create a session
System.out.println("try to create a QueueSession");
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("QueueSession created\n");
// step 5 lookup the Queue
System.out.println("try to lookup the Queue");
queue = (Queue) ctx.lookup("jms.Queue");
System.out.println("Queue received\n");
queueConnection.start();
}
public void setupReceiver() throws JMSException
{
// Set the async listener for queueForReceiving
System.out.println("create receiver");
receiver = queueSession.createReceiver(queue);
System.out.println("set MessageListener for receiver\n");
receiver.setMessageListener(this);
System.out.println("waiting for messages\n");
}
private void sendReply(String text, Queue dest) throws JMSException
{
if (dest!=null)
{
System.out.println("sendReply");
System.out.println(text);
QueueSender sender = queueSession.createSender(dest);
TextMessage tm = queueSession.createTextMessage(text);
sender.send(tm);
sender.close();
}
else
System.out.println("cannot reply, dest = null");
}
public void onMessage(Message msg)
{
TextMessage tm = (TextMessage) msg;
try
{
String mess = tm.getText();
System.out.println("onMessage, received text :\n" +mess + "\n");
Queue dest = (Queue) msg.getJMSReplyTo();
mess = "the reply : " + mess;
sendReply(mess , dest);
}
catch(JMSException ex)
{
ex.printStackTrace();
}
}
// not in use
public void stop() throws JMSException
{
queueConnection.stop();
queueSession.close();
queueConnection.close();
}
public static void main(String args[]) throws Exception
{
System.out.println("--------------------------------------------------------");
System.out.println("starting receiver at " + new java.util.Date(System.currentTimeMillis()) );
System.out.println("--------------------------------------------------------");
ReceiveAndReply client = new ReceiveAndReply();
// client waits
}
}
Die folgenden Screenshots zeigen einen möglichen Ablauf.
Empfänger wird gestartet.
Sender wird gestartet.
Empfänger erhält Nachricht und antwortet.
In diesem Fall ist es wichtig, den Empfänger zuerst zu starten. Andernfalls erhält der Sender seine eigene Nachricht sofort aus dem Nachrichtenkanal zurück, da er ja auch Empfänger ist.