码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ消息队列(五):Routing 消息路由[转]

时间:2015-01-06 13:45:30      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:

上篇文章中,我们构建了一个简单的日志系统。接下来,我们将丰富它:能够使用不同的severity(严重程度)来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。

1. Bindings绑定

    上篇文章中我们是这么做的绑定:

channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//const string ROUTING_KEY = "";

    绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。

    实际上,绑定可以带routing key这个参数。其实这个参数的名称和basic_publish的参数名是相同了。

   为了避免混淆,我们把这个routing key称为binding key(即在Exchange中的routing key)

    使用一个binding key来创建binding :

channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//string routingKey = "指定RoutingKey的名称";

上一篇文章我们讲的是使用fanout类型的exchange,对于fanout的exchange来说,这个参数是被忽略的。

2. Direct exchange

  Direct exchange的路由算法非常简单:通过binding key的完全匹配,可以通过下图来说明。

技术分享
    exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
    当P发布的key是orange时,exchange会把它放到Q1。如果P发布的key是black或者green那么就会到Q2。其余的Message都会被丢弃。

3. Multiple bindings

      多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver(提供)到Q1和Q2。其余的Message都会被丢弃。
技术分享
 
 

4. Emitting logs

首先是我们要创建一个direct的exchange:

const string EXCHANGE_NAME = "direct_logs";
channel.ExchangeDeclare(EXCHANGE_NAME, "direct");

我们将使用log的severity(严重级别)作为routing key,这样Consumer可以针对不同severity(严重级别)的log进行不同的处理。

channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body);

我们使用三种severity(严重级别):‘info‘, ‘warning‘, ‘error‘.

5. Subscribing

对于queue,我们需要绑定severity(严重级别):

const string EXCHANGE_NAME = "direct_logs";
channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
string queueName = channel.QueueDeclare();

channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);

6. 最终版本

技术分享
本例子是没有指定Queue的名称:
Producer.cs
 1 /// SendDemo5.exe info
 2         /// SendDemo5.exe warning
 3         /// SendDemo5.exe error
 4         /// </param>
 5         static void Main(string[] args)
 6         {
 7             var factory = new ConnectionFactory() { HostName = "localhost" };
 8             using (var connection = factory.CreateConnection())
 9             {
10                 using (var channel = connection.CreateModel())
11                 {
12                     const string EXCHANGE_NAME = "direct_logs";
13                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。
14                     //Exchange在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
15                     var routingKey = (args.Length > 0) ? args[0] : "info";
16 
17                     var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
18                     var body = Encoding.UTF8.GetBytes(message);
19                     
20                     channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body);
21                     Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message);
22                 }
23             }
24         }

Consumer.cs

 1 /// <summary>
 2         /// 接收端创建临时queue
 3         /// </summary>
 4         /// <param name="args">
 5         /// ReceiveDemo5.exe info
 6         /// ReceiveDemo5.exe warning
 7         /// ReceiveDemo5.exe error
 8         /// </param>
 9         static void Main(string[] args)
10         {
11             var factory = new ConnectionFactory() { HostName = "localhost" };
12             using (var connection = factory.CreateConnection())
13             {
14                 using (var channel = connection.CreateModel())
15                 {
16                     const string EXCHANGE_NAME = "direct_logs";
17                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
18                     string queueName = channel.QueueDeclare();//得到Queue的名字
19                     if (args.Length < 1)
20                     {
21                         Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]);
22                         Environment.ExitCode = 1;
23                         return;
24                     }
25 
26                     foreach (var routingKey in args)
27                     {
28                         channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);
29                     }
30 
31                     Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");
32 
33                     var consumer = new QueueingBasicConsumer(channel);
34                     channel.BasicConsume(queueName, true, consumer);
35 
36                     while (true)
37                     {
38                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
39 
40                         var body = ea.Body;
41                         var message = Encoding.UTF8.GetString(body);
42                         var routingKey = ea.RoutingKey;
43                         Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘",
44                                           routingKey, message);
45                     }
46                 }
47             }
48         }

必须先运行Consumer,然后在运行Producer.

ReceiveDemo5.exe info warning error 或者ReceiveDemo5.exe info,ReceiveDemo5.exe warning,ReceiveDemo5.exe error

SendDemo5.exe info wyp,SendDemo5.exe warning wyp,SendDemo5.exe warning wyp

 
转:
http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html(官网)
http://blog.csdn.net/anzhsoft/article/details/19630147(翻译)

RabbitMQ消息队列(五):Routing 消息路由[转]

标签:

原文地址:http://www.cnblogs.com/qiyebao/p/4205766.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!