码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitmq系列——(3 优先级 )

时间:2021-01-19 12:12:57      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:队列   contain   load   icp   bytes   tin   stat   with   tag   

  (1)消费消息失败后,重新加入队列并优先级处理;

  (2)根据消息的内容筛选出优先级高的进行设置,并发送

1. 生产者

using RabbitMQMsgProducer.MessageProducer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;

namespace RabbitMQMsgProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // 优先级
                    // 1. 消费消息失败后,重新加入队列并优先级处理
                    PriorityMsg.Send01();
                }
                {
                    // 优先级
                    // 2. 根据消息的内容筛选出优先级高的进行设置,并发送
                    //PriorityMsg.Send02();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgProducer.MessageProducer
{
    public class PriorityMsg
    {
        /// <summary>
        /// 消费消息失败后,重新加入队列并优先级处理
        /// </summary>
        public static void Send01()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";
            connectionFactory.UserName = "guest";
            connectionFactory.Password = "guest";
            string queueName = "PriorityMsgQueue";
            string exchangeName = "PriorityMsgExchange";
            string routingKeyName = "PriorityKey";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明exchange
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    // 声明队列
                    // x-max-priority 指定队列的优先级设置,必须的
                    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>()
                    {
                        {"x-max-priority",10 }
                    });
                    // 绑定
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName);

                    IBasicProperties props = channel.CreateBasicProperties();
                    props.DeliveryMode = 2;
                    int i = 1;
                    while (true)
                    {
                        props.Priority = 1; // 设置优先级
                        string msg = $"the message is {i}";
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: props, body: Encoding.UTF8.GetBytes(msg));
                        Console.WriteLine($"{msg} is send.");
                        i++;
                        if (i > 29)
                        {
                            break;
                        }
                    }
                    Console.WriteLine("press [enter] exit.");
                    Console.Read();
                }
            }
        }

        /// <summary>
        /// 根据消息的内容筛选出优先级高的进行设置,并发送
        /// </summary>
        public static void Send02()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";
            connectionFactory.UserName = "guest";
            connectionFactory.Password = "guest";
            string queueName = "PriorityMsgQueue";
            string exchangeName = "PriorityMsgExchange";
            string routingKeyName = "PriorityKey";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    // 声明exchange
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    // 声明队列
                    // x-max-priority 指定队列的优先级设置,必须的
                    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>()
                    {
                        {"x-max-priority",10 }
                    });
                    // 绑定
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKeyName);

                    string[] msgList =
                    {
                        "顶头上司 给你一个任务1",
                        "1部门领导 约你沟通问题",
                        "2部门领导 约你沟通问题",
                        "3部门领导 约你沟通问题",
                        "人力 谈谈涨薪",
                        "顶头上司 给你一个任务2",
                    };
                    IBasicProperties props = channel.CreateBasicProperties();
                    foreach (string msg in msgList)
                    {
                        ////没有优先级这样写
                        //channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: null, body: Encoding.UTF8.GetBytes(msg));
                        if (msg.StartsWith("顶头上司"))
                        {
                            props.Priority = 10;
                        }
                        else if (msg.Contains("涨薪"))
                        {
                            props.Priority = 9;
                        }
                        else
                        {
                            props.Priority = 1;
                        }
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: props, body: Encoding.UTF8.GetBytes(msg));
                        Console.WriteLine($"{msg} is send.");
                    }
                    Console.WriteLine("press [enter] exit.");
                    Console.Read();
                }
            }
        }
    }
}

 

2. 消费者

using RabbitMQMsgProducer.MessageProducer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;

namespace RabbitMQMsgProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // 优先级
                    // 1. 消费消息失败后,重新加入队列并优先级处理
                    PriorityMsg.Send01();
                }
                {
                    // 优先级
                    // 2. 根据消息的内容筛选出优先级高的进行设置,并发送
                    //PriorityMsg.Send02();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer001.MessageConsumer
{
    public class PriorityMsg
    {
        /// <summary>
        /// 消费消息失败后,重新加入队列并优先级处理
        /// </summary>
        public static void Receive01()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";
            connectionFactory.UserName = "guest";
            connectionFactory.Password = "guest";
            string queueName = "PriorityMsgQueue";
            string exchangeName = "PriorityMsgExchange";
            string routingKeyName = "PriorityKey";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        string msg = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"the consumer receive {msg}");
                        if(msg.Equals("the message is 1"))
                        {
                            // 这里重新删除消息队列中消息,并重新设置消息优先级,写入队列
                            // 消息默认队列优先级是1,重新写入设置为10。
                            //否定:告诉Broker,这个消息我没有正常消费;  requeue: true:重新写入到队列里去; false:你还是删除掉;
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);

                            ///设置消息优先级最高 重新写入到队列中去
                            IBasicProperties props = channel.CreateBasicProperties();
                            props.Priority = 10;
                            channel.BasicPublish(exchange: exchangeName, routingKey: routingKeyName, basicProperties: props, body: Encoding.UTF8.GetBytes(msg + "double.."));
                        }
                        else
                        {
                            //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                    };
                    Console.WriteLine("the consumer is ready.");
                    //处理消息 
                    //autoAck: false  显示确认;
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }

        /// <summary>
        /// 根据消息的内容筛选出优先级高的进行设置,并发送
        /// </summary>
        public static void Receive02()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost";
            connectionFactory.UserName = "guest";
            connectionFactory.Password = "guest";
            string queueName = "PriorityMsgQueue";
            string exchangeName = "PriorityMsgExchange";
            string routingKeyName = "PriorityKey";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        string msg = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"the consumer receive {msg}");
                        //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    Console.WriteLine("the consumer is ready.");
                    //处理消息 
                    //autoAck: false  显示确认;
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

 

3. 结果

  消费消息失败后,重新加入队列并优先级处理;

技术图片

 

 4. 结果

   根据消息的内容筛选出优先级高的进行设置,并发送

技术图片

 

rabbitmq系列——(3 优先级 )

标签:队列   contain   load   icp   bytes   tin   stat   with   tag   

原文地址:https://www.cnblogs.com/Fletcher/p/14172806.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!