标签:out 编码 并保存 python 情况 exchange read arguments 持久化
在远程计算机上运行一个函数并等待结果,我们通常叫这种模式为远程过程调用或者RPC.
通过 RabbitMQ 进行 RPC 很容易,客户端发送请求消息,服务器回复响应消息.为了接收响应,我们需要发送带有“回调”队列地址的请求.
同时,这里面涉及到几个比较重要的消息属性:
在上面介绍的参数中,可以看 ReplyTo 属性可以定义该消息的回调队列,也就是说我们可以为每个RPC请求创建一个回调队列。但这是非常低效的,更好的方法是为每个客户端(多个消费者)创建一个回调队列。
这引发了一个新问题,在该队列中收到响应后,不清楚响应属于哪个请求。
这时候, CorrelationId 属性就发挥它的作用了 。
我们为每个请求的 CorrelationId 属性设置为唯一值。然后,当我们在回调队列中收到消息时,我们将查看此属性,并根据该属性,我们将能够将响应与请求进行匹配。如果我们看到未知的 CorrelationId 值,我们可以安全地丢弃该消息,因为它不属于我们的请求。
为什么我们应该忽略回调队列中的未知消息,而不是因为错误而失败?
这是由于服务器端存在竞争条件的可能性。尽管不太可能,但是在向我们发送答案之后,发送请求的确认消息之前,RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理请求。这就是为什么在客户端上我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。
RPC工作流程:
在进行RPC通信时,我们不再叫"生产者","消费者"了,而是改叫"客户端","服务器".因为在 RPC 中,
客户端即是一个生产者,因为它要发送请求消息给服务器,同时,它也是一个消费者,因为它还要接收服务器发送过来的响应消息.
而服务器即是一个消费者,因为它要接收客户端发送过来的请求消息,同时,它也是一个生产者,因为它执行完函数后,还需要发送响应消息给客户端.
我们把上面的图一分为二来看:
internal class Program { private const string RequestQueueName = "rpc_queue"; private static void Main(string[] args) { using (RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection()) using (RabbitMQ.Client.IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: RequestQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: RequestQueueName, autoAck: false, consumerTag: "", noLocal: false, exclusive: false, arguments: null, consumer: consumer); Console.WriteLine("server 开始等待 RPC 请求"); consumer.Received += (s, e) => { string response = null; byte[] bytes = e.Body; RabbitMQ.Client.IBasicProperties pros = e.BasicProperties;//拿到这条请求消息的属性 RabbitMQ.Client.IBasicProperties replyPros = channel.CreateBasicProperties();//创建响应消息的属性 replyPros.CorrelationId = pros.CorrelationId;//将请求消息的id赋值给响应消息,这个id就相当于请求消息的身份证 try { string msg = Encoding.UTF8.GetString(bytes); int n = int.Parse(msg); Console.WriteLine($"执行函数 Fib(int n) , 入参为 {msg}"); response = Fib(n).ToString();//运行函数,拿到结果 } catch (Exception exception) { Console.WriteLine(exception); response = string.Empty; } finally { byte[] responseBytes = Encoding.UTF8.GetBytes(response);//创建响应消息的字节码 //将响应消息发送到请求消息的属性中指定的响应队列 channel.BasicPublish(exchange: "", routingKey: pros.ReplyTo, mandatory: false, basicProperties: replyPros, body: responseBytes); //发送响应消息后,手动确认已经收到请求消息 channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); } }; Console.WriteLine("按 enter 退出"); Console.ReadLine(); } } /// <summary> /// 服务器的函数 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int Fib(int n) { if (n == 0 || n == 1) { return n; } return Fib(n - 1) + Fib(n - 2); } }
服务器代码非常简单:
public class MyClient { private readonly IConnection connection; private readonly IModel channel; private readonly IBasicProperties pros;//请求消息属性 private readonly EventingBasicConsumer consumer; private readonly string replyQueueName;//响应队列名称 private const string requestQueueName = "rpc_queue";//请求队列名称 private readonly BlockingCollection<string> responseQueue = new BlockingCollection<string>();//存储响应消息 public MyClient() { connection = ConnectionHelper.GetConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName;//声明一个随机的,独占的,自动删除的,非持久化的响应队列 consumer = new EventingBasicConsumer(channel);//创建一个消费者 pros = channel.CreateBasicProperties(); string correlationId = Guid.NewGuid().ToString();//创建一个"身份证" pros.CorrelationId = correlationId; pros.ReplyTo = replyQueueName;//设置回调队列 consumer.Received += (s, e) => { string response = Encoding.UTF8.GetString(e.Body);//拿到响应消息 if (e.BasicProperties.CorrelationId.Equals(correlationId))//确认身份 { responseQueue.Add(response); } }; } /// <summary> /// 发起请求 /// </summary> /// <param name="msg">请求消息</param> /// <returns>请求的结果</returns> public string Call(string msg) { byte[] bytes = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: requestQueueName, basicProperties: pros, body: bytes);//向请求队列发送请求消息. //在发送请求消息(发起请求)后,再定义客户端需要消费的回复队列,并且设置应答模式为 自动应答.因为RPC中,服务器不用关心客户端是否收到了响应 channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer); return responseQueue.Take();//返回本次请求的结果 } /// <summary> /// 关闭客户端 /// </summary> public void Close() { channel.Close(); connection.Close(); } } internal class Program { private static void Main(string[] args) { MyClient client = new MyClient(); while (true) { Console.WriteLine("请输入您要发送的请求消息 : "); string request = Console.ReadLine(); if (string.IsNullOrWhiteSpace(request)) { continue; } if (request.ToLower().Equals("q")) { break; } string response = client.Call(request); Console.WriteLine("请求的结果 : " + response); } client.Close(); } }
客户端代码稍微复杂一些:
此处介绍的设计并不是RPC服务的唯一可能实现,但它具有一些重要优势:
我们的代码仍然相当简单,并不试图解决更复杂(但重要)的问题,例如:
标签:out 编码 并保存 python 情况 exchange read arguments 持久化
原文地址:https://www.cnblogs.com/refuge/p/10356126.html