2021SC@SDUSC
本周主要研究server的启动。
在构造了一个server后,start:
int WFServerBase::start(int family, const char *host, unsigned short port,
const char *cert_file, const char *key_file)
{
struct addrinfo hints = {
.ai_flags = AI_PASSIVE, // key
.ai_family = family,
.ai_socktype = SOCK_STREAM,
};
struct addrinfo *addrinfo;
char port_str[PORT_STR_MAX + 1];
int ret;
...
snprintf(port_str, PORT_STR_MAX + 1, "%d", port);
getaddrinfo(host, port_str, &hints, &addrinfo);
start(addrinfo->ai_addr, (socklen_t)addrinfo->ai_addrlen,
cert_file, key_file);
freeaddrinfo(addrinfo);
...
}
此时start的是:
int ::start(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file)
{
...
this->init(bind_addr, addrlen, cert_file, key_file);
this->scheduler->bind(this);
...
}
这里init了WFServerBase,然后bend,进入server的基本流程。
bind部分代码:
int Communicator::bind(CommService *service)
{
struct poller_data data;
sockfd = this->nonblock_listen(service);
service->listen_fd = sockfd;
service->ref = 1;
data.operation = PD_OP_LISTEN;
data.fd = sockfd;
data.accept = Communicator::accept;
data.context = service;
data.result = NULL;
mpoller_add(&data, service->listen_timeout, this->mpoller);
}
从代码中可以看出,bind和listen操作打包到一起了,相关代码如下:
int Communicator::nonblock_listen(CommService *service)
{
int sockfd = service->create_listen_fd();
__set_fd_nonblock(sockfd)
__bind_and_listen(sockfd, service->bind_addr,
service->addrlen);
}
static int __bind_and_listen(int sockfd, const struct sockaddr *addr,
socklen_t addrlen)
{
...
bind(sockfd, addr, addrlen);
...
return listen(sockfd, SOMAXCONN);
}
然后将listen操作加入epoll监听
static void *__poller_thread_routine(void *arg)
{
...
case PD_OP_LISTEN:
__poller_handle_listen(node, poller);
break;
...
}
epoll检测到listen时
static void __poller_handle_listen(struct __poller_node *node,
poller_t *poller)
{
...
while (1)
{
...
// 1. 这里调用了accept建立连接
sockfd = accept(node->data.fd, (struct sockaddr *)&ss, &len);
// data.accept = Communicator::accept;
// 2. 调用Communicator::accept,初始化
p = node->data.accept((const struct sockaddr *)&ss, len,
sockfd, node->data.context);
res->data = node->data;
res->data.result = p;
res->error = 0;
res->state = PR_ST_SUCCESS;
// .callback = Communicator::callback,
/*
void Communicator::callback(struct poller_result *res, void *context)
{
Communicator *comm = (Communicator *)context;
msgqueue_put(res, comm->queue);
}
*/
// 放回结果到msgqueue中
poller->cb((struct poller_result *)res, poller->ctx);
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
if (!res)
break;
}
if (__poller_remove_node(node, poller))
return;
...
}
epoll检测到listen 时,将epoll中listen事件带的callback执行,然后将结果写入msgqueue中。然后到了消费者流程
void Communicator::handler_thread_routine(void *context)
{
case PD_OP_LISTEN:
comm->handle_listen_result(res);
break;
}
void Communicator::handle_listen_result(struct poller_result *res)
{
CommService *service = (CommService *)res->data.context;
...
case PR_ST_SUCCESS:
target = (CommServiceTarget *)res->data.result;
entry = this->accept_conn(target, service);
res->data.operation = PD_OP_READ;
res->data.message = NULL;
timeout = target->response_timeout;
...
if (res->data.operation != PD_OP_LISTEN)
{
res->data.fd = entry->sockfd;
res->data.ssl = entry->ssl;
res->data.context = entry;
if (mpoller_add(&res->data, timeout, this->mpoller) >= 0)
{
if (this->stop_flag)
mpoller_del(res->data.fd, this->mpoller);
break;
}
}
...
}
整个流程产生CommConnEntry,然把read事件放进epoll进行监听,因为建立连接,需要等待对方发消息。
以下代码产生CommConnEntry,并将信息保存下来。
struct CommConnEntry *Communicator::accept_conn(CommServiceTarget *target,
CommService *service)
{
__set_fd_nonblock(target->sockfd);
size = offsetof(struct CommConnEntry, mutex);
entry = (struct CommConnEntry *)malloc(size);
entry->conn = service->new_connection(target->sockfd);
entry->seq = 0;
entry->mpoller = this->mpoller;
entry->service = service;
entry->target = target;
entry->ssl = NULL;
entry->sockfd = target->sockfd;
entry->state = CONN_STATE_CONNECTED;
entry->ref = 1;
}