标签:
ActiveMQ发送和接收消息的过程和jdbc操作数据库很类似:首先创建Connection连接对象,再获取Session会话对象,之后通过Session会话对象创建Producer、Consumer、Message等对象,只不过ActiveMQ的Connection对象是通过ActiveMQConnectionFactory工厂产生的。以下是一些场景的测试代码。
先定义一些常量数据,这些数据在后面的例子中也有用到
// 用户名 private final static String USERNAME = ActiveMQConnection.DEFAULT_USER; // 密码 private final static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 节点url private final static String BROKERURL = "tcp://localhost:61616"; // 队列名称 private final static String DESTINATION = "com_mq_queue_test";
1、首先测试一下最简单的这种,就是消息发送方发送一则消息,然后接收方接收消息,同时消息被消费方接收之后,采用自动应答模式(Session.AUTO_ACKNOWLEDGE),反馈给消息发送发。
发送方代码: @Test public void run() { try { // 创建ActiveMQConnectionFactory工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); // 通过ActiveMQConnectionFactory工厂,创建Connection对象 Connection connection = factory.createConnection(); // 启动Connection对象 connection.start(); // 创建Session会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION); MessageProducer sender = session.createProducer(destination); // 设置消息的持久化模式:如果设置成持久化,则传递消息时需要消息存储起来,然后再传递。即message->broker->message store->返回给消息发送者消息是否存储成功 // 如果设置成非持久化,表示消息是异步传递,则消息的传递路径是message-broker->返回给消息发送者和存储数据这2个步骤异步执行,性能较持久化模式快 // DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失;PERSISTENT保证重启后会把没有发送的数据再次发送直至发送成功 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 创建消息并发送消息 Message message = session.createMessage(); message.setStringProperty("name", "tom"); sender.send(message); // 关闭资源 session.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } } // 接收方代码: @Test public void run() { try { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION); MessageConsumer receiver = session.createConsumer(destination); Message message = receiver.receive(); String name = message.getStringProperty("name"); System.out.println("接收到的消息体是:" + name); // message.acknowledge(); Session.AUTO_ACKNOWLEDGE时,默认自动发送确认消息 receiver.close(); session.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } }
创建Session的时候,connection.createSession(boolean isTranscration, int acknowledge),第一个参数表示是否使用事务、第二个参数表示采用的消息应答的模式。
为什么会有这2个参数呢?我个人的理解是这样的,首先说下事务,我们都知道事务简单来说就是保证一批操作要么全部成功,要么全部失败,不存在部分成功,部分失败。在ActiveMQ中,消息发送发也可以在一个session中,发送多个Message消息,但要保证多个消息的在一个事务中,所以就需要将isTranscration设置成true,设置好之后,需要调用 session.commit()提交事务。
而acknowledge参数表示消息应答的模式,消息发送发发出消息,接收方接收到消息后,需要反馈给发送方,然后将消息从队列中移除。acknowledge的值有AUTO_ACKNOWLEDGE、CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE三种。AUTO_ACKNOWLEDGE表示自动应答模式,消息消费方接收到消息之后,会自动应答给发送发;CLIENT_ACKNOWLEDGE表示由消息消费方发送应答通知,消费方接收到消息之后,需要调用Message的acknowledge()方法,否则队列不会移除该消息,这可能会造成垃圾数据。具体应答代码如下:
Message message = receiver.receive(); String name = message.getStringProperty("name"); System.out.println("接收到的消息体是:" + name); // 消息确认:否则数据不会被删除 message.acknowledge();
2、测试同一个事务(只关注消息发送方即可)
@Test public void testTranscation() { try { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); // 采用事务 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION); MessageProducer sender = session.createProducer(destination); sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 发送多个消息 for(int i = 0; i < 10; i++) { Message message = session.createMessage(); message.setStringProperty("name_" + i, "yangyong_" + i); sender.send(message); if(i == 1) { // 模拟异常 int count = 10 / 0; } } // 需要调用commit方法 session.commit(); session.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } }
代码中使用10 / 0模拟异常,测试结果表明10个消息均未发送到队列中。
3、使用数据库存储消息
如果想把消息持久化到数据库中,只需要修改activemq.xml文件,测试代码和以文件存储数据的方式一样。首先在activemq.xml文件的<broker />标签上面添加如下配置(以mysql为例,先手动建好数据库)
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
然后在<broker />里找到<persistenceAdapter />标签,注释掉该配置,换成如下配置
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" useDatabaseLock="false"/> </persistenceAdapter>
4、消息过滤(selector)
ActiveMQ是一个消息总线,多个应用中的消息都可以发送到一个队列里,每个应用的消息消费方可能只关注本系统产生的消息,这时需要定义一个selector,通过属性值筛选出感兴趣的消息数据。selector可以支持多种表达式,如=,>,<等等,可参考《ActiveMQ in Action》这本书。
// 测试selector消息过滤:发送方 @Test public void testSelector() { try { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION); MessageProducer sender = session.createProducer(destination); sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送2个消息 TextMessage message = session.createTextMessage("测试selector消息过滤1"); message.setStringProperty("from", "jack"); sender.send(message); message = session.createTextMessage("测试selector消息过滤2"); message.setStringProperty("from", "tom"); sender.send(message); session.commit(); session.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } } // 测试selector消息过滤:消费方 @Test public void testSelector() { try { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION); // 设置筛选条件 String selector = "from=‘jack‘"; // 创建带有筛选条件的MessageConsumer对象 MessageConsumer consumer = session.createConsumer(destination, selector); TextMessage message = (TextMessage) consumer.receive(); System.out.println("接收到的消息:" + message.getText()); session.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } }
测试结果表明消息消费方只能接收到发送发发出的第一个消息,第二个from=‘tom‘的消息不能被接收。
5、消费方接收消息后,发送消息给消息发出方
在消息被消费方接收后,可能消费方需要把处理结果反馈给发出方,然后发出方再执行一些业务逻辑,其流程为
// 消息发送发 public class QueueSender { public void testReplyTo() { try { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); final Connection connection = factory.createConnection(); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DESTINATION); MessageProducer sender = session.createProducer(destination); sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage("测试消息消费者返回处理结果给消息生产者"); sender.send(message); // 接受消费方返回的消息 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if(message != null && message instanceof TextMessage) { try { System.out.println("消息消费方反馈的信息->messageID:" + message.getJMSCorrelationID() + ",消息实体:" + ((TextMessage)message).getText()); //关闭资源 session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }); } catch(Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new QueueSender().testReplyTo(); } } // 消息接收方: public class QueueReceiver { public void testReplyTo() { try { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); final Connection connection = factory.createConnection(); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Destination destination = session.createQueue(DESTINATION); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if(message != null && message instanceof TextMessage) { try { System.out.println("接收到的消息->messageID:" + message.getJMSMessageID() + ",消息实体:" + ((TextMessage)message).getText()); // 发送消息给消息生产者 MessageProducer sender = session.createProducer(destination); sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message0 = session.createTextMessage("消费者已经处理了您发送的消息了"); // 必须设置这些参数JMSCorrelationID:接收到的消息的ID,JMSReplyTo:消息发送到这个地址 message0.setJMSCorrelationID(message.getJMSMessageID()); message0.setJMSReplyTo(destination); sender.send(message0); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }); } catch(Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new QueueReceiver().testReplyTo(); } }
标签:
原文地址:http://my.oschina.net/u/732520/blog/507998