标签:
[一曲广陵不如晨钟暮鼓]
本文,我们来介绍RabbitMQ中的工作队列。在正式开始之前,我们假设RabbitMQ服务已经启动,运行端口为5672,如果各位看官有更改过默认配置,那么就需要修改为对应端口,保持一致即可。
准备工作:
操作系统:window 7 x64
其他软件:eclipse mars,jdk7,maven 3
--------------------------------------------------------------------------------------------------------------------------------------------------------其主要思想是:工作队列避免立即执行资源密集型的任务,并且等待期执行结束。取而代之的,使用调度策略。我们将任务封装为消息的形式,并将其发送到一个队列中。一个worker(接受者)进程,运行在后台,将不断的取出任务并最终执行任务。当你运行多个worker时,他们将会共享这个队列中的任务。
这种概念设计在web应用中十分的有用。使得在很短的HTTP请求时间内来处理复杂的任务成为可能。
在上一篇中,我们发送了一个非常简单的“Hello World”消息。现在我们来发送一个字符串来代表一个复杂任务,但并不是真的拥有一个复杂的任务,而是使用Thread.sleeo()方法来假装一个复杂任务的执行。与此同时,我们使用“.”的数量来描述任务的复杂程度。每一个点都代表需要一秒钟时间来执行。举个例子:“Hello...”代表这个任务需要3秒钟执行。
现在,我们来开看示例工程,结构图如下:
1.修改pom文件,具体内容见前文,在此不再赘述。
2.我们需要稍微的修改rabbitmq01工程中的Sender代码,作为本例工程的NewTask,具体内容如下:
package com.csdn.ingo.rabbitmq_1; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class NewTask { // 队列名称 private final static String QUEUE_NAME = "workqueue"; public static void main(String[] args) throws IOException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送10条消息,依次在消息后面附加1-10个点 for (int i = 0; i < 10; i++) { String dots = ""; for (int j = 0; j <= i; j++) { dots += "."; } String message = "helloworld" + dots + dots.length(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 关闭频道和资源 channel.close(); connection.close(); } }3.再修改rabbitmq01工程中的Receiver代码,作为本例工程的Worker,具体内容如下:
package com.csdn.ingo.rabbitmq_1; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Work { // 队列名称 private final static String QUEUE_NAME = "workqueue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 区分不同工作进程的输出 int hashCode = Work.class.hashCode(); // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定消费队列 channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(hashCode + " [x] Received '" + message + "'"); doWork(message); System.out.println(hashCode + " [x] Done"); } } /** * 每个点耗时1s */ private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
使用一个Task Queue的好处是可以轻松的并行安排任务执行。如果我们积压了很多任务等待执行,那么我们可以简单了增加更多的Worker即可。非常容易的就增加了系统的伸缩性。
首先,我们运行两个Worker示例,如下:
在运行NewTask,],如下:
观察两个Work的输出,如下:
结论:我们发现Work Queue中的消息被两个消费者共享的交替的消费,并没有因为一个消费者堵塞整个队列。
有兴趣的看官还可以创建更多的Work示例来观察这种消息分配策略。
执行一个任务可能需要几秒钟的时间。你可能担心:如果一个消费者刚刚开始一个需要长时间的任务,其就发生宕机而中断原有进程。在我们上面的代码中,一旦消息被投递给消费者,就会立即从队列中删除。在这种情况下,如果你强制结束一个刚刚接收到消息的客户端,那么就会丢失它正在处理的信息。并且也会丢失已经传输给它并且还未执行的信息。
但是,我们并不想因此而丢失任何任务。即:如果一个WorkerA宕机下线,那么我们希望由另一个WorkerB来接替这个任务。
为了确保消息不会丢失,RabbitMQ支持消息的确认应答机制。这种机制确保cunsumer(消费者)在接收到某个指定的消息时,向RabbitMQ返回ACK(确认),以此来通知MQ队列,消息已经确实收到,进程和MQ队列可以放心的删除这条消息了。
如果某个comsumerA(消费者A)因为某个原因宕机(channel关闭,连接关闭,TCP连接丢失等)没有回复ACK,RabbitMQ将会知道这条消息没有被正确的执行,并且会重新将消息加入到队列中。与此同时,如果存在另一个生存的客户端B,那么MQ将会将该消息重新投递给B客户端。通过这种方式,即使某个客户端忽然宕机,其也可以确保消息不会发生丢失。
这种方式对于消息就不存在任何超时的概念。当某个consumer宕机,RabbitMQ将会重新投递消息。如果某个进程处理该消息花费的时间相当的长,这也是允许的,而不是将其当作超时。
在默认情况下,消息的确认机制是打开的。先前的例子中,我们通过设置autoAck = true将其关闭。现在,让我们来移除这个标志,即开启消息确认机制,并发送一条消息。
具体核心步骤如下:
boolean ack = false ; //打开应答机制 channel.basicConsume(QUEUE_NAME, ack, consumer); //另外需要在每次处理完成一个消息后,手动发送一次应答。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);完整代码:
package com.csdn.ingo.rabbitmq_1.one; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Work { // 队列名称 private final static String QUEUE_NAME = "workqueue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { // 区分不同工作进程的输出 int hashCode = Work.class.hashCode(); // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C"); // 设置最大服务转发消息数量 int prefetchCount = 1; channel.basicQos(prefetchCount); QueueingConsumer consumer = new QueueingConsumer(channel); // 指定消费队列 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(hashCode + " [x] Received '" + message + "'"); doWork(message); System.out.println(hashCode + " [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }测试方法:(for循环改为5次)
先启动两个消费者(Work),然后在启动生产者(NewTask),在自动生产者之后,立即关闭一个Work实例,保留一个实例,观察输出:
启动生产者(NewTask)
立即关闭其中一个Work,观察输出:
注意上面的输出:我们关闭掉3085858这个Work之后,其上的任务3由于没有得到确认,于是转由22434271这个Work执行。
上面我们已经介绍了:当consumer宕机之后,确保不会丢失消息。但是,如果是MQ服务器宕机了呢?接下来,我们来学习在这种场景下如果确保消息不会丢失。
当RabbitMQ服务器关机,宕机之后,其都会彻底的丢失关于队列和消息所有内容。除非,我们将其设置为持久化的模式。为了达到这个目的,需要做两件事情。
第一:确保RabbitMQ不会丢失队列。因此,我们需要将队列声明为持久化的模式,具体做法如下:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);尽管这个命令是正确的,但其不会在上面示例代码的上下文环境中正确运行。因为,我们已经定义了一个名称为“hello”的队列,并且这个队列是非持久化的。在RabbitMQ中,不允许使用不同的参数,重新定义一个已经存在的队列。如果这么做的话,就会返回一个错误。所以,我们需要重新定义一个不同名称的队列,举个例子,如下:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);上面的这个示例代码,需要同时应用在生产者与消费者的代码当中,才能够起到相应作用。
第二:确保消息是持久化的。使用MessageProperties.PERSISENT_TEXT_PLAIN具体做法如下:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());特别提醒:
如果各位看官一定需要实现强一致性的持久化策略,那么可以参考下面这边文章的做法(https://www.rabbitmq.com/confirms.html),这部分内容,随后就会更新,敬请期待!
你可能已经发现,上文我们介绍的Round-robin策略不能够按照我们实际期望的那样工作。如:在一个应用中存在两个消费者(Work),当偶数的消息所代表的任务很繁琐,需要很长时间执行。同时,奇数的消息所代表的任务很轻量,需要较短时间执行。于是,就会发生下面的场景:其中一个Work非常的忙,而另一个Work相对的产生空闲期。对于RabbitMQ,其并不知道关于消费者客户端所发生的事情,并且,仍然按照这种策略来分发后续的任务。
这个问题是由于RabbitMQ仅仅是分发消息,而没有查看从消费者返回的未确认的消息的数量。仅仅是将第n个消息发送到第n个队列,如此简单。
为了实现任务的公平转发,我们需要使用basicQos方法,并且设置参数:prefetchCount=1。该配置将会告诉RabbitMQ不要对一个consumer在同一个时刻对其分发超过1条消息。或者,换句话说,不要给还没有做完上一个任务的客户端分发新的任务。取而代之的,将任务发送给下一个空闲的客户端。
具体做法如下;
int prefetchCount = 1; channel.basicQos(prefetchCount);特别提醒:
综合上面所有的内容,完整的代码如下:
package com.csdn.ingo.rabbitmq_1.one; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.rabbitmq.client.QueueingConsumer; public class NewTask { // 队列名称 private final static String QUEUE_NAME = "workqueue"; public static void main(String[] args) throws IOException { // 创建连接和频道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); // 发送10条消息,依次在消息后面附加1-10个点 for (int i = 0; i < 5; i++) { String dots = ""; for (int j = 0; j <= i; j++) { dots += "."; } String message = "helloworld" + dots + dots.length(); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 关闭频道和资源 channel.close(); connection.close(); } }Work的代码见上文,在此不再赘述。
------------------------------------------------------------------------------------------------------------------------------------
至此,系统拆分解耦利器之消息队列---RabbitMQ-工作队列 结束
备注:
各位看官如果有兴趣了解更多关于Channel和MessageProperties的内容,请参考官方文档:http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/
参考资料
官方文档:http://www.rabbitmq.com/tutorials/tutorial-two-java.html
标签:
原文地址:http://blog.csdn.net/abcd898989/article/details/52227828