1.什么是RPC
RPC全称Remote Procedure Call,中文译为远程过程调用,简单理解就是 一种解决方案。
业务场景:
举一个大部分phper都接触过的商城开发,一般商城都有以下几个模块
- 商品模块
- 订单模块
- 会员模块
- XX模块
在常见架构中的体现是:
那么在RPC架构中每个模块就是一个服务提供者,架构体现:
在这套架构中业务机的职责就是把一个请求 ,拆分成N个小请求,分发到各个服务里面,再整合各个服务的结果,返回给用户。
例如在某次下单请求中,那么大概 发送的逻辑如下:
1. 业务机接受请求
2. 业务机提取用户参数,请求用户服务,获取用户余额等信息,等待结果
3. 业务机提取商品参数,请求商品服务,获取商品剩余库存和价格等信息,等待结果。
4. 业务机融合用户服务、商品服务的返回结果,进行下一步调用(假设满足购买条件)
5. 业务机调用用户服务进行扣款,调用商品服务进行库存扣减,调用订单服务进行下单(事务逻辑和撤回可以用请求id保证,或者自己实现其他逻辑调度)
6. 业务机根据处理响应用户
而在以上发生的行为,就称为远程过程调用。而调用过程实现的通讯协议可以有很多,比如常见的HTTP、TCP协议。
服务熔断
某个服务故障或者异常时直接熔断整个服务,而不是一直等到此服务超时
服务降级
当某个服务熔断之后,服务器将不再被调用,此时客户端可以自己准备一个本地的fallback回掉,返回一个缺省值 ,这样做,虽然服务水平下降,但好歹,比直接挂掉要强。 服务降级处理是在客户端实现完成的,与服务端没有关系
服务限流
例如某个服务器最多同时仅能处理100个请求, 或者是cpu负载达到百分之80的时候, 为了保护服务的稳定性,则不在希望继续收到 新的连接。那么此时就要求客户端不再对其发起请求,例如 你可以以任何的形式来监控你的服务,当触发某个条件时(CPU负载80%)下线此服务,业务机动态获取服务节点时就可以知道此服务已限流则响应用户[网络繁忙,请稍后再试] 或者此服务有多台机提供则其他机可继续提供服务,等被下线的机子恢复后又上线
2.Php Tcp通讯
源码
https://github.com/ar414-com/RpcDemo
开发环境要求
- 保证 PHP 版本大于等于 7.2
- 保证 Swoole 拓展版本大于等于 4.3.5
- 使用 Linux / FreeBSD / MacOS 这三类操作系统
作者开发环境
- PHP 7.2
- Swoole 4.3.5
- CentOS 7.2
创建一个最基本的TCP服务器
<?php
//创建Server对象,监听 0.0.0.0:20001端口
$serv = new Swoole\Server("0.0.0.0", 20001);
$serv->on('Start', function ($serv) {
echo "服务已启动,主进程PID:{$serv->master_pid}\n";
});
//监听连接进入事件
$serv->on('Connect', function ($serv, $fd) {
echo "Client: Connect.\n";});
//监听数据接收事件
$serv->on('Receive', function ($serv, $fd, $from_id, $data) {
echo "接收客户端数据:{$data}\n";
$serv->send($fd, "Server: ".$data);
});
//监听连接关闭事件
$serv->on('Close', function ($serv, $fd) {
echo "Client: Close.\n";});
//启动服务器
$serv->start();
<?php
//建立连接
$fp = stream_socket_client('tcp://127.0.0.1:20001');
//发送数据
fwrite($fp, 'Test');
//主动获取响应
$data = fread($fp, 65533);
echo "服务端响应数据:{$data}\n";
//断开连接
fclose($fp);
客户端
服务端
3.客户端调用与服务端处理(提供思路)
客户端与服务器的数据传输约定
客户端请求Rpc服务(以下并非完整代码)
- 场景:例如在一个商场系统中,我们将商品库和用户库两个服务切分开到不同的服务器当中
- 当用户打开商场首页的时候, 我们希望App向某个网关发起请求,
- 该网关可以自动的帮我们请求商品列表和用户信息等数据
//商品列表
$data = [
'service' => 'Goods', //服务名称
'action' => 'getList', //具体方法
'arg' => ['page' => 1] //请求参数
];
//用户信息
$data = [
'service' => 'User', //服务名称
'action' => 'getUserInfoForToken', //具体方法
'arg' => ['token' => '6aa62603ef82b70597a90d93af04b542'] //请求参数
];
//打包数据
$dataStr = serialize($data);
$dataStr = pack('N', strlen($str)).$str;
请求API网关 API网关自动根据Service参数查询出对应服务IP、PORT并进行调用返回
本示例为了方便将Rpc服务配置写入.env文件 例:
//.env
RPC_GOODS_HOST=10.0.0.1
RPC_GOODS_PORT=8899
RPC_USER_HOST=10.0.0.2
RPC_USER_PORT=8899
服务端处理请求(完整代码)
//接受请求数据并解包
$data = substr($request,'4');
$data = unserialize($data);
//TODO 检测必须参数 service action
//检测服务是否存在
//$controllerNameSpace是你的控制器命名空间
$service = ucfirst($data['service']);
$class = "{$controllerNameSpace}\\{$service}";
if(!class_exists($class))
{
//TODO 服务不存在
//设置响应状态错误码(需自行封装)
$response->setStatus(Response::STATUS_SERVICE_SERVICE_NOT_FOUND); //响应客户端(需自行封装)
goto response;}
//检测方法是否存在
$class = new \ReflectionClass($class);
$action = $data['action'];
if(!$class->hasMethod($action))
{
//action不存在
//重新组装参数
//如果方法则调用魔术方法 比如调用一些PDO方法,如果无则调用时返回方法不存在
$request->proxyActionAssemblyArg(); $method = $class->getMethod('__call');}
else
{
$method = $class->getMethod($action);}
//调用
$instance = $class->newInstance($request,$response);
$ret = $method->invokeArgs($instance,$request->getArg());
$response->setMessage($ret);
//响应客户端(需自行封装)
goto response;
//作者的响应封装(仅供参考):
response:{
if ($server->exist($fd))
{
$message = $response->getMessage();
$responseData = [
'status' => $response->getStatus(),
'data' => $message
];
$responseData = serialize($responseData);
$responseData = Request::pack($responseData);
$server->send($fd,$responseData);
//判断客户端是否需要长连接
if(!$request->getIsKeep())
{
$server->close($fd);
}
}
}