Skynet 服务创建流程

根据设计综述 Skynet 是为了让服务器充分利用多核优势,将不同的业务放在独立的执行环境中处理。
Skynet 核心功能是加载一个 C 模块(动态库),模块用数字 id 标识,作为其 handle ,模块被称为服务 service 。服务间可以*发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。
每个服务是被一个个消息驱动,当无消息时,服务处于挂起状态。每个服务拥有一个属于自己的消息队列,框架中存在一个全局队列负责调度处理服务所接收到的消息。

代码层面,Skynet 服务对应于数据结构 struct skynet_context ,其中重要字段如下。

struct skynet_context {
	void * instance;			 // 模块自定义数据
	struct skynet_module * mod;	  // 框架模块数据
	void * cb_ud;				 // 传给回调函数的自定义数据
	skynet_cb cb; 				 // 回调函数
	struct message_queue * queue;  // 消息队列,用于接收发送给服务的消息
};

函数 skynet_context_new 用于创建服务,返回值表示此服务。

// 参数 name: 服务名
// 参数 param: 传递给服务的参数
struct skynet_context * skynet_context_new(const char * name, const char *param);

服务调用函数 skynet_callback 用于向框架注册回调函数,处理接收到的消息。

// 参数 context: 表示服务
// 参数 ud: user data 表示自定义数据
// 参数 cb: 表示回调函数
void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb);

// 参数 context: 表示服务
// 参数 ud: user data 由 skynet_callback 指定
// 参数 type: 消息类型
// 参数 session: 由发送方指定,标识发送的消息
// 参数 source: 表示发送方服务的地址
// 参数 msg sz: 数据
typedef int (*skynet_cb)(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz);

函数 skynet_send 用于向服务发送消息。向一个服务发送消息就是向这个服务的消息队列中添加消息。

// 参数 context: 表示服务
// 参数 source: 表示发送方服务的地址,可为 0
// 参数 destination: 表示接收方服务的地址
// 参数 type: 消息类型
// 参数 session: 用于发送方标识发送的消息,可为 0
// 参数 data sz: 数据
// 返回值 : session
int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz);

C 服务

// 参数 name: 服务名
// 参数 param: 传递给服务的 c-style 字符串,以空白字符分割
struct skynet_context * skynet_context_new(const char * name, const char *param)
{
    struct skynet_module * mod = skynet_module_query(name);
	// 执行 create 传递参数无
    void *inst = skynet_module_instance_create(mod);
    // 执行 init ,参数 param 是传递给服务的参数
    int r = skynet_module_instance_init(mod, inst, ctx, param);
}

函数 skynet_context_new 完成 C 服务的创建,一次函数调用即可完成。函数执行成功,返回的 context 便是创建的服务。创建过程中的初始化包含 create 和 init 流程,在 init 流程中调用 skynet_callback 注册回调函数。此后,服务创建完成,便可接收消息。
Lua 函数 skynet.launch 用于在 Lua 中创建 C 服务。


Lua 服务

Lua 服务本质上也是 C 服务,只是将 Lua 回调函数注册到 C 服务中,将对服务接收消息的处理移动到 Lua 中。完成此功能的 C 服务是 snlua 服务。

因此,创建 Lua 服务需要先创建 snlua 服务,然后在 Lua 中调用 skynet.start 将 Lua 回调函数注册到 C 服务中,skynet.start 函数执行完后,Lua 服务创建完成。

skynet.newservice 用于创建 Lua 服务。参数 name 是服务名,参数 ... 是传给服务的参数,需要是 Lua 中能被转换成字符串的值。

function skynet.newservice(name, ...)
	return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end

skynet.newservice 流程如下。

  1. 当前服务向 launcher 服务发送请求,请求创建服务 name 。
  2. launcher 服务创建新服务。
  3. 新服务创建完成后,skynet.newservice 返回新服务的地址。

具体的创建过程如下所示。
逻辑来到 launcher 服务 command.LAUNCH 函数(主要代码片段)。

local function launch_service(service, ...)
	local param = table.concat({...}, " ")
	local inst = skynet.launch(service, param)
	local response = skynet.response()
	if inst then -- launch 成功
		services[inst] = service .. " " .. param
		instance[inst] = response
	else -- launch 失败
		response(false)
		return
	end
	return inst
end

function command.LAUNCH(_, service, ...)
	launch_service(service, ...)
	return NORET
end

command.LAUNCH 调用 launch_service 传递的参数依次是 ("snlua", name, ...) 与 skynet.newservice 传递的参数对应。 在 launch_service 函数中 service 变量是 "snlua" 而 param 变量是要创建的服务名及其参数。调用 skynet.launch 创建 C 服务 snlua 且将 param 传递给 snlua ,返回的 inst 是创建的地址。 创建成功记录数据到 services 和 instance 变量中,services 存储通过 launcher 服务创建的服务,instance 存储 skynet.response 用于回复请求方创建结果。
创建失败调用 response(false) 回复请求方创建失败。
新服务创建完成后,发送 LAUNCHOK 到 launcher 服务。launcher 服务回复请求方新服务的地址。

逻辑来到 C 服务 snlua ,模块是 service_snlua.c 。
snlua 服务的核心工作就是将消息处理回调函数对接到 Lua 中指定的回调函数。
snlua 服务首先执行 create 函数,调用 lua_newstate 创建 Lua 虚拟机。然后在 init 函数调用 skynet_callback 向框架注册回调函数,并向自身发送第一条消息,用于进行后续初始化。注意 args 参数是要创建的服务信息。

// 参数 args: 字符串,包含由空白字符分割的多个字符串,第一个字符串是要创建的 Lua 服务名,参考 skynet.launch 的 param 参数
int
snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
	int sz = strlen(args);
	char * tmp = skynet_malloc(sz);
	memcpy(tmp, args, sz);
	// 1. 注册回调函数 launch_cb
	skynet_callback(ctx, l , launch_cb);
	// 2. 在第一条消息中进行后续初始化
	const char * self = skynet_command(ctx, "REG", NULL);
	uint32_t handle_id = strtoul(self+1, NULL, 16);
	// it must be first message
	skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
	return 0;
}

struct snlua *
snlua_create(void) {
	struct snlua * l = skynet_malloc(sizeof(*l));
	memset(l,0,sizeof(*l));
	l->L = lua_newstate(lalloc, l); // 创建 Lua 虚拟机
	return l;
}

问题:为何没有在 snlua_init 中直接调用 init_cb ?
乍一看,后续初始化逻辑 init_cb 可在 snlua_init 函数中完成,但这里设计成延迟到第一条消息中执行,好处是简化 snlua_init 函数逻辑,虽然增加了流程,但由于是在第一条消息中处理,整个过程是连续的,从框架整体来看,可认为此过程是“原子”的。对于这种复杂初始化流程,我也很认可这种设计,学习了。

在第一条消息中处理后续初始化。如果初始化失败,snlua 服务退出。

// 参数 msg sz: 要创建的服务信息,参考 snlua_init 中 args 参数
static int
launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
	assert(type == 0 && session == 0);
	struct snlua *l = ud;
	skynet_callback(context, NULL, NULL); // 细节:先清空回调字段
	int err = init_cb(l, context, msg, sz);
	if (err) {
		skynet_command(context, "EXIT", NULL);
	}
	return 0;
}

Lua 服务具体的初始化逻辑如下。

// 参数 args sz: 对应 launch_cb 中 msg sz
static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
	lua_State *L = l->L;
	l->ctx = ctx;
	lua_gc(L, LUA_GCSTOP, 0); // 关闭 GC
	lua_pushboolean(L, 1);  /* signal for libraries to ignore env. vars. */
	lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");
	luaL_openlibs(L); // 打开标准库
	lua_pushlightuserdata(L, ctx); // 设置 struct skynet_context
	lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
	luaL_requiref(L, "skynet.codecache", codecache , 0);
	lua_pop(L,1);

    // 保存一些路径到如下全局变量中
    const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");
	lua_pushstring(L, path);
	lua_setglobal(L, "LUA_PATH");
	const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");
	lua_pushstring(L, cpath);
	lua_setglobal(L, "LUA_CPATH");
	const char *service = optstring(ctx, "luaservice", "./service/?.lua");
	lua_pushstring(L, service);
	lua_setglobal(L, "LUA_SERVICE");
	const char *preload = skynet_command(ctx, "GETENV", "preload");
	lua_pushstring(L, preload);
	lua_setglobal(L, "LUA_PRELOAD");

    // 设置 traceback 函数
	lua_pushcfunction(L, traceback);
	assert(lua_gettop(L) == 1);

    // 加载 Lua 服务入口脚本
	const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
	int r = luaL_loadfile(L,loader);
	if (r != LUA_OK) {
		skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
		report_launcher_error(ctx);
		return 1;
	}
	lua_pushlstring(L, args, sz);
	r = lua_pcall(L,1,0,1);
	if (r != LUA_OK) {
		skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
		report_launcher_error(ctx);
		return 1;
	}
	lua_settop(L,0);
	
	// 处理 skynet.memlimit 设置的内存限制
	if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {
		size_t limit = lua_tointeger(L, -1);
		l->mem_limit = limit;
		skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));
		lua_pushnil(L);
		lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");
	}
	lua_pop(L, 1);

	lua_gc(L, LUA_GCRESTART, 0); // 重启 GC
	return 0;
}

函数 init_cb 的核心作用就是调用 lua_pcall 函数执行 Lua 服务入口脚本,脚本文件由 args sz 指定,具体实现有一些细节。

  • 上述整个加载过程是关闭 Lua GC 的,完成后才重新启动 GC 。猜测是为了加快加载速度。
  • skynet.codecache 用于在 Lua 虚拟机之间共享代码。
  • 调用 lua_pcall 前设置了 traceback 函数。
  • 通过 lualoader 加载 lualoader 脚本,并传递 args sz 参数
  • snlua 服务中调用 skynet_callback(context, NULL, NULL); 删除回调函数后,未再发现注册回调函数的 C 代码,此注册是在 Lua 中完成的。

执行 lua_pcall 函数,逻辑来到框架提供的 lualoader 脚本,位于 ./lualib/loader.lua 。

-- ... 就是 C 函数 init_cb 中的 args sz
local args = {} -- Lua 服务名及参数
for word in string.gmatch(..., "%S+") do
	table.insert(args, word)
end
SERVICE_NAME = args[1] -- 服务名

-- 定位并加载 Lua 服务脚本文件
local main, pattern -- 分别表示加载后的 Lua chunk 和 Lua 服务脚本文件路径
local err = {}
for pat in string.gmatch(LUA_SERVICE, "([^;]+);*") do
	local filename = string.gsub(pat, "?", SERVICE_NAME)
	local f, msg = loadfile(filename)
	if not f then
		table.insert(err, msg)
	else
		pattern = pat
		main = f
		break
	end
end

-- 预处理,可选
if LUA_PRELOAD then
	local f = assert(loadfile(LUA_PRELOAD))
	f(table.unpack(args))
	LUA_PRELOAD = nil
end
-- 执行 Lua 服务入口脚本
main(select(2, table.unpack(args)))

前面提到,需要在脚本调用 skynet.start 向框架注册 Lua 回调函数,完成创建 Lua 服务。lualoader 中调用 main 函数,执行 Lua 服务入口脚本,于是 skynet.start 函数被调用,Lua 服务创建完成。
skynet.start 函数中 c.callback(skynet.dispatch_message) 完成 Lua 回调函数的注册
如下代码,skynet.start 执行完毕后,注册一个 0 秒定时器回调,那时调用 start_func 执行上层业务初始化,并根据初始化结果发送消息到 launcher 服务,告知创建成功与否。

function skynet.start(start_func)
	c.callback(skynet.dispatch_message) -- 注册 Lua 回调函数
    -- 注册定时器回调处理上层业务初始化
	init_thread = skynet.timeout(0, function()
		skynet.init_service(start_func)
		init_thread = nil
	end)
end

function skynet.init_service(start)
    -- 初始化上层业务,并告知 launcher 创建结果
	local ok, err = skynet.pcall(start)
	if not ok then
		skynet.error("init service failed: " .. tostring(err))
		skynet.send(".launcher","lua", "ERROR")
		skynet.exit()
	else
		skynet.send(".launcher","lua", "LAUNCHOK")
	end
end

注意,skynet.start 函数执行完后,在 Skynet 框架层面 Lua 服务已创建完成,可对外提供服务。
而 skynet.init_service 是在业务层面完成初始化,然后才通知 launcher 服务。

问题:为何没有在 skynet.start 中直接调用 skynet.init_service ?
先看调用链 snlua - init_cb() -> lua_pcall -> lualoader - main() -> skynet.start() 。skynet.start 调用堆栈是从 init_cb 函数触发,而 skynet.init_service 会调用上层业务初始化 start_func 函数,start_func 函数可能会很复杂,比如有 RPC ,在 skynet.start 函数中将 skynet.init_service 放到定时器回调中执行,可以将业务层初始化和框架层初始化分离,简化 init_cb 函数触发到脚本中的逻辑。学习了。


理解:在 Skynet 框架层面,创建服务的“原子”性。
对于 C 服务,一次 C 函数 skynet_context_new 调用完成创建,框架内部处理具体过程中的多线程临界区,但从框架层面来看,给这个服务发送消息,要么未查询到服务,要么查询到服务且此时服务是可接收消息的。
对于 Lua 服务,需要一次 C 函数 skynet_context_new 调用和第一条消息完成创建,虽然带有流程,但第一条消息保证了连续性,从框架层面来看,也满足给这个服务发送消息,要么未查询到服务,要么查询到服务且此时服务是可接收消息的。但 Lua 服务中,skynet.init_service 是在定时器回调中调用的,假设在此函数执行前,消息队列中已经存在其它消息,此时在 Lua raw_dispatch_message 函数中,如下代码片段,若 p == nil 则进行如下处理。

local p = proto[prototype]
if p == nil then
    if session ~= 0 then
        c.send(source, skynet.PTYPE_ERROR, session, "")
    else
        unknown_request(session, source, msg, sz, prototype)
    end
    return
end

理解:函数 skynet.register_protocol 和 skynet.dispatch 调用时机。
此函数指定用于业务的消息处理函数。而 skynet.start 的参数 start_func 调用之前,可能已经接收到了消息。
因此,如果某类消息依赖于 start_func 进行初始化,则应该在 start_func 中才指定用于业务的消息处理函数。
如果某消息不依赖于 start_func ,则可和 skynet.start 的调用时机一样,指定消息处理函数。

理解:服务与 worker 线程。
Skynet 在启动 worker 线程之前,就创建了服务,并且可向此服务发送消息,只是 worker 线程开始工作后,才开始调度执行服务接收到的消息。

上一篇:clion调试


下一篇:## 不重复使用github clone,根据现有的skynet文件创建新的skynet项目