标签:连接 abort art provider 两种 程序 int 发布 mitm
现在使用.net领域使用RabbitMQ有很多解决方案,我自己使用过的有两个,一个是EasyNetQ,一个是CAP,都挺好用的,尤其是CAP,懒人推荐使用,怎么使用的文章也很多,小伙伴可以自行搜索。
最近我自己尝试根据目前手头项目的需要,自行封装一下基于RabbitMQ的使用,下面开搞,贴上我自己写的代码。
首先定义消息发布者/生产者接口:
1 using System.Threading.Tasks; 2 3 namespace fx.MQ 4 { 5 public interface IPublisher 6 { 7 /// <summary> 8 /// 释放资源。 9 /// </summary> 10 void Dispose(); 11 /// <summary> 12 /// 13 /// </summary> 14 /// <typeparam name="T"></typeparam> 15 /// <param name="message"></param> 16 void Publish<T>(T message) where T : class; 17 /// <summary> 18 /// 19 /// </summary> 20 /// <param name="message"></param> 21 /// <param name="channelName"></param> 22 void Publish(string message, string channelName); 23 /// <summary> 24 /// 25 /// </summary> 26 /// <typeparam name="T"></typeparam> 27 /// <param name="message"></param> 28 /// <returns></returns> 29 Task PublishAsync<T>(T message) where T : class; 30 } 31 }
定义订阅者/消费者接口:
1 using System; 2 using System.Threading.Tasks; 3 4 namespace fx.MQ 5 { 6 public interface ISubscriber 7 { 8 /// <summary> 9 /// 10 /// </summary> 11 void Dispose(); 12 /// <summary> 13 /// 14 /// </summary> 15 /// <typeparam name="T"></typeparam> 16 /// <param name="channelName"></param> 17 /// <returns></returns> 18 void Subscribe(string channelName, Action<string> callback); 19 /// <summary> 20 /// 21 /// </summary> 22 /// <typeparam name="T"></typeparam> 23 /// <param name="channelName"></param> 24 /// <returns></returns> 25 Task<T> SubscribeAsync<T>(string channelName) where T : class; 26 } 27 }
定义RabbmitMQProvider
1 using RabbitMQ.Client; 2 using System; 3 using System.Collections.Generic; 4 using System.Text; 5 6 namespace fx.MQ 7 { 8 public class RabbitMQProvider 9 { 10 private readonly string _ipAddress; 11 private readonly int? _port; 12 private readonly string _username; 13 private readonly string _password; 14 15 public RabbitMQProvider(string ipAddress, int? port, string username, string password) 16 { 17 _ipAddress = ipAddress ?? throw new ArgumentException("IP地址不能为空!"); 18 _port = port ?? throw new ArgumentException("端口不能为空"); 19 _username = username ?? throw new ArgumentException("用户名不能为空"); 20 _password = password ?? throw new ArgumentException("密码不能为空"); 21 22 ConnectionFactory = new ConnectionFactory//创建连接工厂对象 23 { 24 HostName = _ipAddress,//IP地址 25 Port = (int)_port,//端口号 26 UserName = _username,//用户账号 27 Password = _password//用户密码 28 }; 29 } 30 31 public IConnectionFactory ConnectionFactory { get; } 32 33 } 34 }
实现生产者:
1 using Newtonsoft.Json; 2 using RabbitMQ.Client; 3 using System; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace fx.MQ 8 { 9 /// <summary> 10 /// 消息发布者。 11 /// </summary> 12 public class RabbitMQPublisher : IPublisher 13 { 14 15 private readonly RabbitMQProvider _provider; 16 private IConnection _connection; 17 public RabbitMQPublisher(RabbitMQProvider provider) 18 { 19 _provider = provider; 20 _connection = _provider.ConnectionFactory.CreateConnection(); 21 } 22 23 public IConnection Connection 24 { 25 get 26 { 27 if (_connection != null) 28 return _connection; 29 return _connection = _provider.ConnectionFactory.CreateConnection(); 30 } 31 } 32 33 private IModel _channel; 34 public IModel Channel 35 { 36 get 37 { 38 if (_channel != null) 39 return _channel; 40 else 41 return _channel = _connection.CreateModel(); 42 } 43 } 44 45 public void Dispose() 46 { 47 if (Channel != null) 48 { 49 if (Channel.IsOpen) 50 Channel.Close(); 51 Channel.Abort(); 52 Channel.Dispose(); 53 } 54 55 if (Connection != null) 56 { 57 if (Connection.IsOpen) 58 Connection.Close(); 59 } 60 } 61 62 public void Publish<T>(T message) where T : class 63 { 64 var channelName = typeof(T).Name; 65 Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null); 66 67 var msgContent = JsonConvert.SerializeObject(message); 68 var msgByte = Encoding.UTF8.GetBytes(msgContent); 69 Channel.BasicPublish 70 ( 71 exchange: channelName, 72 routingKey: string.Empty, 73 mandatory: false, 74 basicProperties: null, 75 body: msgByte 76 ); 77 } 78 79 80 public void Publish(string message, string channelName) 81 { 82 Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null); 83 84 var msgByte = Encoding.UTF8.GetBytes(message); 85 Channel.BasicPublish 86 ( 87 exchange: channelName, 88 routingKey: string.Empty, 89 mandatory: false, 90 basicProperties: null, 91 body: msgByte 92 ); 93 } 94 95 public Task PublishAsync<T>(T message) where T : class 96 { 97 throw new NotImplementedException(); 98 } 99 } 100 }
实现消费者:
1 using RabbitMQ.Client; 2 using RabbitMQ.Client.Events; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 using System.Threading.Tasks; 7 8 namespace fx.MQ 9 { 10 /// <summary> 11 /// 消息订阅者/消费者。 12 /// </summary> 13 public class RabbitMQSubscriber : ISubscriber 14 { 15 private readonly RabbitMQProvider _provider; 16 private IConnection _connection; 17 public RabbitMQSubscriber(RabbitMQProvider provider) 18 { 19 _provider = provider; 20 _connection = _provider.ConnectionFactory.CreateConnection(); 21 } 22 23 public IConnection Connection 24 { 25 get 26 { 27 if (_connection != null) 28 return _connection; 29 return _connection = _provider.ConnectionFactory.CreateConnection(); 30 } 31 } 32 33 private IModel _channel; 34 public IModel Channel 35 { 36 get 37 { 38 if (_channel != null) 39 return _channel; 40 else 41 return _channel = _connection.CreateModel(); 42 } 43 } 44 45 46 public void Dispose() 47 { 48 if (_channel != null) 49 { 50 _channel.Abort(); 51 if (_channel.IsOpen) 52 _channel.Close(); 53 54 _channel.Dispose(); 55 } 56 57 if (_connection != null) 58 { 59 if (_connection.IsOpen) 60 _connection.Close(); 61 62 _connection.Dispose(); 63 } 64 } 65 66 /// <summary> 67 /// 消费消息,并执行回调。 68 /// </summary> 69 /// <param name="channelName"></param> 70 /// <param name="callback"></param> 71 public void Subscribe(string channelName, Action<string> callback) 72 { 73 //声明交换机 74 Channel.ExchangeDeclare(exchange: channelName, type: "fanout"); 75 //消息队列名称 76 var queueName = channelName + "_" + Guid.NewGuid().ToString().Replace("-", ""); 77 //声明队列 78 Channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 79 //将队列与交换机进行绑定 80 Channel.QueueBind(queue: queueName, exchange: channelName, routingKey: ""); 81 //声明为手动确认,每次只消费1条消息。 82 Channel.BasicQos(0, 1, false); 83 //定义消费者 84 var consumer = new EventingBasicConsumer(Channel); 85 //接收事件 86 consumer.Received += (eventSender, args) => 87 { 88 var message = args.Body;//接收到的消息 89 90 callback(Encoding.UTF8.GetString(message)); 91 //返回消息确认 92 Channel.BasicAck(args.DeliveryTag, true); 93 }; 94 //开启监听 95 Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); 96 97 } 98 99 public Task<T> SubscribeAsync<T>(string channelName) where T : class 100 { 101 throw new NotImplementedException(); 102 } 103 } 104 }
到这里为止,简单的实现消息队列的接受,发送,已经满足我自己当前项目的需要了。这里我用的exchange进行消息队列的生产消费,并且用fanout模式,就是一个生产者对应多个消费者,有点类似于消息广播,另外还有两种模式,可以根据需要修改。
下面是测试代码:
1 using System; 2 using System.Windows.Forms; 3 4 namespace fx.MQ.TestForm 5 { 6 public partial class Form1 : Form 7 { 8 private readonly RabbitMQProvider _provider; 9 private readonly RabbitMQPublisher _publisher; 10 private readonly RabbitMQSubscriber _subscriber; 11 delegate void Callback(string msg); 12 13 public Form1() 14 { 15 _provider = new RabbitMQProvider("192.168.101.199", 5672, "admin", "admin"); 16 _publisher = new RabbitMQPublisher(_provider); 17 _subscriber = new RabbitMQSubscriber(_provider); 18 //callback = new Callback(ShowMessage); 19 InitializeComponent(); 20 } 21 22 private void button1_Click(object sender, EventArgs e) 23 { 24 _publisher.Publish(textBox1.Text, "public"); 25 } 26 27 private void Form1_Load(object sender, EventArgs e) 28 { 29 30 _subscriber.Subscribe("public", c=> { 31 ShowMessage(c); 32 }); 33 } 34 35 36 private void ShowMessage(string msg) 37 { 38 if (this.richTextBox1.InvokeRequired) 39 { 40 var cb = new Callback(ShowMessage); 41 this.Invoke(cb, new object[] { msg }); 42 } 43 else 44 { 45 this.richTextBox1.Text = msg; 46 } 47 } 48 } 49 }
运行效果如图所示:
OK,没有问题。
另外注意,退出程序时消息发布者和订阅者都需要Dispose()来释放连接。
标签:连接 abort art provider 两种 程序 int 发布 mitm
原文地址:https://www.cnblogs.com/my85016629/p/12072401.html