标签:
[TOC]
缘由:
最近在用netty开发游戏服务器,目前有这样的一个场景,聊天服务器和逻辑服务器要进行消息交互,比如,某个玩家往某个公会提交了加入申请,这个申请动作是在逻辑服务器上完成的,但是要产生一条申请消息,由聊天服务器推送到对应的公会频道,目前这个申请消息就是通过jms发送到聊天服务器上,聊天服务器监听到后,推送到对应的公会频道.
下面主要介绍以下几点
- JMS简介
- 消息传递模型
- ActiveMQ介绍
- 安装使用
- spring整合JMS
- 代码相关
J Java 消息服务(Java Message Service,简称JMS)是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。JMS 在其中扮演的角色与JDBC 很相似,正如JDBC 提供了一套用于访问各种不同关系数据库的公共API,JMS 也提供了独立于特定厂商的企业消息系统访问方式。
使用JMS 的应用程序被称为JMS 客户端,处理消息路由与传递的消息系统被称为JMS Provider,而JMS 应用则是由多个JMS 客户端和一个JMS Provider 构成的业务系统。发送消息的JMS 客户端被称为生产者(producer),而接收消息的JMS 客户端则被称为消费者(consumer)。同一JMS 客户端既可以是生产者也可以是消费者。
JMS 的编程过程很简单,概括为:应用程序A 发送一条消息到消息服务器(也就是JMS Provider)的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A 和应用程序B 没有直接的代码关连,所以两者实现了解偶。如图
1. 头(head)
每条JMS 消息都必须具有消息头。头字段包含用于路由和识别消息的值。可以通过多种方式来设置消息头的值:
a. 由JMS 提供者在生成或传送消息的过程中自动设置
b. 由生产者客户机通过在创建消息生产者时指定的设置进行设置
c. 由生产者客户机逐一对各条消息进行设置
属性(property)
消息可以包含称作属性的可选头字段。他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,其中可以包括如下信息:创建数据的进程、数据的创建时间以及每条数据的结构。JMS提供者也可以添加影响消息处理的属性,如是否应压缩消息或如何在消息生命周期结束时废弃消息。
主体(body)
包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。
StreamMessage 一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。
MapMessage 一种主体中包含一组键–值对的消息。没有定义条目顺序。
TextMessage 一种主体中包含Java字符串的消息(例如,XML消息)。
ObjectMessage 一种主体中包含序列化Java对象的消息。
BytesMessage 一种主体中包含连续字节流的消息。
JMS支持两种消息传递模型:点对点(point-to-point,简称PTP)和发布/订阅(publish/subscribe,简称pub/sub)。这两种消息传递模型非常相似,但有以下区别:
a. PTP消息传递模型规定了一条消息之恩能够传递费一个接收方。
b. Pub/sub消息传递模型允许一条消息传递给多个接收方
每个模型都通过扩展公用基类来实现。例如:javax.jms.Queue和Javax.jms.Topic都扩展自javax.jms.Destination类。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
下载安装包 http://activemq.apache.org/activemq-5133-release.html
系统 | 版本 |
---|---|
windows | apache-activemq-5.13.3-bin.zip |
linux | apache-activemq-5.13.3-bin.tar.gz |
win下 解压得到如下图目录,根据操作系统进入对应的文件夹win32或者win64,双击activemq.bat运行.
ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。点击QUEUES可以看到当前的消息队列.
maven依赖
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
生产者端的spring配置 - activemqContext.xml(该文件之后用import resource=”activemqContext.xml” 导入到spring配置文件中)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="${activemq.url}" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- Spring JmsTemplate 的消息生产者 start -->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--Spring JmsTemplate 的消息生产者 end -->
<!--这个是队列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>UnionApplyMsgQueue</value>
</constructor-arg>
</bean>
</beans>
消费者端spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd">
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="${activemq.url}" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!--这个是队列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>UnionApplyMsgQueue</value>
</constructor-arg>
</bean>
<!-- 消息消费者 start -->
<!-- 消息监听器 -->
<bean id="queueReceiver1" class="com.game.wego.chat.listener.ConsumerMessageListener" />
<!-- 定义UnionApplyMsgQueue监听器 -->
<jms:listener-container destination-type="queue"
container-type="default" connection-factory="connectionFactory"
acknowledge="auto">
<jms:listener destination="UnionApplyMsgQueue" ref="queueReceiver1" />
</jms:listener-container>
<!-- 消息消费者 end -->
</beans>
总结下,消费者端配置监听器,监听queue,有消息,就调用ConsumerMessageListener,这个是自己写的类,实现MessageListener接口的方法即可.生产者端配置队列目的地,JmsTemplate .注意这里我们用的是CachingConnectionFactory.ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer,节省了资源开销.
Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。
ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们在定义一个ConnectionFactory时应该是如下定义:
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
package com.activemq.producter;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ发送类
* <功能详细描述>
*
* @author Administrator
* @version [版本号, 2014年7月27日]
* @see [相关类/方法]
* @since [产品/模块版本]
*/
public class MessageProducter
{
/*
* @param args
* @see [类、类#方法、类#成员]
*/
public static void main(String[] args) throws JMSException
{
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://192.168.197.130:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Destination destination = session.createQueue("my-queue");
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送一条消息
sendMsg(session, producer);
session.commit();
connection.close();
}
/**
* 在指定的会话上,通过指定的消息生产者发出一条消息
*
* @param session 消息会话
* @param producer 消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer) throws JMSException {
//创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
//通过消息生产者发出消息
producer.send(message);
System.out.println("");
}
}
package com.activemq.reciever;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsReceiver
{
public static void main(String[] args)
throws JMSException
{
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://192.168.197.130:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
Destination destination = session.createQueue("my-queue");
// 消费者,消息接收者
MessageConsumer consumer = session.createConsumer(destination);
while (true)
{
TextMessage message = (TextMessage)consumer.receive(1000);
if (null != message)
System.out.println("收到消息:" + message.getText());
else
break;
}
session.close();
connection.close();
}
}
使用了spring-jms之后,就可用用jmsTemplate来取代连接打开关闭维护等事宜,会方便很多.
2. ConsumerMessageListener监听器实现类,消费者用
public class ConsumerMessageListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class);
@Autowired
PlayerLogicService playerService;
@Autowired
private ResourceDPService resourceDP;
@Override
public void onMessage(Message message) {
ObjectMessage objMsg = (ObjectMessage) message;
try {
RspApplyUnionMsg rspApplyMsg = (RspApplyUnionMsg) objMsg.getObject();
logger.debug("消息内容是:" + JsonUtil.toJsonString(rspApplyMsg) + "申请加入公会");
Player player = playerService.getOnlinePlayerInThisApp(rspApplyMsg.getPlayerId());
if (null != player) {
RspUnionMsg rspData = new RspUnionMsg();
ResponseMessage resp = new ResponseMessage();
rspData.setSpeaker(player.getName());
rspData.setPhyle(player.getPhyle());
rspData.setPlayerId(player.getId());
rspData.setMsgType(2);
resp.setId(com.game.wego.chat.message.Message.RSP_UNION_CHAT_PUSH);
resp.setData(rspData);
// 公会内广播
ChannelGroup channels = GameContext.getUnionGroup(rspApplyMsg.getUnionId());
if (null != channels) {
channels.writeAndFlush(resp);
}
resourceDP.pushMsg(rspData, rspApplyMsg.getUnionId());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
生产者实现类
@Service
public class ActivemqService {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(ActivemqService.class);
@Autowired
@Qualifier("queueDestination")
private Destination destination;
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
public void sendMessage(final Serializable message) {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(message);
}
});
}
}
这里可以用消息转换器MessageConverter来代替代码中的new MessageCreator操作,原理不变, 看着简洁点.参见http://haohaoxuexi.iteye.com/blog/1900937
3. 不同消息类型的收发
/**
* 向默认队列发送text消息
*/
public void sendMessage(final String msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println("ProducerService向队列" + destination + "发送了消息:\t" + msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
/**
* 向默认队列发送map消息
*/
public void sendMapMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("name", "小西山");
return message;
}
});
}
/**
* 向默认队列发送Object消息
*/
public void sendObjectMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Staff staff = new Staff(1, "搬砖工"); // Staff必须实现序列化
ObjectMessage message = session.createObjectMessage(staff);
return message;
}
});
}
/**
* 向默认队列发送Bytes消息
*/
public void sendBytesMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
String str = "BytesMessage 字节消息";
BytesMessage message = session.createBytesMessage();
message.writeBytes(str.getBytes());
return message;
}
});
}
/**
* 向默认队列发送Stream消息
*/
public void sendStreamMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
String str = "StreamMessage 流消息";
StreamMessage message = session.createStreamMessage();
message.writeString(str);
message.writeInt(521);
return message;
}
});
}
/**
* 接受消息
*/
public void receive(Destination destination) throws JMSException {
Message message = jmsTemplate.receive(destination);
// 如果是文本消息
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + tm.getText());
}
// 如果是Map消息
if (message instanceof MapMessage) {
MapMessage mm = (MapMessage) message;
System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t"
+ mm.getString("name"));
}
// 如果是Object消息
if (message instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage) message;
Staff staff = (Staff) om.getObject();
System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:\t" + staff);
}
// 如果是bytes消息
if (message instanceof BytesMessage) {
byte[] b = new byte[1024];
int len = -1;
BytesMessage bm = (BytesMessage) message;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}
// 如果是Stream消息
if (message instanceof StreamMessage) {
StreamMessage sm = (StreamMessage) message;
System.out.println(sm.readString());
System.out.println(sm.readInt());
}
}
应用对象消息时遇到的一个bug,对象消息时,对象必须实现序列化接口,为了传输需要,序列化前后要对比id.确保无误.他们去使用的总是同一个对象,包括序列id都要相同,就导致两个系统的时候,要公共依赖同一个地方的同一个对象文件.这点要注意.
JMS实现-ActiveMQ,介绍,安装,使用,注意点,spring整合
标签:
原文地址:http://blog.csdn.net/lisaem/article/details/51858437