标签:
首先创建一个maven工程,在pom文件中增加相关的依赖包,如下:
<dependency> <groupId>javax.jms</groupId> <artifactId>jms-api</artifactId> <version>1.1-rev-1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
创建测试类:
发送消息类:
SendMessage
package com.jason.testmq;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SendMessage {
private static final String url = "tcp://localhost:61616";
private static final String QUEUE_NAME = "choice.queue";
//private String expectedBody = "<hello>world!two</hello>";
//private String expectedBody = "stop";
public void sendMessage() throws JMSException {
Connection connection = null;
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = (Connection) connectionFactory.createConnection();
connection.start();
Session session = (Session) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
// TextMessage message = session.createTextMessage(expectedBody);
// message.setStringProperty("headname", "remoteB");
JmsTestMessage testMessage = new JmsTestMessage();
testMessage.setId("1234567");
testMessage.setMsg("stop");
testMessage.setStatus(1);
ObjectMessage message = session.createObjectMessage(testMessage);
producer.send(message);
producer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
SendMessage sndMsg = new SendMessage();
try {
sndMsg.sendMessage();
} catch (Exception ex) {
System.out.println(ex.toString());
}
}
}
接收消息类:
ReceiveMessage
/**
*
*/
package com.jason.testmq;
import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ReceiveMessage {
private static final String url = "tcp://localhost:61616";
private static final String QUEUE_NAME = "choice.queue";
public void receiveMessage() {
Connection connection = null;
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection, session, consumer);
} catch (Exception e) {
System.out.println(e.toString());
}
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException {
for (int i = 0; i < 1;) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message) {
// try {
// if (message instanceof TextMessage) {
// TextMessage txtMsg = (TextMessage) message;
// String msg = txtMsg.getText();
// System.out.println("Received: " + msg);
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage)message;
Serializable obj = objMsg.getObject();
if (obj instanceof JmsTestMessage) {
JmsTestMessage testMessage = (JmsTestMessage)obj;
System.out.println("Received new msg id is " + testMessage.getId() + ",msg is " + testMessage.getMsg() + ",status is " + testMessage.getStatus());
} else {
System.out.println("it is not JmsTestMessage");
}
} else {
System.out.println("other type message with type is " + message.getJMSType());
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
以注册监听的方式接收消息
ReceiveMessageWithListener
/**
*
*/
package com.jason.testmq;
import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ReceiveMessageWithListener implements MessageListener {
private static final String url = "tcp://localhost:61616";
private static final String QUEUE_NAME = "choice.queue";
private boolean stop = false;
public void receiveMessage() {
(new Thread(new ReceiveMessageRunnable())).start();
}
public void onMessage(Message message) {
// try {
// if (message instanceof TextMessage) {
// TextMessage txtMsg = (TextMessage) message;
// String msg = txtMsg.getText();
// System.out.println("Received: " + msg);
// if ("stop".equals(msg)) {
// this.stop = true;
// }
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage)message;
Serializable obj = objMsg.getObject();
if (obj instanceof JmsTestMessage) {
JmsTestMessage testMessage = (JmsTestMessage)obj;
System.out.println("Received new msg id is " + testMessage.getId() + ",msg is " + testMessage.getMsg() + ",status is " + testMessage.getStatus());
if ("stop".equals(testMessage.getMsg())) {
this.stop = true;
}
} else {
System.out.println("it is not JmsTestMessage");
}
} else {
System.out.println("other type message with type is " + message.getJMSType());
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
ReceiveMessageWithListener rm = new ReceiveMessageWithListener();
rm.receiveMessage();
}
private class ReceiveMessageRunnable implements Runnable {
public void run() {
Connection connection = null;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
try {
connection = connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(ReceiveMessageWithListener.this);
connection.start();
while (!ReceiveMessageWithListener.this.stop) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
在队列中传递的消息类
JmsTestMessage
/**
*
*/
package com.jason.testmq;
import java.io.Serializable;
/**
* @author jasonzhang
*
*/
public class JmsTestMessage implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String msg;
private int status;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
}
参考链接:
标签:
原文地址:http://my.oschina.net/u/914897/blog/420152