rabbitmq (五)RPC

Remote Procedure Call or RPC(远程函数调用)

当我们需要在远程计算机上运行一个函数,并且等待结果的时候,我们用到RPC

在rabbitmq客户端使用call函数,发送RPC请求并阻塞等待结果返回.

提示:虽然RPC是一个很好计算处理的常见模式,但是有时程序员无法判断

一个函数调用时一个本地调用还是一个缓慢的RPC调用.所以有很多错误的不可预知的结果.

并且增加调试的复杂性.

有三个建议:

1.确定函数调用时本地还是远程调用.

2.给系统添加文档,确定各个组件之间的依赖

3.处理每个RPC调用过程中的异常,

callback queue

因为通常订阅/发布是一个单向过程,只需要一个队列,但是在RPC调用的时候需要制定两个队列,一个发送,一个接受结果.所以用ReplyTo属性指定接受结果队列名称.

correlation Id

为每一个RPC调用创建一个callback queue是低效的,所以可以使用一个公用的callback queue使用关联id来标识每个请求.

源码:

客户端

public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props; public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "your ip", UserName = "your name", Password = "your pwd" }; connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName; consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
} public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes); channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true); return respQueue.Take(); ;
} public void Close()
{
connection.Close();
}
}
class Program
{
static void Main(string[] args)
{
var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call(""); Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
}
}

服务端:

public static void Main()
{
var factory = new ConnectionFactory() { HostName = "your ip", UserName = "your name", Password = "your pwd" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(, , false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) =>
{
string response = null; var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId; try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
}; Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
} /// /// Assumes only valid positive integer input.
/// Don't expect this one to work for big numbers, and it's
/// probably the slowest recursive implementation possible.
/// private static int fib(int n)
{
if (n == || n == )
{
return n;
} return fib(n - ) + fib(n - );
}
上一篇:《RabbitMQ Tutorial》译文 第 6 章 远程过程调用(RPC)


下一篇:CPUFreq驱动