标签:
上一篇文章使用的是Direct的Exchange,但是没有指定Queue的名字,这样只能是先运行Consumer之后,Producer在运行发消息Consumer才能收到,否则先运行Producer发送消息,在运行Consumer是收不到之前Producer发送的消息,因为Queue的名字像是这样的:amq.gen-X-XSTaseUmil42zrawBVsw都是临时,如果Consumer关闭之后,这个Queue就会自动被RabbitMQ删掉。
如果想创建可以先执行Producer的Direct的Exchnage呢?因为在实际工作中我们可能需要发送端有消息就会一直发给接收端,不管接收端是否已经运行。如果我们需要指定名称的Queue,并且使用Direct的Exchange方式,我们需要使用Binding的方式。上一篇和第一篇文章中都解释了绑定的含义:绑定其实就是关联了exchange和queue。
使用指定名称的Queue和Direct的Exchange例子:
Producer.cs
1 /// SendDemo51.exe info 2 /// SendDemo51.exe warning 3 /// SendDemo51.exe error 4 /// </param> 5 static void Main(string[] args) 6 { 7 var factory = new ConnectionFactory() { HostName = "localhost" }; 8 using (var connection = factory.CreateConnection()) 9 { 10 using (var channel = connection.CreateModel()) 11 { 12 const string EXCHANGE_NAME = "direct_logs"; 13 channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。 14 15 const string QUEUE_NAME = "direct_hello";//使用我们自己指定Queue的名称 16 bool durable = true; 17 channel.QueueDeclare(QUEUE_NAME, durable, false, false, null); 18 19 var routingKey = (args.Length > 0) ? args[0] : "info"; 20 21 channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联 22 23 var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); 24 25 var body = Encoding.UTF8.GetBytes(message); 26 27 var properties = channel.CreateBasicProperties(); 28 properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties 29 30 channel.BasicPublish(EXCHANGE_NAME, routingKey, properties, body); 31 Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message); 32 33 Console.Read(); 34 } 35 } 36 }
Consumer.cs
1 static void Main(string[] args) 2 { 3 var factory = new ConnectionFactory() { HostName = "localhost" }; 4 using (var connection = factory.CreateConnection()) 5 { 6 using (var channel = connection.CreateModel()) 7 { 8 const string EXCHANGE_NAME = "direct_logs"; 9 channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); 10 11 const string QUEUE_NAME = "direct_hello";//使用我们自己指定Queue的名称 12 bool durable = true; 13 channel.QueueDeclare(QUEUE_NAME, durable, false, false, null); 14 string queueName = QUEUE_NAME; 15 16 if (args.Length < 1) 17 { 18 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); 19 Environment.ExitCode = 1; 20 return; 21 } 22 23 foreach (var routingKey in args) 24 { 25 channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联 26 } 27 28 Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C"); 29 30 var consumer = new QueueingBasicConsumer(channel); 31 channel.BasicConsume(queueName, true, consumer); 32 33 while (true) 34 { 35 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 36 37 var body = ea.Body; 38 var message = Encoding.UTF8.GetString(body); 39 var routingKey = ea.RoutingKey; 40 Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", routingKey, message); 41 } 42 } 43 } 44 }
使用动态方式获取指定名称的Queue和Direct的Exchange例子:
Producer.cs
1 /// <summary> 2 /// 支持多个Queue,通过routingkey可以拿到多个queue里面的内容; 3 /// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息, 4 /// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。 5 /// SendDemo52.exe info custom_direct_hello1 6 /// SendDemo52.exe info custom_direct_hello2 wyp aaa 7 /// </summary> 8 /// <param name="args"></param> 9 static void Main(string[] args) 10 { 11 var factory = new ConnectionFactory() { HostName = "localhost" }; 12 using (var connection = factory.CreateConnection()) 13 { 14 using (var channel = connection.CreateModel()) 15 { 16 const string EXCHANGE_NAME = "direct_logs"; 17 channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。 18 19 string queueName = (args.Length > 1) ? args[1] : "direct_hello";//得到我们自己指定Queue的名称 20 channel.QueueDeclare(queueName, true, false, false, null); 21 22 var routingKey = (args.Length > 0) ? args[0] : "info"; 23 channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联 24 25 var message = (args.Length > 3) ? string.Join(" ", args.Skip(2).ToArray()) : "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); 26 27 var body = Encoding.UTF8.GetBytes(message); 28 29 var properties = channel.CreateBasicProperties(); 30 properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties 31 32 channel.BasicPublish(EXCHANGE_NAME, routingKey, properties, body); 33 Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message); 34 35 Console.Read(); 36 } 37 } 38 }
Consumer.cs
1 /// <summary> 2 /// 支持多个Queue,通过routingkey可以拿到多个queue里面的内容; 3 /// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息, 4 /// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。 5 /// ReceiveDemo52.exe info custom_direct_hello1 6 /// ReceiveDemo52.exe info custom_direct_hello2 7 /// </summary> 8 class Program 9 { 10 static void Main(string[] args) 11 { 12 var factory = new ConnectionFactory() { HostName = "localhost" }; 13 using (var connection = factory.CreateConnection()) 14 { 15 using (var channel = connection.CreateModel()) 16 { 17 const string EXCHANGE_NAME = "direct_logs"; 18 channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); 19 if (args.Length < 1) 20 { 21 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); 22 Environment.ExitCode = 1; 23 return; 24 } 25 26 string queueName = (args.Length > 1) ? args[1] : "direct_hello";//得到我们自己指定Queue的名称 27 channel.QueueDeclare(queueName, true, false, false, null); 28 29 var routingKey = (args.Length > 0) ? args[0] : "info"; 30 channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称和Direct的Exchange方式进行关联 31 32 Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C"); 33 34 var consumer = new QueueingBasicConsumer(channel); 35 channel.BasicConsume(queueName, true, consumer); 36 37 while (true) 38 { 39 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 40 41 var body = ea.Body; 42 var message = Encoding.UTF8.GetString(body); 43 string routingKeyRe = ea.RoutingKey; 44 Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", routingKeyRe, message); 45 } 46 } 47 } 48 } 49 }
特别注意:
本例子是支持多个Queue名称同时发送和接收的,Consumer通过routing key可以拿到属于这个routing key里面的多个queue的内容;
如果Consumer一旦接收到消息,分两种情况:
1、接收的Consumer不是指定名称的Queue(此时这个Consumer是通过routing key得到Producer发送的消息),那么这个Consumer是不会自动从Queue中删除接收到的消息;
2、接收的Consumer是指定名称的Queue,那么这个Consumer是会自动从Queue中删除接收到的消息。
总结:只有当指定名称的Queue收到消息之后才会把Queue中的这条消息删除。
RabbitMQ消息队列(五):Routing 消息路由 2[原]
标签:
原文地址:http://www.cnblogs.com/qiyebao/p/4205851.html