每一个worker代表一个nodejs主进程,通过nodejs的spawn 方法创建一个c++子进程,主进程与子进程之间通过unix socket进行通信,nodejs代码中的channel就是对unix socket的一个封装。
通过channel的request方法发送 “worker.createRouter”指令,并附加参数routerId,到c++子进程,查看worker.ts代码的createRouter方法:
const internal = { routerId: uuidv4() };
await this.#channel.request('worker.createRouter', internal);
channel的request方法中把指令包装为{ id, method, internal, data } 的JSON格式的数据,
id:请求id.每次请求递增
method: 即为“worker.createRouter”指令
internal: 此处为 {routerId}
data: 此处为null
然后通过producerSocket发送给c++层面,看如下代码:
/**
* @private
*/
async request(method: string, internal?: object, data?: any): Promise<any>
{
this.#nextId < 4294967295 ? ++this.#nextId : (this.#nextId = 1);
const id = this.#nextId;
logger.debug('request() [method:%s, id:%s]', method, id);
if (this.#closed)
throw new InvalidStateError('Channel closed');
const request = { id, method, internal, data };
const payload = JSON.stringify(request);
if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('Channel request too big');
// This may throw if closed or remote side ended.
this.#producerSocket.write(
Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this.#producerSocket.write(payload);
........后面省略
}
在c++层面ChannelRequest.cpp代码中,定义了指令到方法id之间的映射关系std::unordered_map:
std::unordered_map<std::string, ChannelRequest::MethodId> ChannelRequest::string2MethodId =
{
{ "worker.close", ChannelRequest::MethodId::WORKER_CLOSE },
..................
{ "worker.createRouter", ChannelRequest::MethodId::WORKER_CREATE_ROUTER
......................
}
从中可以看出createRouter对应的方法MethodId为 ChannelRequest::MethodId::WORKER_CREATE_ROUTER 。我们再看一下MethodId的定义如下,可以看出WORKER_CREATE_ROUTER的值为 5 :
class ChannelRequest
{
public:
enum class MethodId
{
WORKER_CLOSE = 1,
WORKER_DUMP,
WORKER_GET_RESOURCE_USAGE,
WORKER_UPDATE_SETTINGS,
WORKER_CREATE_ROUTER,
................
}
};
C++层面在void ConsumerSocket::UserOnUnixStreamRead()中收到消息时,在子类方法ChannelSocket::OnConsumerSocketMessage中对JOSN格式的消息进行解析,并创建ChannelRequest::ChannelRequest类实例,实例中就包含了解析后的{ id, method, internal, data }数据,看下ChannelRequest的成员变量:
public:
// Passed by argument.
Channel::ChannelSocket* channel{ nullptr };
uint32_t id{ 0u };
std::string method;
MethodId methodId;
json internal;
json data;
// Others.
bool replied{ false };
最后通过c++层面的 Worker::OnChannelRequest对消息进行处理:
inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request)
{
switch (request->methodId)
{
....................
case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER:
{
std::string routerId;
try
{
SetNewRouterIdFromInternal(request->internal, routerId);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.buffer, request->method.c_str());
}
auto* router = new RTC::Router(routerId);
this->mapRouters[routerId] = router;
MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());
request->Accept();
break;
}
...........................
}
// Any other request must be delivered to the corresponding Router.
default:
{
try
{
RTC::Router* router = GetRouterFromInternal(request->internal);
router->HandleRequest(request);
}
catch (const MediaSoupTypeError& error)
{
MS_THROW_TYPE_ERROR("%s [method:%s]", error.buffer, request->method.c_str());
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.buffer, request->method.c_str());
}
break;
}
}
其中我们看到了 auto* router = new RTC::Router(routerId); 即是创建了一个router.