码迷,mamicode.com
首页 > 其他好文 > 详细

ActiveMQ 运行案例

时间:2016-05-12 21:49:29      阅读:109      评论:0      收藏:0      [点我收藏+]

标签:

前言

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

一、使用介绍

环境准备

1、activemq下载传送门:https://activemq.apache.org/download.html、MyEclipse

2、启动activemq,我电脑是win64位,所以启动bin木下win64中的activemq.bat

3、需要输入用户名和密码才能进入,页面成功启动的效果。(默认用户名和密码皆为:admin)

技术分享

开始测试

1、新建一个JMS java工程,导入下载的activemq文件里的jar包,例如作者目录下的activemq-all-5.11.1.jar

技术分享

导入工程

技术分享

2、新建消息生产者 JMSProducer.java

package com.hcg.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author babylon
 * 2016-5-9
 */
public class JMSProducer {
	
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	private static final int SENDNUM= 10;	// 发送的消息数量
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageProducer messageProducer;	// 消息生产者
		
		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);	// 创建会话
			destination = session.createQueue("FirstQueue1");	// 创建消息队列
			messageProducer = session.createProducer(destination);		// 创建消息生产者
			// 发送消息
			sendMessage(session, messageProducer);
		    // 正式提交发送消息的操作
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}  finally {
			// 关闭连接
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		
	}
	
	/**
	 * 发送消息
	 * @param session
	 * @param messageProducer
	 * @throws JMSException 
	 */
	public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
		for(int i=0; i < JMSProducer.SENDNUM; i++){
			TextMessage message = session.createTextMessage("ActiveMQ  发送的消息"+i);
			System.out.println("发送消息:"+i);
			messageProducer.send(message);
		}
	}

}

3、F11运行实例,可以看见成功发送了10条消息

技术分享

3、创建消费者 JMSConsumer.java

package com.hcg.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer {
	
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者
		
		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			destination = session.createQueue("FirstQueue1");	// 创建消息队列
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			while(true){
				TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
				if(textMessage != null){
					System.out.println("收到的消息:"+textMessage.getText());
				} else {
					break;
				}
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} 
	}

}

4、F11运行消费者,可以看见在控制台新增了一个消费者,发送的消息都已被消费处理

技术分享

5、while(true)这种方式处理消费是不合适的,下面以监听的方式处理创建消费者 JMSConsumer_listener.java。

package com.hcg.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者 - 监听方式消费
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer_listener {
	
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者
		
		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer_listener.USERNAME, JMSConsumer_listener.PASSWORD, JMSConsumer_listener.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			destination = session.createQueue("FirstQueue1");	// 创建消息队列
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			messageConsumer.setMessageListener(new Listener());			// 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		} 
	}

}
监听对象需要实现MessageListener
package com.hcg.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听
 *
 * @author babylon
 * 2016-5-9
 */
public class Listener implements MessageListener{

	/*
	 * 收到的消息
	 */
	@Override
	public void onMessage(Message message) {
		try {
			System.out.println("收到的消息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}
6、运行该类,发现消费者数量增加了一个。

技术分享


消息收发和订阅的方式

1、创建消息发布者
package com.hcg.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者 - 消息发布者
 * 
 * @author babylon
 * 2016-5-9
 */
public class JMSProducer {
	
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	private static final int SENDNUM = 10;	// 发送的消息数量
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageProducer messageProducer;	// 消息生产者
		
		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);	// 创建会话
			// destination = session.createQueue("FirstQueue1");	// 创建消息队列
			destination = session.createTopic("FirstTopic1");
			messageProducer = session.createProducer(destination);		// 创建消息生产者
			// 发送消息
			sendMessage(session, messageProducer);
		    // 正式提交发送消息的操作
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}  finally {
			// 关闭连接
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		
	}
	
	/**
	 * 发布的消息
	 * @param session
	 * @param messageProducer
	 * @throws JMSException 
	 */
	public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
		for(int i=0; i < JMSProducer.SENDNUM; i++){
			TextMessage message = session.createTextMessage("ActiveMQ  发送的消息"+i);
			System.out.println("发送消息:"+i);
			messageProducer.send(message);
		}
	}

}
2、创建消息订阅者1
package com.hcg.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者 - 消息订阅者1
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer {
	
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者
		
		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			// destination = session.createQueue("FirstQueue1");	// 创建消息队列
			destination = session.createTopic("FirstTopic1");
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			messageConsumer.setMessageListener(new Listener());			// 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		} 
	}

}

3、创建消息订阅者2
package com.hcg.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者 - 消息订阅者2
 * @author babylon
 * 2016-5-9
 */
public class JMSConsumer2 {
	
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;				// 默认的连接用户名
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;	// 默认的连接密码
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;	// 默认的连接地址
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;   // 连接工厂
		Connection connection = null; 	// 连接
		Session session;				// 会话,接收或者发送消息的线程
		Destination destination;	// 消息的目的地
		MessageConsumer messageConsumer;	// 消息生产者
		
		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
		try {
			connection = connectionFactory.createConnection();		// 通过连接工厂获取连接
			connection.start();		// 启动连接
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	// 创建会话,不加事务
			// destination = session.createQueue("FirstQueue1");	// 创建消息队列
			destination = session.createTopic("FirstTopic1");
			messageConsumer = session.createConsumer(destination);		// 创建消息消费者
			messageConsumer.setMessageListener(new Listener2());			// 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		} 
	}

}

最终目录结构:
技术分享
demo下载地址:https://github.com/JasonBabylon/activemq

ActiveMQ 运行案例

标签:

原文地址:http://blog.csdn.net/jack85986370/article/details/51352259

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!