码迷,mamicode.com
首页 > 编程语言 > 详细

JMS实现-ActiveMQ,介绍,安装,使用,注意点,spring整合

时间:2016-07-10 19:02:29      阅读:455      评论:0      收藏:0      [点我收藏+]

标签:

[TOC]
缘由:
最近在用netty开发游戏服务器,目前有这样的一个场景,聊天服务器和逻辑服务器要进行消息交互,比如,某个玩家往某个公会提交了加入申请,这个申请动作是在逻辑服务器上完成的,但是要产生一条申请消息,由聊天服务器推送到对应的公会频道,目前这个申请消息就是通过jms发送到聊天服务器上,聊天服务器监听到后,推送到对应的公会频道.
下面主要介绍以下几点
- JMS简介
- 消息传递模型
- ActiveMQ介绍
- 安装使用
- spring整合JMS
- 代码相关


JMS简介

J Java 消息服务(Java Message Service,简称JMS)是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。JMS 在其中扮演的角色与JDBC 很相似,正如JDBC 提供了一套用于访问各种不同关系数据库的公共API,JMS 也提供了独立于特定厂商的企业消息系统访问方式。
使用JMS 的应用程序被称为JMS 客户端,处理消息路由与传递的消息系统被称为JMS Provider,而JMS 应用则是由多个JMS 客户端和一个JMS Provider 构成的业务系统。发送消息的JMS 客户端被称为生产者(producer),而接收消息的JMS 客户端则被称为消费者(consumer)。同一JMS 客户端既可以是生产者也可以是消费者。
JMS 的编程过程很简单,概括为:应用程序A 发送一条消息到消息服务器(也就是JMS Provider)的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A 和应用程序B 没有直接的代码关连,所以两者实现了解偶。如图
技术分享

消息组成

1. 头(head)
    每条JMS 消息都必须具有消息头。头字段包含用于路由和识别消息的值。可以通过多种方式来设置消息头的值:

a. 由JMS 提供者在生成或传送消息的过程中自动设置
b. 由生产者客户机通过在创建消息生产者时指定的设置进行设置
c. 由生产者客户机逐一对各条消息进行设置

  1. 属性(property)
    消息可以包含称作属性的可选头字段。他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,其中可以包括如下信息:创建数据的进程、数据的创建时间以及每条数据的结构。JMS提供者也可以添加影响消息处理的属性,如是否应压缩消息或如何在消息生命周期结束时废弃消息。

  2. 主体(body)
    包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。
    StreamMessage 一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。
    MapMessage 一种主体中包含一组键–值对的消息。没有定义条目顺序。
    TextMessage 一种主体中包含Java字符串的消息(例如,XML消息)。
    ObjectMessage 一种主体中包含序列化Java对象的消息。
    BytesMessage 一种主体中包含连续字节流的消息。

消息传递模型

JMS支持两种消息传递模型:点对点(point-to-point,简称PTP)和发布/订阅(publish/subscribe,简称pub/sub)。这两种消息传递模型非常相似,但有以下区别:
a. PTP消息传递模型规定了一条消息之恩能够传递费一个接收方。
b. Pub/sub消息传递模型允许一条消息传递给多个接收方
每个模型都通过扩展公用基类来实现。例如:javax.jms.Queue和Javax.jms.Topic都扩展自javax.jms.Destination类。

  1. 点对点消息传递
    通过点对点的消息传递模型,一个应用程序可以向另外一个应用程序发送消息。在此传递模型中,目标类型时队列。消息首先被传送至队列目标,然后从改对垒将消息传送至对此队列进行监听的某个消费者,如下图:
    技术分享
    一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方。如果多个接收方正在监听队列上的消息,JMS Provider将根据“先来者优先”的原则确定由哪个接收方接受下一条消息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息传递模型是传统意义上的拉模型或轮询模型。在此列模型中,消息不会自动推动给客户端的,而是要由客户端从队列中请求获得。
  2. 发布/订阅消息传递
    通过发布/订阅消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的或送消费者。如下图:
    技术分享
    主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时改消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上时一个推模型。在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方法来获得新的消息。
    上面两种消息传递模型里,我们都需要定义消息生产者和消费者,生产者吧消息发送到JMS Provider的某个目标地址(Destination),消息从该目标地址传送至消费者。消费者可以同步或异步接收消息,一般而言,异步消息消费者的执行和伸缩性都优于同步消息接收者,体现在:
    1. 异步消息接收者创建的网络流量比较小。单向对东消息,并使之通过管道进入消息监听器。管道操作支持将多条消息聚合为一个网络调用。
    2. 异步消息接收者使用线程比较少。异步消息接收者在不活动期间不使用线程。同步消息接收者在接收调用期间内使用线程,结果线程可能会长时间保持空闲,尤其是如果该调用中指定了阻塞超时。

QUEUE和TOPIC的比较

  1. JMS Queue执行load balancer语义
    一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它讲被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另外一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
  2. Topic实现publish和subscribe语义
    一条消息被publish时,他将发送给所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
  3. 分别对应两种消息模式
    Point-to-Point(点对点),Publisher/Subscriber Model(发布/订阅者)
    其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化订阅)和durable subscription(持久化订阅)两种消息处理方式。

ActiveMQ介绍

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

特点

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring4.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试
  11. ActiveMQ速度非常快;一般要比jbossMQ快10倍。

ActiveMQ应用场景

  1. 不同语言应用集成
    ActiveMQ 中间件用Java语言编写,因此自然提供Java客户端 API。但是ActiveMQ 也为C/C++、.NET、Perl、PHP、Python、Ruby 和一些其它语言提供客户端。在你考虑如何集成不同平台不同语言编写应用的时候,ActiveMQ 拥有巨大优势。在这样的例子中,多种客户端API通过ActiveMQ 发送和接受消息成为可能,无论使用的是什么语言。此外,ActiveMQ 还提供交叉语言功能,该功能整合这种功能,无需使用远程过程调用(RPC)确实是个优势,因为消息协助应用解耦。
  2. 作为RPC的替代
    使用RPC同步调用的应用十分普遍。假设大多数客户端服务器应用使用RPC,包括ATM、大多数WEB应用、信用卡系统、销售点系统等等。尽管很多系统很成功,但是转换使用异步消息可以带来很多好处,而且也不会放弃响应保证。使用同步请求的系统在规模上有较大的限制,因为请求会被阻塞,从而导致整个系统变慢。如果使用异步消息替代,可以很容易增加额外的消息接收者,使得消息能被并发消耗,从而加快请求处理。当然,你的系统应用间应该是解耦的。
  3. 应用之间解耦
    正如之前讨论的,紧耦合架构可以导致很多问题,尤其是如果他们是分布的。松耦合架构,在另一方面,证实了更少的依赖性,能够更好地处理不可预见的改变。不仅可以在系统中改变组件而不影响整个系统,而且组件交互也相当的简单。相比使用同步的系统(调用者必须等待被调用者返回信息),异步系统(调用方发送消息后就不管,即fire-and-forget)能够给我们带来事件驱动架构(event-driven architecture EDA)。
  4. 作为事件驱动架构的主干
    解耦,异步架构的系统允许通过代理器自己配置更多的客户端,内存等(即vertical scalability)来扩大系统,而不是增加更多的代理器(即horizontal scalability)。考虑如亚马逊这样繁忙的电子商务系统。当用户购买物品,事实上系统需要很多步骤去处理,包括下单,创建发票,付款,执行订单,运输等。但是用户下单后,会立即返回“谢谢你下单”的界面。不只是没有延迟,而且用户还会受到一封邮件表明订单已经收到。在亚马逊下单的例子就是一个多步处理的例子。每一步都由单独的服务去处理。当用户下单是,有一个同步的体积表单动作,但整个处理流程并不通过浏览器同步处理。相反地,订单马上被接受和反馈。而剩下的步骤就通过异步处理。如果在处理过程中出错,用户会通过邮件收到通知。这样的异步处理能提供高负载和高可用性。
  5. 提高系统扩展性
    很多使用事件驱动设计的系统是为了获得高可扩展性,例如电子商务,政府,制造业,线上游戏等。通过异步消息分开商业处理步骤给各个应用,能够带来很多可能性。考虑设计一个应用来完成一项特殊的任务。这就是面向服务的架构(service-oriented architecture SOA)。每一个服务完成一个功能并且只有一个功能。应用就通过服务组合起来,服务间使用异步消息和最终一致性。这样的设计便可以引入一个复杂事件处理概念(complex event processing CEP)。使用CEP,部件间的交互可以被记录追踪。在异步消息系统中,可以很容易在部件间增加一层处理。

安装使用

下载安装包 http://activemq.apache.org/activemq-5133-release.html

系统 版本
windows apache-activemq-5.13.3-bin.zip
linux apache-activemq-5.13.3-bin.tar.gz

win下 解压得到如下图目录,根据操作系统进入对应的文件夹win32或者win64,双击activemq.bat运行.
技术分享
技术分享
技术分享
ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。点击QUEUES可以看到当前的消息队列.
技术分享

spring整合JMS

spring-jms介绍

  1. spring提供了一个jms集成框架,这个框架如spring 集成jdbc api一样,简化了jms api的使用。
  2. jms可以简单的分成两个功能区,消息的生产和消息的消费。JmsTemplate类用来生成消息和同步接受消息。和其它java ee的消息驱动样式一样,对异步消息,spring也提供了许多消息监听容器用来创建消息驱动的POJO(MDPs)。spring同时也提供了创建消息监听器的声明方式。
  3. org.springframework.jms.core 提供了使用JMS的核心功能,它包含JmsTemplate类,该类类似于jdbc中的jdbdTemplate,它通过对资源的创建和释放处理来简化jms开发。spring的模板类作为一种设计原则在spring框架中广泛使用,模板类对简单操作提供了帮助方法;对复杂操作,通过继承回调接口提供了重要处理过程的代理。jmsTemplate同样遵循这一设计原则,它提供了发送消息、同步消费消息、为用户提供JMS session和消息生产者的多种便利方法。
  4. org.springframework.jms.support提供了JmsException转译功能。它将checked的JmsException层次转换成uncheckedd异常的镜像层次。若抛出的异常不是javax.jms.JmsException的子类,这个异常将被封装成unchecked异常UncategorizedJmsException。
  5. org.springframework.jms.support.converter 提供了在java对象和jms消息之间转换的抽象MessageConverter。
  6. org.springframework.jms.support.destination提供了管理jms destination的多种策略,如对存放在jndi的destionation提供服务定位功能。
  7. org.springframework.jms.annotation通过使用@JmsListener提供了对注解驱动的监听端的支持。
  8. org.springframework.jms.config 支持jms命名空间的解析,同时也支持配置监听容器和生成监听端。
  9. org.springframework.jms.connection提供了适用于standonle应用的ConnectionFactory的实现。对jms的事务管理实现jmsTranscationmanager. 这允许jms 作为事务资源无缝的集成到spring事务管理机制中。

相关配置

maven依赖

<dependency>
  <groupId>javax.jms</groupId>
  <artifactId>javax.jms-api</artifactId>
  <version>2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>${activemq.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>${activemq.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-spring</artifactId>
  <version>${activemq.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
</dependency>
<dependency>                      
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>${spring.version}</version>
</dependency>

生产者端的spring配置 - activemqContext.xml(该文件之后用import resource=”activemqContext.xml” 导入到spring配置文件中)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="${activemq.url}" userName="admin" password="admin" />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    <!-- Spring JmsTemplate 的消息生产者 start -->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>
    <!--Spring JmsTemplate 的消息生产者 end -->
    <!--这个是队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>UnionApplyMsgQueue</value>
        </constructor-arg>
    </bean>
</beans>  

消费者端spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="${activemq.url}" userName="admin" password="admin" />
    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    <!--这个是队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>UnionApplyMsgQueue</value>
        </constructor-arg>
    </bean>
    <!-- 消息消费者 start -->
    <!-- 消息监听器 -->
    <bean id="queueReceiver1" class="com.game.wego.chat.listener.ConsumerMessageListener" />
    <!-- 定义UnionApplyMsgQueue监听器 -->
    <jms:listener-container destination-type="queue"
        container-type="default" connection-factory="connectionFactory"
        acknowledge="auto">
        <jms:listener destination="UnionApplyMsgQueue" ref="queueReceiver1" />
    </jms:listener-container>
    <!-- 消息消费者 end -->
</beans>  

总结下,消费者端配置监听器,监听queue,有消息,就调用ConsumerMessageListener,这个是自己写的类,实现MessageListener接口的方法即可.生产者端配置队列目的地,JmsTemplate .注意这里我们用的是CachingConnectionFactory.ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer,节省了资源开销.
Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。
ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们在定义一个ConnectionFactory时应该是如下定义:

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
    <property name="brokerURL" value="tcp://localhost:61616"/>  
</bean>  

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
    <property name="connectionFactory" ref="targetConnectionFactory"/>  
    <property name="maxConnections" value="10"/>  
</bean>  

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  
</bean>  

代码相关

  1. ActiveMQ的helloworld
package com.activemq.producter;  

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.DeliveryMode;  
import javax.jms.Destination;  
import javax.jms.JMSException;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  

import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  

/** 
 * ActiveMQ发送类 
 * <功能详细描述> 
 *  
 * @author  Administrator 
 * @version  [版本号, 2014年7月27日] 
 * @see  [相关类/方法] 
 * @since  [产品/模块版本] 
 */  
public class MessageProducter  
{  

    /* 
     * @param args 
     * @see [类、类#方法、类#成员] 
     */  
    public static void main(String[] args) throws JMSException   
    {  
     // ConnectionFactory :连接工厂,JMS 用它创建连接  
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
                        ActiveMQConnection.DEFAULT_USER,  
                        ActiveMQConnection.DEFAULT_PASSWORD,  
                        "tcp://192.168.197.130:61616");  
        //JMS 客户端到JMS Provider 的连接  
        Connection connection = connectionFactory.createConnection();  
        connection.start();  
        // Session: 一个发送或接收消息的线程  
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
        // Destination :消息的目的地;消息发送给谁.  
        // 获取session注意参数值my-queue是Query的名字  
        Destination destination = session.createQueue("my-queue");  
        // MessageProducer:消息生产者  
        MessageProducer producer = session.createProducer(destination);  
        //设置不持久化  
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
        //发送一条消息  
        sendMsg(session, producer);  
        session.commit();  
        connection.close();   


    }  

    /** 
     * 在指定的会话上,通过指定的消息生产者发出一条消息 
     * 
     * @param session    消息会话 
     * @param producer 消息生产者 
     */  
    public static void sendMsg(Session session, MessageProducer producer) throws JMSException {  
            //创建一条文本消息  
            TextMessage message = session.createTextMessage("Hello ActiveMQ!");  
            //通过消息生产者发出消息  
            producer.send(message);  
            System.out.println("");  
    }   
}  
package com.activemq.reciever;  

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.JMSException;  
import javax.jms.MessageConsumer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  

import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  

public class JmsReceiver  
{  

    public static void main(String[] args)  
        throws JMSException  
    {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
        ConnectionFactory connectionFactory =  
            new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,  
                "tcp://192.168.197.130:61616");  
        //JMS 客户端到JMS Provider 的连接  
        Connection connection = connectionFactory.createConnection();  
        connection.start();  
        // Session: 一个发送或接收消息的线程  
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
        // Destination :消息的目的地;消息发送给谁.  
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
        Destination destination = session.createQueue("my-queue");  
        // 消费者,消息接收者  
        MessageConsumer consumer = session.createConsumer(destination);  
        while (true)  
        {  
            TextMessage message = (TextMessage)consumer.receive(1000);  
            if (null != message)  
                System.out.println("收到消息:" + message.getText());  
            else  
                break;  
        }  
        session.close();  
        connection.close();  

    }  

}  

使用了spring-jms之后,就可用用jmsTemplate来取代连接打开关闭维护等事宜,会方便很多.
2. ConsumerMessageListener监听器实现类,消费者用

public class ConsumerMessageListener implements MessageListener {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class);

    @Autowired
    PlayerLogicService playerService;

    @Autowired
    private ResourceDPService resourceDP;

    @Override
    public void onMessage(Message message) {
        ObjectMessage objMsg = (ObjectMessage) message;
        try {
            RspApplyUnionMsg rspApplyMsg = (RspApplyUnionMsg) objMsg.getObject();
            logger.debug("消息内容是:" + JsonUtil.toJsonString(rspApplyMsg) + "申请加入公会");
            Player player = playerService.getOnlinePlayerInThisApp(rspApplyMsg.getPlayerId());
            if (null != player) {
                RspUnionMsg rspData = new RspUnionMsg();
                ResponseMessage resp = new ResponseMessage();
                rspData.setSpeaker(player.getName());
                rspData.setPhyle(player.getPhyle());
                rspData.setPlayerId(player.getId());
                rspData.setMsgType(2);
                resp.setId(com.game.wego.chat.message.Message.RSP_UNION_CHAT_PUSH);
                resp.setData(rspData);
                // 公会内广播
                ChannelGroup channels = GameContext.getUnionGroup(rspApplyMsg.getUnionId());
                if (null != channels) {
                    channels.writeAndFlush(resp);
                }
                resourceDP.pushMsg(rspData, rspApplyMsg.getUnionId());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

生产者实现类

@Service
public class ActivemqService {
    @SuppressWarnings("unused")
    private static final Logger logger = LoggerFactory.getLogger(ActivemqService.class);

    @Autowired  
    @Qualifier("queueDestination")
    private Destination destination; 

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    public void sendMessage(final Serializable message) {
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(message);
            }
        });
    }

}

这里可以用消息转换器MessageConverter来代替代码中的new MessageCreator操作,原理不变, 看着简洁点.参见http://haohaoxuexi.iteye.com/blog/1900937
3. 不同消息类型的收发

/**
   * 向默认队列发送text消息
   */
  public void sendMessage(final String msg) {
    String destination = jmsTemplate.getDefaultDestination().toString();
    System.out.println("ProducerService向队列" + destination + "发送了消息:\t" + msg);
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(msg);
      }
    });
  }

  /**
   * 向默认队列发送map消息
   */
  public void sendMapMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        MapMessage message = session.createMapMessage();
        message.setString("name", "小西山");
        return message;
      }
    });
  }

  /**
   * 向默认队列发送Object消息
   */
  public void sendObjectMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        Staff staff = new Staff(1, "搬砖工"); // Staff必须实现序列化
        ObjectMessage message = session.createObjectMessage(staff);
        return message;
      }
    });
  }

  /**
   * 向默认队列发送Bytes消息
   */
  public void sendBytesMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        String str = "BytesMessage 字节消息";
        BytesMessage message = session.createBytesMessage();
        message.writeBytes(str.getBytes());
        return message;
      }
    });
  }

  /**
   * 向默认队列发送Stream消息
   */
  public void sendStreamMessage() {
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        String str = "StreamMessage 流消息";
        StreamMessage message = session.createStreamMessage();
        message.writeString(str);
        message.writeInt(521);
        return message;
      }
    });
  }
/**
   * 接受消息
   */
  public void receive(Destination destination) throws JMSException {
    Message message = jmsTemplate.receive(destination);
    // 如果是文本消息
    if (message instanceof TextMessage) {
      TextMessage tm = (TextMessage) message;
      System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + tm.getText());
    }

    // 如果是Map消息
    if (message instanceof MapMessage) {
      MapMessage mm = (MapMessage) message;
      System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t"
          + mm.getString("name"));
    }

    // 如果是Object消息
    if (message instanceof ObjectMessage) {
      ObjectMessage om = (ObjectMessage) message;
      Staff staff = (Staff) om.getObject();
      System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + staff);
    }

    // 如果是bytes消息
    if (message instanceof BytesMessage) {
      byte[] b = new byte[1024];
      int len = -1;
      BytesMessage bm = (BytesMessage) message;
      while ((len = bm.readBytes(b)) != -1) {
        System.out.println(new String(b, 0, len));
      }
    }

    // 如果是Stream消息
    if (message instanceof StreamMessage) {
      StreamMessage sm = (StreamMessage) message;
      System.out.println(sm.readString());
      System.out.println(sm.readInt());
    }

  }

应用对象消息时遇到的一个bug,对象消息时,对象必须实现序列化接口,为了传输需要,序列化前后要对比id.确保无误.他们去使用的总是同一个对象,包括序列id都要相同,就导致两个系统的时候,要公共依赖同一个地方的同一个对象文件.这点要注意.

JMS实现-ActiveMQ,介绍,安装,使用,注意点,spring整合

标签:

原文地址:http://blog.csdn.net/lisaem/article/details/51858437

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!