标签:stop producer context 信息 creat local str 场景 pat
ActiveMQ的作用分析:
1、 解决服务之间耦合
2、 使用消息队列,增加系统并发处理量
ActiveMQ 应用场景分析
1、 用户注册,重点用户信息数据库保存,发短信、发邮件,增加业务处理复杂度,这时候使用 MQ, 将发短信、发邮箱,通知 MQ,由另外服务平台完成
2、 搜索平台、缓存平台 查询数据,建立缓存、索引 ,不从数据库查询,从缓存或者索引库查询
当增加、修改、删除数据时,发送消息给 MQ, 缓存平台、索引平台 从 MQ 获取到这个信息,更新缓存或者索引
Spring整和ActiveMQ的使用:
导入spring、ActiveMQ的相关坐标:
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelVersion>4.0.0</modelVersion> 3 <groupId>cn.itcast.maven</groupId> 4 <artifactId>activeMQ_spring</artifactId> 5 <version>0.0.1-SNAPSHOT</version> 6 <name>activeMQ_spring</name> 7 8 9 <dependencies> 10 <dependency> 11 <groupId>org.springframework</groupId> 12 <artifactId>spring-context</artifactId> 13 <version>4.1.7.RELEASE</version> 14 </dependency> 15 <dependency> 16 <groupId>org.springframework</groupId> 17 <artifactId>spring-test</artifactId> 18 <version>4.1.7.RELEASE</version> 19 </dependency> 20 <dependency> 21 <groupId>junit</groupId> 22 <artifactId>junit</artifactId> 23 <version>4.12</version> 24 </dependency> 25 <dependency> 26 <groupId>org.apache.activemq</groupId> 27 <artifactId>activemq-all</artifactId> 28 <version>5.14.0</version> 29 </dependency> 30 <dependency> 31 <groupId>org.springframework</groupId> 32 <artifactId>spring-jms</artifactId> 33 <version>4.1.7.RELEASE</version> 34 </dependency> 35 </dependencies> 36 </project>
配置ApplicationContext-mq.xml文件:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:amq="http://activemq.apache.org/schema/core" 8 xmlns:jms="http://www.springframework.org/schema/jms" 9 xsi:schemaLocation=" 10 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd 11 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd 12 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd 13 http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd 14 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd 15 http://www.springframework.org/schema/data/jpa 16 http://www.springframework.org/schema/data/jpa/spring-jpa.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms.xsd 19 http://activemq.apache.org/schema/core 20 http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd "> 21 22 <!-- 扫描包 --> 23 <context:component-scan base-package="cn.itlyj.activemq" /> 24 25 <!-- ActiveMQ 连接工厂 --> 26 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 27 <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> 28 <amq:connectionFactory id="amqConnectionFactory" 29 brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> 30 31 <!-- Spring Caching连接工厂 --> 32 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 33 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 34 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 35 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 36 <!-- 同上,同理 --> 37 <!-- <constructor-arg ref="amqConnectionFactory" /> --> 38 <!-- Session缓存数量 --> 39 <property name="sessionCacheSize" value="100" /> 40 </bean> 41 42 <!-- Spring JmsTemplate 的消息生产者 start--> 43 44 <!-- 定义JmsTemplate的Queue类型 --> 45 <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 46 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 47 <constructor-arg ref="connectionFactory" /> 48 <!-- 非pub/sub模型(发布/订阅),即队列模式 --> 49 <property name="pubSubDomain" value="false" /> 50 </bean> 51 52 <!-- 定义JmsTemplate的Topic类型 --> 53 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> 54 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 55 <constructor-arg ref="connectionFactory" /> 56 <!-- pub/sub模型(发布/订阅) --> 57 <property name="pubSubDomain" value="true" /> 58 </bean> 59 60 <!--Spring JmsTemplate 的消息生产者 end--> 61 62 63 </beans>
队列消息生产者的编写:
1 package cn.itlyj.activemq.producer.queue; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.Session; 6 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Qualifier; 9 import org.springframework.jms.core.JmsTemplate; 10 import org.springframework.jms.core.MessageCreator; 11 import org.springframework.stereotype.Service; 12 13 @Service 14 //队列 15 public class QueueSender { 16 // 注入jmsTemplate 17 @Autowired 18 @Qualifier("jmsQueueTemplate") 19 private JmsTemplate jmsTemplate; 20 //设置队列名,生产信息 21 public void send(String queueName, final String message) { 22 jmsTemplate.send(queueName, new MessageCreator() { 23 public Message createMessage(Session session) throws JMSException { 24 return session.createTextMessage(message); 25 } 26 }); 27 } 28 } 29
话题消息生产者编写:
1 package cn.itlyj.activemq.producer.topic; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.Session; 6 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Qualifier; 9 import org.springframework.jms.core.JmsTemplate; 10 import org.springframework.jms.core.MessageCreator; 11 import org.springframework.stereotype.Service; 12 13 @Service 14 //话题 15 public class TopicSender { 16 // 注入jmsTemplate 17 @Autowired 18 @Qualifier("jmsTopicTemplate") 19 private JmsTemplate jmsTemplate; 20 //设置话题名字,和消息内容 21 public void send(String topicName, final String message) { 22 jmsTemplate.send(topicName, new MessageCreator() { 23 24 public Message createMessage(Session session) throws JMSException { 25 return session.createTextMessage(message); 26 } 27 }); 28 } 29 30 }
测试用例:
1 package cn.itlyj.activemq.producer.test; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.test.context.ContextConfiguration; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 import cn.itcast.activemq.producer.queue.QueueSender; 10 import cn.itcast.activemq.producer.topic.TopicSender; 11 12 @RunWith(SpringJUnit4ClassRunner.class) 13 @ContextConfiguration(locations = "classpath:applicationContext-mq.xml") 14 public class ProducerTest { 15 @Autowired 16 private QueueSender queueSender; 17 18 @Autowired 19 private TopicSender topicSender; 20 21 @Test 22 public void testSendMessage() { 23 queueSender.send("spring_queue", "你好,MQ"); 24 topicSender.send("spring_topic", "你好,Spring"); 25 } 26 }
消费者的配置文件:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:amq="http://activemq.apache.org/schema/core" 8 xmlns:jms="http://www.springframework.org/schema/jms" 9 xsi:schemaLocation=" 10 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd 11 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd 12 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd 13 http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd 14 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd 15 http://www.springframework.org/schema/data/jpa 16 http://www.springframework.org/schema/data/jpa/spring-jpa.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms.xsd 19 http://activemq.apache.org/schema/core 20 http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd "> 21 22 <!-- 扫描包 --> 23 <context:component-scan base-package="cn.itlyj.activemq.consumer" /> 24 25 <!-- ActiveMQ 连接工厂 --> 26 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 27 <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> 28 <amq:connectionFactory id="amqConnectionFactory" 29 brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> 30 31 <!-- Spring Caching连接工厂 --> 32 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 33 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 34 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 35 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 36 <!-- 同上,同理 --> 37 <!-- <constructor-arg ref="amqConnectionFactory" /> --> 38 <!-- Session缓存数量 --> 39 <property name="sessionCacheSize" value="100" /> 40 </bean> 41 42 <!-- 消息消费者 start--> 43 44 <!-- 定义Queue监听器 --> 45 <jms:listener-container destination-type="queue" container-type="default" 46 connection-factory="connectionFactory" acknowledge="auto"> 47 <!-- 默认注册bean名称,应该是类名首字母小写 --> 48 <jms:listener destination="spring_queue" ref="queueConsumer1"/> 49 <jms:listener destination="spring_queue" ref="queueConsumer2"/> 50 </jms:listener-container> 51 52 <!-- 定义Topic监听器 --> 53 <jms:listener-container destination-type="topic" container-type="default" 54 connection-factory="connectionFactory" acknowledge="auto"> 55 <jms:listener destination="spring_topic" ref="topicConsumer1"/> 56 <jms:listener destination="spring_topic" ref="topicConsumer2"/> 57 </jms:listener-container> 58 59 <!-- 消息消费者 end --> 60 61 62 </beans>
编写对列、话题消费者类:
1 package cn.itlyj.activemq.consumer.queue; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 10 @Service 11 public class QueueConsumer implements MessageListener { 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage) message; 14 try { 15 System.out 16 .println("消费者QueueConsumer获取消息:" + textMessage.getText()); 17 } catch (JMSException e) { 18 e.printStackTrace(); 19 } 20 } 21 }
1 package cn.itlyj.activemq.consumer.topic; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 10 @Service 11 public class TopicConsumer1 implements MessageListener { 12 13 public void onMessage(Message message) { 14 TextMessage textMessage = (TextMessage) message; 15 try { 16 System.out 17 .println("消费者TopicConsume获取消息:" + textMessage.getText()); 18 } catch (JMSException e) { 19 e.printStackTrace(); 20 } 21 } 22 23 }
测试消费者获取到生产者的信息:
1 package cn.itlyj.activemq.producer.test; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.test.context.ContextConfiguration; 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 7 8 @RunWith(SpringJUnit4ClassRunner.class) 9 @ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml") 10 public class ConsumerTest { 11 12 @Test 13 public void testConsumerMessage() { 14 while (true) { 15 // junit退出,防止进程死掉 16 } 17 } 18 }
标签:stop producer context 信息 creat local str 场景 pat
原文地址:http://www.cnblogs.com/lyujun/p/7206679.html