在这个部分我们将创建一个工作队列(Work Queue),用于将耗时任务(time-consuming tasks)分发给多个工作者(workers).
引入工作队列最主要的目的是为了避免需要直接去运行资源密集型任务(resource-intensive task),还不得不等待其执行完成。
我们不需要亲自调度将被执行的这些任务,只需要将任务包装成消息,然后将其发送到队列,异步运行(running in the background)的工作者进程会取出这些任务来执行,如果你有多个工作者,那么这些任务会在他们之间共享。
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("", "hello", properties, body);
private static string GetMessage(string[] args)
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer);
Console.WriteLine(" [*] Waiting for messages. " +
"To exit press CTRL+C");
while (true)
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split(‘.‘).Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
$ csc /r:"RabbitMQ.Client.dll" NewTask.cs
$ csc /r:"RabbitMQ.Client.dll" Worker.cs
shell1$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
shell2$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
shell3$ NewTask.exe First message.
shell3$ NewTask.exe Second message..
shell3$ NewTask.exe Third message...
shell3$ NewTask.exe Fourth message....
shell3$ NewTask.exe Fifth message.....
shell1$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘First message.‘
[x] Received ‘Third message...‘
[x] Received ‘Fifth message.....‘
shell2$ Worker.exe
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Second message..‘
[x] Received ‘Fourth message....‘
默认地RabbitMQ将会依次按顺序(in sequence)发送每一个消息给下一个consumer,平均来说每个consumer都会得到相同数目的消息,这种分发消息的方式就叫做轮训调度(round-robin),你可以试试更多的Worker看看效果。
译者注: “轮训调度”这个特性是RabbitMQ的队列所默认具备的,你不需要针对队列进行任何属性设置,只要你有多个consumer同时从一个队列接收消息,那么RabbitMQ就会以轮训调度的方式均匀地将消息分发给这些consumer,同时你可以无缝地ADD进更多的针对这个队列的consumer。
针对我们上面的代码,一旦RabbitMQ将消息递送出去给consumer之后就会将消息在内存中移除掉。--- 译者注:可见RabbitMQ存储消息默认是存储在内存中,机器挂掉之后默认会丢失。
There aren‘t any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It‘s fine even if processing a message takes a very, very long time.
消息确认默认是打开的,上面的例子中我们通过设置channel的BasicConsume的第二个参数 noAck=true来将消息确认机制关闭掉了,你可以打开:
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", false, consumer);
while (true)
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicAck(ea.DeliveryTag, false);
Forgotten acknowledgment
It‘s a common mistake to miss the BasicAck. It‘s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won‘t be able to release any unacked messages.
In order to debug this kind of mistake you can use rabbitmqctl to print themessages_unacknowledged field:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);
这样做会报错!为什么呢?因为前面已经创建了一个叫做hello的队列,它是非持久化的,RabbitMQ不允许用不同的参数重新定义一个存在的队列,我们需要重新取一个名字比如 task_queue:
bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
注意这个queueDeclare 这个设置持久化的操作需要在生产者和消费者两边都做!
(2)接下来我们需要通过调用 IBasicProperties.SetPersistent 这个方法传递参数true来标记消息也是具备持久化特性的:
var properties = channel.CreateBasicProperties();
Note on message persistence
Marking messages as persistent doesn‘t fully guarantee that a message won‘t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn‘t saved it yet. Also, RabbitMQ doesn‘t do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren‘t strong, but it‘s more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
为了避免这种情况,我们可以调用channel的 basicQos 方法设置 prefetchCount 参数为1来告诉RabbitMQ不要将超过一个的消息给同一个worker。
channel.BasicQos(0, 1, false);
Note about queue size
If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
NewTask.cs 的完整的源码如下:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
public static void Main(string[] args)
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare("task_queue", true, false, false, null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
channel.BasicPublish("", "task_queue", properties, body);
Console.WriteLine(" [x] Sent {0}", message);
private static string GetMessage(string[] args)
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
public static void Main()
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare("task_queue", true, false, false, null);
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("task_queue", false, consumer);
Console.WriteLine(" [*] Waiting for messages. " +
"To exit press CTRL+C");
while (true)
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split(‘.‘).Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(ea.DeliveryTag, false);
Using message acknowledgments and BasicQos you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.
关于更多的 IModel 方法以及 IBasicProperties的信息,可以参考 RabbitMQ .NET client API reference online.
Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.
