标签:string 轮询 存储 最大化 机制 receive 检查 占用 取消
工作队列是为了避免等待一些占用大量资源或时间操作的一种处理方式。我们把任务封装为消息发送到队列中,消费者在后台不停的取出任务并且执行。当运行了多个消费者工作进程时,队列中的任务将会在每个消费者间进行共享。
使用工作队列的好处就是能够并行的处理任务。如果队列中堆积了很多任务,只要添加更多的消费着就可以了,拓展非常方便。
准备工作
1.创建生产者和消费者客户端
2.在消费者中使用Thread.Sleep()模拟耗时操作
?
生产者?TaskQueuesProducer.cs
?
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
?
namespace RabbitMQProducer
{
??? public class TaskQueuesProducer
??? {
??????? static int processId = Process.GetCurrentProcess().Id;
??????? public static void Send()
??????? {
??????????? Console.WriteLine($"我是生产者{processId}");
??????????? var factory = new ConnectionFactory()
??????????? {
??????????????? HostName = "127.0.0.1"
??????????? };
??????????? using (var connection = factory.CreateConnection())
??????????? {
??????????????? using (var channel = connection.CreateModel())
??????????????? {
??????????????????? channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
??????????????????? for (int item = 0; item < 20; item++)
??????????????????? {
??????????????????????? string message = $"我是生产者{processId}发送的消息:{item}";
??????????????????????? channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: null, body: Encoding.UTF8.GetBytes(message));
??????????????????????? Console.WriteLine(message);
??????????????????? }
?
??????????????????? Console.WriteLine(" Press [enter] to exit.");
??????????????????? Console.ReadLine();
??????????????? }
??????????? }
??????? }
??? }
}
?
消费者?TaskQueuesConsumer.cs
?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Diagnostics;
using System.Threading;
?
namespace RabbitMQConsumer
{
??? public class TaskQueuesConsumer
??? {
??????? static int processId = 0;
??????? static TaskQueuesConsumer()
??????? {
??????????? processId = Process.GetCurrentProcess().Id;
??????? }
??????? public static void Receive()
??????? {
??????????? Console.WriteLine($"我是消费者{processId}");
??????????? var factory = new ConnectionFactory()
??????????? {
??????????????? HostName = "127.0.0.1"
??????????? };
??????????? using (var connection = factory.CreateConnection())
??????????? {
??????????????? using (var channel = connection.CreateModel())
??????????????? {
??????????????????? channel.QueueDeclare(queue: "taskqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
??????????????????? EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
??????????????????? consumer.Received += Consumer_Received;
??????????????????? //noack=false 不自动消息确认 这时需要手动调用?? channel.BasicAck(); 进行消息确认
??????????????????? //noack=true 自动消息确认,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除
??????????????????? channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);
?
??????????????????? Console.WriteLine(" Press [enter] to exit.");
??????????????????? Console.ReadLine();
??????????????? }
??????????? }
??????? }
?
??????? private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
??????? {
??????????? string message = Encoding.UTF8.GetString(e.Body);
??????????? Console.WriteLine($"接收到消息:{message}");
??????????? //Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模拟消息处理耗时操作
??????????? Console.WriteLine($"已处理完消息");
?
??????????? //对应前面的 BasicConsume 中 noack=false 发送消息确认回执
??????????? //EventingBasicConsumer consumer = sender as EventingBasicConsumer;
??????????? //consumer.Model.BasicAck(e.DeliveryTag, false);
??????? }
??? }
}
?
当我们运行了三个consumer客户端,一个producer客户端后,发现producer发送的20条消息,被三个客户端依次平均接收并处理了。
这是RabbitMQ默认的消息分发机制——轮询( round-robin),默认情况下RabbitMQ会按顺序把消息发送给每个消费者,平均每个消费者都会收到同等数量的消息。
?
消息确认
当前的代码中当消息被RabbitMQ发送到consumer后,就会被立即删除,这种情况给下,如果其中一个consumer客户端被停止,那么正在处理的消息就会丢失,同时所有发送到这个工作者并且还没处理的消息也会丢失。这不是我们希望看到的,我们希望如果一个consumer客户端挂掉后,希望把重新发送任务到其它的consumer客户端。
为了防止消息丢失,RabbbitMQ提供了消息确认机制,消费者会通过一个ack,告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
如果consumer挂掉了,没有发送相应,RabbitMQ就会认为消息没有被处理,然后重新发送给其他消费者,这样即使某个consumer挂掉,也不会丢失消息。
消息没有超市的概念,当工作者和它断开连接时,RabbitMQ会重新发送消息,这样在处理耗时较长任务时就不会出现问题了。
之前的代码中我们开启了自动消息确认,这样一旦consumer挂掉,就会发生消息丢失的情况,现在我们来修改两处代码,开启消息确认机制。
?
修改参数noack为false,关闭自动消息确认
?
channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);
?
取消下面代码的注释,进行消息确认回执
?
EventingBasicConsumer consumer = sender as EventingBasicConsumer;
consumer.Model.BasicAck(e.DeliveryTag, false);
?
注意:一旦忘记消息确认,消息会在你程序推出之后就会重新发送,如果不能释放没响应的消息,RabbitMQ将会占用越来越来越多的内存
可通过以下指令检查忘记确认的消息信息或在?RabbitMQWebweb管理页面中查看
?
rabbitmqctl list_queues name messages_ready messages_unacknowledged
?
修改完成后再次运行,就不用担心消息丢失的问题了
?
消息持久化
如果没有特殊的设置,那么在RabbitMQ服务关闭或崩溃的情况下将会丢失所有的队列和消息。为了确保消息不会丢失需要做两个事情,把队列和消息设置为持久化
设置队列为持久化,producer和consumer两处都要修改
?
channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
设置消息持久化
?
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //DeliveryMode 消息的投递模式,默认为1 非持久化的,DeliveryMode=2 持久化存储消息内容
channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));
?
注意:
公平调度
RabbitMQ只管分发进入队列的消息,而不关心那些consumer比较繁忙或空闲,这样容易导致一些consumer比较繁忙,一些比较空闲,不能使资源被最大化的使用。
为了解决这样的问题,RabbitMQ提供了basicQos方法,传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。就是只有在消费者空闲的时候会发送下一条信息。
?
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
?
执行以上设置之后会发现并没有按照之前的轮询(Round-robin)进行消息转发,而是在消费者不忙时才进行转发。
由于消息并没有发出去,在动态添加了consumer后能够立即投入工作,而默认的轮询转发机制则不支持动态添加消费者,因为此时消息已经分配完毕,无法立即加入工作即使还有很多未完成的任务。
注意:
这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。
完整代码
?
生产者?TaskQueuesProducer.cs
?
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
?
namespace RabbitMQProducer
{
??? public class TaskQueuesProducer
??? {
??????? static int processId = Process.GetCurrentProcess().Id;
??????? public static void Send()
??????? {
??????????? Console.WriteLine($"我是生产者{processId}");
??????????? var factory = new ConnectionFactory()
??????????? {
??????????????? HostName = "127.0.0.1"
??????????? };
??????????? using (var connection = factory.CreateConnection())
??????????? {
??????????????? using (var channel = connection.CreateModel())
??????????????? {
??????????????????? //durable:持久化存储队列
??????????????????? //autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
??????????????????? //exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。注意事项:1,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。2,"首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。3,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
??????????????????? channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
??????????????????? for (int item = 0; item < 200000; item++)
??????????????????? {
??????????????????????? string message = $"我是生产者{processId}发送的消息:{item}";
??????????????????????? var properties = channel.CreateBasicProperties();
??????????????????????? properties.DeliveryMode = 2; //DeliveryMode 消息的投递模式,默认为1 非持久化的,DeliveryMode=2 持久化存储消息内容
??????????????????????? channel.BasicPublish(exchange: "", routingKey: "taskqueue", basicProperties: properties, body: Encoding.UTF8.GetBytes(message));
??????????????????????? Console.WriteLine(message);
??????????????????? }
?
??????????????????? Console.WriteLine(" Press [enter] to exit.");
??????????????????? Console.ReadLine();
??????????????? }
??????????? }
??????? }
??? }
}
?
消费者?TaskQueuesConsumer.cs
?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Diagnostics;
using System.Threading;
?
namespace RabbitMQConsumer
{
??? public class TaskQueuesConsumer
??? {
??????? static int processId = 0;
??????? static TaskQueuesConsumer()
??????? {
??????????? processId = Process.GetCurrentProcess().Id;
??????? }
??????? public static void Receive()
??????? {
??????????? Console.WriteLine($"我是消费者{processId}");
??????????? var factory = new ConnectionFactory()
??????????? {
??????????????? HostName = "127.0.0.1"
??????????? };
??????????? using (var connection = factory.CreateConnection())
??????????? {
??????????????? using (var channel = connection.CreateModel())
??????????????? {
??????????????????? channel.QueueDeclare(queue: "taskqueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
??????????????????? // BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它
??????????????????? channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
??????????????????? EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
??????????????????? consumer.Received += Consumer_Received;
??????????????????? //noack=false 不自动消息确认 这时需要手动调用?? channel.BasicAck(); 进行消息确认
??????????????????? //noack=true 自动消息确认,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除
??????????????????? channel.BasicConsume(queue: "taskqueue", noAck: false, consumer: consumer);
?
??????????????????? Console.WriteLine(" Press [enter] to exit.");
??????????????????? Console.ReadLine();
??????????????? }
??????????? }
??????? }
?
??????? private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
??????? {
??????????? string message = Encoding.UTF8.GetString(e.Body);
??????????? Console.WriteLine($"接收到消息:{message}");
??????????? Thread.Sleep(new Random().Next(1000, 1000 * 5)); //模拟消息处理耗时操作
??????????? Console.WriteLine($"已处理完消息");
?
??????????? //对应前面的 BasicConsume 中 noack=false 发送消息确认回执
??????????? EventingBasicConsumer consumer = sender as EventingBasicConsumer;
??????????? consumer.Model.BasicAck(e.DeliveryTag, false);
??????? }
??? }
}
标签:string 轮询 存储 最大化 机制 receive 检查 占用 取消
原文地址:http://www.cnblogs.com/AlvinLee/p/6145641.html