ngx_start_worker_processes位于Nginx_process_cycle.c中,主要的工作是创建子进程。
在Nginx中,master进程和worker进程是通过socketpair函数创建一对socket来实现,父进程与子进程之间的通信的。而这对socket被保存在进程结构体ngx_process中的channel[2]数组中,其中channel[0]为父进程的socket,channel[1]为子进程的socket。
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) { ngx_int_t i; ngx_channel_t ch; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes"); //传递给其他进程的命令 ch.command = NGX_CMD_OPEN_CHANNEL; for (i = 0; i < n; i++) { //创建n个子进程 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type); //保存当前worker进程的信息 ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; //ngx_process_slot是进程信息在全局进程数组中存放的下标 ch.fd = ngx_processes[ngx_process_slot].channel[0];//channel[0]为父进程的socket,channel[1]为子进程的socket //向每一个进程的父进程发送本进程的信息 ngx_pass_open_channel(cycle, &ch); }
具体分析一下创建子进程的函数,也就是分析ngs_spawn_process:
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { u_long on; ngx_pid_t pid; ngx_int_t s; //可创建进程的位置 if (respawn >= 0) { s = respawn; //如果类型大于0,表示该进程已经退出,可以重启该进程 } else { for (s = 0; s < ngx_last_process; s++) { //遍历所有进程,找到可用的已退出的进程 if (ngx_processes[s].pid == -1) { break; } } //超过最大进程限制会报错 if (s == NGX_MAX_PROCESSES) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } } if (respawn != NGX_PROCESS_DETACHED) { /* Solaris 9 still has no AF_LOCAL */ //创建socketpair用于进程间通信,master进程为每个worker创建一对socket if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); //设置非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } //设置异步模式 on = 1; if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { //FIOASYNC异步输入/输出标志 ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {//F_SETOWN设置异步I/O的所有者 ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } //若进程执行了exec后,关闭socket if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } ngx_channel = ngx_processes[s].channel[1]; } else { ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; //创建子进程 pid = fork(); switch (pid) { case -1: ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; case 0: ngx_pid = ngx_getpid(); proc(cycle, data); //调用ngx_worker_process_cycle()子进程循环处理事件 break; default: break; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid); ngx_processes[s].pid = pid; ngx_processes[s].exited = 0; //大于0,说明确定重启该进程 if (respawn >= 0) { return pid; } //设置进程信息 ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0; //设置状态信息 switch (respawn) { case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; } if (s == ngx_last_process) { ngx_last_process++; } return pid; }
接下来,看看Nginx是如何在进程间进行通信的,ngx_pass_open_channel函数:
static void ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch) { ngx_int_t i; //遍历所有进程 for (i = 0; i < ngx_last_process; i++) { //跳过本进程和退出/不能通信的进程 if (i == ngx_process_slot || ngx_processes[i].pid == -1 || ngx_processes[i].channel[0] == -1) { continue; } ngx_log_debug6(NGX_LOG_DEBUG_CORE, cycle->log, 0, "pass channel s:%d pid:%P fd:%d to s:%i pid:%P fd:%d", ch->slot, ch->pid, ch->fd, i, ngx_processes[i].pid, ngx_processes[i].channel[0]); /* TODO: NGX_AGAIN */ //把本进程的信息发送给每一个进程的父进程 ngx_write_channel(ngx_processes[i].channel[0], ch, sizeof(ngx_channel_t), cycle->log); } }
ngx_write_channel原型:
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, //ch内储存着本进程的信息,s是父进程的socket值(channel[0]) ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; if (ch->fd == -1) { msg.msg_control = NULL; msg.msg_controllen = 0; } else { msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; /* * We have to use ngx_memcpy() instead of simple * *(int *) CMSG_DATA(&cmsg.cm) = ch->fd; * because some gcc 4.4 with -O2/3/s optimization issues the warning: * dereferencing type-punned pointer will break strict-aliasing rules * * Fortunately, gcc with -O1 compiles this ngx_memcpy() * in the same simple assignment as in the code above */ ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int)); } msg.msg_flags = 0; #else if (ch->fd == -1) { msg.msg_accrights = NULL; msg.msg_accrightslen = 0; } else { msg.msg_accrights = (caddr_t) &ch->fd; msg.msg_accrightslen = sizeof(int); } #endif iov[0].iov_base = (char *) ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; n = sendmsg(s, &msg, 0);//sendmsg函数,在这里用于进程间通信 if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } ngx_log_error(NGX_LOG_ALERT, log, err, "sendmsg() failed"); return NGX_ERROR; } return NGX_OK; }
本文转自cococo点点博客园博客,原文链接:http://www.cnblogs.com/coder2012/p/3188355.html,如需转载请自行联系原作者