远程过程调用,简称为RPC,是一个计算机通信协议,它允许运行于一台计算机的程序调用另一台计算机的子程序,而无需额外地为这个交互作用编程。
RPC与传统的HTTP对比
优点:
1. 传输效率高(二进制传输)
2. 发起调用的一方无需知道RPC的具体实现,如同调用本地函数般调用
缺点:
1. 通用性不如HTTP好(HTTP是标准协议)
总结:RPC适合内部服务间的通信调用;HTTP适合面向用户与服务间的通信调用
RPC调用过程如下:
1. 调用者(客户端Client)以本地调用的方式发起调用;
2. Client stub(客户端存根)收到调用后,负责将被调用的方法名、参数等打包编码成特定格式的能进行网络传输的消息体;
3. Client stub将消息体通过网络发送给服务端;
4. Server stub(服务端存根)收到通过网络接收到消息后按照相应格式进行拆包解码,获取方法名和参数;
5. Server stub根据方法名和参数进行本地调用;
6. 被调用者(Server)本地调用执行后将结果返回给server stub;
7. Server stub将返回值打包编码成消息,并通过网络发送给客户端;
8. Client stub收到消息后,进行拆包解码,返回给Client;
9. Client得到本次RPC调用的最终结果。
对于RPC调用流程的实现,抛开调用方与被调用方,其核心主要是:消息协议和传输控制的实现
(1). RPC消息协议:客户端调用的参数和服务端的返回值这些在网络上传输的数据以何种方式打包编码和拆包解码
RPC的消息协议在设计时主要要考虑,消息转换及传输的效率,为解决消息转换及传输的效率,可以以二进制的方式传输消息,使用原始的二进制传输可以省去中间转换的环节并减少传输的数据量。
在Python中可以使用struct模块对二进制进行编码和解码:
struct.pact将其它类型转换为二进制,通常用于消息长度的转换:
import struct # 将整数2转换成适用网络传输的无符号的4个字节整数
>>> struct.pack('!I', 2)
'\x00\x00\x00\x02'
struct.unpack将二进制类型转换为其它类型:
byte_data = '\x00\x00\x00\x02'
# 将2对应的二进制 转换为十进制整数 返回结果是个元组
>>> struct.unpack('!I',byte_data)
(2,)
(2). RPC传输控制
对于消息数据的传输,主要有HTTP传输和TCP传输,鉴于TCP传输的可靠性,RPC的传输一般使用TCP作为传输协议
在TCP传输中客户端与服务端通过socket进行通信,通信流程:
RPC使用TCP进行传输控制的实现
import socket class Channel(object):
"""
与客户端建立网络连接
""" def __init__(self, host, port):
self.host = host # 服务器地址
self.port = port # 服务器端口 def get_connection(self):
"""
获取一个tcp连接
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
return sock class Server(object):
def __init__(self, host, port, handlers):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
self.sock.bind((host, port))
self.handlers = handlers def serve(self):
"""
开启服务器运行,提供RPC服务
"""
# 开启服务监听,等待客户端连接
self.sock.listen(128)
print("开始监听")
while True:
# 接收客户端的连接请求
conn, addr = self.sock.accept()
print("建立连接{}".format(str(addr))) # 创建ServerStub对象,完成客户端具体的RPC调用
stub = ServerStub(conn, self.handlers)
try:
while True:
stub.process()
except EOFError:
# 表示客户端关闭了连接
print("客户端关闭连接")
# 关闭服务端连接
conn.close()
通过Socket使得RPC客户端与服务器进行通信
TCP服务端
sock = socket.socket() # 创建一个套接字
sock.bind() # 绑定端口
sock.listen() # 监听连接
sock.accept() # 接受新连接
sock.close() # 关闭服务器套接字 TCP客户端
sock = socket.socket() # 创建一个套接字
sock.connect() # 连接远程服务器
sock.recv() # 读
sock.send() # 写
sock.sendall() # 完全写
sock.close() # 关闭
RPC服务的实现:为了能让RPC服务器同时处理多个客户端的请求,提升性能,可以采用多线程、多进程方式实现,下面分别介绍这两种方式实现的RPC服务
多线程RPC服务:
class ThreadServer(object):
"""
多线程RPC服务器
""" def __init__(self, host, port, handlers):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
self.sock.bind((host, port))
self.handlers = handlers def serve(self):
"""
开始服务
"""
self.sock.listen(128)
print("开始监听")
while True:
conn, addr = self.sock.accept()
print("建立链接{}".format(str(addr)))
t = threading.Thread(target=self.handle, args=(conn,))
t.start() def handle(self, client):
stub = ServerStub(client, self.handlers)
try:
while True:
stub.process()
except EOFError:
print("客户端关闭连接") client.close()
多进程RPC服务:
class MultiProcessServer(object):
"""
多进程服务器
""" def __init__(self, host, port, handlers):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
self.sock.bind((host, port))
self.handlers = handlers def serve(self):
"""
开始服务
"""
self.sock.listen(128)
print("开始监听")
while True:
conn, addr = self.sock.accept()
print("建立链接{}".format(str(addr)))
t = Process(target=self.handle, args=(conn,))
t.start() def handle(self, client):
stub = ServerStub(client, self.handlers)
try:
while True:
stub.process()
except EOFError:
print("客户端关闭连接") client.close()