RpcProvider类(负责发布RPC Service)

RpcProvider类


RpcProvider类是框架提供的专门发布rpc服务的网络对象类。

mprpcprovider.h

#pragma once

#include "google/protobuf/service.h"
#include <functional>
#include <google/protobuf/descriptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpServer.h>
#include <string>
#include <unordered_map>

//框架提供的专门发布rpc服务的网络对象类
class RpcProvider
{
public:
    //这个函数是框架提供给外部发布rpc方法的接口
    //框架是可以接收各种RPC服务的,不能依赖具体的某一个业务
    //基类service指针指向servie的子类对象
    void NotifyService(google::protobuf::Service *service);

    //启动rpc服务节点,开始提供rpc远程网络调用服务
    void Run();

private:
    //muduo的事件循环EventLoop类的对象
    muduo::net::EventLoop m_eventloop;

    //一个结构类型,里面存储了service的信息
    struct ServiceInfo
    {
        google::protobuf::Service *m_service;                                              //service对象
        std::unordered_map<std::string, const google::protobuf::MethodDescriptor *> m_methodMap; //保存service对象的所有rpc方法
    };

    //存储注册成功的service对象和其信息
    std::unordered_map<std::string, ServiceInfo> m_serviceMap;

    //muduo库TCP连接建立时执行的回调函数
    void
    OnConnection(const muduo::net::TcpConnectionPtr &);

    //muduo库TCP连接上有消息到来(消息读写)时执行的回调函数
    void OnMessage(const muduo::net::TcpConnectionPtr &, muduo::net::Buffer *, muduo::Timestamp);

    //Closure的回调操作,用于序列化rpc的响应和网络发送
    void SendRpcResponse(const muduo::net::TcpConnectionPtr &, google::protobuf::Message *);
};

mprpcprovider.cc

#include "mprpcprovider.h"
#include "logger.h"
#include "mprpcapplication.h"
#include "rpcheader.pb.h"
#include "zookeeperutil.h"
#include <string>

//这个函数是框架提供给外部发布rpc方法的接口
void RpcProvider::NotifyService(google::protobuf::Service *service)
{
    ServiceInfo service_info;

    //获取service的信息,如名字,rpc方法总数等
    const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();

    //获取service的名字
    std::string service_name = pserviceDesc->name();
    //获取service中rpc方法的数量
    int methodCnt = pserviceDesc->method_count();

    LOG_INFO("service_name: %s", service_name.c_str());

    //记录service对象所有的rpc方法
    for (int i = 0; i < methodCnt; i++)
    {
        //获取service指定下标rpc方法的描述
        const google::protobuf::MethodDescriptor *pmethodDesc = pserviceDesc->method(i);
        std::string method_name = pmethodDesc->name();
        service_info.m_methodMap.insert({method_name, pmethodDesc});

        LOG_INFO("method_name: %s", method_name.c_str());
    }
    //记录service对象
    service_info.m_service = service;
    //记录service及其信息
    m_serviceMap.insert({service_name, service_info});
}

//启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run()
{
    //读取配置文件,获取rpcserver的ip地址和端口号
    std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    uint16_t port = stoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport"));
    muduo::net::InetAddress address(ip, port);

    //创建TcpServer对象
    muduo::net::TcpServer server(&m_eventloop, address, "RpcProvider");

    //为新建的服务器绑定连接回调和消息回调函数
    server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
    server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

    //设置muduo库的线程数量为4,1个是I/O线程,3个是工作线程
    server.setThreadNum(4);

    //把当前rpc节点上要发布的服务全部注册到zookeeper上面,让rpc client可以从zookeeper上发现服务
    //session timeout   30s  zookeeper client 网络I/O线程  1/3 * timeout 时间发送ping消息
    ZkClient zkCli;
    zkCli.Start();                //连接zookeeper server
    for (auto &sp : m_serviceMap) //service_name为永久性节点   method_name为临时性节点
    {
        // /service_name   /UserServiceRpc
        std::string service_path = "/" + sp.first;
        zkCli.Create(service_path.c_str(), nullptr, 0);
        for (auto &mp : sp.second.m_methodMap)
        {
            // /service_name/method_name   /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
            std::string method_path = service_path + "/" + mp.first;
            char method_path_data[128] = {0};
            sprintf(method_path_data, "%s:%d", ip.c_str(), port);
            zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL); //ZOO_EPHEMERAL表示znode是一个临时性节点,和zkserver断了,就是表示不提供这个RPC服务了,所以ZK自动删掉就好啦。
        }
    }

    //rpc服务端准备启动,打印信息
    std::cout << "RpcProvider start service at ip:" << ip << " port:" << port << std::endl;

    //启动网络服务
    server.start();
    //相当于启动了epoll_wait,阻塞,等待远程连接
    m_eventloop.loop();
}

//rpc的请求是短连接的,请求一次完了,服务端返回rpc的方法的响应,之后主动关闭连接。
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{
    if (!conn->connected())
    {
        //断开连接
        conn->shutdown();
    }
}

/*
在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型
怎么商量呢? 
包含:service_name  method_name   args   
对应:16UserService   Login    zhang san123456   
我们在框架中定义proto的message类型,进行数据头的序列化和反序列化
service_name method_name args_size(防止粘包的问题) 

怎么去区分哪个是service_name, method_name, args
我们用消息头表示 service_name, method_name以及args_size(参数字符串的长度)。

消息的格式是:
header_size + service_name method_name args_size + args
即:
header_size(4个字节) + header_str + args_str
header_str 是service_name,method_name以及args_size。 
为了防止粘包,我们还要用args_str记录表示参数的字符串。 
我们统一:一开始读4个字节,数据头的长度,也就是服务名字,方法名字 和参数字符串的长度;之后再根据参数的字符串的长度,读取参数字符串
10 "10"
10000 "1000000"
std::string   insert和copy方法 
*/
//已建立连接用户的读写事件回调,如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
                            muduo::net::Buffer *buffer,
                            muduo::Timestamp)
{
    //记录从网络上接收的远程rpc调用请求的字符流,包含了RPC方法的名字(如Login)和参数args
    std::string recv_buf = buffer->retrieveAllAsString();

    //从字符流中读取前4个字节的内容
    uint32_t header_size = 0;
    recv_buf.copy((char *)&header_size, 4, 0); //从0下标位置拷贝4个字节的内容到header_size

    //从recv_buf的第5个字节,读取service_name,method_name,args_size
    std::string rpc_header_str = recv_buf.substr(4, header_size);

    //根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
    mprpc::RpcHeader rpcHeader;
    std::string service_name;
    std::string method_name;
    uint32_t args_size;
    if (rpcHeader.ParseFromString(rpc_header_str))
    {
        //数据头反序列化成功
        service_name = rpcHeader.service_name();
        method_name = rpcHeader.method_name();
        args_size = rpcHeader.args_size();
    }
    else
    {
        //数据头反序列化失败
        std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
        return;
    }

    //获取rpc方法参数的字符流数据
    std::string args_str = recv_buf.substr(4 + header_size, args_size);

    //打印调试信息
    std::cout << "============================================" << std::endl;
    std::cout << "header_size: " << header_size << std::endl;
    std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
    std::cout << "service_name: " << service_name << std::endl;
    std::cout << "method_name: " << method_name << std::endl;
    std::cout << "args_str: " << args_str << std::endl;
    std::cout << "============================================" << std::endl;

    //消息到来时获取service对象和method对象,寻找该service是否有客户端所要求的rpc方法
    auto it = m_serviceMap.find(service_name);
    if (it == m_serviceMap.end())
    {
        std::cout << service_name << " is not exist!" << std::endl;
        return;
    }
    auto mit = it->second.m_methodMap.find(method_name);
    if (mit == it->second.m_methodMap.end())
    {
        std::cout << service_name << " : " << method_name << " is not exist!" << std::endl;
        return;
    }

    google::protobuf::Service *service = it->second.m_service;      //获取service对象 ,如new UserService这种
    const google::protobuf::MethodDescriptor *method = mit->second; //获取rpcmethod对象,如Login这种

    //生成rpc方法调用的参数(即request message和response message中的各个变量)
    //在框架以抽象的方式表示。new生成新对象,传给userservice
    google::protobuf::Message *request = service->GetRequestPrototype(method).New(); //获取调用rpc方法的request message,如LoginRequest

    if (!request->ParseFromString(args_str)) //解析参数,与rpc方法的请求message中的各个变量对应,如LoginRequest里的name和pwd
    {
        std::cout << "request parse error , content: " << args_str << std::endl;
        return;
    }
    google::protobuf::Message *response = service->GetResponsePrototype(method).New(); //获取调用rpc方法的响应message,如LoginResponse

    //CallMethod需要closure参数,构造一个closure类型的done给下面的CallMethod()绑定一个Closure的回调函数
    //在使用callmethod()调用完本地的rpc方法后,回调done中绑定的函数SendRpcResponse()
    //SendRpcResponse()的主要作用就是检查 response 正确性,序列化,打包,发送响应等逻辑
    google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
                                                                    const muduo::net::TcpConnectionPtr &,
                                                                    google::protobuf::Message *>(this,
                                                                                                 &RpcProvider::SendRpcResponse,
                                                                                                 conn,
                                                                                                 response);

    //在框架上根据远端rpc request,调用当前rpc节点上发布的method
    //CallMethod底层调用了rpc节点上发布的method,如 Login()
    service->CallMethod(method, nullptr, request, response, done); //做完本地业务,根据结果写好reponse给框架,框架再给调用方
}

//Closure的回调操作,用于序列化服务器对rpc的response,网络发送,检查 response 正确性,序列化,打包,发送响应等逻辑
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response)
{
    std::string response_str;
    //对response_str进行序列化
    if (response->SerializeToString(&response_str))
    {
        //序列化成功后,通过muudo的conn把rpc方法执行的结果发送回rpc的调用方
        conn->send(response_str);
    }
    else
    {
        //序列化失败
        std::cout << "serialize response_str error!" << std::endl;
    }

    //rpc的请求是短连接的,由rpcprovider主动断开连接,给更多的rpc调用方提供服务
    conn->shutdown();
}

上一篇:RPC


下一篇:flink clickhouse-jdbc和flink-connector 写入数据到clickhouse因为jar包冲突导致的60 seconds.Please check if the reque