目录
文章目录
Neutron 的软件架构分析与实现
Neutron 的软件架构并不复杂,我们尝试通过三张图来讲述。
第一张:Neutron 是一个主从分布式应用程序,具有多个服务进程(e.g. neutron-server.service、l3_agent.service etc.),采用异步通讯模式。分为担任*控制(接收北向 API 请求,控制逻辑,下达任务)的 Neutron Server 和担任地方执行者(执行任务,反馈结果)的 Agents,两者之间互为生产者/消费者模型,通过消息队列(MQ)进行通讯。
第二张:为了对接多样化的底层物理/虚拟网元支撑,Neutron Server 实现了 Plugins(插件)机制来使用这些网元功能支撑上层逻辑。所以可以将 Neutron Server 进一步细分为:
- 接收北向 RESTful API 请求的 API 层
- 对接不同厂商网元支撑的 Plugin 层
第三张:Neutron 为了兼顾优秀的稳定性(默认网元支撑的核心资源模型功能子集)和可扩展性(多样化网元支撑的扩展资源模型功能集),Neutron Server 进一步将 API 层细分为:核心 API(Core API)和扩展 API(Extension API);将 Plugin 层细分为:核心插件(Core Plugins)和服务插件(又称扩展插件,Service Plugins);不同的网络供应商可以根据自己的需求对 Neutron 的功能集进行扩展,但如果不扩展的话,Neutron 自身就可以提供一套完整的解决方案。这就是 Neutron 引入 Core & Plugin 架构理念的关键。
简而言之,Neutron 的软件架构并不算太过特立独行,秉承了 OpenStack 项目的一贯设计风格,具有以下特点:
- 分布式 — 多服务进程
- RESTful API — 统一北向接口
- Plugin — 底层异构兼容
- 异步消息队列 — MQ
- Agents — Workers
Neutron Server 启动流程
NOTE:下文中的代码均来自 OpenStack Rocky 版本。
neutron-server — Accepts and routes API requests to the appropriate OpenStack Networking plug-in for action.
Neutron Server 对应的服务进程是 neutron-server.service,包含了 Web Server、Plugins(Core Plugins、Extension Plugis)、RCP Client/Server、DB ORM 等功能模块。
neutron-server.service 启动命令:
neutron-server --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/api-paste.ini
惯例我们从 neutron-server.service 服务进程的启动脚本开始看。
# /opt/stack/neutron/setup.cfg
[entry_points]
...
console_scripts =
...
neutron-server = neutron.cmd.eventlet.server:main
找到程序入口函数:
# /opt/stack/neutron/neutron/cmd/eventlet/server/__init__.py
def main():
server.boot_server(wsgi_eventlet.eventlet_wsgi_server)
# /opt/stack/neutron/neutron/server/wsgi_eventlet.py
def eventlet_wsgi_server():
# 获取 WSGI Application
neutron_api = service.serve_wsgi(service.NeutronApiService)
# 启动 API 和 RPC 服务
start_api_and_rpc_workers(neutron_api)
大致梳理 neutron-server.service 的启动流程其实非常简单:
- 初始化配置(加载、解析配置文件)
- 获取 WSGI Application
- 启动 API 和 RPC 服务
NOTE:其中,第一步初始化配置文件就是应用 oslo.config 库加载 neutron.conf 文件的并解析其中的内容,关于 oslo.config 库的内容在《OpenStack 实现技术分解 (7) 通用库 — oslo_config》中已经提到,这里不再赘述。我们主要关注第二、第三步。
获取 WSGI Application
WSGI Application 属于 Neutron Web Server 功能模块,Python Web Server 通常都会采用 WSGI 协议,将 Web Server 划分为 WSGI Server、WSGI Middleware、WSGI Application。
NOTE:关于 WSGI 协议的内容在《Python Web 开发规范 — WSGI》一文中已经介绍过,这里不再赘述。
获取 WSGI Application 的关键代码如下:
# /opt/stack/neutron/neutron/server/wsgi_eventlet.py
neutron_api = service.serve_wsgi(service.NeutronApiService)
# /opt/stack/neutron/neutron/common/config.py
def load_paste_app(app_name):
"""Builds and returns a WSGI app from a paste config file.
:param app_name: Name of the application to load
"""
# oslo_service 的封装函数,解析 api-paste.ini 文件并加载其定义的 Apps 实例
loader = wsgi.Loader(cfg.CONF)
# Log the values of registered opts
if cfg.CONF.debug:
cfg.CONF.log_opt_values(LOG, logging.DEBUG)
# 实参 app_name 为 `neutron`
app = loader.load_app(app_name)
return app
相关日志:
DEBUG oslo.service.wsgi [-] Loading app neutron from /etc/neutron/api-paste.ini
其中 api-paste.ini 是 Paste 库的配置文件,关于 Paste 的内容在《Openstack Restful API 开发框架 Paste + PasteDeploy + Routes + WebOb》中已经提到,这里不再赘述。配置内容如下:
# /etc/neutron/api-paste.ini
[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors osprofiler extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors osprofiler authtoken keystonecontext extensions neutronapiapp_v2_0
[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory
[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory
经过 Paste 库的一系列处理之后,程序流进入 pipeline_factory
function。
# /opt/stack/neutron/neutron/auth.py
def pipeline_factory(loader, global_conf, **local_conf):
"""Create a paste pipeline based on the 'auth_strategy' config option."""
# 通过配置项 auth_strategy 指定是否需要启用 Keystone 鉴权服务
pipeline = local_conf[cfg.CONF.auth_strategy]
pipeline = pipeline.split()
# 加载所有 WSGI Middleware 的 filter
filters = [loader.get_filter(n) for n in pipeline[:-1]]
# 加载 WSGI Application,传入的实参为 neutronapiapp_v2_0
app = loader.get_app(pipeline[-1])
filters.reverse()
# 让 WSGI Application 逆序的通过(执行)所有 WSGI Middleware filters
for filter in filters:
app = filter(app)
return app
# /opt/stack/neutron/neutron/api/v2/router.py
def _factory(global_config, **local_config):
return pecan_app.v2_factory(global_config, **local_config)
/opt/stack/neutron/neutron/pecan_wsgi/app.py
def v2_factory(global_config, **local_config):
# Processing Order:
# As request enters lower priority called before higher.
# Response from controller is passed from higher priority to lower.
app_hooks = [
hooks.UserFilterHook(), # priority 90
hooks.ContextHook(), # priority 95
hooks.ExceptionTranslationHook(), # priority 100
hooks.BodyValidationHook(), # priority 120
hooks.OwnershipValidationHook(), # priority 125
hooks.QuotaEnforcementHook(), # priority 130
hooks.NotifierHook(), # priority 135
hooks.QueryParametersHook(), # priority 139
hooks.PolicyHook(), # priority 140
]
# REST API 的根 "/" 控制器是 root.V2Controller
app = pecan.make_app(root.V2Controller(),
debug=False,
force_canonical=False,
hooks=app_hooks,
guess_content_type_from_ext=True)
# 初始化 Neutron Server
startup.initialize_all()
return app
自此,得到了一个最终的 WSGI Application,我们也找到了 API Request 的 “/”。从代码可以看出 Neutron 当前使用的 Web 框架是 Pecan(A WSGI object-dispatching web framework, designed to be lean and fast with few dependencies.),不再是旧版本的 PPRW(Paste + PasteDeploy + Routes + WebOb)。Pecan 的「对象分发式路由」设计,让 WSGI Application 的路由映射、视图函数实现变得更加简单,而不再像 PPRW 那样需要编写大量与实际业务无关的代码,Pecan 是现在大多数 OpenStack 项目首选的 Web 框架。
Core API & Extension API
Neutron 的根控制器 root.V2Controller()
提供了打印所有 Core API、Extension API Refs(资源模型应用)的接口。
获取 Core API refs:
[root@localhost ~]# curl -i "http://172.18.22.200:9696/v2.0/" \
> -X GET \
> -H 'Content-type: application/json' \
> -H 'Accept: application/json' \
> -H "X-Auth-Project-Id: admin" \
> -H 'X-Auth-Token:gAAAAABchg8IRf8aMdYbm-7-vPJsFCoSecCJz9GZcPgrS0UirgSpbxIaF1f5duFsrkwRePBP6duTmVhV3GSIrHLqZ3RT21GQ1oDipTwCe8RktCnkEg5kXrUuQfAXmvjltRm5_0w5XbltJahVY0c3QXlrpP9G-IBdBWI7mpvyoP6h0x94000Ux20'
HTTP/1.1 200 OK
Content-Length: 516
Content-Type: application/json
X-Openstack-Request-Id: req-7c8aa1e6-1a18-433e-8ff5-95e59028cce5
Date: Mon, 11 Mar 2019 07:36:17 GMT
{
"resources": [{
"links": [{
"href": "http://172.18.22.200:9696/v2.0/subnets",
"rel": "self"
}],
"name": "subnet",
"collection": "subnets"
}, {
"links": [{
"href": "http://172.18.22.200:9696/v2.0/subnetpools",
"rel": "self"
}],
"name": "subnetpool",
"collection": "subnetpools"
}, {
"links": [{
"href": "http://172.18.22.200:9696/v2.0/networks",
"rel": "self"
}],
"name": "network",
"collection": "networks"
}, {
"links": [{
"href": "http://172.18.22.200:9696/v2.0/ports",
"rel": "self"
}],
"name": "port",
"collection": "ports"
}]
}
获取 Extension API refs:
[root@localhost ~]# curl -i "http://172.18.22.200:9696/v2.0/extensions/" \
> -X GET \
> -H 'Content-type: application/json' \
> -H 'Accept: application/json' \
> -H "X-Auth-Project-Id: admin" \
> -H 'X-Auth-Token:gAAAAABchg8IRf8aMdYbm-7-vPJsFCoSecCJz9GZcPgrS0UirgSpbxIaF1f5duFsrkwRePBP6duTmVhV3GSIrHLqZ3RT21GQ1oDipTwCe8RktCnkEg5kXrUuQfAXmvjltRm5_0w5XbltJahVY0c3QXlrpP9G-IBdBWI7mpvyoP6h0x94000Ux20'
HTTP/1.1 200 OK
Content-Length: 9909
Content-Type: application/json
X-Openstack-Request-Id: req-4dad9963-57c2-4b3e-a4d5-bc6fea5e78e8
Date: Mon, 11 Mar 2019 07:37:25 GMT
{
"extensions": [{
"alias": "default-subnetpools",
"updated": "2016-02-18T18:00:00-00:00",
"name": "Default Subnetpools",
"links": [],
"description": "Provides ability to mark and use a subnetpool as the default."
}, {
"alias": "availability_zone",
"updated": "2015-01-01T10:00:00-00:00",
"name": "Availability Zone",
"links": [],
"description": "The availability zone extension."
}, {
"alias": "network_availability_zone",
"updated": "2015-01-01T10:00:00-00:00",
"name": "Network Availability Zone",
"links": [],
"description": "Availability zone support for network."
}, {
"alias": "auto-allocated-topology",
"updated": "2016-01-01T00:00:00-00:00",
"name": "Auto Allocated Topology Services",
"links": [],
"description": "Auto Allocated Topology Services."
}, {
"alias": "ext-gw-mode",
"updated": "2013-03-28T10:00:00-00:00",
"name": "Neutron L3 Configurable external gateway mode",
"links": [],
"description": "Extension of the router abstraction for specifying whether SNAT should occur on the external gateway"
}, {
"alias": "binding",
"updated": "2014-02-03T10:00:00-00:00",
"name": "Port Binding",
"links": [],
"description": "Expose port bindings of a virtual port to external application"
}, {
"alias": "agent",
"updated": "2013-02-03T10:00:00-00:00",
"name": "agent",
"links": [],
"description": "The agent management extension."
}, {
"alias": "subnet_allocation",
"updated": "2015-03-30T10:00:00-00:00",
"name": "Subnet Allocation",
"links": [],
"description": "Enables allocation of subnets from a subnet pool"
}, {
"alias": "dhcp_agent_scheduler",
"updated": "2013-02-07T10:00:00-00:00",
"name": "DHCP Agent Scheduler",
"links": [],
"description": "Schedule networks among dhcp agents"
}, {
"alias": "external-net",
"updated": "2013-01-14T10:00:00-00:00",
"name": "Neutron external network",
"links": [],
"description": "Adds external network attribute to network resource."
}, {
"alias": "standard-attr-tag",
"updated": "2017-01-01T00:00:00-00:00",
"name": "Tag support for resources with standard attribute: subnet, trunk, router, network, policy, subnetpool, port, security_group, floatingip",
"links": [],
"description": "Enables to set tag on resources with standard attribute."
}, {
"alias": "flavors",
"updated": "2015-09-17T10:00:00-00:00",
"name": "Neutron Service Flavors",
"links": [],
"description": "Flavor specification for Neutron advanced services."
}, {
"alias": "net-mtu",
"updated": "2015-03-25T10:00:00-00:00",
"name": "Network MTU",
"links": [],
"description": "Provides MTU attribute for a network resource."
}, {
"alias": "network-ip-availability",
"updated": "2015-09-24T00:00:00-00:00",
"name": "Network IP Availability",
"links": [],
"description": "Provides IP availability data for each network and subnet."
}, {
"alias": "quotas",
"updated": "2012-07-29T10:00:00-00:00",
"name": "Quota management support",
"links": [],
"description": "Expose functions for quotas management per tenant"
}, {
"alias": "revision-if-match",
"updated": "2016-12-11T00:00:00-00:00",
"name": "If-Match constraints based on revision_number",
"links": [],
"description": "Extension indicating that If-Match based on revision_number is supported."
}, {
"alias": "l3-port-ip-change-not-allowed",
"updated": "2018-10-09T10:00:00-00:00",
"name": "Prevent L3 router ports IP address change extension",
"links": [],
"description": "Prevent change of IP address for some L3 router ports"
}, {
"alias": "availability_zone_filter",
"updated": "2018-06-22T10:00:00-00:00",
"name": "Availability Zone Filter Extension",
"links": [],
"description": "Add filter parameters to AvailabilityZone resource"
}, {
"alias": "l3-ha",
"updated": "2014-04-26T00:00:00-00:00",
"name": "HA Router extension",
"links": [],
"description": "Adds HA capability to routers."
}, {
"alias": "filter-validation",
"updated": "2018-03-21T10:00:00-00:00",
"name": "Filter parameters validation",
"links": [],
"description": "Provides validation on filter parameters."
}, {
"alias": "multi-provider",
"updated": "2013-06-27T10:00:00-00:00",
"name": "Multi Provider Network",
"links": [],
"description": "Expose mapping of virtual networks to multiple physical networks"
}, {
"alias": "quota_details",
"updated": "2017-02-10T10:00:00-00:00",
"name": "Quota details management support",
"links": [],
"description": "Expose functions for quotas usage statistics per project"
}, {
"alias": "address-scope",
"updated": "2015-07-26T10:00:00-00:00",
"name": "Address scope",
"links": [],
"description": "Address scopes extension."
}, {
"alias": "extraroute",
"updated": "2013-02-01T10:00:00-00:00",
"name": "Neutron Extra Route",
"links": [],
"description": "Extra routes configuration for L3 router"
}, {
"alias": "net-mtu-writable",
"updated": "2017-07-12T00:00:00-00:00",
"name": "Network MTU (writable)",
"links": [],
"description": "Provides a writable MTU attribute for a network resource."
}, {
"alias": "empty-string-filtering",
"updated": "2018-05-01T10:00:00-00:00",
"name": "Empty String Filtering Extension",
"links": [],
"description": "Allow filtering by attributes with empty string value"
}, {
"alias": "subnet-service-types",
"updated": "2016-03-15T18:00:00-00:00",
"name": "Subnet service types",
"links": [],
"description": "Provides ability to set the subnet service_types field"
}, {
"alias": "floatingip-pools",
"updated": "2018-03-21T10:00:00-00:00",
"name": "Floating IP Pools Extension",
"links": [],
"description": "Provides a floating IP pools API."
}, {
"alias": "port-mac-address-regenerate",
"updated": "2018-05-03T10:00:00-00:00",
"name": "Neutron Port MAC address regenerate",
"links": [],
"description": "Network port MAC address regenerate"
}, {
"alias": "standard-attr-timestamp",
"updated": "2016-09-12T10:00:00-00:00",
"name": "Resource timestamps",
"links": [],
"description": "Adds created_at and updated_at fields to all Neutron resources that have Neutron standard attributes."
}, {
"alias": "provider",
"updated": "2012-09-07T10:00:00-00:00",
"name": "Provider Network",
"links": [],
"description": "Expose mapping of virtual networks to physical networks"
}, {
"alias": "service-type",
"updated": "2013-01-20T00:00:00-00:00",
"name": "Neutron Service Type Management",
"links": [],
"description": "API for retrieving service providers for Neutron advanced services"
}, {
"alias": "l3-flavors",
"updated": "2016-05-17T00:00:00-00:00",
"name": "Router Flavor Extension",
"links": [],
"description": "Flavor support for routers."
}, {
"alias": "port-security",
"updated": "2012-07-23T10:00:00-00:00",
"name": "Port Security",
"links": [],
"description": "Provides port security"
}, {
"alias": "extra_dhcp_opt",
"updated": "2013-03-17T12:00:00-00:00",
"name": "Neutron Extra DHCP options",
"links": [],
"description": "Extra options configuration for DHCP. For example PXE boot options to DHCP clients can be specified (e.g. tftp-server, server-ip-address, bootfile-name)"
}, {
"alias": "port-security-groups-filtering",
"updated": "2018-01-09T09:00:00-00:00",
"name": "Port filtering on security groups",
"links": [],
"description": "Provides security groups filtering when listing ports"
}, {
"alias": "standard-attr-revisions",
"updated": "2016-04-11T10:00:00-00:00",
"name": "Resource revision numbers",
"links": [],
"description": "This extension will display the revision number of neutron resources."
}, {
"alias": "pagination",
"updated": "2016-06-12T00:00:00-00:00",
"name": "Pagination support",
"links": [],
"description": "Extension that indicates that pagination is enabled."
}, {
"alias": "sorting",
"updated": "2016-06-12T00:00:00-00:00",
"name": "Sorting support",
"links": [],
"description": "Extension that indicates that sorting is enabled."
}, {
"alias": "security-group",
"updated": "2012-10-05T10:00:00-00:00",
"name": "security-group",
"links": [],
"description": "The security groups extension."
}, {
"alias": "l3_agent_scheduler",
"updated": "2013-02-07T10:00:00-00:00",
"name": "L3 Agent Scheduler",
"links": [],
"description": "Schedule routers among l3 agents"
}, {
"alias": "fip-port-details",
"updated": "2018-04-09T10:00:00-00:00",
"name": "Floating IP Port Details Extension",
"links": [],
"description": "Add port_details attribute to Floating IP resource"
}, {
"alias": "router_availability_zone",
"updated": "2015-01-01T10:00:00-00:00",
"name": "Router Availability Zone",
"links": [],
"description": "Availability zone support for router."
}, {
"alias": "rbac-policies",
"updated": "2015-06-17T12:15:12-00:00",
"name": "RBAC Policies",
"links": [],
"description": "Allows creation and modification of policies that control tenant access to resources."
}, {
"alias": "standard-attr-description",
"updated": "2016-02-10T10:00:00-00:00",
"name": "standard-attr-description",
"links": [],
"description": "Extension to add descriptions to standard attributes"
}, {
"alias": "ip-substring-filtering",
"updated": "2017-11-28T09:00:00-00:00",
"name": "IP address substring filtering",
"links": [],
"description": "Provides IP address substring filtering when listing ports"
}, {
"alias": "router",
"updated": "2012-07-20T10:00:00-00:00",
"name": "Neutron L3 Router",
"links": [],
"description": "Router abstraction for basic L3 forwarding between L2 Neutron networks and access to external networks via a NAT gateway."
}, {
"alias": "allowed-address-pairs",
"updated": "2013-07-23T10:00:00-00:00",
"name": "Allowed Address Pairs",
"links": [],
"description": "Provides allowed address pairs"
}, {
"alias": "binding-extended",
"updated": "2017-07-17T10:00:00-00:00",
"name": "Port Bindings Extended",
"links": [],
"description": "Expose port bindings of a virtual port to external application"
}, {
"alias": "project-id",
"updated": "2016-09-09T09:09:09-09:09",
"name": "project_id field enabled",
"links": [],
"description": "Extension that indicates that project_id field is enabled."
}, {
"alias": "dvr",
"updated": "2014-06-1T10:00:00-00:00",
"name": "Distributed Virtual Router",
"links": [],
"description": "Enables configuration of Distributed Virtual Routers."
}]
}
从这两个 API 调用可以看出,Neutron 的 Core API Resources 只有与大二层相关的 networks、subnets、subnetpools、ports 四种,其余的均为 Extension API Resources。Core API 是 Neutron 的立身之本,是 Neutron 最小而稳定的核心功能集。 相对的,Extension API 则是 Neutron 的 “生财之道”,优秀的可扩展 API 让 Neutron 得以拥有良好的开源生态。
如果看了根控制器 V2Controller 的源码实现或许你会感到疑惑,为什么 V2Controller 只是实现了 ExtensionsController 子控制器,而且 ExtensionsController 实现的功能也只是打印出 Extension API Resources 的清单信息而已。那么,Neutron 官方文档(Networking API v2.0)提供的这么多资源对象相应的 Controller 到底在什么地方实现的呢?答案就在 startup.initialize_all()
function 里,但是在讲述 Controllers 的实现之前,我们得先了解 Neutron Plugins 是如何被加载的,因为 API Resources、Controllers、Plugins 三者之间的关系是息息相关的。
Core Plugins & Service Plugins
与 API 的分类对应,Plugins 也分为 Core Plugin 和 Service(Extension)Plugin,可通过配置项选择 Plugins 的具体实现。e.g.
[DEFAULT]
...
# The core plugin Neutron will use (string value)
core_plugin = ml2
# The service plugins Neutron will use (list value)
service_plugins = neutron.services.l3_router.l3_router_plugin.L3RouterPlugin
从配置项属性看出,Core Plugin 只能有一个(默认为 ml2),而 Service Plugins 却可以同时指定多个。例如:L3RouterPlugin、FWaaSPlugin、LBaaSPlugin、VPNaaSPlugin 等等。
加载 Plugins 的代码实现如下:
# /opt/stack/neutron/neutron/manager.py
def init():
"""Call to load the plugins (core+services) machinery."""
# 加载所有插件 (core plugin + extension services plugin)
# directory 的含义就是登记 Plugins,实现在 neutron_lib/plugins/directory.py,主要提供 Plugins 清单的维护和管理(e.g. add_plugin、get_plugins、get_unique_plugins、is_loaded)
if not directory.is_loaded():
# 这里 get 的就是 NeutronManager 自己的 instance,实现了单例模式(为了维护统一的一套 Plugins 清单)
NeutronManager.get_instance()
class NeutronManager(object):
"""Neutron's Manager class.
Neutron's Manager class is responsible for parsing a config file and
instantiating the correct plugin that concretely implements
neutron_plugin_base class.
"""
...
def __init__(self, options=None, config_file=None):
...
# 从配置文件中读取 Core Plugin,e.g. ml2
plugin_provider = cfg.CONF.core_plugin
LOG.info("Loading core plugin: %s", plugin_provider)
# NOTE(armax): keep hold of the actual plugin object
# 实例化 Core Plugin Class,e.g. ML2Plugin Class
plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
plugin_provider)
# 将 Core Plugin 登记到 Plugins Directory
directory.add_plugin(lib_const.CORE, plugin)
...
# load services from the core plugin first
# 首先加载 Core Plugin 本身默认支持的 Extension Plugins
self._load_services_from_core_plugin(plugin)
# 继续加载通过配置文件额外指定的 Extension Plugins
self._load_service_plugins()
...
def _load_services_from_core_plugin(self, plugin):
"""Puts core plugin in service_plugins for supported services."""
LOG.debug("Loading services supported by the core plugin")
# supported service types are derived from supported extensions
# 来之 Core Plugin 默认支持的 Extension Plugins(e.g. lbaas、fwaas、vpnaas、router、qos)
for ext_alias in getattr(plugin, "supported_extension_aliases", []):
if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
# 将 Extension Plugins 登记到 Plugins Directory
directory.add_plugin(service_type, plugin)
LOG.info("Service %s is supported by the core plugin",
service_type)
def _load_service_plugins(self):
"""Loads service plugins.
Starts from the core plugin and checks if it supports
advanced services then loads classes provided in configuration.
"""
# 从配置文件中读取 Extension Plugins
plugin_providers = cfg.CONF.service_plugins
# 获取 Core Plugin 使用 Neutron 原生的 Datastore 则会返回一些默认的 Extension Plugins(e.g. tag、timestamp、flavors、revisions)
plugin_providers.extend(self._get_default_service_plugins())
LOG.debug("Loading service plugins: %s", plugin_providers)
for provider in plugin_providers:
if provider == '':
continue
LOG.info("Loading Plugin: %s", provider)
# 实例化 Extension Plugins Class 的对象
plugin_inst = self._get_plugin_instance('neutron.service_plugins',
provider)
# only one implementation of svc_type allowed
# specifying more than one plugin
# for the same type is a fatal exception
# TODO(armax): simplify this by moving the conditional into the
# directory itself.
plugin_type = plugin_inst.get_plugin_type()
if directory.get_plugin(plugin_type):
raise ValueError(_("Multiple plugins for service "
"%s were configured") % plugin_type)
# 将 Extension Plugins 登记到 Plugins Directory
directory.add_plugin(plugin_type, plugin_inst)
# search for possible agent notifiers declared in service plugin
# (needed by agent management extension)
# 获取 Core Plugin 实例对象
plugin = directory.get_plugin()
if (hasattr(plugin, 'agent_notifiers') and
hasattr(plugin_inst, 'agent_notifiers')):
# 将 Extension Plugins 对应的 Agent notifiers 更新到 Core Plugin 的 Agent notifiers 字典中
plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
# disable incompatible extensions in core plugin if any
utils.disable_extension_by_service_plugin(plugin, plugin_inst)
LOG.debug("Successfully loaded %(type)s plugin. "
"Description: %(desc)s",
{"type": plugin_type,
"desc": plugin_inst.get_plugin_description()})
至此,Neutron 当前支持的 Core、Extension Plugins Class 都被实例化并登记到 Plugins Directory 了。Plugins Directory 是一个重要的工具模块,在需要获取 Loaded Plugins 的代码逻辑中都会看见它的身影。
注册完 Plugins 之后,会在启动 neutron-server.service 服务进程的过程中启动 Plugins 响应的 Workers 进程或协程。
# /opt/stack/neutron/neutron/server/wsgi_eventlet.py
# 在启动 neutron-server.servce 时执行
def start_api_and_rpc_workers(neutron_api):
try:
# 获取所有 RPC and Plugins workers 的 Launcher 实例对象
worker_launcher = service.start_all_workers()
# 创建协程池
pool = eventlet.GreenPool()
# 以协程的方式启动 WSGI Application
api_thread = pool.spawn(neutron_api.wait)
# 以协程的方式启动 RPC and Plugins workers
plugin_workers_thread = pool.spawn(worker_launcher.wait)
# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())
pool.waitall()
except NotImplementedError:
LOG.info("RPC was already started in parent process by "
"plugin.")
neutron_api.wait()
# /opt/stack/neutron/neutron/service.py
def _get_rpc_workers():
# 从 Plugins Directory 获取 Core plugin 实例对象
plugin = directory.get_plugin()
# 从 Plugins Directory 获取 Service plugins(包括 Core Plugin)清单列表
service_plugins = directory.get_plugins().values()
...
# passing service plugins only, because core plugin is among them
# 创建所有 Plugins(core + service)的 RpcWorker 对象实例
rpc_workers = [RpcWorker(service_plugins,
worker_process_count=cfg.CONF.rpc_workers)]
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_workers.append(
RpcReportsWorker(
[plugin],
worker_process_count=cfg.CONF.rpc_state_report_workers
)
)
return rpc_workers
class RpcWorker(neutron_worker.BaseWorker):
"""Wraps a worker to be handled by ProcessLauncher"""
start_listeners_method = 'start_rpc_listeners'
def __init__(self, plugins, worker_process_count=1):
super(RpcWorker, self).__init__(
worker_process_count=worker_process_count
)
self._plugins = plugins
self._servers = []
def start(self):
super(RpcWorker, self).start()
for plugin in self._plugins:
if hasattr(plugin, self.start_listeners_method):
try:
# 获取所有 Plugins 的 RPC listeners 方法
servers = getattr(plugin, self.start_listeners_method)()
except NotImplementedError:
continue
self._servers.extend(servers)
...
class RpcReportsWorker(RpcWorker):
start_listeners_method = 'start_rpc_state_reports_listener'
RpcWorker 与 RpcReportsWorker 的区别在于:后者仅仅具有 RPC state 的 Report 功能,前者才是真正的 Neutron 业务逻辑 RPC worker。可以通过配置项 rpc_state_report_workers
来指定是否要开启 RPC state Report 功能。
至于 RPC workers 启动的方式与配置项 rpc_workers 和 rpc_state_report_workers 有关。如果这两个配置项的值(int)小于 1,那么 RPC workers 就会在 neutron-server 进程内以协程的方式启动;如果大于 1 则会 fork 新的进程,然后在新的进程内以协程的方式启动。
Core Controller & Extension Controller
Controller 是 Pecan 中一个非常重要的概念,是 WSGI Application 中 URL Routed、View Function、HTTP Method 以及三者间 Mapper 的封装,可以说是一个 Web 框架的核心对象。更多关于 Controller 的内容请浏览 Pecan 官方网站。这里我们主要关注 Neutron 是如何实现 Controller 的。
neutronapiapp_v2_0 Factory Function 的代码中,除了生成并返回 WSGI Application 对象之外,还执行了 startup.initialize_all()
语句,它所做的事情就是准备好 neutron-server.service 启动所必须的前提条件。包括:Plugins 的加载、API Resources Controller 的实例化以及处理 API Resources、Controllers、Plugins 三者之间的映射关系。
# /opt/stack/neutron/neutron/pecan_wsgi/startup.py
# Core Resources 清单
RESOURCES = {'network': 'networks',
'subnet': 'subnets',
'subnetpool': 'subnetpools',
'port': 'ports'}
def initialize_all():
# 加载 Plugins,如上文所说
manager.init()
# PluginAwareExtensionManager 做的事情就是从 configured extension path 加载 extensions,并且对 Extension Plugins 提供一些常用管理函数,e.g. add_extension、extend_resources、get_resources
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
# 将 Core Resources 加入到了 Extension Resources 清单
ext_mgr.extend_resources("2.0", attributes.RESOURCES)
# At this stage we have a fully populated resource attribute map;
# build Pecan controllers and routes for all core resources
# 获取 Core Plugin 实例对象
plugin = directory.get_plugin()
# 循环处理 Core Resources
for resource, collection in RESOURCES.items():
# Keeps track of Neutron resources for which quota limits are enforced.
resource_registry.register_resource_by_name(resource)
# 将 Core Resource、Core Plugin 封装到 new_controller 实例对象
new_controller = res_ctrl.CollectionsController(collection, resource,
plugin=plugin)
# 将 new_controller 以 resource_name:new_controller 的方式保存到 NeutronManager 实例属性
manager.NeutronManager.set_controller_for_resource(
collection, new_controller)
# 将 plugin 以 resource_name:plugin 的方式保存到 NeutronManager 实例属性
manager.NeutronManager.set_plugin_for_resource(collection, plugin)
pecanized_resources = ext_mgr.get_pecan_resources()
for pec_res in pecanized_resources:
manager.NeutronManager.set_controller_for_resource(
pec_res.collection, pec_res.controller)
manager.NeutronManager.set_plugin_for_resource(
pec_res.collection, pec_res.plugin)
# Now build Pecan Controllers and routes for all extensions
# 获取 Extension Resources 对应的 ResourceExtension
resources = ext_mgr.get_resources()
# Extensions controller is already defined, we don't need it.
resources.pop(0)
# 循环处理 ResourceExtension objects
for ext_res in resources:
path_prefix = ext_res.path_prefix.strip('/')
collection = ext_res.collection
# Retrieving the parent resource. It is expected the format of
# the parent resource to be:
# {'collection_name': 'name-of-collection',
# 'member_name': 'name-of-resource'}
# collection_name does not appear to be used in the legacy code
# inside the controller logic, so we can assume we do not need it.
parent = ext_res.parent or {}
parent_resource = parent.get('member_name')
collection_key = collection
if parent_resource:
collection_key = '/'.join([parent_resource, collection])
collection_actions = ext_res.collection_actions
member_actions = ext_res.member_actions
if manager.NeutronManager.get_controller_for_resource(collection_key):
# This is a collection that already has a pecan controller, we
# do not need to do anything else
continue
legacy_controller = getattr(ext_res.controller, 'controller',
ext_res.controller)
new_controller = None
if isinstance(legacy_controller, base.Controller):
resource = legacy_controller.resource
plugin = legacy_controller.plugin
attr_info = legacy_controller.attr_info
member_actions = legacy_controller.member_actions
pagination = legacy_controller.allow_pagination
sorting = legacy_controller.allow_sorting
# NOTE(blogan): legacy_controller and ext_res both can both have
# member_actions. the member_actions for ext_res are strictly for
# routing, while member_actions for legacy_controller are used for
# handling the request once the routing has found the controller.
# They're always the same so we will just use the ext_res
# member_action.
# 将 Extension Resource、Extension Plugin、原 Extension Controller 的部分属性重新封装到 new_controller 实例对象
new_controller = res_ctrl.CollectionsController(
collection, resource, resource_info=attr_info,
parent_resource=parent_resource, member_actions=member_actions,
plugin=plugin, allow_pagination=pagination,
allow_sorting=sorting, collection_actions=collection_actions)
# new_controller.collection has replaced hyphens with underscores
manager.NeutronManager.set_plugin_for_resource(
new_controller.collection, plugin)
if path_prefix:
manager.NeutronManager.add_resource_for_path_prefix(
collection, path_prefix)
else:
new_controller = utils.ShimCollectionsController(
collection, None, legacy_controller,
collection_actions=collection_actions,
member_actions=member_actions,
action_status=ext_res.controller.action_status,
collection_methods=ext_res.collection_methods)
# 将 new_controller 以 resource_name:new_controller 的方式保存到 NeutronManager 实例属性
manager.NeutronManager.set_controller_for_resource(
collection_key, new_controller)
# Certain policy checks require that the extensions are loaded
# and the RESOURCE_ATTRIBUTE_MAP populated before they can be
# properly initialized. This can only be claimed with certainty
# once this point in the code has been reached. In the event
# that the policies have been initialized before this point,
# calling reset will cause the next policy check to
# re-initialize with all of the required data in place.
policy.reset()
简而言之,def initialize_all
function 所做的事情就是首先加载所有的 Plugins,然后将 Core Plugins + Core Resources、Extension Plugins + Extension Resources 重新封装为一个 CollectionsController 实例对象并且统一注册到 NeutronManger 实例属性 self.resource_plugin_mappings
、self.resource_controller_mappings
中。
从最初的 Core、Extension API 的分离,再到完成 Core、Extension Controller 的统一,真是一段漫长的路程。
Core API 请求处理
根控制器 V2Controller 没有显示的声明 Core Resources Controller,但其实所有 Core API Request 都在 method def _lookup(self, collection, *remainder):
(注:所有未显式定义的 URL path 都被路由到 _lookup
method)中得到了处理。
@utils.expose()
def _lookup(self, collection, *remainder):
# if collection exists in the extension to service plugins map then
# we are assuming that collection is the service plugin and
# needs to be remapped.
# Example: https://neutron.endpoint/v2.0/lbaas/loadbalancers
if (remainder and
manager.NeutronManager.get_resources_for_path_prefix(
collection)):
collection = remainder[0]
remainder = remainder[1:]
# collection 的实参为 networks、subnets、subnetpools、ports 等 Core Resources
# 从 NeutronManager 实例对象中获取 Resource 对应的 Controller 实例对象
controller = manager.NeutronManager.get_controller_for_resource(
collection)
if not controller:
LOG.warning("No controller found for: %s - returning response "
"code 404", collection)
pecan.abort(404)
# Store resource and collection names in pecan request context so that
# hooks can leverage them if necessary. The following code uses
# attributes from the controller instance to ensure names have been
# properly sanitized (eg: replacing dashes with underscores)
request.context['resource'] = controller.resource
request.context['collection'] = controller.collection
# NOTE(blogan): initialize a dict to store the ids of the items walked
# in the path for example: /networks/1234 would cause uri_identifiers
# to contain: {'network_id': '1234'}
# This is for backwards compatibility with legacy extensions that
# defined their own controllers and expected kwargs to be passed in
# with the uri_identifiers
request.context['uri_identifiers'] = {}
return controller, remainder
在 def initialize_all
阶段已经把 Core Controllers 都注册到 NeutronManager 的实例属性 self.resource_controller_mappings
中了,这里再根据 API Request 的类型(e.g. networks、subnets)从 NeutronManager 的实例属性中取出。
(Pdb) controller
<neutron.pecan_wsgi.controllers.resource.CollectionsController object at 0x7f0fc2b60e10>
(Pdb) controller.resource
'network'
(Pdb) controller.plugin
<weakproxy at 0x7f0fc2b69cb0 to Ml2Plugin at 0x7f0fc3343fd0>
(Pdb) controller.plugin_lister
<bound method Ml2Plugin.get_networks of <neutron.plugins.ml2.plugin.Ml2Plugin object at 0x7f0fc3343fd0>>
通过打印上述 NetworkController 的实例属性看出,每种 Resource networks 关联到了 Core Plugin ml2,并在该 Plugin 类中实现对这个 Resource 的 “真·视图函数”。比如:API 请求 GET /v2.0/networks
的视图函数是 Ml2Plugin.get_networks
。实际上所有 Core Resources 都会关联到同一个 Core Plugin,但 Extension Resources 就会根据不同的类型关联到相应的 Service Plugins 上。Neutron 就是通过这种方式实现了从 Neutron API 层到 Neutron Plugin 层的调用封装。
# /opt/stack/neutron/neutron/plugins/ml2/plugin.py
@db_api.retry_if_session_inactive()
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
# NOTE(ihrachys) use writer manager to be able to update mtu
# TODO(ihrachys) remove in Queens when mtu is not nullable
with db_api.CONTEXT_WRITER.using(context):
nets_db = super(Ml2Plugin, self)._get_networks(
context, filters, None, sorts, limit, marker, page_reverse)
# NOTE(ihrachys) pre Pike networks may have null mtus; update them
# in database if needed
# TODO(ihrachys) remove in Queens+ when mtu is not nullable
net_data = []
for net in nets_db:
if net.mtu is None:
net.mtu = self._get_network_mtu(net, validate=False)
net_data.append(self._make_network_dict(net, context=context))
self.type_manager.extend_networks_dict_provider(context, net_data)
nets = self._filter_nets_provider(context, net_data, filters)
return [db_utils.resource_fields(net, fields) for net in nets]
Extension API 请求处理
Extensions API 的本质是一个 WSGI Middleware 而非 WSGI Application。
# /opt/stack/neutron/neutron/api/extensions.py
import routes
def plugin_aware_extension_middleware_factory(global_config, **local_config):
"""Paste factory."""
def _factory(app):
ext_mgr = PluginAwareExtensionManager.get_instance()
# ExtensionMiddleware 是 Extensions middleware for WSGI(路由隐射、视图函数)的封装,接收 Extensions Resources Request 并进行处理
return ExtensionMiddleware(app, ext_mgr=ext_mgr)
return _factory
class ExtensionMiddleware(base.ConfigurableMiddleware):
"""Extensions middleware for WSGI."""
def __init__(self, application,
...
# extended resources
for resource in self.ext_mgr.get_resources():
...
# 自定义 Actions
for action, method in resource.collection_actions.items():
conditions = dict(method=[method])
path = "/%s/%s" % (resource.collection, action)
with mapper.submapper(controller=resource.controller,
action=action,
path_prefix=path_prefix,
conditions=conditions) as submap:
submap.connect(path_prefix + path, path)
submap.connect(path_prefix + path + "_format",
"%s.:(format)" % path)
# 自定义 Methods
for action, method in resource.collection_methods.items():
conditions = dict(method=[method])
path = "/%s" % resource.collection
with mapper.submapper(controller=resource.controller,
action=action,
path_prefix=path_prefix,
conditions=conditions) as submap:
submap.connect(path_prefix + path, path)
submap.connect(path_prefix + path + "_format",
"%s.:(format)" % path)
# 将 ResourceCollection、ResourceController、ResourceMemberAction 映射起来
mapper.resource(resource.collection, resource.collection,
controller=resource.controller,
member=resource.member_actions,
parent_resource=resource.parent,
path_prefix=path_prefix)
...
从上述代码可以看出,虽然 Core API 使用了 Pecan 框架,但 Extension API 依旧使用了 routes 来进行 Mapper 的维护。
(Pdb) resource.collection
'routers'
(Pdb) resource.collection
'routers'
# 自定义 Method 和 Member Action 隐射
(Pdb) resource.member_actions
{'remove_router_interface': 'PUT', 'add_router_interface': 'PUT'}
(Pdb) resource.controller.__class__
<class 'webob.dec.wsgify'>
(Pdb) resource.controller.controller
<neutron.api.v2.base.Controller object at 0x7f81fd694ed0>
(Pdb) resource.controller.controller.plugin
<weakproxy at 0x7f81fd625158 to L3RouterPlugin at 0x7f81fd6c09d0>
Extension Resource routers 对应的 Plugin 是 L3RouterPlugin,API 请求 GET /v2.0/routers
对应的真·视图函数就是 neutron.services.l3_router.l3_router_plugin:L3RouterPlugin.get_routers
。
# /opt/stack/neutron/neutron/db/l3_db.py
# L3RouterPlugin 继承自父类 L3_NAT_dbonly_mixin
@db_api.retry_if_session_inactive()
def get_routers(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
marker_obj = lib_db_utils.get_marker_obj(
self, context, 'router', limit, marker)
return model_query.get_collection(context, l3_models.Router,
self._make_router_dict,
filters=filters, fields=fields,
sorts=sorts,
limit=limit,
marker_obj=marker_obj,
page_reverse=page_reverse)
Neutron Server 小结
Neutron Server 的启动过程 :
- 加载(实例化)Core WSGI Application 和 Extension WSGI Middleware
- 加载(实例化) Core & Extension Plugins
- 启动 Web Server 服务
- 启动 Plugins RPC 服务
熟悉 OpenStack 的开发者可以感受到,相对于其他项目(e.g. Nova、Cinder),Neutron 的代码编写并不算常规,很难使有经验的开发者快速掌握其要领,是 Neutron 入门难的原因。这显然不是一个好的思路,但想想 Neutron(Quantum)是由谁开源出来的也就释怀了。
Neutron API 主要有 Core API 和 Extension API 两大类,在 Web Server 层面(WSGI Server、WSGI Middleware、WSGI Application)分别对应 WSGI Application 和 WSGI Middleware。无论是 Core API 还是 Extension API,它们都通过 Controller Class 封装一种 Resource,区别在于前者使用的是 Pecan 框架,后者依旧使用 routes 库来完成 URL Router、View Function 以及 HTTP Method 三者之间的 Mapping。虽然代码编写的地方和实现方式并不那么统一,但最终的结果是一致的 — 将 Request 从 API 层传递到 Plugin 层,再由 Plugin 层使用 RPC 协议通过 MQ 异步传递到真正执行任务的 Agents 服务进程中。
NOTE:并非所有的请求都会被异步专递到 Agents 服务进程,有些请求会在 Plugins 层被完成,例如:获取 networks 资源信息。
Plug-ins 与 Agents
Neutron Plugins 是 Neutron Server 的一部分,但在这里拧出来讲是因为 Plugins 和 Agents 具有紧密的联系。Neutron Plugins 作为 Neutron 内部调用的 “中转” 层,上下承接 Neutron API 层和 Neutron Agents 层,中间的桥梁自然就是 RPC 通信协议以及 MQ 了。
OpenStack Networking plug-ins and agents — Plug and unplug ports, create networks or subnets, and provide IP addressing. These plug-ins and agents differ depending on the vendor and technologies used in the particular cloud. OpenStack Networking ships with plug-ins and agents for Cisco virtual and physical switches, NEC OpenFlow products, Open vSwitch, Linux bridging, and the VMware NSX product.
The common agents are L3 (layer 3), DHCP (dynamic host IP addressing), and a plug-in agent.
Messaging queue — Used by most OpenStack Networking installations to route information between the neutron-server and various agents. Also acts as a database to store networking state for particular plug-ins.
Plugin RPC
Plugin 层对 RPC 协议进行了封装,Plugin 既充当了 RPC Producer 又充当着 RPC Consumer 的角色。
- RPC Producer:Plugin 向 Agent 发送消息
- RPC Consumer:Plugin 接收 Agent 发送的消息
首先,Plugin 要想成为 Consumer,就需要向 RPC Server 申请,这个申请的过程,我称之为 Registered Endpoints。通过 Registered Endpoints,Plugin 就注册好了与对应 Agent 通信的 endpoint(调用接口)。
Registered Endpoints 的代码逻辑:
# /opt/stack/neutron/neutron/plugins/ml2/plugin.py
# 以 Core Plugin 的 RPC Listeners 启动方法为例
class Ml2Plugin(...):
...
@log_helpers.log_method_call
def start_rpc_listeners(self):
"""Start the RPC loop to let the plugin communicate with agents."""
# 设置 ML2Plugin 与 Agents 通信的 endpoints
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.Connection()
# 将 endpoints 注册到 RPC Consumer 并创建 RPC Consumer 实例对象
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,
[resources_rpc.ResourcesPushToServerRpcCallback()],
fanout=True)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
# 以线程的方式启动 endpoint 中的 RPC servers 实例对象
return self.conn.consume_in_threads()
def start_rpc_state_reports_listener(self):
self.conn_reports = n_rpc.Connection()
self.conn_reports.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
return self.conn_reports.consume_in_threads()
def _setup_rpc(self):
"""Initialize components to support agent communication."""
# Agents endpoints 清单
self.endpoints = [
rpc.RpcCallbacks(self.notifier, self.type_manager),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback(),
metadata_rpc.MetadataRpcCallback(),
resources_rpc.ResourcesPullRpcCallback()
]
其中最重要的 start_rpc_listeners 和 start_rpc_state_reports_listener 函数分别在上提到的 RpcWorker 和 RpcReportsWorker Workers 类被调用,从而实现 RPC Workers 的加载和运行。
打印 self.endpoints:
(Pdb) pp self.endpoints
[<neutron.plugins.ml2.rpc.RpcCallbacks object at 0x7f17fcd9f350>,
<neutron.api.rpc.handlers.securitygroups_rpc.SecurityGroupServerRpcCallback object at 0x7f17fcd9f390>,
<neutron.api.rpc.handlers.dvr_rpc.DVRServerRpcCallback object at 0x7f17fcd9f3d0>,
<neutron.api.rpc.handlers.dhcp_rpc.DhcpRpcCallback object at 0x7f17fcd9f410>,
<neutron.db.agents_db.AgentExtRpcCallback object at 0x7f17fcd9f450>,
<neutron.api.rpc.handlers.metadata_rpc.MetadataRpcCallback object at 0x7f17fcd9f5d0>,
<neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcCallback object at 0x7f17fcd9f610>]
Create Port 业务流程中调用 PRC 函数的示例:
/opt/stack/neutron/neutron/plugins/ml2/plugin.py
class Ml2Plugin(...):
...
def create_port(self, context, port):
...
return self._after_create_port(context, result, mech_context)
def _after_create_port(self, context, result, mech_context):
...
try:
bound_context = self._bind_port_if_needed(mech_context)
except ml2_exc.MechanismDriverError:
...
return bound_context.current
@db_api.retry_db_errors
def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False, allow_commit=True):
...
if not try_again:
if allow_notify and need_notify:
self._notify_port_updated(context)
return context
...
return context
def _notify_port_updated(self, mech_context):
port = mech_context.current
segment = mech_context.bottom_bound_segment
if not segment:
# REVISIT(rkukura): This should notify agent to unplug port
network = mech_context.network.current
LOG.debug("In _notify_port_updated(), no bound segment for "
"port %(port_id)s on network %(network_id)s",
{'port_id': port['id'], 'network_id': network['id']})
return
self.notifier.port_update(mech_context._plugin_context, port,
segment[api.NETWORK_TYPE],
segment[api.SEGMENTATION_ID],
segment[api.PHYSICAL_NETWORK])
# /opt/stack/neutron/neutron/plugins/ml2/rpc.py
class AgentNotifierApi(...):
...
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
# 构建 RPC Client
cctxt = self.client.prepare(topic=self.topic_port_update,
fanout=True)
# 发送 RPC 消息
cctxt.cast(context, 'port_update', port=port,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)
最终 ML2Plugin 发出的 RPC 消息被订阅该 Target 的 Agent(RPC 消费者)接收到,并执行最后的任务。
(Pdb) self.client.target
<Target topic=q-agent-notifier, version=1.0>
比如被 OvS Agent 接收到该消息:
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
Plugins 的 Callback System
除了上述举例直接调用 RPC 函数(call、cast)的方式之外,Plugins 还是实现了一套 Callback System 机制,官方文档《Neutron Messaging Callback System》、《Neutron Callback System》。
Callback System 与 RPC 一样是为了实现通信,不同之处在于:RPC 是为了实现 neutron-server 与 agent 不同服务进程之间的任务消息传递;Callback System 是为了实现 core and service components 之间的、同一进程内部的通信,传递 Resource 的 Lifecycle Events(e.g. before creation, before deletion, etc.),让不同的 Core 和 Services 之间、不同的 Services 之间可以感知到特定 Resource 的状态变化。比如当 Neutron Network Resource 与多个 Service(VPN, Firewall and Load Balancer)关联,那么 Service 在对 Network 进行操作时就需要确定 Network 当前的正确状态。
举例说明 Callback System 的作用:Service A, B, and C 都需要知道 router creation event。如果没有一个中介来采用消息的方式通知这些 Services,那么,A 在执行 router creation 的时候就需要直接 call B/C,告知他们:“我要创建路由器”。但如果有了中介 X(Callback System),那么执行的流程就会变成:
- B 和 C 向 X 订阅了 A create router 的 event
- 当 A 完成了 created router
- A calls X(A 具有 X 的调用句柄)
- X 就会将 A created router 的 event 通知 B 和 C(X -> notify)
整个过程中间 A、B、C 三者没有直接通信,实现了 A、B、C(Services)之间的解耦。这就是所谓的 Callback(回调)。
Callback System 在 Neutron 中被大量应用,代码实现均类似下述举例:
# /opt/stack/neutron/neutron/plugins/ml2/plugin.py
class Ml2Plugin(...):
...
def create_network(self, context, network):
self._before_create_network(context, network)
...
def _before_create_network(self, context, network):
net_data = network[net_def.RESOURCE_NAME]
# 通知一个资源类型为 Network,Event 为 Before Create 的消息给订阅了该类型 Event 的 Services
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
在 Callback System 的视线中有两类角色:一类是事件的处理角色,另一类是事件发布角色。事件处理角色负责订阅一个事件 registry.subscribe API
,事件发布角色则负责通知一个事件 registry.notifyAPI
。具体的代码实现和模块使用,在 Neutron 的官方文档中有很多示例这里不再赘述。
Agents
从 Neutron 部署架构可以看出 Neutron 具有大量的 Networking Agents 服务进程,这个 Agents 被分散部署到各类节点之上运行,配置的对象是部署在这些节点之上的物理/虚拟网元(e.g. DHCP、Linux Bridge、Open vSwitch、Router),Agent 为 Neutron 提供各类网元功能的管理和执行服务。通过 Agents “搭配” 组合的不同,用户得以灵活构建预期的网络拓扑。
[root@localhost ~]# openstack network agent list
+--------------------------------------+--------------------+-----------------------+-------------------+-------+-------+---------------------------+
| ID | Agent Type | Host | Availability Zone | Alive | State | Binary |
+--------------------------------------+--------------------+-----------------------+-------------------+-------+-------+---------------------------+
| 2698f558-6b20-407c-acf5-950e707432ed | Metadata agent | localhost.localdomain | None | :-) | UP | neutron-metadata-agent |
| 7804fb5a-fe22-4f02-8e4c-5689744bb0aa | Open vSwitch agent | localhost.localdomain | None | :-) | UP | neutron-openvswitch-agent |
| a7b30a22-0a8a-4d31-bf20-9d96dbe420bc | DHCP agent | localhost.localdomain | nova | :-) | UP | neutron-dhcp-agent |
| eb1da27b-3fa2-4304-965a-f6b15c475419 | L3 agent | localhost.localdomain | nova | :-) | UP | neutron-l3-agent |
+--------------------------------------+--------------------+-----------------------+-------------------+-------+-------+---------------------------+
Agent 的抽象架构可分为三层:
- 北向提供 RPC 接口,供 Neutron Server 调用
- 南向通过 CLI 协议栈对相应的 Neutron VNF(虚拟网络功能,虚拟网元)进行配置
- 中间进行两种模型的转换:从 RPC 模型转换为 CLI 模型
例如:当 Neutron 为虚拟机创建并绑定一个 Port 时,Linux Bridge Agent 和 OvS Agent 就会执行下列指令,以此来支撑 Neutron 在计算节点上的网络实现模型。
# Port UUID: 15c7b577-89f5-46f6-8111-5f4e0c8ebaa1
# VM UUID: 80996760-0c30-4e2a-847a-b9d882182df
brctl addbr qbr15c7b577-89
brctl setfd qbr15c7b577-89 0
brctl stp qbr15c7b577-89 off
brctl setageing qbr15c7b577-89 0
ip link add qvb15c7b577-89 type veth peer name qvo15c7b577-89
ip link set qvb15c7b577-89 up
ip link set qvb15c7b577-89 promisc on
ip link set qvb15c7b577-89 mtu 1450
ip link set qvo15c7b577-89 up
ip link set qvo15c7b577-89 promisc on
ip link set qvo15c7b577-89 mtu 1450
ip link set qbr15c7b577-89 up
brctl addif qbr15c7b577-89 qvb15c7b577-89
ovs-vsctl -- --may-exist add-br br-int -- set Bridge br-int datapath_type=system
ovs-vsctl --timeout=120 -- --if-exists del-port qvo15c7b577-89 -- add-port br-int qvo15c7b577-89 -- set Interface qvo15c7b577-89 external-ids:iface-id=15c7b577-89f5-46f6-8111-5f4e0c8ebaa1 external-ids:iface-status=active external-ids:attached-mac=fa:16:3e:d0:f6:a4 external-ids:vm-uuid=80996760-0c30-4e2a-847a-b9d882182df
ip link set qvo15c7b577-89 mtu 1450
Neutron Agent 的程序入口依旧定义在 setup.cfg 文件中,我们这里主要以 OvS Agent 为例关注 neutron-openvswitch-agent.service 服务进程的启动流程。
启动 OvS Agent
# /opt/stack/neutron/setup.cfg
neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main
找到服务进程的程序入口函数:
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/main.py
_main_modules = {
'ovs-ofctl': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'ovs_ofctl.main',
'native': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'native.main',
}
def main():
common_config.init(sys.argv[1:])
driver_name = cfg.CONF.OVS.of_interface
mod_name = _main_modules[driver_name]
mod = importutils.import_module(mod_name)
mod.init_config()
common_config.setup_logging()
profiler.setup("neutron-ovs-agent", cfg.CONF.host)
mod.main()
这里可以看见 OvS Agent 有两种不同的启动模式 ovs-ofctl 和 native,通过配置项 of_interface
来指定。
# openvswitch_agent.ini
of_interface -- OpenFlow interface to use.
Type: string
Default: native
Valid Values: ovs-ofctl, native
通常为 ovs-ofctl,表示使用 Open vSwitch 的 ovs-ofctl 指令来操作流表。
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/main.py
def main():
# 三种不同的 OvS Bridge 类型定义,分别对应 br-int、br-ethX、br-tun
bridge_classes = {
'br_int': br_int.OVSIntegrationBridge,
'br_phys': br_phys.OVSPhysicalBridge,
'br_tun': br_tun.OVSTunnelBridge,
}
ovs_neutron_agent.main(bridge_classes)
# /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
def main(bridge_classes):
...
try:
# 实例化服务进程 app 对象
agent = OVSNeutronAgent(bridge_classes, ext_mgr, cfg.CONF)
capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent)
except (RuntimeError, ValueError) as e:
LOG.error("%s Agent terminated!", e)
sys.exit(1)
# 启动 Agent 守护进程
agent.daemon_loop()
class OVSNeutronAgent(...):
...
def __init__(self, bridge_classes, ext_manager, conf=None):
...
# 创建 RPC Consumer
self.setup_rpc()
...
def setup_rpc(self):
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
# allow us to receive port_update/delete callbacks from the cache
self.plugin_rpc.register_legacy_notification_callbacks(self)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerAPIShim(
self.plugin_rpc.remote_resource_cache)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
# RPC network init
self.context = context.get_admin_context_without_session()
# Made a simple RPC call to Neutron Server.
while True:
try:
self.state_rpc.has_alive_neutron_server(self.context)
except oslo_messaging.MessagingTimeout as e:
LOG.warning('l2-agent cannot contact neutron server. '
'Check connectivity to neutron server. '
'Retrying... '
'Detailed message: %(msg)s.', {'msg': e})
continue
break
# 定义监听消费者函数类型
# Define the listening consumers for the agent
consumers = [[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers([self],
topics.AGENT,
consumers,
start_listening=False)
创建完 RPC Consumer 之后,OvS Agent 定义的 RPC 消费者函数就能够到 MQ “消费” 从 Neutron Server 发送过来的消息了。e.g.
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
self.updated_ports.add(port['id'])
def port_delete(self, context, **kwargs):
port_id = kwargs.get('port_id')
self.deleted_ports.add(port_id)
self.updated_ports.discard(port_id)
def network_update(self, context, **kwargs):
network_id = kwargs['network']['id']
for port_id in self.network_ports[network_id]:
# notifications could arrive out of order, if the port is deleted
# we don't want to update it anymore
if port_id not in self.deleted_ports:
self.updated_ports.add(port_id)
LOG.debug("network_update message processed for network "
"%(network_id)s, with ports: %(ports)s",
{'network_id': network_id,
'ports': self.network_ports[network_id]})
...
NOTE:OvS Agent 只会监听 UPDATE 和 DELETE 的 RPC 消息,并没有监听 CREATE,这是因为 Neutron Port 的创建并不由 OvS Agent 完成而是由 L3 Agent、DHCP Agent 完成的。