RabbitMQ(六)
——RPC
(转载请附上本文链接——linhxx)
一、概述
RabbitMQ的RPC模式,支持生产者和消费者不在同一个系统中,即允许远程调用的情况。通常,消费者作为服务端,放置在远程的系统中,提供接口,生产者调用接口,并发送消息。
RPC模式如下图所示:
RPC模式是一种远程调用的模式,因为需要http请求,因此速度比系统内部调用慢。而且rpc模式下,通常不易区分哪些是来自外部的请求,哪些是内部的请求,导致整体速度较慢。因此,不能滥用rpc模式。
二、回调队列(Callback queue)
要实现rpc模式,生产者需要发送回调队列。方法如下:
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$msg = new AMQPMessage(
$payload,
array(‘reply_to‘ => $queue_name));
$channel->basic_publish($msg, ‘‘, ‘rpc_queue‘);
三、AMQP消息属性
AMQP是一种消息传输协议,RabbitMQ是基于此协议的组件。其中,对消息(Message)的属性,有如下几个常用定义:
1)delivery_mode
消息的模式,当该值是2,表示消息需要持久化;当值是1,表示该消息不用持久化。
2)content_type
消息的类型,用来标记消息编码的类型,通常采用json,则此值是application/json,类似在HTML中的定义。
3)reply_to
用于定义回调队列的名字。
4)correlation_id
用于关联rpc的消息请求发送与消息响应接收。(request 和 response)
四、correlation_id
采用回调队列的方式,效率不够高,rabbitMQ还提供了一种方式,即correlation_id。对于每个请求,可以设定一个特定的、唯一的correlation_id。则当收到回复时,仍查看回复消息的此属性,则可以将请求和响应进行关联。
此时,当接收的correlation_id不存在,则表示该响应不是对该生产者发送请求的响应,生产者会丢弃接收到的消息。
采用抛弃消息,而不是报错,是因为服务端(消费者)可能存在竞争条件(race condition),因此存在可能,当服务器(消费者)刚刚发送ack回馈消息给客户端(生产者),服务器宕机。则服务器重启后,会重新发送ack给客户端。但是由于客户端在此之前,已经完成处理,则再次接收到correlation_id时,correlation_id不存在。因此,客户端对待重复的回复是采取抛弃消息的方式,而不是报错。
五、工作流程
1、生产者(Client)开始生产消息后,创建了匿名的、独一无二的回调队列。
2、生产者(Client)发送请求时,包含两个属性:reply_to,即回调队列;correlation_id,即用于标记请求的属性。
3、请求(request )被发送到rpc_queue队列。
4、消费者(The RPC worker)等待请求,收到时,其处理生产者发送的特定的reply_to的消息。
5、生产者等待消息的ack回复,当收到回复后,其校验correlation_id,如果正确则回到应用程序中。
——written by linhxx
更多最新文章,欢迎关注微信公众号“决胜机器学习”,或扫描右边二维码。