目录
RPC
RPC: 同一个项目内的不同服务进程之间的交互方式。为不同的进程服务提供了 call()
(同步) 和 cast()
(异步) 两种调用方式。
问题 1: 在一个 Openstack 项目中拥有多个不同的进程服务,EG. API Service/Manage Service。 当我们通过 Client 发送 API Request 去调用 Manage Service 执行一个操作任务时,我们会希望这个调用的结果是能够快速响应到 Client 的(保证用户体验)。
问题 2: 而且进程服务之间的调用我们还需要考虑如何有效的避免进程服务之间调用的阻塞问题。EG. API Service 调用 Manage Service 时,如果不能及时的将 API Service 释放掉,那么 API Request 就会因为被占用,而无法处理新的请求。
对于上面两个问题,我们可以通过将具体的执行过程和响应过程分离来达到理想的效果。这也是 RPC 和 API 存在的原因之一。
一个通过 HTTP Request 调用操作函数的 RPC 实现样例
包含了下列两个过程实现:
- 接收 HTTP Request
- RPC 调用操作函数
环境
Devstack 用的 L 版,样例在自定义的 Openstack 项目中实现,详见自动化生成 Openstack 新项目开发框架。
接收 HTTP Request
-
定义 HTTP Request URL 路由转发的 Resource
详见Openstack Restful API 开发框架 Paste + PasteDeploy + Routes + WebOb 和
Openstack Paste.ini 文件详解
# project/api/v1/router.py
from project.api.v1 import new_resource
class APIRouter(octopunch.api.openstack.APIRouter):
"""Routes requests on the API to the appropriate controller and method."""
ExtensionManager = extensions.ExtensionManager
def _setup_routes(self, mapper, ext_mgr):
self.resources['versions'] = versions.create_resource()
mapper.connect("versions", "/",
controller=self.resources['versions'],
action='show')
mapper.redirect("", "/")
self.resources['new_resource'] = new_resource.create_resource(ext_mgr)
mapper.resource('new_resource', 'new_resource',
controller=self.resources['new_resource'])
- 将 Resource 和 HTTP 方法绑定到 Controller 的 Action 函数中
# project/api/v1/new_resource.py
class NewResourceController(wsgi.Controller):
def __init__(self, ext_mgr):
self.ext_mgr = ext_mgr
super(NewResourceController, self).__init__()
# Create() 对应了 HTTP 的 POST 方法
@wsgi.serializers()
def create(self, req, body):
"""Create a NewResource."""
context = req.environ['project.context']
# Sync the project database.
self.new_resource_api.db_sync(context)
def create_resource(ext_mgr):
"""project resource factory method."""
return wsgi.Resource(NewResourceController(ext_mgr))
上述两个文件实现了通过 HTTP Request 的 POST 方法来执行指定的 Create() 函数。
RPC 调用具体的操作函数
- 通过 API 调用 RPC-API
# project/new_resource/api.py
# import rpcapi module
from project.new_resource import rpcapi as new_resource_rpcapi
class API(base.Base):
"""API for interacting with the new_resource manager."""
def __init__(self, db_driver=None, image_service=None):
self.new_resource_rpcapi = new_resource_rpcapi.NewResourceAPI()
super(API, self).__init__(db_driver)
# 定义调用 rpcapi 的接口函数
def db_sync(self, context):
"""Call rpc api to start db sync for new_resource database."""
self.new_resource_rpcapi.db_sync(context)
API 的存在是为了能够快速的响应请求,至于之后的执行过程交由 RPC-API 和 Manager 来处理
-
rpc-api.py 调用 manager.py
在 rpcapi.py 定义的 RPC 接口函数会自动的映射到 manager.py 中指定的处理函数。
# project/new_resource/rpcapi.py
class NewResourceAPI(object):
def __init__(self):
super(NewResourceAPI, self).__init__()
target = messaging.Target(topic=CONF.manage_topic,
version=self.RPC_API_VERSION)
serializer = objects_base.ProjectObjectSerializer()
self.client = rpc.get_client(target, version_cap='1.8',
serializer=serializer)
# 定义 rpcapi 函数,使用 cast 异步调用方式
def db_sync(self, context):
cctxt = self.client.prepare()
# 指定 rpcapi 的调用方式,和指定映射到 manager.py 的处理函数
cctxt.cast(context, 'db_sync')
RPC-API 的存在是为了快速的响应进程服务之间的调用请求。
# project/new_resource/manager.py
class NewResourceManager(manager.Manager):
RPC_API_VERSION = '1.8'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, service_name=None, *args, **kwargs):
super(NewResourceManager, self).__init__(*args, **kwargs)
self._startup_delay = True
def init_host_with_rpc(self):
eventlet.sleep(CONF.periodic_interval)
self._startup_delay = False
def db_sync(self, context):
print "这里是具体的 RPC 操作函数"
小结:
Openstack 的 PRC 调用的过程为: api.py ⇒ rpcapi.py ⇒ manager.py
详见:Openstack Nova 源码分析 — RPC 远程调用过程
测试
- 启动 API 服务
project-api --config-file /etc/project/proname.conf
- 启动 Manager 服务
project-manager --config-file /etc/project/project.conf
- 发送 HTTP 请求
curl -i 'http://<Service_host_ip>:<service_port>/v1/<project_id>/<ResourceUrl>' -X POST -H "Content-Type: application/json" -H "X-Auth-Project-Id: admin" -H "X-Auth-Token: <token_id>" -d '<body_content_dict>'
注意: 样例需要根据自身开发环境进行调整,