标签:生产者和消费者 ges print pen 形式 item 增加 The div
<dependencies> <!-- activemq 所需要的jar 包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!-- activemq 和 spring 整合的基础包 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> </dependencies>
JMS开发的基本步骤:
创建一个connection factory。
通过connection factory 来创建JMS connection。
启动JMS connection。
通过connection创建JMS session。
创建JMS destination。
创建JMS producer 或者创建JMS message 并设置destination。
创建JMS consumer 或者是注册一个 JMS message listener。
发送或者接受JMS message(s)。
关闭所有的JMS资源(connection、session、producer 、consumer )。
Destination是目的地,分为两种:队列和主题。下图介绍:
public class JmsProduce { // linux 上部署的activemq 的 IP 地址 + activemq 的端口号 public static final String ACTIVEMQ_URL = "tcp://192.168.126.133:61616"; // 目的地的名称 public static final String QUEUE_NAME = "jdy01"; public static void main(String[] args) throws Exception{ // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。 // 该类的其他构造方法可以指定用户名和密码。 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2 通过连接工厂,获得连接 connection 并启动访问。 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类 Queue queue = session.createQueue(QUEUE_NAME); // 5 创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); // 6 通过messageProducer 生产 3 条 消息发送到消息队列中 for (int i = 1; i < 4 ; i++) { // 7 创建消息 TextMessage textMessage = session.createTextMessage("msg--" + i); // 8 通过messageProducer发送给mq messageProducer.send(textMessage); } // 9 关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("**** 消息发送到MQ完成 ****"); } }
Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers:消费者数量,消费者端的消费者数量。
Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。
总结:当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
// 消息的消费者 public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.126.133:61616"; public static final String QUEUE_NAME = "jdy01"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); javax.jms.Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); // 5 创建消息的消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while(true){ // reveive()一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。 // reveive(Long time):等待n毫秒之后还没有收到消息,就是结束阻塞。 // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage TextMessage message = (TextMessage)messageConsumer.receive(); if (null != message){ System.out.println("****消费者的消息:"+message.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); } }
// 消息的消费者 也就是回答消息的系统 public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; public static final String QUEUE_NAME = "jdbc01"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); javax.jms.Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。 通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息 */ messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { // instanceof 判断是否A对象是否是B类的子类 if (null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println("****消费者的消息:"+textMessage.getText()); }catch (JMSException e) { e.printStackTrace(); } } } }); // 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。 // 实际开发中,我们的程序会一直运行,这句代码都会省略。 System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
同步阻塞方式(receive):订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式:订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
在点对点的消息传递中,目的地被称为队列。点对点消息传递域的特点如下:
每个消息只能有一个消费者,类似1对1的关系,好比个人快递自己领取自己的。
消息的生产者和消费者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息,好比我们发送短息,发送者发送后不见得的接受者即收即看。
消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
情况2:先启动消费者1,再启动消费者2。
结果:消费者1消费所有的数据。消费者2不会消费到消息。
情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。
结果:消费者1和消费者2平摊了消息。各自消费3条消息。
在发布订阅消息传递域中,目的地被称为主题(topic)。发布/订阅消息传递域的特点如下:
生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
生产者生产时,topic不保存消息,它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。
public class JmsProduce_topic { public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616"; public static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer messageProducer = session.createProducer(topic); for (int i = 1; i < 4 ; i++) { TextMessage textMessage = session.createTextMessage("topic_name--" + i); messageProducer.send(textMessage); MapMessage mapMessage = session.createMapMessage(); } messageProducer.close(); session.close(); connection.close(); System.out.println(" **** TOPIC_NAME消息发送到MQ完成 ****"); } }
public class JmsConsummer_topic { public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616"; public static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4 创建目的地 (两种 : 队列/主题 这里用主题) Topic topic = session.createTopic(TOPIC_NAME); MessageConsumer messageConsumer = session.createConsumer(topic); // MessageListener接口只有一个方法,可以使用lambda表达式 messageConsumer.setMessageListener( (message) -> { if (null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println("****消费者text的消息:"+textMessage.getText()); }catch (JMSException e) { } } }); System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
存在多个消费者,每个消费者都能收到,自从自己启动后所有生产的消息。
topic有多个消费者时,消费消息的数量 ≈ 在线消费者数量*生产消息的数量。
下图展示了:我们先启动了3个消费者,再启动一个生产者,并生产了3条消息。
比较项目 | Topic模式队列 | Queue模式队列 |
---|---|---|
工作模式 | "订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都还会收到消息。 | "负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送其中一个消费者,并且要求消费者ack信息。 |
有无状态 | 无状态 | Queue数据默认会在mq服务商以文件形式保存,比如ActiveMQ 一般保存在$AMQ_HOME\data\kr-srore\data下面,也可以配置成DB存储, |
传递完整性 | 如果没有订阅者消息将会被丢弃。 | 消息不会丢失 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 |
标签:生产者和消费者 ges print pen 形式 item 增加 The div
原文地址:https://www.cnblogs.com/jdy1022/p/14238663.html