一、基本概念
1.1 JMS是什么
JMS Java Message Service,Java消息服务,是JavaEE中的一个技术。
1.2 JMS规范
JMS定义了Java中访问消息中间件的接口,并没有给予实现,实现JMS接口的消息中间件称为JMS Provider,例如ActiveMQ
1.3 JMS provider
实现JMS接口和规范的消息中间件
1.4 JMS message
JMS的消息,JMS消息由一下三部组成:
1:消息头:每个消息头字段都有相应的getter和setter方法
2:消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性
3:消息体:封装具体的消息数据
1.5 JMS producer
消息生产者,创建和发送JMS消息的客户端应用
1.6 JMS consumer
消息消费者,接收和处理JMS消息的客户端应用
消息的消费可以采用以下两种方法之一:
1:同步消费:通过调用消费者的receive方法从目的地中显式提取消息,receive 方法可以一直阻塞到消息到达。
2:异步消费:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作
1.7 JMS domains
消息传递域,JMS规范中定义了两种消息传递域:
1:点对点 (point-to-point,简写成PTP)消息传递域:
2:发布/订阅消息传递域(publish/subscribe,简写:成pub/sub)
点对点消息传递域的特点如下:
(1)每个消息只能有一个消费者
(2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。
发布/订阅消息传递域的特点如下:
(1)每个消息可以有多个消费者
(2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
注意:在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中,目的地被称为主题(topic)
1.8 接口简介
1.8.1 Connection factory
连接工厂,用来创建连接对象,以连接到JMS的provider
1.8.2 JMS Connection
封装了客户与JMS 提供者之间的一个虚拟的连接
1.8.3 JMS session
是生产和消费消息的一个单线程上下文会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
1.8.4 Destination
消息发送到的目的地
1.8.5 Acknowledge
签收
1.8.6 Transaction
事务
1.8.7 JMS client
用来收发消息的Java应用
1.8.8 Non-JMS client
使用JMS provider本地API写的应用,用来替换JMS API实现收发消息的功能,通常会提供其他的一些特性,比如:CORBA、RMI等。
1.8.9 Administered objects
预定义的JMS对象,通常在provider规范中有定义,提供给JMS客户端来访问,比如: ConnectionFactory和Destination
二、JMS的消息结构
JMS消息的组成:消息头,属性和消息体
2.1 消息头
消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如下:
1:JMSDestination:消息发送的目的地,由send方法设置
消息发送的目的地,主要是指Queue和Topic,自动分配
2:JMSDeliveryMode:传送模式,由send方法设置
传送模式,有两种 :持久模式和非持久模式。
一条持久性的消息应该被传送“一次仅仅一次”,这就意味者如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
一条非持久的消息最多会传送一次,这意味这服务器出现故障,该消息将永远丢失。自动分配
3:JMSExpiration:消息过期时间,由send方法设置
消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。
如果timeToLive值不等于零,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
4:JMSPriority:消息优先级,由send方法设置
从 0-9 十个级别,0-4 是普通消息,5-9 是加急消息。
JMS不要求JMSProvider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是4级。
5:JMSMessageID:由send方法设置
唯一识别每个消息的标识,由JMS Provider 产生。
6:JMSTimestamp:由客户端设置
一个JMS Provider在调用send()方法时自动设置的。它是消息被发送和消费者实际接收的时间差。
7:JMSCorrelationID :由客户端设置
用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID可以是任何值,不仅仅是JMSMessageID。由开发者设置
8:JMSReplyTo :由客户端设置
提供本消息回复消息的目的地址。由开发者设置
9:JMSType :由客户端设置
消息类型的识别符。由开发者设置
10:JMSRedelivered:由JMS Provider设置
如果一个客户端收到一个设置了JMSRedelivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。如果该消息被重新传送,JMSRedelivered=true反之,JMSRedelivered =false。
2.2 消息属性
消息属性,包含以下三种类型的属性:
1:应用程序设置和添加的属性,比如:
Message.setStringProperty(“username”,username);
2:JMS定义的属性
使用“JMSX”作为属性名的前缀,connection.getMetaData().getJMSXPropertyNames(),方法返回所有连接支持的JMSX 属性的名字。
3:JMS供应商特定的属性
JMS定义的属性如下:
1:JMSXUserID:发送消息的用户标识,发送时提供商设置
2:JMSXAppID:发送消息的应用标识,发送时提供商设置
3:JMSXDeliveryCount:转发消息重试次数,第一次是1,第二次是2,… ,发送时提供商设置
4:JMSXGroupID:消息所在消息组的标识,由客户端设置
5:JMSXGroupSeq:组内消息的序号第一个消息是1,第二个是2,…,由客户端设置
6:JMSXProducerTXID :产生消息的事务的事务标识,发送时提供商设置
7:JMSXConsumerTXID :消费消息的事务的事务标识,接收时提供商设置
8:JMSXRcvTimestamp :JMS 转发消息到消费者的时间,接收时提供商设置
9:JMSXState:假定存在一个消息仓库,它存储了每个消息的单独拷贝,且这些消息从原始消息被发送时开始。每个拷贝的状态有:1(等待),2(准备),3(到期)或4(保留)。
由于状态与生产者和消费者无关,所以它不是由它们来提供。它只和在仓库中查找消息相关,因此JMS没有提供这种API。由提供商设置
2.3 消息体
消息体,JMS API定义了5种消息体格式,也叫消息类型,可以使用不同形式发送接收数据,并可以兼容现有的消息格式。
包括:TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage
三、示例
3.1 发送
public void test2() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { MapMessage message = session.createMapMessage(); message.setIntProperty("orderId" + i, 1); message.setString("nro" + i, "X2015478464"); producer.send(message); } session.commit(); session.close(); connection.close(); }
3.2 接收
public void test2() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616"); Connection connection = cf.createConnection(); connection.start(); Enumeration names = connection.getMetaData().getJMSXPropertyNames(); while (names.hasMoreElements()) { String name = (String) names.nextElement(); System.out.println("JMSX name = " + name); } final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i=0; while(i<3) { MapMessage message = (MapMessage) consumer.receive(); session.commit(); System.out.println("收到消息:" + message.getString("nro" + i) + ", poperty : " + message.getStringProperty("orderId" + i)); i++; } session.close(); connection.close(); }
3.3 效果
本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1914850
原文地址:http://1754966750.blog.51cto.com/7455444/1914850