原文来自 RabbitMQ 英文官网的教程(6.Remote procedure call - RPC),其示例代码采取了 .NET C# 措辞。
在第二篇教程中,我们学习了如何利用事情行列步队在多个事情单元之间分配耗时的任务。
但是如果我们须要运行一个在远程电脑上的函数并等待其结果将会若何呢?好吧,这将是一个完备不同的故事,这个模式被普遍认为叫远程过程调用或者简称 RPC。

在本教程中我们即将利用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可伸缩的 RPC 做事器。由于我们并没有任何耗时的任务能拿来分配,那就创建一个返回斐波纳契数列的虚拟 RPC 做事吧。
客户端接口(Client interface)为相识释如何利用 RPC 做事我们来创建一个大略的客户端类。我会公开一个名叫 call 的方法,该方法用以发送一个 RPC 要求并保持壅塞状态,直至吸收到应答为止。
var rpcClient = new RPCClient();
Console.WriteLine(\公众 [x] Requesting fib(30)\"大众);
var response = rpcClient.Call(\"大众30\公众);
Console.WriteLine(\"大众 [.] Got '{0}'\"大众, response);
rpcClient.Close();
关于 RPC
只管 RPC 是一个很常见的打算模式,也时常遭受批评。当程序员不知道针对 call 函数的调用是本地的还是很慢的 RPC 时就会涌现问题,像这样的困惑每每会导致不可预测的系统(问题)以及徒增不必要的调试繁芜性。与简化软件有所不同的是,误用 RPC 会导致难以掩护的意大利面条式代码。
记住以上问题,并考虑以下建议:
确保可以明显区分哪一个函数是调用本地的,哪一个是远程的。
为系统编写文档,确保组件之间的依赖很明确。
处理缺点环境,当 RPC 做事端停机很永劫光时,客户端会若何应对?
当有疑问时先避免利用 RPC,如果可以,考虑利用一个异步管道 - 它类似于 RPC 的壅塞,会通过异步的办法将结果推送到下一个打算场景。
回调行列步队(Callback queue)一样平常而言,基于 RabbitMQ 来利用 RPC 是很大略的,即客户端发送一个要求,然后做事端利用一个相应作为应答。为了能得到一个相应,我们须要在要求过程中发送一个“callback”行列步队地址。
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: \公众\"大众, routingKey: \"大众rpc_queue\"大众, basicProperties: props, body: messageBytes);
// ... then code to read a response message from the callback_queue ...
属性
AMQP 0-9-1 协议会在中预定义包含有 14 个属性的凑集,大部分的属性用得都比较少,除了以下几项之外:
deliveryMode:将标记为持久的(值为2),或者瞬时的(其他值),想必你在第二篇教程中还记得这个属性。
contentType:常常用来描述编码的 mime 类型,比如在常见的 JSON 编码中一个好的实践便是设置该属性为:application/json。
replyTo:常日用来为回调行列步队命名。
correlationId:用以将 RPC 相应与要求关联起来。
Correlation Id在上面呈现的方法中我们建议为每一个 RPC 要求创建一个回调行列步队,不过这很低效,幸运的是我们有更好的办法 - 让我们为每一个客户端创建一个单独的回调。
这就又会涌现一个问题,即在收到相应的行列步队中,并不清楚哪个要求从属于该相应,这便是 correlationId 属性所用之处。我们将会对每一个要求设置 correlationId 为唯一值,然后,当我们在回调行列步队中吸收到时会查看这个属性,在该属性的根本上,我们可以让要求与相应进行匹配。如果我们创造有未知的 correlationId 值,则可以放心地丢弃这些并不属于我们的要求的。
你可能会问,我们为什么该当在回调行列步队中忽略未知的,而不是(直接)返回缺点?这可能是由于做事端存在竞态条件。只管不太可能,但是针对一个要求,RPC 做事器很可能在发送完应答后中止,而不是在发送确认之前。如果确实发生,重启的 RPC 做事将再一次处理这个要求,这便是为什么我们在客户端须要优雅地处理重复的相应,以及该当(保持)空想地幂等性。
总结RPC 会像如下这样运作:
当客户端启动时,它将创建一个匿名的独占回调行列步队。
针对一个 RPC 要求,客户端会发送一个基于两个属性的:一个是指向回调行列步队的 replyTo,另一个是为每一个要求标记唯一值的 correlationId。
要求将发送至 rpc_queue 行列步队。
RPC 事情单元(或者叫做事端)会在行列步队中持续等待要求。当要求涌现时,RPC 将完成事情,同时利用来自 replyTo 字段(所指代)的行列步队来发送携带着结果的返回至客户端。
客户端在回调行列步队上等待着数据,当一个涌现时,客户端会检讨 correlationId 属性,如果该值与当前要求的值相匹配,则把相应返回给运用程序。
领悟一起斐波纳契任务(函数)
private static int fib(int n){
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
我们声明了斐波纳契函数,并假定只(许可)输入正整数。(不要期望输入过大的数字,由于很可能这个递归实现会非常慢)
针对我们的 RPC 做事端,RPCServer.cs 类文件的代码看起来如下:
做事真个代码是相称大略的。
像往常一样,我们先建立连接、信道以及声明行列步队。
我们可能想运行不但一个做事端处理程序,为了能通过多台做事器均匀地分担负载,我们须要设定 channel.basicQos 中 prefetchCount 的值。
我们利用 basicConsume 来访问行列步队,然后注册一个递送程序,在这个程序中我们实行事情并返回相应。
针对我们的 RPC 客户端,RPCClient.cs 类文件的代码如下:
客户真个代码轻微多一些:
我们建立连接和信道,以及针对答复(相应)声明一个独占的“callback”行列步队。
我们订阅这个“callback”行列步队,以便可以吸收到 RPC 相应。
我们的 call 方法将发起一个实际的 RPC 要求。
在此,我们首先天生一个唯一的 correlationId 编号并保存好它,由于 while 循环会利用该值来捕获匹配的相应。
接下来,我们发布要求,它包含了两个属性:replyTo 和 correlationId。
此时,我们可以轻微等待一下直到指定的相应到来。
while 循环所做的事情非常大略,对付每一个相应,它都会检讨 correlationId 是否为我们正在探求的那一个,如果是就保存该相应。
终极,我们将相应返回给用户。
客户端要求
var rpcClient = new RPCClient();
Console.WriteLine(\公众 [x] Requesting fib(30)\公众);
var response = rpcClient.Call(\"大众30\公众);
Console.WriteLine(\公众 [.] Got '{0}'\公众, response);
rpcClient.Close();
现在是时候来看一下 RPCClient.cs 和 RPCServer.cs 完全的示例代码了(包含了基本的非常处理)。
像往常一下创建(可参考第一篇):
我们的 RPC 做事已经就绪,现可以开启做事端:
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
运行客户端来要求一个斐波纳契数:
cd RPCClient
dotnet run
# => [x] Requesting fib(30)
目前所呈现的设计不仅仅是 RPC 做事的可能实现,而且还有一些主要优点:
如果 RPC 做事很慢,你可以通过运行另一个来横向扩展,也便是考试测验在新的掌握台中运行第二个 RPCServer。
在客户端,RPC 只能发送和吸收一条,必需像 queueDeclare 那样进行非同步式调用。因此,RPC 客户端只须要单次要求的一次网络来回。
我们的代码仍旧很大略,也并没有考试测验去办理更繁芜(但很主要的)问题,比如就像:
如果做事端没有运行,那么客户端将如何应对?
客户端针对 RPC 是否该当有某种超时(应对方法)?
如果做事端涌现故障并引发非常,它是否该当转发给客户端?
在处理之前防备无效的传入(比如检讨边界和类型)。