码迷,mamicode.com
首页 > 编程语言 > 详细

ActiveMQ整合Spring队列和话题的使用

时间:2017-07-19 17:58:07      阅读:417      评论:0      收藏:0      [点我收藏+]

标签:stop   producer   context   信息   creat   local   str   场景   pat   

ActiveMQ的作用分析:

1、 解决服务之间耦合  

2、 使用消息队列,增加系统并发处理量  

ActiveMQ 应用场景分析  

1、 用户注册,重点用户信息数据库保存,发短信、发邮件,增加业务处理复杂度,这时候使用 MQ 将发短信、发邮箱,通知 MQ,由另外服务平台完成  

2、 搜索平台、缓存平台  查询数据,建立缓存、索引 ,不从数据库查询,从缓存或者索引库查询  

当增加、修改、删除数据时,发送消息给 MQ 缓存平台、索引平台  MQ 获取到这个信息,更新缓存或者索引  

 

Spring整和ActiveMQ的使用:

导入spring、ActiveMQ的相关坐标

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 2   <modelVersion>4.0.0</modelVersion>
 3   <groupId>cn.itcast.maven</groupId>
 4   <artifactId>activeMQ_spring</artifactId>
 5   <version>0.0.1-SNAPSHOT</version>
 6   <name>activeMQ_spring</name>
 7   
 8   
 9   <dependencies>
10       <dependency>
11           <groupId>org.springframework</groupId>
12           <artifactId>spring-context</artifactId>
13           <version>4.1.7.RELEASE</version>
14       </dependency>
15       <dependency>
16           <groupId>org.springframework</groupId>
17           <artifactId>spring-test</artifactId>
18           <version>4.1.7.RELEASE</version>
19       </dependency>
20       <dependency>
21           <groupId>junit</groupId>
22           <artifactId>junit</artifactId>
23           <version>4.12</version>
24       </dependency>
25       <dependency>
26           <groupId>org.apache.activemq</groupId>
27           <artifactId>activemq-all</artifactId>
28           <version>5.14.0</version>
29       </dependency>
30       <dependency>
31           <groupId>org.springframework</groupId>
32           <artifactId>spring-jms</artifactId>
33           <version>4.1.7.RELEASE</version>
34       </dependency>
35   </dependencies>
36 </project>

 

配置ApplicationContext-mq.xml文件:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
 4     xmlns:context="http://www.springframework.org/schema/context"
 5     xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:amq="http://activemq.apache.org/schema/core"
 8     xmlns:jms="http://www.springframework.org/schema/jms"
 9     xsi:schemaLocation="
10         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
11         http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
12         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
13         http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
14         http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
15         http://www.springframework.org/schema/data/jpa 
16         http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
17         http://www.springframework.org/schema/jms
18         http://www.springframework.org/schema/jms/spring-jms.xsd
19         http://activemq.apache.org/schema/core
20         http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
21     
22     <!-- 扫描包 -->
23     <context:component-scan base-package="cn.itlyj.activemq" />
24     
25     <!-- ActiveMQ 连接工厂 -->
26     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
27     <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
28     <amq:connectionFactory id="amqConnectionFactory"
29         brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />
30 
31     <!-- Spring Caching连接工厂 -->
32     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
33     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
34         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
35         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
36         <!-- 同上,同理 -->
37         <!-- <constructor-arg ref="amqConnectionFactory" /> -->
38         <!-- Session缓存数量 -->
39         <property name="sessionCacheSize" value="100" />
40     </bean>
41     
42      <!-- Spring JmsTemplate 的消息生产者 start-->
43 
44     <!-- 定义JmsTemplate的Queue类型 -->
45     <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
46         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
47         <constructor-arg ref="connectionFactory" />
48         <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
49         <property name="pubSubDomain" value="false" />
50     </bean>
51 
52     <!-- 定义JmsTemplate的Topic类型 -->
53     <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
54          <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
55         <constructor-arg ref="connectionFactory" />
56         <!-- pub/sub模型(发布/订阅) -->
57         <property name="pubSubDomain" value="true" />
58     </bean>
59 
60     <!--Spring JmsTemplate 的消息生产者 end-->
61     
62     
63 </beans>

队列消息生产者的编写:

 1 package cn.itlyj.activemq.producer.queue;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.Session;
 6 
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.beans.factory.annotation.Qualifier;
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12 
13 @Service
14 //队列
15 public class QueueSender {
16     // 注入jmsTemplate
17     @Autowired
18     @Qualifier("jmsQueueTemplate")
19     private JmsTemplate jmsTemplate;
20            //设置队列名,生产信息
21     public void send(String queueName, final String message) {
22         jmsTemplate.send(queueName, new MessageCreator() {
23             public Message createMessage(Session session) throws JMSException {
24                 return session.createTextMessage(message);
25             }
26         });
27     }
28 }
29             

话题消息生产者编写:

 1 package cn.itlyj.activemq.producer.topic;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.Session;
 6 
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.beans.factory.annotation.Qualifier;
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12 
13 @Service
14 //话题
15 public class TopicSender {
16     // 注入jmsTemplate
17     @Autowired
18     @Qualifier("jmsTopicTemplate")
19     private JmsTemplate jmsTemplate;
20            //设置话题名字,和消息内容
21     public void send(String topicName, final String message) {
22         jmsTemplate.send(topicName, new MessageCreator() {
23 
24             public Message createMessage(Session session) throws JMSException {
25                 return session.createTextMessage(message);
26             }
27         });
28     }
29 
30 }

测试用例:

 1 package cn.itlyj.activemq.producer.test;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.test.context.ContextConfiguration;
 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 8 
 9 import cn.itcast.activemq.producer.queue.QueueSender;
10 import cn.itcast.activemq.producer.topic.TopicSender;
11 
12 @RunWith(SpringJUnit4ClassRunner.class)
13 @ContextConfiguration(locations = "classpath:applicationContext-mq.xml")
14 public class ProducerTest {
15     @Autowired
16     private QueueSender queueSender;
17 
18     @Autowired
19     private TopicSender topicSender;
20 
21     @Test
22     public void testSendMessage() {
23         queueSender.send("spring_queue", "你好,MQ");
24         topicSender.send("spring_topic", "你好,Spring");
25     }
26 }

消费者的配置文件

  

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
 4     xmlns:context="http://www.springframework.org/schema/context"
 5     xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:amq="http://activemq.apache.org/schema/core"
 8     xmlns:jms="http://www.springframework.org/schema/jms"
 9     xsi:schemaLocation="
10         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
11         http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
12         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
13         http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
14         http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
15         http://www.springframework.org/schema/data/jpa 
16         http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
17         http://www.springframework.org/schema/jms
18         http://www.springframework.org/schema/jms/spring-jms.xsd
19         http://activemq.apache.org/schema/core
20         http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
21     
22     <!-- 扫描包 -->
23     <context:component-scan base-package="cn.itlyj.activemq.consumer" />
24     
25     <!-- ActiveMQ 连接工厂 -->
26     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
27     <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
28     <amq:connectionFactory id="amqConnectionFactory"
29         brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />
30 
31     <!-- Spring Caching连接工厂 -->
32     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
33     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
34         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
35         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
36         <!-- 同上,同理 -->
37         <!-- <constructor-arg ref="amqConnectionFactory" /> -->
38         <!-- Session缓存数量 -->
39         <property name="sessionCacheSize" value="100" />
40     </bean>
41     
42      <!-- 消息消费者 start-->
43 
44     <!-- 定义Queue监听器 -->
45     <jms:listener-container destination-type="queue" container-type="default" 
46         connection-factory="connectionFactory" acknowledge="auto">
47         <!-- 默认注册bean名称,应该是类名首字母小写  -->
48         <jms:listener destination="spring_queue" ref="queueConsumer1"/>
49         <jms:listener destination="spring_queue" ref="queueConsumer2"/>
50     </jms:listener-container>
51     
52     <!-- 定义Topic监听器 -->
53     <jms:listener-container destination-type="topic" container-type="default" 
54         connection-factory="connectionFactory" acknowledge="auto">
55         <jms:listener destination="spring_topic" ref="topicConsumer1"/>
56         <jms:listener destination="spring_topic" ref="topicConsumer2"/>
57     </jms:listener-container>
58 
59     <!-- 消息消费者 end -->
60     
61     
62 </beans>

编写对列、话题消费者类:

 1 package cn.itlyj.activemq.consumer.queue;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 
10 @Service
11 public class QueueConsumer implements MessageListener {
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage) message;
14         try {
15             System.out
16                     .println("消费者QueueConsumer获取消息:" + textMessage.getText());
17         } catch (JMSException e) {
18             e.printStackTrace();
19         }
20     }
21 }
 1 package cn.itlyj.activemq.consumer.topic;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 
10 @Service
11 public class TopicConsumer1 implements MessageListener {
12 
13     public void onMessage(Message message) {
14         TextMessage textMessage = (TextMessage) message;
15         try {
16             System.out
17                     .println("消费者TopicConsume获取消息:" + textMessage.getText());
18         } catch (JMSException e) {
19             e.printStackTrace();
20         }
21     }
22 
23 }

测试消费者获取到生产者的信息:

 1 package cn.itlyj.activemq.producer.test;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.test.context.ContextConfiguration;
 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 7 
 8 @RunWith(SpringJUnit4ClassRunner.class)
 9 @ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml")
10 public class ConsumerTest {
11 
12     @Test
13     public void testConsumerMessage() {
14         while (true) {
15             // junit退出,防止进程死掉
16         }
17     }
18 }

 

 

 

 

ActiveMQ整合Spring队列和话题的使用

标签:stop   producer   context   信息   creat   local   str   场景   pat   

原文地址:http://www.cnblogs.com/lyujun/p/7206679.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!