标签:static bytes consumer 声明 body 消息 运行 并行处理 simple
原文:RabbitMQ(四):RPC的实现RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。有很多方式可以实现,譬如UNIX RPC、REST API、WCF和SOAP。这些传统的RPC实现方法有共同之处:那就是客户端和服务器端紧密相连,客户端直接连接上服务器,发送一个请求,然后就停下来等待服务器的应答。
这种点对点的性质模式有很多好处,它使得在小范围内的拓扑变得简单。但是当有众多服务器的时候,客户端如何发现在那台服务器上可以找到其他想要的服务就变的麻烦,SOAP和大多数的企业RPC已经采用复杂的补充协议和服务目录,但也带来了额外的复杂度和众多故障点。
但是,用RabbitMQ来实现RPC可以无需关心由那台服务器来处理,也不必担心服务器奔溃,只需要简单的发送消息,然后等待响应即可。一般接触RabbitMQ的都是用发后即忘模型,用于发送邮件等通知或者处理其他并行处理事件,也就是AMQP的消息是单向的。如何才能让服务器将处理结果返回给原始的客户端呢?
RabbitMQ有一个优雅的解决方案:使用消息来发回应答。在每个AMQP消息头里有个字段reply_to.消息的生产者可以通过该字段来确定队列的名称,并监听应答队列等待应答。然后接收消息的RPC服务器能偶检查reply_to字段,并创建包含应答内容的新的消息,并以队列名称为路由键,通过应答队列将处理结果发回给生产者。这里我们不需要创建应答队列的名字也不需要将应答队列绑定到交换器上,这是因为没有声明队列的名称RabbitMQ会自动申明,消息发布到RabbitMQ在没有指名交换器的时候,RabbitMQ就会让位目的地是应答队列,而路由键就是应答队列名称。
所以RabbitMQ实现RPC需要比一般的消息通信多以下几个步骤:
其实简单的讲就是生产者在发送消息后接收消息,消费者在接受消息后发送消息,生产者多了一步接收处理消息,消费者多了一步发送消息。我这里简化了一些操作,争取用最少的代码实现,具体代码如下:
生产者:
private static void MySelfRPCProducer() { var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672}; using (var conn = conn_factory.CreateConnection()) { using (var channel = conn.CreateModel()) { IBasicProperties pro = channel.CreateBasicProperties(); pro.ReplyTo = channel.QueueDeclare().QueueName;//创建应答队列并返回队列名称,这个方法创建的队列exclusive和auto_delete都是true,这样可以确保没有人能窃取信息 pro.ContentType = "text/plain"; string corrId = Guid.NewGuid().ToString(); pro.CorrelationId = corrId; channel.BasicPublish("", "rpc_queue", pro, Encoding.UTF8.GetBytes("小黄")); var consumer = new EventingBasicConsumer(channel); consumer.Received += (ea, ch) => { //比较CorrelationId确认是返回的我们的消息 if (ch.BasicProperties.CorrelationId == corrId) { //处理返回结果 string msg = Encoding.UTF8.GetString(ch.Body); Console.WriteLine(msg); } }; string consumer_tag = channel.BasicConsume(pro.ReplyTo, true, consumer);//监听应答队列 channel.BasicCancel(consumer_tag); } } Console.ReadLine(); }
消费者:
private static void MySelfRPCCousmer() { var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672}; using (var conn = conn_factory.CreateConnection()) { using (var channel = conn.CreateModel()) { channel.QueueDeclare("rpc_queue", false, false, false, null); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(0, 1, false); consumer.Received += (ea, ch) => { string msg = Encoding.UTF8.GetString(ch.Body); Console.WriteLine("接收到消息:" + msg); //发送处理结果 channel.BasicPublish("", ch.BasicProperties.ReplyTo, ch.BasicProperties, Encoding.UTF8.GetBytes(msg + "给我回电话了")); channel.BasicAck(ch.DeliveryTag, false); }; string consumer_tag = channel.BasicConsume("rpc_queue", false, consumer); Console.ReadLine();//这里先停止运行下面的代码,因为需要持续监听,信道断开就监听不了了 channel.BasicCancel(consumer_tag); } } }
其实RabbitMQ已经封装好了RPC相应的对象,分别是SimpleRpcClient和SimpleRpcServer。客户端在初始化SimpleRpcClient后主要可以通过Call方法发送消息并返回服务端处理结果。服务端的SimpleRpcServer内部定义了很多虚方法,具体的消息处理是我们自己决定的,所以需要继承SimpleRpcServer后实现相应方法,通过实现重写HandleSimpleCall方法可以返回给客户端数据。具体代码如下所示:
客户端:
private static void RabbitMQRPCProducer() { var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 }; using (var conn = conn_factory.CreateConnection()) { using (var channel = conn.CreateModel()) { //创建client的rpc SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "rpc_queue")); bool flag = true; var sendmsg = ""; while (flag) { Console.WriteLine("请输入要发送的消息"); sendmsg = Console.ReadLine(); if (string.IsNullOrWhiteSpace(sendmsg)) { Console.Write("请输入消息"); continue; } var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg)); Console.WriteLine(Encoding.UTF8.GetString(msg)); } Console.ReadKey(); } } }
服务端:
private static void RabbitMQRPCCousmer() { var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 }; using (var conn = conn_factory.CreateConnection()) { //创建返回一个新的频道 using (var channel = conn.CreateModel()) { channel.QueueDeclare("rpc_queue", false, false, false, null);//创建一个rpc queue SimpleRpcServer rpc = new MySimpleRpcServer(new Subscription(channel, "rpc_queue")); Console.WriteLine("服务端启动成功"); rpc.MainLoop(); Console.ReadKey(); } } }
继承实现方法:
class MySimpleRpcServer : SimpleRpcServer { public MySimpleRpcServer(Subscription subscription) : base(subscription) { } /// <summary> /// 执行完成后进行回调 /// </summary> public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; return Encoding.UTF8.GetBytes($"给{Encoding.UTF8.GetString(body)}发送短信成功"); } }
以上就是RabbitMQ对于RPC的最简单的实现,与大家共勉。
标签:static bytes consumer 声明 body 消息 运行 并行处理 simple
原文地址:https://www.cnblogs.com/lonelyxmas/p/10693402.html