客户端代码:
namespace RabbitMQDemo { public partial class RPC : Form { private readonly static RPC _RPC; Action<string, TextBox> SetText; static RPC() { _RPC = new RPC(); } /// <summary> /// 单例模式 /// </summary> public static RPC SingleForm { get { return _RPC; } } private RPC() { CheckForIllegalCrossThreadCalls = false; InitializeComponent(); } private void btnSendMsg_Click(object sender, EventArgs e) {//RPC客户端发出请求 string message = txtPublisher.Text; if (message.Trim().Length <= 0) { MessageBox.Show("请输入要发送的消息"); } RpcClient client = new RpcClient(); var response = client.Call(message); txtRpcClient.Text += string.Format("{0}\r\n", response); client.Close(); } /// <summary> /// 客户端类 /// </summary> private class RpcClient { #region 参数 /// <summary> /// rabbitmq连接 /// </summary> private readonly IConnection connection; /// <summary> /// 通道 /// </summary> private readonly IModel channel; /// <summary> /// 客户端关联的队列 /// </summary> private readonly string replyQueueName; /// <summary> /// 消费者 /// </summary> private readonly EventingBasicConsumer consumer; //private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>(); private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>(); /// <summary> /// 消息属性 /// </summary> private readonly IBasicProperties props; #endregion /// <summary> /// 构造函数 /// </summary> public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); //关联response,request和replyQueueName var correlationID = Guid.NewGuid().ToString(); props.CorrelationId = correlationID; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var response = Encoding.UTF8.GetString(ea.Body); //确定返回的响应是这个请求发出的 if (ea.BasicProperties.CorrelationId == correlationID) resQueue.Add(response); }; } public string Call(string msg) { var msgBytes = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: msgBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, noAck: true); return resQueue.Take(); } public void Close() { connection.Close(); } }//class } }
服务端代码:
namespace RpcServer { public partial class RpcServer : Form { private readonly static RpcServer _RpcServer; Action<string, TextBox> SetText; static RpcServer() { _RpcServer = new RpcServer(); } /// <summary> /// 单例模式 /// </summary> public static RpcServer SingleForm { get { return _RpcServer; } } private RpcServer() { CheckForIllegalCrossThreadCalls = false; InitializeComponent(); ReceiveMsg(txtRpcServer);//服务端 SetText += OnSetText; } /// <summary> /// 服务端接收消息 /// </summary> private void ReceiveMsg(TextBox box) { try { var factory = new ConnectionFactory() { HostName = "localhost" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); //声明队列 channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); //每个消费者最多消费一条消息,没返回消息确认之前不再接收消息 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; var msg = Encoding.UTF8.GetString(body); //服务端显示内容 box.Invoke(SetText, msg, box); response = "我将给你回复:已收到消息-" + msg; var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //手动向rabbitmq发送消息确认 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "rpc_queue", noAck: false,//手动确认消息 consumer: consumer); } catch (Exception ex) { MessageBox.Show(ex.ToString()); } } private void OnSetText(string text, TextBox box) { box.Text += string.Format("{0}\r\n", text); } } }
界面:
大概流程:
客户端模拟发送一个请求到队列,服务端从队列消费消息并模拟发送一个响应到队列,客户端消费该消息(新建2个winform程序测试,一个客户端,一个服务端)
vs同时启动两个winform程序:鼠标点击解决方案-右键属性-多个启动项目-操作改为启动-确定-即可
测试结果: