码迷,mamicode.com
首页 > 其他好文 > 详细

系统拆分解耦利器之消息队列---RabbitMQ-工作队列

时间:2016-08-25 21:29:15      阅读:187      评论:0      收藏:0      [点我收藏+]

标签:

[一曲广陵不如晨钟暮鼓]

本文,我们来介绍RabbitMQ中的工作队列。在正式开始之前,我们假设RabbitMQ服务已经启动,运行端口为5672,如果各位看官有更改过默认配置,那么就需要修改为对应端口,保持一致即可。

准备工作:

操作系统:window 7 x64 

其他软件:eclipse mars,jdk7,maven 3

--------------------------------------------------------------------------------------------------------------------------------------------------------

工作队列:

在前一篇博文中,我们给出了一个包含:发送,接收,队列,三者构成的消息模型。在本文中,我们将创建一个Work Queue(工作队列),其将会用来在多个接受者之间分发资源密集型的任务。

其主要思想是:工作队列避免立即执行资源密集型的任务,并且等待期执行结束。取而代之的,使用调度策略。我们将任务封装为消息的形式,并将其发送到一个队列中。一个worker(接受者)进程,运行在后台,将不断的取出任务并最终执行任务。当你运行多个worker时,他们将会共享这个队列中的任务。

这种概念设计在web应用中十分的有用。使得在很短的HTTP请求时间内来处理复杂的任务成为可能。

准备:

在上一篇中,我们发送了一个非常简单的“Hello World”消息。现在我们来发送一个字符串来代表一个复杂任务,但并不是真的拥有一个复杂的任务,而是使用Thread.sleeo()方法来假装一个复杂任务的执行。与此同时,我们使用“.”的数量来描述任务的复杂程度。每一个点都代表需要一秒钟时间来执行。举个例子:“Hello...”代表这个任务需要3秒钟执行。

现在,我们来开看示例工程,结构图如下:
技术分享

1.修改pom文件,具体内容见前文,在此不再赘述。

2.我们需要稍微的修改rabbitmq01工程中的Sender代码,作为本例工程的NewTask,具体内容如下:

package com.csdn.ingo.rabbitmq_1;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NewTask {
	// 队列名称
	private final static String QUEUE_NAME = "workqueue";

	public static void main(String[] args) throws IOException {
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 发送10条消息,依次在消息后面附加1-10个点
		for (int i = 0; i < 10; i++) {
			String dots = "";
			for (int j = 0; j <= i; j++) {
				dots += ".";
			}
			String message = "helloworld" + dots + dots.length();
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
		// 关闭频道和资源
		channel.close();
		connection.close();
	}
}
3.再修改rabbitmq01工程中的Receiver代码,作为本例工程的Worker,具体内容如下:
package com.csdn.ingo.rabbitmq_1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work {
	// 队列名称
	private final static String QUEUE_NAME = "workqueue";

	public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
		// 区分不同工作进程的输出
		int hashCode = Work.class.hashCode();
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定消费队列
		channel.basicConsume(QUEUE_NAME, true, consumer);
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());

			System.out.println(hashCode + " [x] Received '" + message + "'");
			doWork(message);
			System.out.println(hashCode + " [x] Done");
		}
	}

	/**
	 * 每个点耗时1s
	 */
	private static void doWork(String task) throws InterruptedException {
		for (char ch : task.toCharArray()) {
			if (ch == '.')
				Thread.sleep(1000);
		}
	}
}

Round-robin 转发

使用一个Task Queue的好处是可以轻松的并行安排任务执行。如果我们积压了很多任务等待执行,那么我们可以简单了增加更多的Worker即可。非常容易的就增加了系统的伸缩性。

首先,我们运行两个Worker示例,如下:

技术分享

技术分享

在运行NewTask,],如下:

技术分享

观察两个Work的输出,如下:

技术分享

技术分享

结论:我们发现Work Queue中的消息被两个消费者共享的交替的消费,并没有因为一个消费者堵塞整个队列。

有兴趣的看官还可以创建更多的Work示例来观察这种消息分配策略。

消息应答

执行一个任务可能需要几秒钟的时间。你可能担心:如果一个消费者刚刚开始一个需要长时间的任务,其就发生宕机而中断原有进程。在我们上面的代码中,一旦消息被投递给消费者,就会立即从队列中删除。在这种情况下,如果你强制结束一个刚刚接收到消息的客户端,那么就会丢失它正在处理的信息。并且也会丢失已经传输给它并且还未执行的信息。

但是,我们并不想因此而丢失任何任务。即:如果一个WorkerA宕机下线,那么我们希望由另一个WorkerB来接替这个任务。

为了确保消息不会丢失,RabbitMQ支持消息的确认应答机制。这种机制确保cunsumer(消费者)在接收到某个指定的消息时,向RabbitMQ返回ACK(确认),以此来通知MQ队列,消息已经确实收到,进程和MQ队列可以放心的删除这条消息了。

如果某个comsumerA(消费者A)因为某个原因宕机(channel关闭,连接关闭,TCP连接丢失等)没有回复ACK,RabbitMQ将会知道这条消息没有被正确的执行,并且会重新将消息加入到队列中。与此同时,如果存在另一个生存的客户端B,那么MQ将会将该消息重新投递给B客户端。通过这种方式,即使某个客户端忽然宕机,其也可以确保消息不会发生丢失。

这种方式对于消息就不存在任何超时的概念。当某个consumer宕机,RabbitMQ将会重新投递消息。如果某个进程处理该消息花费的时间相当的长,这也是允许的,而不是将其当作超时。

在默认情况下,消息的确认机制是打开的。先前的例子中,我们通过设置autoAck = true将其关闭。现在,让我们来移除这个标志,即开启消息确认机制,并发送一条消息。

具体核心步骤如下:

boolean ack = false ; //打开应答机制  
channel.basicConsume(QUEUE_NAME, ack, consumer);  
//另外需要在每次处理完成一个消息后,手动发送一次应答。  
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
完整代码:

package com.csdn.ingo.rabbitmq_1.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work {
	// 队列名称
	private final static String QUEUE_NAME = "workqueue";

	public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
		// 区分不同工作进程的输出
		int hashCode = Work.class.hashCode();
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明队列
		boolean durable = true;
		channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
		System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");
		// 设置最大服务转发消息数量
		int prefetchCount = 1;
		channel.basicQos(prefetchCount);
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定消费队列
		boolean ack = false;
		channel.basicConsume(QUEUE_NAME, ack, consumer);
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());

			System.out.println(hashCode + " [x] Received '" + message + "'");
			doWork(message);
			System.out.println(hashCode + " [x] Done");
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}

	}


	private static void doWork(String task) throws InterruptedException {
		for (char ch : task.toCharArray()) {
			if (ch == '.')
				Thread.sleep(1000);
		}
	}
}
测试方法:(for循环改为5次)

先启动两个消费者(Work),然后在启动生产者(NewTask),在自动生产者之后,立即关闭一个Work实例,保留一个实例,观察输出:

技术分享


技术分享

启动生产者(NewTask)

技术分享

立即关闭其中一个Work,观察输出:

技术分享

技术分享

注意上面的输出:我们关闭掉3085858这个Work之后,其上的任务3由于没有得到确认,于是转由22434271这个Work执行。

消息持久化

上面我们已经介绍了:当consumer宕机之后,确保不会丢失消息。但是,如果是MQ服务器宕机了呢?接下来,我们来学习在这种场景下如果确保消息不会丢失。

当RabbitMQ服务器关机,宕机之后,其都会彻底的丢失关于队列和消息所有内容。除非,我们将其设置为持久化的模式。为了达到这个目的,需要做两件事情。

第一:确保RabbitMQ不会丢失队列。因此,我们需要将队列声明为持久化的模式,具体做法如下:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
尽管这个命令是正确的,但其不会在上面示例代码的上下文环境中正确运行。因为,我们已经定义了一个名称为“hello”的队列,并且这个队列是非持久化的。在RabbitMQ中,不允许使用不同的参数,重新定义一个已经存在的队列。如果这么做的话,就会返回一个错误。所以,我们需要重新定义一个不同名称的队列,举个例子,如下:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
上面的这个示例代码,需要同时应用在生产者与消费者的代码当中,才能够起到相应作用。

第二:确保消息是持久化的。使用MessageProperties.PERSISENT_TEXT_PLAIN具体做法如下:

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
特别提醒:
我们将消息设置为持久化的模式,并不意味着实现消息的100%不会丢失。尽管,通过配置可以将消息写入到磁盘,但是这个过程中仍然存在着一小段时间,即RabbitMQ刚刚接受到消息,但没有存储。同时,RabbitMQ也不会对每一个消息都执行同步的持久化过程,可能发生的情景是,刚刚将消息写入到缓存中,但没有写入到磁盘。这种持久化的策略并不保证强一致性的实现,但聊胜于无,总好过一个简单的队列所实现的效果。

如果各位看官一定需要实现强一致性的持久化策略,那么可以参考下面这边文章的做法(https://www.rabbitmq.com/confirms.html),这部分内容,随后就会更新,敬请期待!

公平转发

你可能已经发现,上文我们介绍的Round-robin策略不能够按照我们实际期望的那样工作。如:在一个应用中存在两个消费者(Work),当偶数的消息所代表的任务很繁琐,需要很长时间执行。同时,奇数的消息所代表的任务很轻量,需要较短时间执行。于是,就会发生下面的场景:其中一个Work非常的忙,而另一个Work相对的产生空闲期。对于RabbitMQ,其并不知道关于消费者客户端所发生的事情,并且,仍然按照这种策略来分发后续的任务。

这个问题是由于RabbitMQ仅仅是分发消息,而没有查看从消费者返回的未确认的消息的数量。仅仅是将第n个消息发送到第n个队列,如此简单。

技术分享

为了实现任务的公平转发,我们需要使用basicQos方法,并且设置参数:prefetchCount=1。该配置将会告诉RabbitMQ不要对一个consumer在同一个时刻对其分发超过1条消息。或者,换句话说,不要给还没有做完上一个任务的客户端分发新的任务。取而代之的,将任务发送给下一个空闲的客户端。

具体做法如下;

int prefetchCount = 1;
channel.basicQos(prefetchCount);
特别提醒:
如果所有的消费者都处于工作状态,那么队列就可能会发生溢出。这时,就需要运维人员实时关注队列所处的状态,酌情增加更多的消费者,或者,使用其他的策略来抵消这种不平衡。

综合上面所有的内容,完整的代码如下:

package com.csdn.ingo.rabbitmq_1.one;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;


public class NewTask {
	// 队列名称
	private final static String QUEUE_NAME = "workqueue";

	public static void main(String[] args) throws IOException {
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明队列
		boolean durable = true;
		channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
		// 发送10条消息,依次在消息后面附加1-10个点
		for (int i = 0; i < 5; i++) {
			String dots = "";
			for (int j = 0; j <= i; j++) {
				dots += ".";
			}
			String message = "helloworld" + dots + dots.length();
			channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
		// 关闭频道和资源
		channel.close();
		connection.close();
	}
}
Work的代码见上文,在此不再赘述。

------------------------------------------------------------------------------------------------------------------------------------

至此,系统拆分解耦利器之消息队列---RabbitMQ-工作队列  结束


备注:

各位看官如果有兴趣了解更多关于Channel和MessageProperties的内容,请参考官方文档:http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/


参考资料

官方文档:http://www.rabbitmq.com/tutorials/tutorial-two-java.html

系统拆分解耦利器之消息队列---RabbitMQ-工作队列

标签:

原文地址:http://blog.csdn.net/abcd898989/article/details/52227828

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!