标签:
上篇文章中,我们构建了一个简单的日志系统。接下来,我们将丰富它:能够使用不同的severity(严重程度)来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
上篇文章中我们是这么做的绑定:
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//const string ROUTING_KEY = "";
绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。
实际上,绑定可以带routing key这个参数。其实这个参数的名称和basic_publish的参数名是相同了。
为了避免混淆,我们把这个routing key称为binding key(即在Exchange中的routing key)
使用一个binding key来创建binding :
channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//string routingKey = "指定RoutingKey的名称";
上一篇文章我们讲的是使用fanout类型的exchange,对于fanout的exchange来说,这个参数是被忽略的。
Direct exchange的路由算法非常简单:通过binding key的完全匹配,可以通过下图来说明。
首先是我们要创建一个direct的exchange:
const string EXCHANGE_NAME = "direct_logs"; channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
我们将使用log的severity(严重级别)作为routing key,这样Consumer可以针对不同severity(严重级别)的log进行不同的处理。
channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body);
我们使用三种severity(严重级别):‘info‘, ‘warning‘, ‘error‘.
对于queue,我们需要绑定severity(严重级别):
const string EXCHANGE_NAME = "direct_logs"; channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); string queueName = channel.QueueDeclare(); channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);
1 /// SendDemo5.exe info 2 /// SendDemo5.exe warning 3 /// SendDemo5.exe error 4 /// </param> 5 static void Main(string[] args) 6 { 7 var factory = new ConnectionFactory() { HostName = "localhost" }; 8 using (var connection = factory.CreateConnection()) 9 { 10 using (var channel = connection.CreateModel()) 11 { 12 const string EXCHANGE_NAME = "direct_logs"; 13 channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。 14 //Exchange在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。 15 var routingKey = (args.Length > 0) ? args[0] : "info"; 16 17 var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; 18 var body = Encoding.UTF8.GetBytes(message); 19 20 channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body); 21 Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message); 22 } 23 } 24 }
Consumer.cs
1 /// <summary> 2 /// 接收端创建临时queue 3 /// </summary> 4 /// <param name="args"> 5 /// ReceiveDemo5.exe info 6 /// ReceiveDemo5.exe warning 7 /// ReceiveDemo5.exe error 8 /// </param> 9 static void Main(string[] args) 10 { 11 var factory = new ConnectionFactory() { HostName = "localhost" }; 12 using (var connection = factory.CreateConnection()) 13 { 14 using (var channel = connection.CreateModel()) 15 { 16 const string EXCHANGE_NAME = "direct_logs"; 17 channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); 18 string queueName = channel.QueueDeclare();//得到Queue的名字 19 if (args.Length < 1) 20 { 21 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); 22 Environment.ExitCode = 1; 23 return; 24 } 25 26 foreach (var routingKey in args) 27 { 28 channel.QueueBind(queueName, EXCHANGE_NAME, routingKey); 29 } 30 31 Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C"); 32 33 var consumer = new QueueingBasicConsumer(channel); 34 channel.BasicConsume(queueName, true, consumer); 35 36 while (true) 37 { 38 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 39 40 var body = ea.Body; 41 var message = Encoding.UTF8.GetString(body); 42 var routingKey = ea.RoutingKey; 43 Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", 44 routingKey, message); 45 } 46 } 47 } 48 }
必须先运行Consumer,然后在运行Producer.
ReceiveDemo5.exe info warning error 或者ReceiveDemo5.exe info,ReceiveDemo5.exe warning,ReceiveDemo5.exe error
SendDemo5.exe info wyp,SendDemo5.exe warning wyp,SendDemo5.exe warning wyp
RabbitMQ消息队列(五):Routing 消息路由[转]
标签:
原文地址:http://www.cnblogs.com/qiyebao/p/4205766.html