搜狗workflow项目研究(十)http server(2)

2021SC@SDUSC

本周主要研究server的启动。

在构造了一个server后,start:

int WFServerBase::start(int family, const char *host, unsigned short port,
						const char *cert_file, const char *key_file)
{
	struct addrinfo hints = {
		.ai_flags		=	AI_PASSIVE,    // key
		.ai_family		=	family,
		.ai_socktype	=	SOCK_STREAM,
	};
	struct addrinfo *addrinfo;
	char port_str[PORT_STR_MAX + 1];
	int ret;
    ...
	snprintf(port_str, PORT_STR_MAX + 1, "%d", port);
	getaddrinfo(host, port_str, &hints, &addrinfo);
    start(addrinfo->ai_addr, (socklen_t)addrinfo->ai_addrlen,
                cert_file, key_file);
    freeaddrinfo(addrinfo);
    ...
}

此时start的是:

int ::start(const struct sockaddr *bind_addr, socklen_t addrlen,
						const char *cert_file, const char *key_file)
{
    ...
	this->init(bind_addr, addrlen, cert_file, key_file);
    this->scheduler->bind(this);
    ...
}

这里init了WFServerBase,然后bend,进入server的基本流程。

bind部分代码:

int Communicator::bind(CommService *service)
{
	struct poller_data data;

	sockfd = this->nonblock_listen(service);

    service->listen_fd = sockfd;
    service->ref = 1;
    data.operation = PD_OP_LISTEN;
    data.fd = sockfd;
    data.accept = Communicator::accept;
    data.context = service;
    data.result = NULL;
    mpoller_add(&data, service->listen_timeout, this->mpoller);
}

从代码中可以看出,bind和listen操作打包到一起了,相关代码如下:

int Communicator::nonblock_listen(CommService *service)
{
	int sockfd = service->create_listen_fd();

    __set_fd_nonblock(sockfd)

    __bind_and_listen(sockfd, service->bind_addr,
                            service->addrlen);
}
static int __bind_and_listen(int sockfd, const struct sockaddr *addr,
							 socklen_t addrlen)
{
    ...
	bind(sockfd, addr, addrlen);
    ...
	return listen(sockfd, SOMAXCONN);
}

然后将listen操作加入epoll监听

static void *__poller_thread_routine(void *arg)
{
    ...
    case PD_OP_LISTEN:
    __poller_handle_listen(node, poller);
    break;
    ...
}

 epoll检测到listen时

static void __poller_handle_listen(struct __poller_node *node,
								   poller_t *poller)
{
    ...
	while (1)
	{
        ...
        // 1. 这里调用了accept建立连接
		sockfd = accept(node->data.fd, (struct sockaddr *)&ss, &len);

        // data.accept = Communicator::accept;
        // 2. 调用Communicator::accept,初始化
		p = node->data.accept((const struct sockaddr *)&ss, len,
							  sockfd, node->data.context);
		res->data = node->data;
		res->data.result = p;
		res->error = 0;
		res->state = PR_ST_SUCCESS;
        // .callback			=	Communicator::callback,
        /*
            void Communicator::callback(struct poller_result *res, void *context)
            {
                Communicator *comm = (Communicator *)context;
                msgqueue_put(res, comm->queue);
            }
        */
        // 放回结果到msgqueue中
		poller->cb((struct poller_result *)res, poller->ctx);

		res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
		node->res = res;
		if (!res)
			break;
	}

	if (__poller_remove_node(node, poller))
		return;
    ...
}

epoll检测到listen 时,将epoll中listen事件带的callback执行,然后将结果写入msgqueue中。然后到了消费者流程

void Communicator::handler_thread_routine(void *context)
{
    case PD_OP_LISTEN:
        comm->handle_listen_result(res);
        break;
}
void Communicator::handle_listen_result(struct poller_result *res)
{
	CommService *service = (CommService *)res->data.context;

	...
	case PR_ST_SUCCESS:
		target = (CommServiceTarget *)res->data.result;
		entry = this->accept_conn(target, service);

		res->data.operation = PD_OP_READ;
		res->data.message = NULL;
		timeout = target->response_timeout;
		...

		if (res->data.operation != PD_OP_LISTEN)
		{
			res->data.fd = entry->sockfd;
			res->data.ssl = entry->ssl;
			res->data.context = entry;
			if (mpoller_add(&res->data, timeout, this->mpoller) >= 0)
			{
				if (this->stop_flag)
					mpoller_del(res->data.fd, this->mpoller);
				break;
			}
		}
		...
}

 整个流程产生CommConnEntry,然把read事件放进epoll进行监听,因为建立连接,需要等待对方发消息。

以下代码产生CommConnEntry,并将信息保存下来。

struct CommConnEntry *Communicator::accept_conn(CommServiceTarget *target,
												CommService *service)
{
	__set_fd_nonblock(target->sockfd);

	size = offsetof(struct CommConnEntry, mutex);
	entry = (struct CommConnEntry *)malloc(size);

	entry->conn = service->new_connection(target->sockfd);

	entry->seq = 0;
	entry->mpoller = this->mpoller;
	entry->service = service;
	entry->target = target;
	entry->ssl = NULL;
	entry->sockfd = target->sockfd;
	entry->state = CONN_STATE_CONNECTED;
	entry->ref = 1;
}

 

 

 

 

 

 

上一篇:卷一 第四章:基本TCP套接字编程


下一篇:B2. Character Swap (Hard Version)