标签:持久化 需要 style odi host 工作 导致 persist idt
Work Queues 即工作队列,它表示一个 Producer 对应多个 Consumer,包括两种分发模式:轮循分发(Round-robin)和公平分发(Fair dispatch)。旨在为了避免立即执行任务时出现占用很多资源和时间却又必须等待完成的现象。
原理分析: Producer 把工作任务转化为消息发送给队列,当后台有一个 Consumer 进程在运行时,它会不间断地从队列中取出消息来执行;当后台有多个 Consumer 进程在运行时,它们会不间断地从队列中取出消息采取并行执行的方式以提高效率。
我修改了第一篇文章中的代码,用线程来模拟处理消息耗时的场景,分别在10个消息的末尾增加符号“>”,每个符号“>”表示该消息在线程中执行需要耗时1秒,每个消息处理完毕时以“OK”表示结束。
Producer 代码片段:
for (int m = 0; m < 10; m++) { string marks = string.Empty; for (int n = 0; n <= m; n++) { marks += ">"; } string msg = "Mr.Tua" + marks + marks.Length + "s"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish ( exchange: string.Empty, routingKey: "Tua", basicProperties: null, body: body ); Console.WriteLine("Producer sent message: {0}", msg); }
Consumer 代码片段:
consumer.Received += (sender, e) => { var body = e.Body; var msg = Encoding.UTF8.GetString(body); int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count(); Console.WriteLine("Consumer received message: {0}", msg);
Thread.Sleep(marks * 1000); Console.WriteLine("OK"); };
Producer 控制台(后启动运行):
Consumer 控制台(先启动运行 x 2 ):
可以看到 Producer 按照顺序将每个消息发送给下一个 Consumer (一次性平均分配),每个 Consumer 得到相等数量的消息,当中不用考虑处理消息时需要耗费多少时间,也就是说不关心 Consumer 是否繁忙或空闲,这种默认的分发模式称为轮循分发(Round-robin)。
如果某个 Consumer 在处理消息时由于各种原因挂了导致 Producer 没有收到消息处理完成时的应答,那么就会丢失 Consumer 正在处理和没有处理的消息。
两个 Consumer 同时运行的过程中我关闭了其中一个,可以看到下面的 Consumer 完成了第2个消息,丢失了第4(未处理完毕)、6、8、10个消息。
在这种情况下如何保证消息不丢失呢?
消息应答(Message acknowledgment):如果 Consumer 挂了没有发送应答,Producer 会重新转发给其它的 Consumer 以保证不丢失消息。
修改 Consumer 代码:
var body = e.Body; var msg = Encoding.UTF8.GetString(body); int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count(); Console.WriteLine("Consumer received message: {0}", msg); Thread.Sleep(marks * 1000); Console.WriteLine("OK"); //每个消息处理完毕时手动发送消息应答 channel.BasicAck ( deliveryTag: e.DeliveryTag, //该消息的Index multiple: false//是否批量应答,true:批量应答所有小于该deliveryTag的消息 );
channel.BasicConsume ( queue: "Tua", noAck: false,//手动应答 consumer: consumer );
可以看到虽然下面的 Consumer 挂了,但是 Producer 重新把消息发给了上面的 Consumer 去处理。
现在已经知道当 Consumer 挂了不丢失消息的解决方案,可是 RabbitMQ 服务要是挂了会导致所有的队列和消息丢失,这种情况该怎么办呢?
消息持久化(Message durability):让所有的队列和消息都开启持久化功能,将队列和消息都保存在磁盘上以达到持久化的目的。
另外还有一种为了解决事务机制性能开销大(导致吞吐量下降)而提出的更强大的消息持久化的方式叫做 Publisher Confirm,这里不作讨论。
修改 Producer 代码:
channel.QueueDeclare ( queue: "Tua", durable: true,//开启队列持久化 exclusive: false, autoDelete: false, arguments: null );
var basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true;//开启消息持久化 channel.BasicPublish ( exchange: string.Empty, routingKey: "Tua", basicProperties: basicProperties, body: body );
修改 Consumer 代码:
channel.QueueDeclare ( queue: "Tua", durable: true,//开启队列持久化 exclusive: false, autoDelete: false, arguments: null );
运行程序时报出一个异常错误:
这是因为修改了代码 durable: true,开启了队列的持久化,然而 RabbitMQ 是不允许使用不同的参数重新定义一个已有的同名队列。
两种方法可以解决:
1.重新定义一个不同名的队列;
2.删除已有的同名队列。
第一种方法没有什么好说的,这里说第二种方法,打开 RabbitMQ 管理平台删除已有的同名队列:
测试步骤:首先启动 Producer ---> 关闭 RabbitMQ 服务 ---> 启动 RabbitMQ 服务 ---> 最后启动 Consumer。
可以看到队列和消息都木有丢失,这里就不再上图了。
在介绍轮循分发(Round-robin)时有提到它是不关心 Consumer 是否繁忙或空闲的,但是这样很可能就会出现有的 Consumer 劳累过度赶脚身体被掏空,而有的 Consumer 悠闲自得赶脚无用武之地的问题,那该怎么办呢?
公平分发(Fair dispatch):不会同时给一个 Consumer 发送多个新消息,只有在 Consumer 空闲的时候才会给它发送一个新消息。
修改 Consumer 代码:
//请求服务的特殊设置 channel.BasicQos ( prefetchSize: 0,//服务传送消息的最大容量,0表示无限制 prefetchCount: 1,//服务传送消息的最大数量,0表示无限制 global: false//false:将以上的设置应用于Consumer级别,true:将以上的设置应用于Channel级别 );
为了便于演示,我把 Producer 发送消息的顺序改为从10到1。
可以看到当 Consumer 空闲的时候才会给它发送一个新消息,而且在公平分发(Fair dispatch)模式下支持动态增加 Consumer ,使得新加的 Consumer 可以立即处理还没有发送出去的消息。
反观在默认的轮循分发(Round-robin)模式下已经将消息一次性平均分配完毕,就算是动态增加了 Consumer 也然并卵。。。
using RabbitMQ.Client; using System; using System.Text; namespace WorkQueuesProducer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory { HostName = "192.168.31.212", UserName = "Tua", Password = "Tua", Port = 5672 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare ( queue: "Tua", durable: true, exclusive: false, autoDelete: false, arguments: null ); for (int m = 0; m < 10; m++) { string marks = string.Empty; for (int n = 0; n <= m; n++) { marks += ">"; } string msg = "Mr.Tua" + marks + marks.Length + "s"; var body = Encoding.UTF8.GetBytes(msg); var basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true; channel.BasicPublish ( exchange: string.Empty, routingKey: "Tua", basicProperties: basicProperties, body: body ); Console.WriteLine("Producer sent message: {0}", msg); } Console.ReadLine(); } } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Linq; using System.Text; using System.Threading; namespace WorkQueueConsumer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare ( queue: "Tua", durable: true, exclusive: false, autoDelete: false, arguments: null ); channel.BasicQos ( prefetchSize: 0, prefetchCount: 1, global: false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var msg = Encoding.UTF8.GetString(body); int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count(); Console.WriteLine("Consumer received message: {0}", msg); Thread.Sleep(marks * 1000); Console.WriteLine("OK"); channel.BasicAck ( deliveryTag: e.DeliveryTag, multiple: false ); }; channel.BasicConsume ( queue: "Tua", noAck: false, consumer: consumer ); Console.ReadLine(); } } } } }
RabbitMQ --- Work Queues(工作队列)
标签:持久化 需要 style odi host 工作 导致 persist idt
原文地址:http://www.cnblogs.com/poepoe/p/7298151.html