TCP测试用客户程序
#include "unp.h" #define MAXN 16384 /* max # bytes to request from server */ int
main(int argc, char **argv)
{
int i, j, fd, nchildren, nloops, nbytes;
pid_t pid;
ssize_t n;
char request[MAXLINE], reply[MAXN]; if (argc != )
err_quit("usage: client <hostname or IPaddr> <port> <#children> "
"<#loops/child> <#bytes/request>"); nchildren = atoi(argv[]);
nloops = atoi(argv[]);
nbytes = atoi(argv[]);
snprintf(request, sizeof(request), "%d\n", nbytes); /* newline at end */ for (i = ; i < nchildren; i++) {
if ( (pid = Fork()) == ) { /* child */
for (j = ; j < nloops; j++) {
fd = Tcp_connect(argv[], argv[]); Write(fd, request, strlen(request)); if ( (n = Readn(fd, reply, nbytes)) != nbytes)
err_quit("server returned %d bytes", n); Close(fd); /* TIME_WAIT on client, not server */
}
printf("child %d done\n", i);
exit();
}
/* parent loops around to fork() again */
} while (wait(NULL) > ) /* now parent waits for all children */
;
if (errno != ECHILD)
err_sys("wait error"); exit();
}
每次运行本客户程序时,我们指定了
1.服务器的主机名或IP地址
2.服务器的端口
3.由客户fork的子进程数(以允许客户并发地向同一个服务器发起多个连接)
4.每个子进程发送给服务器的请求数
5.每个请求要求服务器反送的数据字节数
大写字母开头的函数(如Write是对原始write进行过错误处理的包裹函数)
tcp_connect函数以及后面的tcp_listen函数可以查看第十一章的笔记 http://www.cnblogs.com/runnyu/p/4663514.html
TCP迭代服务器程序
第一章我们给出的服务器程序就是迭代服务器。
迭代TCP服务器总是在完全处理某个客户的请求后才转向下一个客户的。
下面给出的迭代TCP服务器只是下一节并发服务器程序的少许修改,部分函数代码在下一节将演示。
/* include serv00 */
#include "unp.h" int
main(int argc, char **argv)
{
int listenfd, connfd;
void sig_int(int), web_child(int);
socklen_t clilen, addrlen;
struct sockaddr *cliaddr; if (argc == )
listenfd = Tcp_listen(NULL, argv[], &addrlen);
else if (argc == )
listenfd = Tcp_listen(argv[], argv[], &addrlen);
else
err_quit("usage: serv00 [ <host> ] <port#>");
cliaddr = Malloc(addrlen); Signal(SIGINT, sig_int); for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen); web_child(connfd); /* process the request */ Close(connfd); /* parent closes connected socket */
}
}
/* end serv00 */
运行结果
在8888端口运行该服务器程序
运行测试用客户程序
终止服务器程序。由于服务器是迭代的,这就让我们测量出服务器处理如此数目客户所需CPU时间的一个基准值,从其他服务器的实测CPU时间中减去该值就能得到他们的进程控制时间
TCP并发服务器程序:每个客户一个子进程
传统上并发服务器调用fork派生一个子进程来处理每个客户。这使得服务器能够同时为多个客户服务,每个进程一个客户。
并发服务器的问题在于为每个客户现场fork一个子进程比较耗费CPU时间。
下面是一个并发服务器程序的例子,绝大多数服务器程序也按照这个范式编写
main函数
/* include serv01 */
#include "unp.h" int
main(int argc, char **argv)
{
int listenfd, connfd;
pid_t childpid;
void sig_chld(int), sig_int(int), web_child(int);
socklen_t clilen, addrlen;
struct sockaddr *cliaddr; if (argc == )
listenfd = Tcp_listen(NULL, argv[], &addrlen);
else if (argc == )
listenfd = Tcp_listen(argv[], argv[], &addrlen);
else
err_quit("usage: serv01 [ <host> ] <port#>");
cliaddr = Malloc(addrlen); Signal(SIGCHLD, sig_chld);
Signal(SIGINT, sig_int); for ( ; ; ) {
clilen = addrlen;
if ( (connfd = accept(listenfd, cliaddr, &clilen)) < ) {
if (errno == EINTR)
continue; /* back to for() */
else
err_sys("accept error");
} if ( (childpid = Fork()) == ) { /* child process */
Close(listenfd); /* close listening socket */
web_child(connfd); /* process request */
exit();
}
Close(connfd); /* parent closes connected socket */
}
}
/* end serv01 */
信号处理函数。在客户运行完毕之后我们键入Ctrl+C进入SIGINT信号处理函数,以显示服务器程序所需的CPU时间
void
sig_int(int signo)
{
void pr_cpu_time(void); pr_cpu_time();
exit();
}
SIGINT信号处理函数调用的pr_cup_time函数
#include "unp.h"
#include <sys/resource.h> #ifndef HAVE_GETRUSAGE_PROTO
int getrusage(int, struct rusage *);
#endif void
pr_cpu_time(void)
{
double user, sys;
struct rusage myusage, childusage; if (getrusage(RUSAGE_SELF, &myusage) < )
err_sys("getrusage error");
if (getrusage(RUSAGE_CHILDREN, &childusage) < )
err_sys("getrusage error"); user = (double) myusage.ru_utime.tv_sec +
myusage.ru_utime.tv_usec/1000000.0;
user += (double) childusage.ru_utime.tv_sec +
childusage.ru_utime.tv_usec/1000000.0;
sys = (double) myusage.ru_stime.tv_sec +
myusage.ru_stime.tv_usec/1000000.0;
sys += (double) childusage.ru_stime.tv_sec +
childusage.ru_stime.tv_usec/1000000.0; printf("\nuser time = %g, sys time = %g\n", user, sys);
}
下面是处理每个客户请求的web_child函数
#include "unp.h" #define MAXN 16384 /* max # bytes client can request */ void
web_child(int sockfd)
{
int ntowrite;
ssize_t nread;
char line[MAXLINE], result[MAXN]; for ( ; ; ) {
if ( (nread = Readline(sockfd, line, MAXLINE)) == )
return; /* connection closed by other end */ /* 4line from client specifies #bytes to write back */
ntowrite = atol(line);
if ((ntowrite <= ) || (ntowrite > MAXN))
err_quit("client request for %d bytes", ntowrite); Writen(sockfd, result, ntowrite);
}
}
TCP预先派生子进程服务器程序:accept无上锁保护
传统意义的并发服务器像上一节那样为每个客户现场派生一个子进程,本节将使用成为预先派生子进程的技术:
在启动阶段预先派生一定数量的子进程,当各个客户连接到达时,这些子进程立即就能为它们服务。
这种技术的优点在于无须引入父进程执行fork的开销就能处理新到的客户。缺点是父进程必须在服务器启动阶段猜测需要预先派生多少子进程。
下面给出我们预先派生子进程服务器程序第一个main函数
/* include serv02 */
#include "unp.h" static int nchildren;
static pid_t *pids; int
main(int argc, char **argv)
{
int listenfd, i;
socklen_t addrlen;
void sig_int(int);
pid_t child_make(int, int, int); if (argc == )
listenfd = Tcp_listen(NULL, argv[], &addrlen);
else if (argc == )
listenfd = Tcp_listen(argv[], argv[], &addrlen);
else
err_quit("usage: serv02 [ <host> ] <port#> <#children>");
nchildren = atoi(argv[argc-]);
pids = Calloc(nchildren, sizeof(pid_t)); for (i = ; i < nchildren; i++)
pids[i] = child_make(i, listenfd, addrlen); /* parent returns */ Signal(SIGINT, sig_int); for ( ; ; )
pause(); /* everything done by children */
}
/* end serv02 */
增设一个命令行参数供用户指定预先派生的子进程个数。分配一个存放各个子进程ID的数组。
下面给出SIGINT信号处理函数。在调用pr_cpu_time之前不需终止所有子进程来获得子进程的资源利用统计
void
sig_int(int signo)
{
int i;
void pr_cpu_time(void); /* 4terminate all children */
for (i = ; i < nchildren; i++)
kill(pids[i], SIGTERM);
while (wait(NULL) > ) /* wait for all children */
;
if (errno != ECHILD)
err_sys("wait error"); pr_cpu_time();
exit();
}
下面给出child_make函数,它由main调用以派生各个子进程
pid_t
child_make(int i, int listenfd, int addrlen)
{
pid_t pid;
void child_main(int, int, int); if ( (pid = Fork()) > )
return(pid); /* parent */ child_main(i, listenfd, addrlen); /* never returns */
}
调用fork派生子进程后只有父进程返回。子进程调用下面给出的child_main函数,它是个无限循环
void
child_main(int i, int listenfd, int addrlen)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr; cliaddr = Malloc(addrlen); printf("child %ld starting\n", (long) getpid());
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen); web_child(connfd); /* process the request */
Close(connfd);
}
}
每个子进程调用accept返回一个已连接套接字,然后调用web_child处理客户请求,最后关闭连接。子进程一直循环,知道被父进程终止。
这个程序有一个性能上的问题:
服务器再启动阶段派生N个子进程,它们各自调用accept并因而均被内核投入睡眠,
当第一个客户连接到达时,所有N歌子进程均被唤醒,然后只有最先运行的子进程获得能够客户连接,
这就是有时候成为惊群(thundering herd)的问题。只是每当一个连接准备好被接收时唤醒太多进程的做法会导致性能受损
TCP预先派生子进程服务器程序:accept使用文件上锁保护
在不同的系统上accept函数的实现会有所不同。
事实上如果我们在基于SVR4的Solaris 2.5内核上运行上一节的服务器程序,那么客户开始连接到该服务器后不久,某个子进程的accept就会返回EPROTO错误
解决办法是让应用程序在调用accept前后安置某种形式的锁(lock),这样任意时刻只有一个子进程阻塞在accept调用中,其他子进程则阻塞在试图获取用于保护accept的锁上
我们有多种方法可用于提供上锁功能。本节我们使用以fcntl函数呈现的POSIX文件上锁功能
main函数唯一改动是在派生子进程的循环之前增加一个对我们的my_lock_init函数的调用
my_lock_init("/tmp/lock.XXXXXX"); /* one lock file for all children */
for (i = ; i < nchildren; i++)
pids[i] = child_make(i, listenfd, addrlen); /* parent returns */
child_main的唯一改动是在调用accept之前获取文件锁,在accept返回之后释放文件锁
for ( ; ; ) {
clilen = addrlen;
my_lock_wait();
connfd = Accept(listenfd, cliaddr, &clilen);
my_lock_release(); web_child(connfd); /* process the request */
Close(connfd);
}
下面给出使用POSIX文件上锁功能的my_lock_init函数
#include "unp.h" static struct flock lock_it, unlock_it;
static int lock_fd = -;
/* fcntl() will fail if my_lock_init() not called */ void
my_lock_init(char *pathname)
{
char lock_file[]; /* 4must copy caller's string, in case it's a constant */
strncpy(lock_file, pathname, sizeof(lock_file));
lock_fd = Mkstemp(lock_file); Unlink(lock_file); /* but lock_fd remains open */ lock_it.l_type = F_WRLCK;
lock_it.l_whence = SEEK_SET;
lock_it.l_start = ;
lock_it.l_len = ; unlock_it.l_type = F_UNLCK;
unlock_it.l_whence = SEEK_SET;
unlock_it.l_start = ;
unlock_it.l_len = ;
}
mkstemp函数在系统中以唯一的文件名创建一个文件并打开。关于fcntl记录锁可以查看apue的笔记 http://www.cnblogs.com/runnyu/p/4645754.html
下面给出用于上锁和解锁文件的两个函数
void
my_lock_wait()
{
int rc; while ( (rc = fcntl(lock_fd, F_SETLKW, &lock_it)) < ) {
if (errno == EINTR)
continue;
else
err_sys("fcntl error for my_lock_wait");
}
} void
my_lock_release()
{
if (fcntl(lock_fd, F_SETLKW, &unlock_it) < )
err_sys("fcntl error for my_lock_release");
}
TCP预先派生子进程服务器程序:accept使用线程上锁保护
上一节使用的POSIX文件上锁方法可移植到所有POSIX兼容系统,不过它涉及文件系统操作,可能比较耗时。
本节我们改用线程上锁保护accept,这种方法不能适用于同一进程内各线程之间的上锁,而且适用于不同进程之间的上锁。
我们需要改动的只是3个上锁好熟。在不同进程之间使用线程上锁要求:
1.互斥锁变量必须存放在由所有进程共享的内存区中
2.必须告知线程函数库这是在不同进程之间共享的互斥锁
关于互斥量可以查看apue的笔记 http://www.cnblogs.com/runnyu/p/4643363.html 关于互斥量属性可以查看 http://www.cnblogs.com/runnyu/p/4643764.html
我们有多种方法可用于不同进程之间共享内存空间。在本节例子中我们使用mmap函数以及/dev/zero设备。
下面是新版本的my_lock_init函数
#include "unpthread.h"
#include <sys/mman.h> static pthread_mutex_t *mptr; /* actual mutex will be in shared memory */ void
my_lock_init(char *pathname)
{
int fd;
pthread_mutexattr_t mattr; fd = Open("/dev/zero", O_RDWR, ); mptr = Mmap(, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, );
Close(fd); Pthread_mutexattr_init(&mattr);
Pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
Pthread_mutex_init(mptr, &mattr);
}
下面是新版本的my_lock_wait和my_lock_relesease函数
void
my_lock_wait()
{
Pthread_mutex_lock(mptr);
} void
my_lock_release()
{
Pthread_mutex_unlock(mptr);
}
TCP预先派生子进程服务器程序:传递描述符
这个版本是只让父进程调用accept,然后把所有接受的已连接套接字“传递”给某个子进程。
TCP并发服务器程序:每个客户一个线程
在本节中我们的服务器程序将为每个客户创建一个线程来取代为每个客户派生一个子进程
main函数
#include "unpthread.h" int
main(int argc, char **argv)
{
int listenfd, connfd;
void sig_int(int);
void *doit(void *);
pthread_t tid;
socklen_t clilen, addrlen;
struct sockaddr *cliaddr; if (argc == )
listenfd = Tcp_listen(NULL, argv[], &addrlen);
else if (argc == )
listenfd = Tcp_listen(argv[], argv[], &addrlen);
else
err_quit("usage: serv06 [ <host> ] <port#>");
cliaddr = Malloc(addrlen); Signal(SIGINT, sig_int); for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen); Pthread_create(&tid, NULL, &doit, (void *) connfd);
}
} void *
doit(void *arg)
{
void web_child(int); Pthread_detach(pthread_self());
web_child((int) arg);
Close((int) arg);
return(NULL);
}
主线程大部分时间阻塞在一个accept调用之中,每当它返回一个客户连接时,就调用pthread_create创建一个新线程。新线程执行的函数是doit。
doit函数先让自己脱离,使得主线程不必等待它,然后调用web_child函数。该函数返回后关闭已连接套接字。
TCP预先创建线程服务器程序:每个线程各自accept
跟预先派生子进程服务器程序一样,我们可以在服务器启动阶段预先创建一个线程池以取代为每个客户现场创建一个线程的做法来提高性能。
本服务器的基本设计是预先创建一个线程池,并让每个线程各自调用accept,使用互斥锁以保证任何时刻只有一个线程调用accept。
我们在pthread07.h中定义了用于维护关于每个线程若干信息的Thread结构,并声明了一些全局变量
typedef struct {
pthread_t thread_tid; /* thread ID */
long thread_count; /* # connections handled */
} Thread;
Thread *tptr; /* array of Thread structures; calloc'ed */ int listenfd, nthreads;
socklen_t addrlen;
pthread_mutex_t mlock;
main函数
/* include serv07 */
#include "unpthread.h"
#include "pthread07.h" pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER; int
main(int argc, char **argv)
{
int i;
void sig_int(int), thread_make(int); if (argc == )
listenfd = Tcp_listen(NULL, argv[], &addrlen);
else if (argc == )
listenfd = Tcp_listen(argv[], argv[], &addrlen);
else
err_quit("usage: serv07 [ <host> ] <port#> <#threads>");
nthreads = atoi(argv[argc-]);
tptr = Calloc(nthreads, sizeof(Thread)); for (i = ; i < nthreads; i++)
thread_make(i); /* only main thread returns */ Signal(SIGINT, sig_int); for ( ; ; )
pause(); /* everything done by threads */
}
/* end serv07 */ void
sig_int(int signo)
{
int i;
void pr_cpu_time(void); pr_cpu_time(); for (i = ; i < nthreads; i++)
printf("thread %d, %ld connections\n", i, tptr[i].thread_count); exit();
}
thread_make和thread_main函数
#include "unpthread.h"
#include "pthread07.h" void
thread_make(int i)
{
void *thread_main(void *); Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
return; /* main thread returns */
} void *
thread_main(void *arg)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr; cliaddr = Malloc(addrlen); printf("thread %d starting\n", (int) arg);
for ( ; ; ) {
clilen = addrlen;
Pthread_mutex_lock(&mlock);
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_mutex_unlock(&mlock);
tptr[(int) arg].thread_count++; web_child(connfd); /* process request */
Close(connfd);
}
}
TCP预先创建线程服务器程序:主线程统一accept
与上一节服务器程序比较,本节的服务器程序只让主线程调用accept并把每个客户连接传递给线程池中某个可用线程。
我们可以使用描述符进行传递,因为所有线程和所有描述符都在同一个进程之内,接收线程只需知道这个已连接套接字描述符的值。
在pthread08.h头文件中定义了Thread结构和声明了一些全局变量
typedef struct {
pthread_t thread_tid; /* thread ID */
long thread_count; /* # connections handled */
} Thread;
Thread *tptr; /* array of Thread structures; calloc'ed */ #define MAXNCLI 32
int clifd[MAXNCLI], iget, iput;
pthread_mutex_t clifd_mutex;
pthread_cond_t clifd_cond;
我们定义了一个clifd数组,由主线程往中存入已接受的已连接套接字描述符,并由线程池中的可用线程从中取出一个以服务响应的客户
iput是主线程将往该数组中存入的下一个元素的下标,iget是线程池中某个线程将从该数组中取出的下一个元素的下标。我们使用互斥锁和条件变量把这些数据保护起来
main函数
/* include serv08 */
#include "unpthread.h"
#include "pthread08.h" static int nthreads;
pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clifd_cond = PTHREAD_COND_INITIALIZER; int
main(int argc, char **argv)
{
int i, listenfd, connfd;
void sig_int(int), thread_make(int);
socklen_t addrlen, clilen;
struct sockaddr *cliaddr; if (argc == )
listenfd = Tcp_listen(NULL, argv[], &addrlen);
else if (argc == )
listenfd = Tcp_listen(argv[], argv[], &addrlen);
else
err_quit("usage: serv08 [ <host> ] <port#> <#threads>");
cliaddr = Malloc(addrlen); nthreads = atoi(argv[argc-]);
tptr = Calloc(nthreads, sizeof(Thread));
iget = iput = ; /* 4create all the threads */
for (i = ; i < nthreads; i++)
thread_make(i); /* only main thread returns */ Signal(SIGINT, sig_int); for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen); Pthread_mutex_lock(&clifd_mutex);
clifd[iput] = connfd;
if (++iput == MAXNCLI)
iput = ;
if (iput == iget)
err_quit("iput = iget = %d", iput);
Pthread_cond_signal(&clifd_cond);
Pthread_mutex_unlock(&clifd_mutex);
}
}
/* end serv08 */ void
sig_int(int signo)
{
int i;
void pr_cpu_time(void); pr_cpu_time(); for (i = ; i < nthreads; i++)
printf("thread %d, %ld connections\n", i, tptr[i].thread_count); exit();
}
下面是thread_make和thread_main函数
#include "unpthread.h"
#include "pthread08.h" void
thread_make(int i)
{
void *thread_main(void *); Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
return; /* main thread returns */
} void *
thread_main(void *arg)
{
int connfd;
void web_child(int); printf("thread %d starting\n", (int) arg);
for ( ; ; ) {
Pthread_mutex_lock(&clifd_mutex);
while (iget == iput)
Pthread_cond_wait(&clifd_cond, &clifd_mutex);
connfd = clifd[iget]; /* connected socket to service */
if (++iget == MAXNCLI)
iget = ;
Pthread_mutex_unlock(&clifd_mutex);
tptr[(int) arg].thread_count++; web_child(connfd); /* process request */
Close(connfd);
}
}