码迷,mamicode.com
首页 > 其他好文 > 详细

ActiveMQ消息队列-单节点测试(点对点模式)

时间:2015-09-18 12:16:14      阅读:1586      评论:0      收藏:0      [点我收藏+]

标签:

ActiveMQ发送和接收消息的过程和jdbc操作数据库很类似:首先创建Connection连接对象,再获取Session会话对象,之后通过Session会话对象创建Producer、Consumer、Message等对象,只不过ActiveMQ的Connection对象是通过ActiveMQConnectionFactory工厂产生的。以下是一些场景的测试代码。

先定义一些常量数据,这些数据在后面的例子中也有用到

// 用户名
private final static String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 密码
private final static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 节点url
private final static String BROKERURL = "tcp://localhost:61616";
// 队列名称
private final static String DESTINATION  = "com_mq_queue_test";

1、首先测试一下最简单的这种,就是消息发送方发送一则消息,然后接收方接收消息,同时消息被消费方接收之后,采用自动应答模式(Session.AUTO_ACKNOWLEDGE),反馈给消息发送发。

发送方代码:
@Test
public void run() {
    try {
	// 创建ActiveMQConnectionFactory工厂
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	// 通过ActiveMQConnectionFactory工厂,创建Connection对象
	Connection connection = factory.createConnection();
	// 启动Connection对象
	connection.start();
			
	// 创建Session会话
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageProducer sender = session.createProducer(destination);
	// 设置消息的持久化模式:如果设置成持久化,则传递消息时需要消息存储起来,然后再传递。即message->broker->message store->返回给消息发送者消息是否存储成功
	// 如果设置成非持久化,表示消息是异步传递,则消息的传递路径是message-broker->返回给消息发送者和存储数据这2个步骤异步执行,性能较持久化模式快
	// DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失;PERSISTENT保证重启后会把没有发送的数据再次发送直至发送成功
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			
	// 创建消息并发送消息
	Message message = session.createMessage();
	message.setStringProperty("name", "tom");
	sender.send(message);
			
	// 关闭资源
	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

// 接收方代码:
@Test
public void run() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();
			
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageConsumer receiver = session.createConsumer(destination);
			
	Message message = receiver.receive();
	String name = message.getStringProperty("name");
	System.out.println("接收到的消息体是:" + name);
	// message.acknowledge();  Session.AUTO_ACKNOWLEDGE时,默认自动发送确认消息
			
	receiver.close();
        session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

创建Session的时候,connection.createSession(boolean isTranscration, int acknowledge),第一个参数表示是否使用事务、第二个参数表示采用的消息应答的模式。

为什么会有这2个参数呢?我个人的理解是这样的,首先说下事务,我们都知道事务简单来说就是保证一批操作要么全部成功,要么全部失败,不存在部分成功,部分失败。在ActiveMQ中,消息发送发也可以在一个session中,发送多个Message消息,但要保证多个消息的在一个事务中,所以就需要将isTranscration设置成true,设置好之后,需要调用 session.commit()提交事务。

acknowledge参数表示消息应答的模式,消息发送发发出消息,接收方接收到消息后,需要反馈给发送方,然后将消息从队列中移除。acknowledge的值有AUTO_ACKNOWLEDGE、CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE三种。AUTO_ACKNOWLEDGE表示自动应答模式,消息消费方接收到消息之后,会自动应答给发送发;CLIENT_ACKNOWLEDGE表示由消息消费方发送应答通知,消费方接收到消息之后,需要调用Message的acknowledge()方法,否则队列不会移除该消息,这可能会造成垃圾数据。具体应答代码如下:

Message message = receiver.receive();
String name = message.getStringProperty("name");
System.out.println("接收到的消息体是:" + name);
// 消息确认:否则数据不会被删除
message.acknowledge();

2、测试同一个事务(只关注消息发送方即可)

@Test
public void testTranscation() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();
	//  采用事务		
	Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageProducer sender = session.createProducer(destination);
	sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	// 发送多个消息		
	for(int i = 0; i < 10; i++) {
	    Message message = session.createMessage();
	    message.setStringProperty("name_" + i, "yangyong_" + i);
	    sender.send(message);
	    if(i == 1) { // 模拟异常
		int count = 10 / 0;
	    }
	}
	// 需要调用commit方法
	session.commit();
	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

代码中使用10 / 0模拟异常,测试结果表明10个消息均未发送到队列中。

3、使用数据库存储消息

如果想把消息持久化到数据库中,只需要修改activemq.xml文件,测试代码和以文件存储数据的方式一样。首先在activemq.xml文件的<broker />标签上面添加如下配置(以mysql为例,先手动建好数据库)

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

然后在<broker />里找到<persistenceAdapter />标签,注释掉该配置,换成如下配置

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" 
                            createTablesOnStartup="true" 
                            useDatabaseLock="false"/>
</persistenceAdapter>

4、消息过滤(selector)

ActiveMQ是一个消息总线,多个应用中的消息都可以发送到一个队列里,每个应用的消息消费方可能只关注本系统产生的消息,这时需要定义一个selector,通过属性值筛选出感兴趣的消息数据。selector可以支持多种表达式,如=,>,<等等,可参考《ActiveMQ in Action》这本书。

// 测试selector消息过滤:发送方
@Test
public void testSelector() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();
			
	Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	MessageProducer sender = session.createProducer(destination);
	sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	
	//发送2个消息 		
	TextMessage message = session.createTextMessage("测试selector消息过滤1");
	message.setStringProperty("from", "jack");
	sender.send(message);
			
	message = session.createTextMessage("测试selector消息过滤2");
	message.setStringProperty("from", "tom");
	sender.send(message);
			
	session.commit();
	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

// 测试selector消息过滤:消费方
@Test
public void testSelector() {
    try {
	ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	Connection connection = factory.createConnection();
	connection.start();
			
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination destination = session.createQueue(DESTINATION);
	
	// 设置筛选条件		
	String selector = "from=‘jack‘";
	// 创建带有筛选条件的MessageConsumer对象
	MessageConsumer consumer = session.createConsumer(destination, selector);
			
	TextMessage message = (TextMessage) consumer.receive();
	System.out.println("接收到的消息:" + message.getText());
			
	session.close();
        connection.close();
    } catch(Exception e) {
	e.printStackTrace();
    }
}

测试结果表明消息消费方只能接收到发送发发出的第一个消息,第二个from=‘tom‘的消息不能被接收。

5、消费方接收消息后,发送消息给消息发出方

在消息被消费方接收后,可能消费方需要把处理结果反馈给发出方,然后发出方再执行一些业务逻辑,其流程为

技术分享

// 消息发送发
public class QueueSender {
    public void testReplyTo() {
        try {
	   ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	    final Connection connection = factory.createConnection();
	    connection.start();
			
	    final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    Destination destination = session.createQueue(DESTINATION);
	    MessageProducer sender = session.createProducer(destination);
	    sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			
	    TextMessage message = session.createTextMessage("测试消息消费者返回处理结果给消息生产者");
	    sender.send(message);

	    // 接受消费方返回的消息
	    MessageConsumer consumer = session.createConsumer(destination);
	    consumer.setMessageListener(new MessageListener() {
	        public void onMessage(Message message) {
		    if(message != null && message instanceof TextMessage) {
			try {
			    System.out.println("消息消费方反馈的信息->messageID:" 
			                + message.getJMSCorrelationID() + ",消息实体:" 
			                + ((TextMessage)message).getText());
			    //关闭资源
			    session.close();
		            connection.close();
		        } catch (JMSException e) {
			    e.printStackTrace();
			}
		    }
		}
	    });
        } catch(Exception e) {
	    e.printStackTrace();
        }
   }
   public static void main(String[] args) {
	new QueueSender().testReplyTo();
    }
 }
 
 //  消息接收方:
 public class  QueueReceiver {
     public void testReplyTo() {
        try {
	   ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
	    final Connection connection = factory.createConnection();
	    connection.start();
			
	    final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    final Destination destination = session.createQueue(DESTINATION);
	    MessageConsumer consumer = session.createConsumer(destination);
	    consumer.setMessageListener(new MessageListener() {
	        public void onMessage(Message message) {
		    if(message != null && message instanceof TextMessage) {
		        try {
			    System.out.println("接收到的消息->messageID:" 
			                + message.getJMSMessageID() + ",消息实体:"
			                + ((TextMessage)message).getText());
							
			    // 发送消息给消息生产者
			    MessageProducer sender = session.createProducer(destination);
			    sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			    TextMessage message0 = session.createTextMessage("消费者已经处理了您发送的消息了");
		// 必须设置这些参数JMSCorrelationID:接收到的消息的ID,JMSReplyTo:消息发送到这个地址
			    message0.setJMSCorrelationID(message.getJMSMessageID());
			    message0.setJMSReplyTo(destination);
			    sender.send(message0);
							
			    session.close();
		            connection.close();
			} catch (JMSException e) {
			    e.printStackTrace();
			}
		    }
		}
	    });
        } catch(Exception e) {
	    e.printStackTrace();
	}
    }
	
    public static void main(String[] args) {
	new QueueReceiver().testReplyTo();
    }
}


ActiveMQ消息队列-单节点测试(点对点模式)

标签:

原文地址:http://my.oschina.net/u/732520/blog/507998

(0)
(1)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!