标签:.exe llb strong 建立 创建 list bit icp line
Remote procedure call (RPC)
在第二个教程second tutorial 中我们已经了解到了工作队列如何将耗时任务分配给多个workers。
但是假如我们需要在远端机器上面运行一个函数并且等待结果返回呢?这通常叫做RPC,即远端过程调用。
这里我们将用RabbitMQ构造一个RPC系统,客户端请求调用服务端的计算斐波纳契数列值得一个函数,并等待计算结果。
首先看一下客户端接口,我们定义一个RPC调用类,其中提供了一个叫做Call的接口,这个接口内部所做的事情就是将调用服务端计算斐波那契数列的请求(包含参数)发送到指定的消息队列,然后再另一个临时队列阻塞等待服务端将计算结果放入到这个临时队列。
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got ‘{0}‘", response);
rpcClient.Close();
下面代码是Call接口内部实现的一部分,为了等待服务端RPC调用的结果,我们需要告诉服务端将计算结果放到哪个队列中,这里props参数就已经制定了计算结果的存放队列名称,同时还附上了每个RPC请求的ID,方便读取response的时候能够知道对应于哪个请求:
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "rpc_queue", props, messageBytes);
// ... then code to read a response message from the callback_queue ...
很显然我们不可能为每个RPC调用都创建一个存放调用结果的回调队列,我们可以为每个client端都创建一个。
至于每个RPC请求发出去之后,收到回应时如何知道这个response是对应于哪个RPC请求,就需要用到 correlationId 属性.
这个ID值可以有多重方法生成,比如客户端IP+计数值,或者一个唯一的GUID等都可以。
RPC调用工作流程:
斐波那契数列计算函数:
private static int fib(int n)
{
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
RPC服务端代码 RPCServer.cs :
class RPCServer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("rpc_queue", false, false, false, null);
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("rpc_queue", false, consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
while (true)
{
string response = null;
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes);
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
}
private static int fib(int n