码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ消息队列(五):Routing 消息路由 2[原]

时间:2015-01-06 15:21:53      阅读:166      评论:0      收藏:0      [点我收藏+]

标签:

上一篇文章使用的是Direct的Exchange,但是没有指定Queue的名字,这样只能是先运行Consumer之后,Producer在运行发消息Consumer才能收到,否则先运行Producer发送消息,在运行Consumer是收不到之前Producer发送的消息,因为Queue的名字像是这样的:amq.gen-X-XSTaseUmil42zrawBVsw都是临时,如果Consumer关闭之后,这个Queue就会自动被RabbitMQ删掉。

如果想创建可以先执行Producer的Direct的Exchnage呢?因为在实际工作中我们可能需要发送端有消息就会一直发给接收端,不管接收端是否已经运行。如果我们需要指定名称的Queue,并且使用Direct的Exchange方式,我们需要使用Binding的方式。上一篇和第一篇文章中都解释了绑定的含义:绑定其实就是关联了exchange和queue。

使用指定名称的Queue和Direct的Exchange例子:

Producer.cs

技术分享
 1 /// SendDemo51.exe info
 2         /// SendDemo51.exe warning
 3         /// SendDemo51.exe error
 4         /// </param>
 5         static void Main(string[] args)
 6         {
 7             var factory = new ConnectionFactory() { HostName = "localhost" };
 8             using (var connection = factory.CreateConnection())
 9             {
10                 using (var channel = connection.CreateModel())
11                 {
12                     const string EXCHANGE_NAME = "direct_logs";
13                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。
14 
15                     const string QUEUE_NAME = "direct_hello";//使用我们自己指定Queue的名称
16                     bool durable = true;
17                     channel.QueueDeclare(QUEUE_NAME, durable, false, false, null);
18 
19                     var routingKey = (args.Length > 0) ? args[0] : "info";
20 
21                     channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联
22 
23                     var message = (args.Length > 1)  ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
24 
25                     var body = Encoding.UTF8.GetBytes(message);
26 
27                     var properties = channel.CreateBasicProperties();
28                     properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties
29 
30                     channel.BasicPublish(EXCHANGE_NAME, routingKey, properties, body);
31                     Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message);
32 
33                     Console.Read();
34                 }
35             }
36         }
Producer.cs

Consumer.cs

技术分享
 1 static void Main(string[] args)
 2         {
 3             var factory = new ConnectionFactory() { HostName = "localhost" };
 4             using (var connection = factory.CreateConnection())
 5             {
 6                 using (var channel = connection.CreateModel())
 7                 {
 8                     const string EXCHANGE_NAME = "direct_logs";
 9                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
10 
11                     const string QUEUE_NAME = "direct_hello";//使用我们自己指定Queue的名称
12                     bool durable = true;
13                     channel.QueueDeclare(QUEUE_NAME, durable, false, false, null);
14                     string queueName = QUEUE_NAME;
15 
16                     if (args.Length < 1)
17                     {
18                         Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]);
19                         Environment.ExitCode = 1;
20                         return;
21                     }
22 
23                     foreach (var routingKey in args)
24                     {
25                         channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联
26                     }
27 
28                     Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");
29 
30                     var consumer = new QueueingBasicConsumer(channel);
31                     channel.BasicConsume(queueName, true, consumer);
32 
33                     while (true)
34                     {
35                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
36 
37                         var body = ea.Body;
38                         var message = Encoding.UTF8.GetString(body);
39                         var routingKey = ea.RoutingKey;
40                         Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", routingKey, message);
41                     }
42                 }
43             }
44         }
Consumer.cs

 使用动态方式获取指定名称的Queue和Direct的Exchange例子:

Producer.cs

 

技术分享
 1 /// <summary>
 2         /// 支持多个Queue,通过routingkey可以拿到多个queue里面的内容;
 3         /// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息,
 4         /// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。
 5         /// SendDemo52.exe info custom_direct_hello1
 6         /// SendDemo52.exe info custom_direct_hello2 wyp aaa
 7         /// </summary>
 8         /// <param name="args"></param>
 9         static void Main(string[] args)
10         {
11             var factory = new ConnectionFactory() { HostName = "localhost" };
12             using (var connection = factory.CreateConnection())
13             {
14                 using (var channel = connection.CreateModel())
15                 {
16                     const string EXCHANGE_NAME = "direct_logs";
17                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。
18 
19                     string queueName = (args.Length > 1) ? args[1] : "direct_hello";//得到我们自己指定Queue的名称
20                     channel.QueueDeclare(queueName, true, false, false, null);
21 
22                     var routingKey = (args.Length > 0) ? args[0] : "info";
23                     channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联
24 
25                     var message = (args.Length > 3) ? string.Join(" ", args.Skip(2).ToArray()) : "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
26 
27                     var body = Encoding.UTF8.GetBytes(message);
28 
29                     var properties = channel.CreateBasicProperties();
30                     properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties
31 
32                     channel.BasicPublish(EXCHANGE_NAME, routingKey, properties, body);
33                     Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message);
34 
35                     Console.Read();
36                 }
37             }
38         }
Producer.cs

 

 

Consumer.cs

 

技术分享
 1 /// <summary>
 2     /// 支持多个Queue,通过routingkey可以拿到多个queue里面的内容;
 3     /// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息,
 4     /// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。
 5     /// ReceiveDemo52.exe info custom_direct_hello1
 6     /// ReceiveDemo52.exe info custom_direct_hello2
 7     /// </summary>
 8     class Program
 9     {
10         static void Main(string[] args)
11         {
12             var factory = new ConnectionFactory() { HostName = "localhost"  };
13             using (var connection = factory.CreateConnection())
14             {
15                 using (var channel = connection.CreateModel())
16                 {
17                     const string EXCHANGE_NAME = "direct_logs";
18                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
19                     if (args.Length < 1)
20                     {
21                         Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]);
22                         Environment.ExitCode = 1;
23                         return;
24                     }
25 
26                     string queueName = (args.Length > 1) ? args[1] : "direct_hello";//得到我们自己指定Queue的名称
27                     channel.QueueDeclare(queueName, true, false, false, null);
28 
29                     var routingKey = (args.Length > 0) ? args[0] : "info";
30                     channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联
31 
32                     Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");
33 
34                     var consumer = new QueueingBasicConsumer(channel);
35                     channel.BasicConsume(queueName, true, consumer);
36 
37                     while (true)
38                     {
39                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
40 
41                         var body = ea.Body;
42                         var message = Encoding.UTF8.GetString(body);
43                         string routingKeyRe = ea.RoutingKey;
44                         Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", routingKeyRe, message);
45                     }
46                 }
47             }
48         }
49     }
Consumer.cs

 

特别注意:

本例子是支持多个Queue名称同时发送和接收的,Consumer通过routing key可以拿到属于这个routing key里面的多个queue的内容;
如果Consumer一旦接收到消息,分两种情况:

1、接收的Consumer不是指定名称的Queue(此时这个Consumer是通过routing key得到Producer发送的消息),那么这个Consumer是不会自动从Queue中删除接收到的消息;

2、接收的Consumer是指定名称的Queue,那么这个Consumer是会自动从Queue中删除接收到的消息。

总结:只有当指定名称的Queue收到消息之后才会把Queue中的这条消息删除。

 

RabbitMQ消息队列(五):Routing 消息路由 2[原]

标签:

原文地址:http://www.cnblogs.com/qiyebao/p/4205851.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!