码迷,mamicode.com
首页 > 其他好文 > 详细

JMS编程模型

时间:2019-06-25 00:07:15      阅读:115      评论:0      收藏:0      [点我收藏+]

标签:cti   live   string   订阅模式   资源   而不是   开启   als   发送消息   

模型结构

JMS编程模型由以下几个组成:

  • ConnectionFactory:连接工厂(创建连接)
  • Connection:连接(创建会话)
  • Session:会话(创建目的地、生产者、消费者、消息)
  • Destination:目的地(消息发送目标)
  • MessageProducer:消息生产者(发送消息)
  • MessageConsumer:消息消费者(消费消息)
  • Message:消息(内容主体)

下面用一张图片展示几个组成部分是如何联系在一起的

技术图片

下面将逐个了解每个部分,并且以activeMQ的实现作为代码片段部分示例。

ConnectionFactory

顾名思义,一个ConnectionFactory是客户端用来创建Connection的接口。基于工厂模式,它简化了Connection的创建。除了ConnectionFactory接口,常见的还有QueueConnectionFactory和TopicConnectionFactory,它们都继承自ConnectionFactory。

创建一个ConnectionFactory的代码片段如下:

1 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

Connection

有了ConnectionFactory我们就可以创建Connection了,Connection表示的是一个虚拟的连接,也就是代表着打开了一个由客户端到消息代理端的socket连接。Connection可以用来创建Session。

下面我们看看ConnectionFactory来创建Connection的示例:

1 Connection connection = connectionFactory.createConnection();

在使用Connection之前,你必须先调用start方法开启连接

1 connection.start();

在使用完了之后,你必须调用close方法关闭资源。注意,close方法会关闭Connection创建的Session、MessageProducer、MessageConsumer。另外,如果close方法调用失败,那么将会导致资源未被释放的问题。

但是,如果你只是想暂时停止一下消息的传送,那么可以调用stop方法,而不是将Connection进行close。

Session

session是一个Message的生产和消费的上下文,我们称作会话,由Connection创建。session可以创建MessageProducer、MessageConsumer、Message、Destination。

我们创建一个session

1 Session session = connection.createSession(false, Session.AUTO_ACKNOWEDGE);

第一个入参传入了false,表示不需要事务。第二个入参表示消息被接收以后session会自动做ack确认操作。如果要创建一个有事务的session呢?

1 Session session = connection.createSession(true, 0);

第一个参数true表示开启事务,第二个参数表示不指定ack确认机制。在业务代码完成以后,需要显示提交事务

1 session.commit();

Destination

一个destination表示的是生产者的消息发送目的地,以及消费者消费消息的源头。在点对点模式中,destination又被称作queue(队列)。在发布订阅模式中,destination被称作topic(话题)。

Destination由session创建,创建一个queue

1 Destination destination = session.createQueue("queue1");

创建一个topic

1 Destination destination = session.createTopic("topic1");

MessageProducer

MessageProducer是由session创建的,用于发送Message到destination。我们使用session创建一个MessageProducer,如下

1 MessageProducer producer = session.createProducer(destination);

如果你创建了一个Message对象,你可以使用MessageProducer发送消息

1 producer.send(message);

MessageConsumer

MessageConsumer是由session创建的,将会作为一个消费者消费destination中的Message。创建一个MessageConsumer

1 MessageConsumer consumer = session.createConsumer(destination);

创建了消费者,就可以消费消息了

1 Message message = consumer.receive();

receive方法是同步消费消息的方法,有时候我们不想等待那么久,所以采用异步监听的方式,如

1 Listener listener = new Listener();
2 consumer.setMessageListener(listener);

这里假设Listener是实现了MessageListener接口的监听器,当消息到达的时候onMessage方法将被触发。

Message

Message表示具体的消息,JMS定义了五种消息格式,如:

  1. TextMessage:文本
  2. MapMessage:键值对
  3. BytesMessage:字节码
  4. StreamMessage:流
  5. ObjectMessage:对象

以TextMessage为例,创建一个消息

1 TextMessage message = session.createTextMessage();
2 message.setText("text content");
3 producer.send(message);

如果是MessageConsumerreceive消息

1 Message message = consumer.receive();
2 if (message instanceof TextMessage) {
3     TextMessage textMessage = (TextMessage)message;
4     System.out.println("receive message" + message.getText());
5 }

完整代码

生产

 1 // Create a ConnectionFactory
 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
 3 
 4 // Create a Connection
 5 Connection connection = connectionFactory.createConnection();
 6 connection.start();
 7 
 8 // Create a Session
 9 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
10 
11 // Create the destination (Topic or Queue)
12 Destination destination = session.createQueue("TEST.FOO");
13 
14 // Create a MessageProducer from the Session to the Topic or Queue
15 MessageProducer producer = session.createProducer(destination);
16 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
17 
18 // Create a messages
19 String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
20 TextMessage message = session.createTextMessage(text);
21 
22 // Tell the producer to send the message
23 System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
24 producer.send(message);
25 
26 // Clean up
27 session.close();
28 connection.close();

消费

 1 // Create a ConnectionFactory
 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
 3 
 4 // Create a Connection
 5 Connection connection = connectionFactory.createConnection();
 6 connection.start();
 7 
 8 connection.setExceptionListener(this);
 9 
10 // Create a Session
11 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
12 
13 // Create the destination (Topic or Queue)
14 Destination destination = session.createQueue("TEST.FOO");
15 
16 // Create a MessageConsumer from the Session to the Topic or Queue
17 MessageConsumer consumer = session.createConsumer(destination);
18 
19 // Wait for a message
20 Message message = consumer.receive(1000);
21 
22 if (message instanceof TextMessage) {
23     TextMessage textMessage = (TextMessage) message;
24     String text = textMessage.getText();
25     System.out.println("Received: " + text);
26 } else {
27     System.out.println("Received: " + message);
28 }
29 
30 consumer.close();
31 session.close();
32 connection.close();

原文

https://docs.oracle.com/javaee/1.4/tutorial/doc/JMS4.html#wp78884

JavaDoc

https://docs.oracle.com/javaee/7/api/javax/jms/package-frame.html

 

JMS编程模型

标签:cti   live   string   订阅模式   资源   而不是   开启   als   发送消息   

原文地址:https://www.cnblogs.com/lay2017/p/11080107.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!