JMS规范定义了2种消息传输模式:持久传送模式和非持久传输模式。发送者可以通过如下类似的代码进行设置
TopicPublisher publihser = session.createPublisher(topic); // 设置持久化传输 publihser.setDeliveryMode(DeliveryMode.PERSISTENT);这种方式对publisher发送的所有消息都有效,相当于是一个全局的效果。如果只是想设置某一个消息的传输模式,可以通过以下代码设置消息头的属性来实现
TextMessage message = session.createTextMessage(text); message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
使用传输模式是一件很容易的事,直接调用API就可以了。那什么是传输模式呢?传输模式是用来控制消息属性的,DeliveryMode.PERSISTENT代表这是持久消息,DeliveryMode.NON_PERSISTENT代表是非持久消息。个人觉得传输模式和消息持久化是同一个概念,只不过是不同的叫法而已。
package mq.aty.persistentmsg;
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import mq.aty.JmsUtils;
/**
* 直接运行该程序和activeMQ,不运行任何的消费者,然后观察持久化介质(我们使用了数据库)
*
*/
public class NoReceiverTest
{
private static TopicConnection connection = null;
private static Topic topic = null;
public static void main(String[] args) throws Exception
{
connection = JmsUtils.getConnection();
topic = JmsUtils.getTopic();
sentPersistent();
sentNonPersistent();
connection.close();
}
public static void sentPersistent() throws Exception
{
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicPublisher publihser = session.createPublisher(topic);
publihser.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 3; i++)
{
String text = "I am persistent message.order=" + i;
TextMessage message = session.createTextMessage(text);
message.setJMSPriority(i);
publihser.publish(message);
}
}
public static void sentNonPersistent() throws Exception
{
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicPublisher publihser = session.createPublisher(topic);
publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 3; i++)
{
String text = "non-persistent message.id=" + i;
TextMessage message = session.createTextMessage(text);
publihser.publish(message);
}
}
}
我使用了mysql数据库,并配置了activeMQ将消息持久化到数据库。运行上面的程序,发现mysql数据库中activemq_msgs表没有任何数据。可以证明:持久消息和非持久消息都被MQ消息服务器丢弃了。无论是持久消息,还是非持久消息,如果消息没有对应的消费者,那么activeMQ会认为这些消息无用,直接删除。package mq.aty.persistentmsg;
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import mq.aty.JmsUtils;
/**
* 直接运行该程序和activeMQ,没有任何的消费者,然后观察持久化介质(我们使用了数据库)
*
*/
public class NoReceiverTest
{
private static TopicConnection connection = null;
private static Topic topic = null;
public static void main(String[] args) throws Exception
{
connection = JmsUtils.getConnection();
topic = JmsUtils.getTopic();
sentPersistent();
sentNonPersistent();
connection.close();
}
public static void sentPersistent() throws Exception
{
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicPublisher publihser = session.createPublisher(topic);
publihser.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 3; i++)
{
String text = "I am persistent message.order=" + i;
TextMessage message = session.createTextMessage(text);
message.setJMSPriority(i);
publihser.publish(message);
}
}
public static void sentNonPersistent() throws Exception
{
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicPublisher publihser = session.createPublisher(topic);
publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 3; i++)
{
String text = "non-persistent message.id=" + i;
TextMessage message = session.createTextMessage(text);
publihser.publish(message);
}
}
}
持久订阅者源码如下:package mq.aty.persistentmsg;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import mq.aty.JmsUtils;
/**
* <pre>
* 1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码
*
* 2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录,
* 也就是说activeMQ识别和接受了我们的持久订阅者
*
* 3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息。发现activemq_msgs中多持久消息
*
* 4、运行持久订阅者。发现持久消息和非持久消息都能接受到
* </pre>
*
*/
public class DurableSubscriberTest
{
public static void main(String[] args) throws Exception
{
TopicConnection connection = JmsUtils.getConnection();
Topic topic = JmsUtils.getTopic();
// 创建持久订阅的时候,必须要设置client,否则会报错:
// javax.jms.JMSException: You cannot create a durable subscriber
// without specifying a unique clientID on a Connection
// 如果clientID重复(已经存在相同id的活动连接),会报错
// javax.jms.InvalidClientIDException: Broker: localhost - Client: 1
// already connected from tcp://127.0.0.1:2758
connection.setClientID("1");
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// 在同一个连接的ClientID下,持久订阅者的名称必须唯一
// javax.jms.JMSException: Durable consumer is in use for client: 1 and
// subscriptionName: 11
// TopicSubscriber subscriber = session.createSubscriber(topic);
TopicSubscriber subscriber = session.createDurableSubscriber(topic,
"11");
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg)
{
try
{
TextMessage textMsg = (TextMessage) msg;
System.out.println("DurableSubscriber get:"
+ textMsg.getText());
} catch (JMSException e)
{
e.printStackTrace();
}
}
});
connection.start();// 一定要start
}
}
* <pre> * 1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码 * * 2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录, * 也就是说activeMQ识别和接受了我们的持久订阅者 * * 3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息 * * 4、消息发送成功后,停止activemq服务器、 * * 5、重新启动mq服务器和订阅者。发现只能接收到持久消息 * * </pre>
理解JMS规范中消息的传输模式和消息持久化,布布扣,bubuko.com
原文地址:http://blog.csdn.net/aitangyong/article/details/26132913