码迷,mamicode.com
首页 > Windows程序 > 详细

C# 封装RabbitMQ消息队列处理

时间:2019-12-20 13:47:09      阅读:110      评论:0      收藏:0      [点我收藏+]

标签:连接   abort   art   provider   两种   程序   int   发布   mitm   

现在使用.net领域使用RabbitMQ有很多解决方案,我自己使用过的有两个,一个是EasyNetQ,一个是CAP,都挺好用的,尤其是CAP,懒人推荐使用,怎么使用的文章也很多,小伙伴可以自行搜索。

最近我自己尝试根据目前手头项目的需要,自行封装一下基于RabbitMQ的使用,下面开搞,贴上我自己写的代码。

首先定义消息发布者/生产者接口:

 1 using System.Threading.Tasks;
 2 
 3 namespace fx.MQ
 4 {
 5     public interface IPublisher
 6     {
 7         /// <summary>
 8         /// 释放资源。
 9         /// </summary>
10         void Dispose();
11         /// <summary>
12         /// 
13         /// </summary>
14         /// <typeparam name="T"></typeparam>
15         /// <param name="message"></param>
16         void Publish<T>(T message) where T : class;
17         /// <summary>
18         /// 
19         /// </summary>
20         /// <param name="message"></param>
21         /// <param name="channelName"></param>
22         void Publish(string message, string channelName);
23         /// <summary>
24         /// 
25         /// </summary>
26         /// <typeparam name="T"></typeparam>
27         /// <param name="message"></param>
28         /// <returns></returns>
29         Task PublishAsync<T>(T message) where T : class;
30     }
31 }

定义订阅者/消费者接口:

 1 using System;
 2 using System.Threading.Tasks;
 3 
 4 namespace fx.MQ
 5 {
 6     public interface ISubscriber
 7     {
 8         /// <summary>
 9         /// 
10         /// </summary>
11         void Dispose();
12         /// <summary>
13         /// 
14         /// </summary>
15         /// <typeparam name="T"></typeparam>
16         /// <param name="channelName"></param>
17         /// <returns></returns>
18         void Subscribe(string channelName, Action<string> callback);
19         /// <summary>
20         /// 
21         /// </summary>
22         /// <typeparam name="T"></typeparam>
23         /// <param name="channelName"></param>
24         /// <returns></returns>
25         Task<T> SubscribeAsync<T>(string channelName) where T : class;
26     }
27 }

定义RabbmitMQProvider

 1 using RabbitMQ.Client;
 2 using System;
 3 using System.Collections.Generic;
 4 using System.Text;
 5 
 6 namespace fx.MQ
 7 {
 8     public class RabbitMQProvider
 9     {
10         private readonly string _ipAddress;
11         private readonly int? _port;
12         private readonly string _username;
13         private readonly string _password;
14 
15         public RabbitMQProvider(string ipAddress, int? port, string username, string password)
16         {
17             _ipAddress = ipAddress ?? throw new ArgumentException("IP地址不能为空!");
18             _port = port ?? throw new ArgumentException("端口不能为空");
19             _username = username ?? throw new ArgumentException("用户名不能为空");
20             _password = password ?? throw new ArgumentException("密码不能为空");
21 
22             ConnectionFactory = new ConnectionFactory//创建连接工厂对象
23             {
24                 HostName = _ipAddress,//IP地址
25                 Port = (int)_port,//端口号
26                 UserName = _username,//用户账号
27                 Password = _password//用户密码
28             };
29         }
30 
31         public IConnectionFactory ConnectionFactory { get; }
32 
33     }
34 }

实现生产者:

  1 using Newtonsoft.Json;
  2 using RabbitMQ.Client;
  3 using System;
  4 using System.Text;
  5 using System.Threading.Tasks;
  6 
  7 namespace fx.MQ
  8 {
  9     /// <summary>
 10     /// 消息发布者。
 11     /// </summary>
 12     public class RabbitMQPublisher : IPublisher
 13     {
 14 
 15         private readonly RabbitMQProvider _provider;
 16         private IConnection _connection;
 17         public RabbitMQPublisher(RabbitMQProvider provider)
 18         {
 19             _provider = provider;
 20             _connection = _provider.ConnectionFactory.CreateConnection();
 21         }
 22 
 23         public IConnection Connection
 24         {
 25             get
 26             {
 27                 if (_connection != null)
 28                     return _connection;
 29                 return _connection = _provider.ConnectionFactory.CreateConnection();
 30             }
 31         }
 32 
 33         private IModel _channel;
 34         public IModel Channel
 35         {
 36             get
 37             {
 38                 if (_channel != null)
 39                     return _channel;
 40                 else
 41                     return _channel = _connection.CreateModel();
 42             }
 43         }
 44 
 45         public void Dispose()
 46         {
 47             if (Channel != null)
 48             {
 49                 if (Channel.IsOpen)
 50                     Channel.Close();
 51                 Channel.Abort();
 52                 Channel.Dispose();
 53             }
 54 
 55             if (Connection != null)
 56             {
 57                 if (Connection.IsOpen)
 58                     Connection.Close();
 59             }
 60         }
 61 
 62         public void Publish<T>(T message) where T : class
 63         {
 64             var channelName = typeof(T).Name;
 65             Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
 66 
 67             var msgContent = JsonConvert.SerializeObject(message);
 68             var msgByte = Encoding.UTF8.GetBytes(msgContent);
 69             Channel.BasicPublish
 70             (
 71                 exchange: channelName,
 72                 routingKey: string.Empty,
 73                 mandatory: false,
 74                 basicProperties: null,
 75                 body: msgByte
 76             );
 77         }
 78 
 79 
 80         public void Publish(string message, string channelName)
 81         {
 82             Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
 83 
 84             var msgByte = Encoding.UTF8.GetBytes(message);
 85             Channel.BasicPublish
 86             (
 87                 exchange: channelName,
 88                 routingKey: string.Empty,
 89                 mandatory: false,
 90                 basicProperties: null,
 91                 body: msgByte
 92             );
 93         }
 94 
 95         public Task PublishAsync<T>(T message) where T : class
 96         {
 97             throw new NotImplementedException();
 98         }
 99     }
100 }

实现消费者:

  1 using RabbitMQ.Client;
  2 using RabbitMQ.Client.Events;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.Text;
  6 using System.Threading.Tasks;
  7 
  8 namespace fx.MQ
  9 {
 10     /// <summary>
 11     /// 消息订阅者/消费者。
 12     /// </summary>
 13     public class RabbitMQSubscriber : ISubscriber
 14     {
 15         private readonly RabbitMQProvider _provider;
 16         private IConnection _connection;
 17         public RabbitMQSubscriber(RabbitMQProvider provider)
 18         {
 19             _provider = provider;
 20             _connection = _provider.ConnectionFactory.CreateConnection();
 21         }
 22 
 23         public IConnection Connection
 24         {
 25             get
 26             {
 27                 if (_connection != null)
 28                     return _connection;
 29                 return _connection = _provider.ConnectionFactory.CreateConnection();
 30             }
 31         }
 32 
 33         private IModel _channel;
 34         public IModel Channel
 35         {
 36             get
 37             {
 38                 if (_channel != null)
 39                     return _channel;
 40                 else
 41                     return _channel = _connection.CreateModel();
 42             }
 43         }
 44 
 45 
 46         public void Dispose()
 47         {
 48             if (_channel != null)
 49             {
 50                 _channel.Abort();
 51                 if (_channel.IsOpen)
 52                     _channel.Close();
 53                 
 54                 _channel.Dispose();
 55             }
 56 
 57             if (_connection != null)
 58             {
 59                 if (_connection.IsOpen)
 60                     _connection.Close();
 61 
 62                 _connection.Dispose();
 63             }
 64         }
 65 
 66         /// <summary>
 67         /// 消费消息,并执行回调。
 68         /// </summary>
 69         /// <param name="channelName"></param>
 70         /// <param name="callback"></param>
 71         public void Subscribe(string channelName, Action<string> callback)
 72         {
 73             //声明交换机
 74             Channel.ExchangeDeclare(exchange: channelName, type: "fanout");
 75             //消息队列名称
 76             var queueName = channelName + "_" + Guid.NewGuid().ToString().Replace("-", "");
 77             //声明队列
 78             Channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
 79             //将队列与交换机进行绑定
 80             Channel.QueueBind(queue: queueName, exchange: channelName, routingKey: "");
 81             //声明为手动确认,每次只消费1条消息。
 82             Channel.BasicQos(0, 1, false);
 83             //定义消费者
 84             var consumer = new EventingBasicConsumer(Channel);
 85             //接收事件
 86             consumer.Received += (eventSender, args) =>
 87             {
 88                 var message = args.Body;//接收到的消息
 89 
 90                 callback(Encoding.UTF8.GetString(message));
 91                 //返回消息确认
 92                 Channel.BasicAck(args.DeliveryTag, true);
 93             };
 94             //开启监听
 95             Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
 96 
 97         }
 98 
 99         public Task<T> SubscribeAsync<T>(string channelName) where T : class
100         {
101             throw new NotImplementedException();
102         }
103     }
104 }

到这里为止,简单的实现消息队列的接受,发送,已经满足我自己当前项目的需要了。这里我用的exchange进行消息队列的生产消费,并且用fanout模式,就是一个生产者对应多个消费者,有点类似于消息广播,另外还有两种模式,可以根据需要修改。

下面是测试代码:

 1 using System;
 2 using System.Windows.Forms;
 3 
 4 namespace fx.MQ.TestForm
 5 {
 6     public partial class Form1 : Form
 7     {
 8         private readonly RabbitMQProvider _provider;
 9         private readonly RabbitMQPublisher _publisher;
10         private readonly RabbitMQSubscriber _subscriber;
11         delegate void Callback(string msg);
12 
13         public Form1()
14         {
15             _provider = new RabbitMQProvider("192.168.101.199", 5672, "admin", "admin");
16             _publisher = new RabbitMQPublisher(_provider);
17             _subscriber = new RabbitMQSubscriber(_provider);
18             //callback = new Callback(ShowMessage);
19             InitializeComponent();
20         }
21 
22         private void button1_Click(object sender, EventArgs e)
23         {
24             _publisher.Publish(textBox1.Text, "public");
25         }
26 
27         private void Form1_Load(object sender, EventArgs e)
28         {
29             
30             _subscriber.Subscribe("public", c=> {
31                 ShowMessage(c);
32             });
33         }
34 
35 
36         private void ShowMessage(string msg)
37         {
38             if (this.richTextBox1.InvokeRequired)
39             {
40                 var cb = new Callback(ShowMessage);
41                 this.Invoke(cb, new object[] { msg });
42             }
43             else
44             {
45                 this.richTextBox1.Text = msg;
46             }
47         }
48     }
49 }

运行效果如图所示:

技术图片

 

 

OK,没有问题。

另外注意,退出程序时消息发布者和订阅者都需要Dispose()来释放连接。

C# 封装RabbitMQ消息队列处理

标签:连接   abort   art   provider   两种   程序   int   发布   mitm   

原文地址:https://www.cnblogs.com/my85016629/p/12072401.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!