标签:.com user 解压 分享 教程 auto obj 密码 通讯
ActiveMQ 的安装与使用(单节点)
IP:192.168.4.101
环境:CentOS 6.6、JDK7
1、 安装 JDK 并配置环境变量(略)
JAVA_HOME=/usr/local/java/jdk1.7.0_72
2、 下载 Linux 版的 ActiveMQ(当前最新版 apache-activemq-5.11.1-bin.tar.gz)
$ wget http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz
3、 解压安装
$ tar -zxvf apache-activemq-5.11.1-bin.tar.gz
$ mv apache-activemq-5.11.1 activemq-01
如果启动脚本 activemq 没有可执行权限,此时则需要授权(此步可选)
$ cd /home/wusc/activemq-01/bin/
$ chmod 755 ./activemq
4、 防火墙中打开对应的端口
ActiveMQ 需要用到两个端口
一个是消息通讯的端口(默认为 61616)
一个是管理控制台端口(默认为 8161)可在 conf/jetty.xml 中修改,如下:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/> <property name="port" value="8161"/>
</bean>
# vi /etc/sysconfig/iptables
添加:
-A INPUT -m state --state NEW -m tcp -p tcp --dport 61616 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8161 -j ACCEPT
重启防火墙:
# service iptables restart
5、 启动
$ cd /home/wusc/activemq-01/bin $ ./activemq start
6、 打开管理界面:http://192.168.4.101:8161
默认用户名和密码为:admin/admin
7、 安全配置(消息安全)
ActiveMQ 如果不加入安全机制的话,任何人只要知道消息服务的具体地址(包括 ip,端口,消息地址[ 队列或者主题地址 ] , ) ,都可以肆无忌惮的发送、接收消息。关于 ActiveMQ 安装配置http://activemq.apache.org/security.html
ActiveMQ 的消息安全配置策略有多种,我们以简单授权配置为例:
在 conf/activemq.xml 文件中在 broker 标签最后加入以下内容即可:
$ vi /home/wusc/activemq-01/conf/activemq.xml
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="wusc" password="wusc.123" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
定义了一个 wusc 用户,密码为 wusc.123,角色为 users,admins
设置 admin 的用户名和密码: $ vi /home/wusc/activemq-01/conf/jetty.xml
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" />
<property name="roles" value="admin" /> <property name="authenticate" value="true" />
</bean>
确保 authenticate 的值为 true(默认)
控制台的登录用户名密码保存在 conf/jetty-realm.properties 文件中,内容如下:
$ vi /home/wusc/activemq-01/conf/jetty-realm.properties
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: wusc.123, admin
注意:用户名和密码的格式是用户名 : 密码 ,角色名
重启:
$ /home/wusc/activemq-01/bin/activemq restart
设置开机启动:
# vi /etc/rc.d/rc.local
加入以下内容
## ActiveMQ
su - wusc -c ‘/home/wusc/activemq-01/bin/activemq start‘
与spring整合异步发送邮件
mq.properties
## MQ mq.brokerURL=tcp\://192.168.4.101\:61616 mq.userName=wusc mq.password=wusc.123 mq.pool.maxConnections=10 #queueName queueName=wusc.edu.mqtest.v1
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false"> <!-- 基于Dubbo的分布式系统架构视频教程,吴水成,wu-sc@foxmail.com,学习交流QQ群:367211134 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}" /> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${mq.pool.maxConnections}" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 队列模板 --> <bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="${queueName}"></property> </bean> </beans>
@Service("mqProducer") public class MQProducer { @Autowired private JmsTemplate activeMqJmsTemplate; /** * 发送消息. * @param mail */ public void sendMessage(final MailParam mail) { activeMqJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(JSONObject.toJSONString(mail)); } }); } }
public class MQProducerTest { private static final Log log = LogFactory.getLog(MQProducerTest.class); public static void main(String[] args) { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml"); context.start(); MQProducer mqProducer = (MQProducer) context.getBean("mqProducer"); // 邮件发送 MailParam mail = new MailParam(); mail.setTo("wu-sc@foxmail.com"); mail.setSubject("ActiveMQ测试"); mail.setContent("通过ActiveMQ异步发送邮件!"); mqProducer.sendMessage(mail); context.stop(); } catch (Exception e) { log.error("==>MQ context start error:", e); System.exit(0); } finally { log.info("===>System.exit"); System.exit(0); } } }
#SMTP服务配置 mail.host=smtp.qq.com mail.port=25 mail.username=XXX@qq.com mail.password=XXXX mail.smtp.auth=true mail.smtp.timeout=30000 mail.default.from=XXXXX@qq.com
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false"> <!-- 基于Dubbo的分布式系统架构视频教程,吴水成,wu-sc@foxmail.com,学习交流QQ群:367211134 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}" /> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${mq.pool.maxConnections}" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 队列模板 --> <bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="${queueName}"></property> </bean> <!--这个是sessionAwareQueue目的地 --> <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>${queueName}</value> </constructor-arg> </bean> <!-- 可以获取session的MessageListener --> <bean id="consumerSessionAwareMessageListener" class="wusc.edu.demo.mqtest.listener.ConsumerSessionAwareMessageListener"></bean> <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="sessionAwareQueue" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> </bean> </beans>spring-mail.xml真正发送邮件的
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:cache="http://www.springframework.org/schema/cache" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.2.xsd"> <!-- 基于Dubbo的分布式系统架构视频教程,吴水成,wu-sc@foxmail.com,学习交流QQ群:367211134 --> <!-- Spring提供的发送电子邮件的高级抽象类 --> <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl"> <property name="host" value="${mail.host}" /> <property name="username" value="${mail.username}" /> <property name="password" value="${mail.password}" /> <property name="defaultEncoding" value="UTF-8"></property> <property name="javaMailProperties"> <props> <prop key="mail.smtp.auth">${mail.smtp.auth}</prop> <prop key="mail.smtp.timeout">${mail.smtp.timeout}</prop> </props> </property> </bean> <bean id="simpleMailMessage" class="org.springframework.mail.SimpleMailMessage"> <property name="from"> <value>${mail.default.from}</value> </property> </bean> <!-- 配置线程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线程池维护线程的最少数量 --> <property name="corePoolSize" value="5" /> <!-- 线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="30000" /> <!-- 线程池维护线程的最大数量 --> <property name="maxPoolSize" value="50" /> <!-- 线程池所使用的缓冲队列 --> <property name="queueCapacity" value="100" /> </bean> </beans>
/** * * @描述: 队列监听器 . * @作者: WuShuicheng . * @创建时间: 2015-3-17,下午11:21:23 . * @版本号: V1.0 . */ @Component public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<Message> { private static final Log log = LogFactory.getLog(ConsumerSessionAwareMessageListener.class); @Autowired private JmsTemplate activeMqJmsTemplate; @Autowired private Destination sessionAwareQueue; @Autowired private MailBiz bailBiz; public synchronized void onMessage(Message message, Session session) { try { ActiveMQTextMessage msg = (ActiveMQTextMessage) message; final String ms = msg.getText(); log.info("==>receive message:" + ms); MailParam mailParam = JSONObject.parseObject(ms, MailParam.class);// 转换成相应的对象 if (mailParam == null) { return; } try { bailBiz.mailSend(mailParam); } catch (Exception e) { // 发送异常,重新放回队列 // activeMqJmsTemplate.send(sessionAwareQueue, new MessageCreator() { // public Message createMessage(Session session) throws JMSException { // return session.createTextMessage(ms); // } // }); log.error("==>MailException:", e); } } catch (Exception e) { log.error("==>", e); } } }
@Component("mailBiz") public class MailBiz { @Autowired private JavaMailSender mailSender;// spring配置中定义 @Autowired private SimpleMailMessage simpleMailMessage;// spring配置中定义 @Autowired private ThreadPoolTaskExecutor threadPool; /** * 发送模板邮件 * * @param mailParamTemp需要设置四个参数 * templateName,toMail,subject,mapModel * @throws Exception * */ public void mailSend(final MailParam mailParam) { threadPool.execute(new Runnable() { public void run() { try { simpleMailMessage.setFrom(simpleMailMessage.getFrom()); // 发送人,从配置文件中取得 simpleMailMessage.setTo(mailParam.getTo()); // 接收人 simpleMailMessage.setSubject(mailParam.getSubject()); simpleMailMessage.setText(mailParam.getContent()); mailSender.send(simpleMailMessage); } catch (MailException e) { throw e; } } }); }
/** * * @描述: ActiveMQ测试启动类 . * @作者: WuShuicheng . * @创建时间: 2015-3-17,上午2:25:20 . * @版本号: V1.0 . */ public class MQConsumer { private static final Log log = LogFactory.getLog(MQConsumer.class); public static void main(String[] args) { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml"); context.start(); } catch (Exception e) { log.error("==>MQ context start error:", e); System.exit(0); } } }公共的邮件参数Bean
/** * * @描述: 邮件参数封装类 . * @作者: WuShuicheng . * @创建时间: 2015-3-18,上午1:08:42 . * @版本号: V1.0 . */ public class MailParam { /** 发件人 **/ private String from; /** 收件人 **/ private String to; /** 主题 **/ private String subject; /** 邮件内容 **/ private String content; public MailParam() { } public MailParam(String to, String subject, String content) { this.to = to; this.subject = subject; this.content = content; }.....省略getter/setter
分布式架构学习之:015--ActiveMQ 的安装与使用(单节点)
标签:.com user 解压 分享 教程 auto obj 密码 通讯
原文地址:http://blog.csdn.net/xxssyyyyssxx/article/details/71640098