码迷,mamicode.com
首页 > 编程语言 > 详细

Spring整合JMS-基于activeMQ实现(一)

时间:2015-07-04 12:45:05      阅读:191      评论:0      收藏:0      [点我收藏+]

标签:activemq

Spring整合JMS-基于activeMQ实现(一)

1.1 JMS简介

     JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

     安装AMQ博客:http://blog.csdn.net/liupeng_family/article/details/46728091


1.2 Spring整合JMS

          <dependencies>
             <dependency >
                   <groupId >junit</groupId>
                   <artifactId >junit</artifactId>
                   <version >4.10 </version >
                   <scope >test </scope >
             </dependency >
             <dependency >
                   <groupId >org.springframework </groupId >
                   <artifactId >spring-context </artifactId >
                   <version >3.2.5.RELEASE </version >
             </dependency >
            
             <dependency >
                   <groupId >org.springframework </groupId >
                   <artifactId >spring- jms</ artifactId>
                   <version >3.2.5.RELEASE </version >
             </dependency >
            
             <dependency >
                   <groupId >org.springframework </groupId >
                   <artifactId >spring-test </artifactId >
                   <version >3.2.5.RELEASE </version >
             </dependency >

                         <dependency >
                   <groupId >org.springframework </groupId >
                   <artifactId >spring- web</ artifactId>
                   <version >3.2.5.RELEASE </version >
             </dependency >
            
             <dependency >
                   <groupId >javax.annotation </groupId >
                   <artifactId >jsr250- api</ artifactId>
                   <version >1.0 </version >
             </dependency >
             <dependency >
                   <groupId >org.apache.activemq </groupId >
                   <artifactId >activemq-core</ artifactId>
                   <version >5.7.0 </version >
             </dependency >
      </dependencies >

1.2.1 ActiveMQ准备
      官网下载(http://activemq.apache.org/download.html),解压后运行bin下面的activemq.bat文件启动activeMQ

1.2.2 配置ConnectionFactory
     Spring为我们提供多个ConnectionFactory,有SingleConnectionFactory和CacheConnectionFactory。
     1、SingleConnectionFactory对于建立JMS服务器连接的请求会一直返回同一个连接,并且会忽略Connection的close方法调用
     2、CachingConnectionFactory继承了SingleConnectionFactory,所以他拥有其所有功能,同时新增缓存功能,可以缓存Session、MessageProducer和MessageConsumer。
     <beans xmlns= "http://www.springframework.org/schema/beans"
       xmlns:aop= "http://www.springframework.org/schema/aop" xmlns:tx= "http://www.springframework.org/schema/tx"
       xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms= "http://www.springframework.org/schema/jms"
       xsi:schemaLocation= "
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-3.1.xsd ">
    <bean id= "connectionFactory" class= "org.springframework.jms.connection.CachingConnectionFactory" >

       <property name ="targetConnectionFactory">
            <!-- 真正可以产生Connection的ConnectionFactory,由对应的JMS服务厂商提供 -->
            <bean class= "org.apache.activemq.ActiveMQConnectionFactory" >
                 <property name ="brokerURL">
                      <value >tcp://localhost:61616</value >
                      <!-- failover://(${mq.address})?randomize=false&amp;jms.useAsyncSend=true -->
                 </property >
            </bean >
       </property >
       <property name ="sessionCacheSize" value= "1" />
   </bean >

1.2.3 配置生产者
     生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。服务实现类利用Spring为我们提供的JmsTemplate,所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,要知道消息往哪里发,为此,在定义JmsTemplate的时候需要注入一个Spring提供的ConnectionFactory对象。
              <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
      <bean id ="jmsTemplate" class= "org.springframework.jms.core.JmsTemplate" >
             <property name ="connectionFactory" ref= "connectionFactory"></property >
             <property name ="defaultDestinationName" value= "subject"></property >
             <property name ="deliveryPersistent" value= "true"></property >
             <property name ="pubSubDomain" value="false"></ property> <!-- false p2p,true topic -->
             <property name ="sessionAcknowledgeMode" value= "1"></property >
             <property name ="explicitQosEnabled" value= "true"></property >
             <property name ="timeToLive" value="604800000"></ property>
      </bean >


1.2.4 配置队列
     在真正利用JmsTemplate进行消息发送的时候,我们需要知道目的地(destination)。在JMS中存在Destination接口,它里面没有任何方法定义,用来做一个标识而已。当使用JmsTemplate进行消息发送的时候没有指定目的地会使用默认的Destination。默认的Destination在定义JmsTemplate bean时候通过defaultDestination或者defaultDestinationName来进行注入。
     在AMQ中存在两种类型的Destination,一种是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。
             <!-- 配置Queue,队列目的地,其中value为Queue名称 -->
      <bean id ="testQueue" class= "org.apache.activemq.command.ActiveMQQueue" >
             <constructor-arg index ="0" value="queue.sys.liupeng.test.add" />
      </bean >
            
  <!-- 配置Topic,主题目的地,其中value为Topic名称
 <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="topic.adapter.amq.destination" />
 </bean>
 -->

<!-- 注入AMQ的实现类属性(JmsTemplate和Destination) -->
       <bean id = "amqQueueSender" class = "com.tuniu.scc.purchase.plan.manage.core.amq.AMQQueueSender" >
            <property name = "jmsTemplate" ref="jmsTemplate" ></property >
       </bean >

1.2.5 配置消费者
        <!-- 消息监听器 -->  
    <bean id = "consumerMessageListener" class= "com.liupeng.test.service.ConsumerMessageListener" /> 
    <!-- 消息监听容器 --> 
    <bean id = "jmsContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" >
        <property name = "connectionFactory" ref= "connectionFactory" />  
        <property name = "destination" ref= "testQueue" /><!-- 消费者队列名称,修改 -->
        <property name = "messageListener" ref= "consumerMessageListener" /> 
    </bean > 
    <!-- <jms:listener-container connection-factory="connectionFactory" destination-type="queue">
            <jms:listener destination="${real.stock.info}" ref="consumerMessageListener" method="onMessage" />
      </jms:listener-container> -->

1.2.6 生产消息
      通过JmsTemplate来发送消息到对应的Destination
     public class AMQQueueSender {
    private JmsTemplate jmsTemplate;
    /**
     * 发送消息
     * @param message :
     * @throws InterruptedException
     */
    public void sendMessage(Destination destination, final String message) throws InterruptedException {
        System. out.println( "-----生产者发送消息-----" );
        System. out.println( "-----生产者发了一个消息:" +message);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
  
}


1.2.7 消费消息
     消费者通过Spring为我们封装的消息监听容器MessageListenerContainer接收消息,并把接收到的消息分发给真正的MessageListener进行处理。每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于该容器而言,需要知道监听哪个目的地、去哪里监听(监听哪个JMS服务器),这是通过配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。所以在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是从哪里监听的ConnectionFactory,一个是监听什么Destination,一个进行消息处理的MessageListener。Spring为我们提供了两种类型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。
     SimpleMessageListenerContainer会在一开始的时候就创建一个session和consumer,并且会使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态适应运行时需要和参与外部的事务管理,兼容性方面,非常接近于独立的JMS规范,但一般不兼容Java EE的JMS限制
     DefaultMessageListenerContainer会动态的适应运行时需要,能够参与外部事务管理,兼容Java EE

     public class ConsumerMessageListener implements MessageListener{
    public void onMessage(Message message) {
        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
        TextMessage textMessage = (TextMessage)message;
        System. out.println( "接收到一个纯文本消息" );
        try {
            System. out.println( "消息内容是:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

1.2.8 测试使用
       @Resource
    private Destination testQueue;
    @Resource
    private AMQQueueSender amqQueueSender;
    @RequestMapping(value = "/liupeng/testAMQ", method = RequestMethod.POST)
    public void test() {
        System. out.println( "开始发送");
        try {
            amqQueueSender.sendMessage( testQueue, "消息1");
            amqQueueSender.sendMessage( testQueue, "消息2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System. out.println( "---------");
    }














版权声明:本文为博主原创文章,未经博主允许不得转载。

Spring整合JMS-基于activeMQ实现(一)

标签:activemq

原文地址:http://blog.csdn.net/liupeng_family/article/details/46754107

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!