最简单的服务器的socket程序流程如下(面向连接的TCP连接 ):
1. WSAStartup(); 初始化网络库的使用。
2. socket(); 获得一个socket。
3. bind(); 把获得的socket绑定到一个ip 和端口。既然作为服务器, ip通常为本地IP127.0.0.1。
4. listen(); 监听已经绑定了指定端口的socket。
5. accept(); 接受一个来自客户端的连接。
accept()返回一个新的socket,该socket代表着本地服务器与某一个连接过来的客户端的链接。以该socket为参数,可以调用send函数往客户端发送数据,也可以调用recv函数接受客户端发送过来的函数。
最后服务器程序结束的时候调用closesocket()关闭socket, WSACleanup()终止网络库的使用,清理资源。
最简单的客户端的socket程序流程如下(同样是面向连接的TCP连接):
1. WSAStartup();初始化网络库的使用。
2. socket(); 获得一个socket。
3. connect(); 连接到一个 服务器。
连接成功后就可以收发数据了。收发完毕后调用closesocket()关闭socket,最后程序结束前调用 WSACleanup()清理资源。
下面直接上代码
需包含以下头文件和定义
#include <stdlib.h>
#include <stdio.h>
#include <WinSock2.h>
#pragma comment(lib,"ws2_32.lib")
#define SERVE_ADDRESS "127.0.0.1"
#define SERVE_PORT 7001
// ---------------------------- WSAStartup() ----------------------------// WSADATA wsd; int resStartup = WSAStartup(MAKEWORD(2,2),&wsd); if(0 != resStartup) { printf("failed to WSAStartup!\n"); goto Main_End; } //------------------------------------------------------------------------------// // ---------------------------- socket() ----------------------------// SOCKET serverSocket = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET == serverSocket) { printf("failed to invoke socket, the socket returned is invalid!\n"); goto Main_End; } // ------------------------------------------------------------------------------------// //---------------------------- bind() ----------------------------// // 初始化 struct sockaddr 结构体, SOCKADDR_IN就是 struct sockaddr的宏定义 SOCKADDR_IN localAddr; localAddr.sin_family = AF_INET; localAddr.sin_addr.S_un.S_addr = inet_addr(SERVE_ADDRESS); localAddr.sin_port = htons(SERVE_PORT); memset(localAddr.sin_zero,0x0,sizeof(localAddr.sin_zero)); // int resBind = bind(serverSocket,(sockaddr*)&localAddr,sizeof(SOCKADDR_IN)); if(0 != resBind) { printf("failed to bind ! \n"); goto Main_End; } //------------------------------------------------------------------------------------// //---------------------------- listen() ----------------------------// int resListen = listen(serverSocket,5); if(0 != resListen) { printf("failed to listen! \n"); goto Main_End; } printf("the server is listening now!\n"); //------------------------------------------------------------------------------------// //---------------------------- accept() ----------------------------// SOCKADDR_IN clientAddr; int addrLen = sizeof(clientAddr); SOCKET acceptedSocket = accept(serverSocket,(sockaddr*)&clientAddr,&addrLen); if(INVALID_SOCKET == acceptedSocket) { printf("accept error!\n"); goto Main_End; } printf("a client has connected to the server!\n"); //------------------------------------------------------------------------------------// char recvBuffer[256]; char sendBuffer[256]; strcpy(sendBuffer,"server:Welcome to connect !"); int sendBufLen = strlen(sendBuffer); int resSend = send(acceptedSocket,sendBuffer,sendBufLen,0); while(true) { if(resSend != sendBufLen) //发送的长度与需要发送的长度不等 { printf("send data error!!\n"); break; } int recvLen = recv(acceptedSocket,recvBuffer,sizeof(recvBuffer),0); if(0 == recvLen) { printf("a client close the socket!\n"); break; } else if(recvLen < 0) { printf("an error has happen when receiving\n"); break; } recvBuffer[recvLen] = ‘\0‘; printf("client:%s\n",recvBuffer); //在客户发过来的数据前面加上server:再发回给客户端 strcpy(sendBuffer,"server:"); strcat(sendBuffer,recvBuffer); sendBufLen = strlen(sendBuffer); resSend = send(acceptedSocket,sendBuffer,sendBufLen,0); } closesocket(acceptedSocket); closesocket(serverSocket); Main_End: WSACleanup(); system("pause"); return 0;
客户端代码:
//---------------------------- WSAStartup() ----------------------------// WSADATA wsd; int resStartup = WSAStartup(MAKEWORD(2,2),&wsd); if(0 != resStartup) { printf("failed to WSAStartup!\n"); goto Main_End; } //------------------------------------------------------------------------------------// //---------------------------- socket() ----------------------------// SOCKET connSocket = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET == connSocket) { printf("the socket returned is invalid!\n"); goto Main_End; } //------------------------------------------------------------------------------------// //---------------------------- connect() ----------------------------// //初始化struct sockaddr 结构体 SOCKADDR_IN serverAddr; serverAddr.sin_family = AF_INET; serverAddr.sin_addr.S_un.S_addr = inet_addr(SERVE_ADDRESS); serverAddr.sin_port = htons(SERVE_PORT); memset(serverAddr.sin_zero,0x0,sizeof(serverAddr.sin_zero)); //connect int resConn = connect(connSocket,(sockaddr*)&serverAddr,sizeof(serverAddr)); if(0 != resConn) { printf("failed to connect to server!!\n"); goto Main_End; } //------------------------------------------------------------------------------------// char sendBuffer[256]; char recvBuffer[256]; while(true) { int recvLen = recv(connSocket,recvBuffer,256,0); if(recvLen < 0) { printf("receive error!!\n"); break; } else if(0 == recvLen) { printf("the server close the socket!\n"); } recvBuffer[recvLen] = ‘\0‘; printf("the data recv:%s\n\n\n",recvBuffer); printf("please input what you want to send:\n"); gets(sendBuffer); if(0 == strcmp(sendBuffer,"exit")) { break; } int sendDataLen = strlen(sendBuffer); int nDataSent = send(connSocket,sendBuffer,sendDataLen,0); if(nDataSent != sendDataLen) { printf("failed to send data!!\n"); break; } } closesocket(connSocket); printf("the connection is closed!\n"); Main_End: WSACleanup(); system("pause"); return 0;
客户端连接到服务端后,每次给服务端发送一段内容,服务器在内容前面加上server:再发送给客户端。
当客户端发送的内容是exit时,客户端程序跳出循环,关闭socket断开连接。服务端发现客户端断开连接后也关闭套接字结束程序。
当然上面程序只为了演示最简单的网络编程。有若干漏洞。
1. 服务器只能接受一个客户端连接。当然加一个循环语句进去可以重复地接受客户端的连接,但是仍然是每次只处理一个客户端连接。
2.accept, connect,send,recv函数默认均是阻塞函数。当没有客户连接到服务端时,服务端阻塞在accept函数,无法退出程序。当服务器在接受客户端的数据时,如果客户端不发送数据,也不断开连接,那么服务端阻塞在recv函数,无法退出程序。
改进该程序,使得服务端随时都可以停止服务退出程序,无论有多少个用户已经在连接。
为了多个客户端可以同时连接,最容易理解的便是利用多线程。每一个连接的客户端都用一个线程去处理它的通信。
至于为了随时可以退出服务端,不能再调用永久阻塞的函数了。利用select函数,可以阻塞指定的时间,阻塞期间不占CPU。
int select( __in int nfds, __in_out fd_set*readfds, __in_out fd_set*writefds, __in_out fd_set*exceptfds, __in const struct timeval*timeout);
- nfds
-
用于兼容Berkeley sockets.不用理会,随便给个0值就OK。
- readfds
-
用于检查是否存在可读socket的的一个socket集合。可为空。
- writefds
-
用于检查是否存在可写socket的一个socket集合。可为空。
- exceptfds
-
用于检查是否存在有错误的socket的一个 socket集合,可为空。
- timeout
-
TIMEVAL结构体,用于指定该函数阻塞多长时间。
在 调用select时,当readfds不为空时,当readfds中任何一个socket就绪可读时,或者当writefds不为空且writefds中任何一个socket准备就绪可写,或者当exceptfds不为空且任何一个socket发生socket错误时,select就立即返回。否则,直到timeout指定的时间过去以后才返回。
返回值,返回准备就绪的socket的个数。如果为0,说明该函数超时了,如果大于0,说明至少有一个socket就绪。如果小于0,说明发生错误了。
fd_set 是一种集合类型。
typedef struct fd_set {
u_int fd_count; /* how many are SET? */
SOCKET fd_array[FD_SETSIZE]; /* an array of SOCKETs */
} fd_set;
记录着一个socket数组,以及里面的socket个数。
struct timeval是一个表示等待时间的结构体。
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* and microseconds */
};
tv_sec表示有多少秒,tv_usec表示有多少毫秒。
对于fd_set类型,用到几个宏定义函数。
FD_ZERO(fd_set*), 清空fd_set集合
FD_SET(SOCKET,fd_set*),把socket加入fd_set集合。
FD_ISSET(SOCKET,fd_set*),判断socket是否在集合fd_set中,并且socket准备就绪。
FD_CLR(SOCKET,fd_set*),如果fd_set存在该SOCKET,则移除它。
下面是改进后的服务端代码
typedef struct _ThreadInfo { HANDLE hThread; bool bRunning; SOCKET sock; }ThreadInfo; typedef struct _AcceptThreadParam { bool bRunning; SOCKET listeningSocket; }AcceptThreadParam; std::list<ThreadInfo*> g_threadInfoList; CRITICAL_SECTION g_csForList; DWORD WINAPI ListeningThread(LPVOID lpParameter); DWORD WINAPI CommunicationThread(LPVOID lpParameter); int _tmain(int argc, _TCHAR* argv[]) { _CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF); // ---------------------------- WSAStartup() ----------------------------// WSADATA wsd; int resStartup = WSAStartup(MAKEWORD(2,2),&wsd); if(0 != resStartup) { printf("failed to WSAStartup!\n"); return -1; } //------------------------------------------------------------------------------// InitializeCriticalSection(&g_csForList); // ---------------------------- socket() ----------------------------// SOCKET serverSocket = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET == serverSocket) { printf("failed to invoke socket, the socket returned is invalid!\n"); goto Main_End; } // ------------------------------------------------------------------------------------// //---------------------------- bind() ----------------------------// // 初始化 struct sockaddr 结构体, SOCKADDR_IN就是 struct sockaddr的宏定义 SOCKADDR_IN localAddr; localAddr.sin_family = AF_INET; localAddr.sin_addr.S_un.S_addr = inet_addr(SERVE_ADDRESS); localAddr.sin_port = htons(SERVE_PORT); memset(localAddr.sin_zero,0x0,sizeof(localAddr.sin_zero)); // int resBind = bind(serverSocket,(sockaddr*)&localAddr,sizeof(SOCKADDR_IN)); if(0 != resBind) { printf("failed to bind ! \n"); goto Main_End; } //------------------------------------------------------------------------------------// //---------------------------- listen() ----------------------------// int resListen = listen(serverSocket,5); if(0 != resListen) { printf("failed to listen! \n"); goto Main_End; } //------------------------------------------------------------------------------------// AcceptThreadParam threadParam; threadParam.bRunning = true; threadParam.listeningSocket = serverSocket; HANDLE hListeningThread = CreateThread(0,0,ListeningThread,&threadParam,0,0); if(0 == hListeningThread) { printf("failed to create the listening thread!\n"); goto Main_End; } else { printf("the server is listening now!pass any key to close the server!\n"); } while(true) { char ch = getchar(); threadParam.bRunning = false; DWORD resWait = WaitForSingleObject(hListeningThread,3000); if(WAIT_TIMEOUT == resWait) { printf("failed to wait for the listening thread exiting!\n"); } else { printf("the listening thread has exited!\n"); } break; } Main_End: if(INVALID_SOCKET != serverSocket) { closesocket(serverSocket); serverSocket = INVALID_SOCKET; } WSACleanup(); DeleteCriticalSection(&g_csForList); system("pause"); return 0; } DWORD WINAPI ListeningThread(LPVOID lpParameter) { AcceptThreadParam* pAcceptThreadParam = (AcceptThreadParam*)lpParameter; SOCKET serverSocket = pAcceptThreadParam->listeningSocket; while(pAcceptThreadParam->bRunning) { //---------------------------- accept() ----------------------------// fd_set fdAccept; FD_ZERO(&fdAccept); FD_SET(serverSocket,&fdAccept); TIMEVAL acceptTimeVal; acceptTimeVal.tv_sec = 1; acceptTimeVal.tv_usec = 0; int selRes = select(0,&fdAccept,0,0,&acceptTimeVal); if(selRes > 0) { SOCKADDR_IN clientAddr; int addrLen = sizeof(clientAddr); SOCKET acceptedSocket = accept(serverSocket,(sockaddr*)&clientAddr,&addrLen); if(INVALID_SOCKET == acceptedSocket) { printf("accept error!\n"); break; } printf("a client has connected to the server!\n"); ThreadInfo* pTI = new ThreadInfo; pTI->bRunning = true; pTI->sock = acceptedSocket; pTI->hThread = CreateThread(0,0,CommunicationThread,(LPVOID)pTI,0,0); if(0 == pTI->hThread) { printf("failed to create a thread!\n"); delete pTI; pTI = 0; } else { EnterCriticalSection(&g_csForList); g_threadInfoList.push_back(pTI); LeaveCriticalSection(&g_csForList); } } else if(selRes < 0) { printf("an error has occured when listening !\n"); break; } } std::list<ThreadInfo*> tempList; EnterCriticalSection(&g_csForList); std::list<ThreadInfo*>::iterator listIter; for(listIter = g_threadInfoList.begin(); listIter != g_threadInfoList.end(); listIter++) { (*listIter)->bRunning = false; tempList.push_back(*listIter); } g_threadInfoList.clear(); LeaveCriticalSection(&g_csForList); int nSuccessfullyExit = 0; for(listIter = tempList.begin(); listIter != tempList.end(); listIter++) { DWORD resWait = WaitForSingleObject((*listIter)->hThread,2000); if(WAIT_TIMEOUT == resWait) { printf("failed to wait for a communication thread exiting!\n"); } else { nSuccessfullyExit++; } delete (*listIter); } printf("succeed waiting for %d thread exiting!\n",nSuccessfullyExit); tempList.clear(); printf("listening thread is exiting!\n"); return 0; } DWORD WINAPI CommunicationThread(LPVOID lpParameter) { ThreadInfo* pThreadInfo = (ThreadInfo*)lpParameter; SOCKET clientSocket = pThreadInfo->sock; fd_set fdRead,fdWrite; FD_ZERO(&fdRead); FD_ZERO(&fdWrite); FD_SET(clientSocket,&fdRead); FD_SET(clientSocket,&fdWrite); TIMEVAL sendTimeVal; sendTimeVal.tv_sec = 0; sendTimeVal.tv_usec = 500; int selRes = select(0,0,&fdWrite,0,&sendTimeVal); if(selRes <= 0) { goto ThreadOver; } char recvBuffer[256]; char sendBuffer[256]; strcpy(sendBuffer,"server:Welcome to connect !"); int sendBufLen = strlen(sendBuffer); int resSend = send(clientSocket,sendBuffer,sendBufLen,0); if(resSend != sendBufLen) { printf("there are %d bytes to send, but it just succeeded sending %d bytes!\n",sendBufLen,resSend); goto ThreadOver; } while(pThreadInfo->bRunning) { FD_ZERO(&fdRead); FD_SET(pThreadInfo->sock,&fdRead); TIMEVAL recvTimeVal; recvTimeVal.tv_sec = 0; recvTimeVal.tv_usec = 500; int recvSelRes = select(0,&fdRead,0,0,&recvTimeVal); if(recvSelRes < 0) { printf("socket error when receiving!\n"); break; } else if(recvSelRes > 0) { int recvLen = recv(clientSocket,recvBuffer,sizeof(recvBuffer),0); if(0 == recvLen) { printf("a client close the socket!\n"); break; } else if(recvLen < 0) { printf("an error has happen when recving\n"); break; } else { recvBuffer[recvLen] = ‘\0‘; printf("a client:%s\n",recvBuffer); strcpy(sendBuffer,"server:"); strcat(sendBuffer,recvBuffer); sendBufLen = strlen(sendBuffer); FD_ZERO(&fdWrite); FD_SET(pThreadInfo->sock,&fdWrite); sendTimeVal.tv_sec = 0; sendTimeVal.tv_usec = 500; int sendSelRes = select(0,0,&fdWrite,0,&sendTimeVal); if(sendSelRes > 0) { int bytesSent = send(clientSocket,sendBuffer,sendBufLen,0); if(bytesSent != sendBufLen) { printf("there are %d bytes to be sent,but only %d bytes are sent!\n",sendBufLen,bytesSent); break; } } else { printf("failed to send in 500 ms!\n"); break; } } } } ThreadOver: closesocket(pThreadInfo->sock); bool bMainThreadWaiting = true; EnterCriticalSection(&g_csForList); std::list<ThreadInfo*>::iterator listIter; for(listIter = g_threadInfoList.begin(); listIter != g_threadInfoList.end(); listIter++) { if(pThreadInfo == (*listIter)) { bMainThreadWaiting = false; g_threadInfoList.erase(listIter); break; } } LeaveCriticalSection(&g_csForList); if(false == bMainThreadWaiting) { CloseHandle(pThreadInfo->hThread); delete pThreadInfo; pThreadInfo = 0; } return 0; }
前面的代码与之前的一样,改变的地方在于accept的地方。对于一个监听的socket,如果该socket可读,说明有用户连接过来了。
全局维护了一个纪录创建的线程的信息的链表,每创建一个线程都有一个标识该线程是否应该继续循环执行的bool变量。当bRunning变为false的时候,线程函数跳出循环,返回。
当需要停止服务端运行时,服务端只需要按任何一个键和回车,就会通知线程退出,并且调用WaitForSingleObject(),来确认线程已退出。还有利用了 EnterCriticalSection()和LeaveCriticalSection()临界区函数来保证只有一个线程在操作全局的链表。
使用多线程要消耗一定的资源。对于fd_set,默认最多可以容纳64个socket.所以可以用1个线程去处理64个客户端的连接。而不必每个客户端都创建一个线程。
代码如下:
typedef struct _AcceptThreadParam { bool bRunning; SOCKET listeningSocket; }AcceptThreadParam; #define SOCKET_ARRAY_SIZE 64 SOCKET g_socketArray[SOCKET_ARRAY_SIZE]; int g_socketCount = 0; CRITICAL_SECTION g_csForSocketArray; DWORD WINAPI ListeningThread(LPVOID lpParameter); DWORD WINAPI CommunicationThread(LPVOID lpParameter); int _tmain(int argc, _TCHAR* argv[]) { _CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF); // ---------------------------- WSAStartup() ----------------------------// WSADATA wsd; int resStartup = WSAStartup(MAKEWORD(2,2),&wsd); if(0 != resStartup) { printf("failed to WSAStartup!\n"); return -1; } //------------------------------------------------------------------------------// InitializeCriticalSection(&g_csForSocketArray); g_socketCount = 0; // ---------------------------- socket() ----------------------------// SOCKET serverSocket = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET == serverSocket) { printf("failed to invoke socket, the socket returned is invalid!\n"); goto Main_End; } // ------------------------------------------------------------------------------------// //---------------------------- bind() ----------------------------// // 初始化 struct sockaddr 结构体, SOCKADDR_IN就是 struct sockaddr的宏定义 SOCKADDR_IN localAddr; localAddr.sin_family = AF_INET; localAddr.sin_addr.S_un.S_addr = inet_addr(SERVE_ADDRESS); localAddr.sin_port = htons(SERVE_PORT); memset(localAddr.sin_zero,0x0,sizeof(localAddr.sin_zero)); // int resBind = bind(serverSocket,(sockaddr*)&localAddr,sizeof(SOCKADDR_IN)); if(0 != resBind) { printf("failed to bind ! \n"); goto Main_End; } //------------------------------------------------------------------------------------// //---------------------------- listen() ----------------------------// int resListen = listen(serverSocket,5); if(0 != resListen) { printf("failed to listen! \n"); goto Main_End; } //------------------------------------------------------------------------------------// AcceptThreadParam threadParam; threadParam.bRunning = true; threadParam.listeningSocket = serverSocket; bool bCommunicationThreadRunning = true; HANDLE hListeningThread = CreateThread(0,0,ListeningThread,&threadParam,0,0); HANDLE hCommunicationThread = CreateThread(0,0,CommunicationThread,&bCommunicationThreadRunning,0,0); if(0 == hListeningThread || 0 == hCommunicationThread) { printf("failed to create a thread!\n"); if(0 != hListeningThread) { threadParam.bRunning = false; WaitForSingleObject(hListeningThread,2000); CloseHandle(hListeningThread); } if(0 != hCommunicationThread) { bCommunicationThreadRunning = false; WaitForSingleObject(hCommunicationThread,2000); CloseHandle(hCommunicationThread); } goto Main_End; } else { printf("the server is listening now!pass any key to close the server!\n"); } while(true) { char ch = getchar(); threadParam.bRunning = false; bCommunicationThreadRunning = false; DWORD resWait = WaitForSingleObject(hListeningThread,3000); if(WAIT_TIMEOUT == resWait) { printf("failed to wait for the listening thread exiting!\n"); } else { printf("the listening thread has exited!\n"); } CloseHandle(hListeningThread); resWait = WaitForSingleObject(hCommunicationThread,3000); if(WAIT_TIMEOUT == resWait) { printf("failed to wait for the communication thread exiting!\n"); } else { printf("the communication thread has exited!\n"); } CloseHandle(hCommunicationThread); break; } Main_End: if(INVALID_SOCKET != serverSocket) { closesocket(serverSocket); serverSocket = INVALID_SOCKET; } WSACleanup(); DeleteCriticalSection(&g_csForSocketArray); system("pause"); return 0; } DWORD WINAPI ListeningThread(LPVOID lpParameter) { AcceptThreadParam* pAcceptThreadParam = (AcceptThreadParam*)lpParameter; SOCKET serverSocket = pAcceptThreadParam->listeningSocket; while(pAcceptThreadParam->bRunning) { //---------------------------- accept() ----------------------------// fd_set fdAccept; FD_ZERO(&fdAccept); FD_SET(serverSocket,&fdAccept); TIMEVAL acceptTimeVal; acceptTimeVal.tv_sec = 1; acceptTimeVal.tv_usec = 0; int selRes = select(0,&fdAccept,0,0,&acceptTimeVal); if(selRes > 0) { SOCKADDR_IN clientAddr; int addrLen = sizeof(clientAddr); SOCKET acceptedSocket = accept(serverSocket,(sockaddr*)&clientAddr,&addrLen); if(INVALID_SOCKET == acceptedSocket) { printf("accept error!\n"); break; } printf("a client has connected to the server!\n"); fd_set fdWrite; FD_ZERO(&fdWrite); FD_SET(acceptedSocket,&fdWrite); TIMEVAL writeTimeVal; writeTimeVal.tv_sec = 0; writeTimeVal.tv_usec = 500; int writeSelRes = select(0,0,&fdWrite,0,&writeTimeVal); if(writeSelRes > 0) { int sendBufferLen = strlen("server:Welcome to connect!"); int bytesSent = send(acceptedSocket,"server:Welcome to connect!",sendBufferLen,0); if(bytesSent == sendBufferLen) { EnterCriticalSection(&g_csForSocketArray); if(g_socketCount < 64) { g_socketArray[g_socketCount] = acceptedSocket; g_socketCount++; } else { printf("the server has accepted more than 64 clients!\n"); closesocket(acceptedSocket); } LeaveCriticalSection(&g_csForSocketArray); } else { printf("send error, there are %d bytes to be sent, but only %d bytes are sent!\n",sendBufferLen,bytesSent); closesocket(acceptedSocket); } } else { printf("select error of can not wait for sending data when select!\n"); closesocket(acceptedSocket); } } else if(selRes < 0) { printf("an error has occured when listening !\n"); break; } } printf("listening thread is exiting!\n"); return 0; } DWORD WINAPI CommunicationThread(LPVOID lpParameter) { bool* pBRunning = (bool*)lpParameter; char recvBuffer[256]; char tempBuffer[256]; while(true == *pBRunning) { int currentSocketCount = 0; EnterCriticalSection(&g_csForSocketArray); if(0 == g_socketCount) { LeaveCriticalSection(&g_csForSocketArray); Sleep(200); continue; } currentSocketCount = g_socketCount; LeaveCriticalSection(&g_csForSocketArray); fd_set fdRead; FD_ZERO(&fdRead); for(int i = 0; i < currentSocketCount; i++) { FD_SET(g_socketArray[i],&fdRead); } TIMEVAL readTimeVal; readTimeVal.tv_sec = 1; readTimeVal.tv_usec = 0; int selRes = select(0,&fdRead,0,0,&readTimeVal); if(selRes > 0) { for(int i = 0; i < currentSocketCount; i++) { if(FD_ISSET(g_socketArray[i],&fdRead) != 0) { int bytesRecv = recv(g_socketArray[i],recvBuffer,sizeof(recvBuffer),0); if(bytesRecv > 0) { recvBuffer[bytesRecv] = ‘\0‘; printf("the %d client: %s\n",i + 1,recvBuffer); sprintf(tempBuffer,"the server:%s",recvBuffer); fd_set fdWrite; FD_ZERO(&fdWrite); FD_SET(g_socketArray[i],&fdWrite); TIMEVAL writeTimeVal; writeTimeVal.tv_sec = 0; writeTimeVal.tv_usec = 500; int writeSelRes = select(g_socketArray[i],0,&fdWrite,0,&writeTimeVal); if(writeSelRes > 0) { int sendBufLen = strlen(tempBuffer); int bytesSent = send(g_socketArray[i],tempBuffer,sendBufLen,0); if(bytesSent == sendBufLen) { break; } else { printf("there are %d bytes to be sent,but only %d bytes are sent!\n",sendBufLen,bytesSent); } } else { printf("select error!\n"); } } else if(0 == bytesRecv) { printf("the %d client has closed the socket!\n",i + 1); } else { printf("recv error!\n"); } closesocket(g_socketArray[i]); EnterCriticalSection(&g_csForSocketArray); g_socketArray[i] = g_socketArray[g_socketCount - 1]; g_socketCount--; LeaveCriticalSection(&g_csForSocketArray); } } } else if(selRes < 0) { printf("select error in communication thread!\n"); } } EnterCriticalSection(&g_csForSocketArray); for(int i = 0; i < g_socketCount; i++) { closesocket(g_socketArray[i]); } LeaveCriticalSection(&g_csForSocketArray); printf("the communication thread is exiting!\n"); return 0; }
完成的功能一样。只需要一个线程就可以处理多个客户端了。
还可以用异步IO来实现该服务器,以下是用完成端口来实现同样功能的服务器。
typedef struct _RepeatAcceptingThreadParam { SOCKET listeningSocket; bool* pBRunning; }RepeatAcceptingThreadParam; typedef struct _CompletionPortWorkerThreadParam { HANDLE hCompletionPort; bool* pBRunning; }CompletionPortWorkerThreadParam; #define MESSAGE_BUF_SIZE 1024 enum OPERATION_TYPE { OPERATION_SEND, OPERATION_RECV }; typedef struct { SOCKET sock; WSAOVERLAPPED overlap; WSABUF wsaBuf; char message[1024]; DWORD bytesRecv; DWORD flags; OPERATION_TYPE operationType; }PER_IO_OPERATION_DATA; //global vector, which saves the information of the client sockets connected to the server std::vector<PER_IO_OPERATION_DATA*> g_perIoDataPointerVec; //accept sockets connected to the server‘s listening socket in a recycle - while DWORD WINAPI RepeatAcceptingThread(LPVOID lpParameter); //the worker thread that deal with the communications between the server and the clients. DWORD WINAPI CompletionPortWorkerThread(LPVOID lpParameter); int _tmain(int argc,_TCHAR* argv[]) { _CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF); // ---------------------------- WSAStartup() ----------------------------// WSADATA wsd; int resStartup = WSAStartup(MAKEWORD(2,2),&wsd); if(0 != resStartup) { printf("failed to WSAStartup!\n"); return -1; } //------------------------------------------------------------------------------// // ---------------------------- socket() ----------------------------// SOCKET serverSocket = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET == serverSocket) { printf("failed to invoke socket, the socket returned is invalid!\n"); return -1; } // ------------------------------------------------------------------------------------// //---------------------------- bind() ----------------------------// // 初始化 struct sockaddr 结构体, SOCKADDR_IN就是 struct sockaddr的宏定义 SOCKADDR_IN localAddr; localAddr.sin_family = AF_INET; localAddr.sin_addr.S_un.S_addr = inet_addr(SERVE_ADDRESS); localAddr.sin_port = htons(SERVE_PORT); memset(localAddr.sin_zero,0x0,sizeof(localAddr.sin_zero)); // int resBind = bind(serverSocket,(sockaddr*)&localAddr,sizeof(SOCKADDR_IN)); if(0 != resBind) { printf("failed to bind ! \n"); closesocket(serverSocket); return -1; } //------------------------------------------------------------------------------------// //---------------------------- listen() ----------------------------// int resListen = listen(serverSocket,5); if(0 != resListen) { printf("failed to listen! \n"); closesocket(serverSocket); return -1; } //------------------------------------------------------------------------------------// bool bRepeatAcceptingThreadRunning = true; // a bool variable that take control of terminating the RepeatAcceptingThread. //init the parameter for the RepeatAcceptingThread. RepeatAcceptingThreadParam rtp; rtp.listeningSocket = serverSocket; rtp.pBRunning = &bRepeatAcceptingThreadRunning; HANDLE hRepeatAcceptingThread = CreateThread(0,0,RepeatAcceptingThread,&rtp,0,0); if(0 == hRepeatAcceptingThread) { printf("failed to create the repeat-accepting thread!\n"); closesocket(serverSocket); return -1; } printf("the repeat-accepting thread has run!\n"); while(true) { // pass any key char ch = getchar(); bRepeatAcceptingThreadRunning = false;//to notify the RepeatAcceptingThread to exit safely DWORD waitRes = WaitForSingleObject(hRepeatAcceptingThread,3000); if(WAIT_TIMEOUT == waitRes) { printf("failed to wait for the repeatAcceptingThread exiting!\n"); } else { printf("the repeat accepting thread has exited!\n"); } CloseHandle(hRepeatAcceptingThread); break; } system("pause"); return 0; } DWORD WINAPI RepeatAcceptingThread(LPVOID lpParameter) { //get the parameters passed by the creator of the thread. RepeatAcceptingThreadParam* pParam = (RepeatAcceptingThreadParam*)lpParameter; SOCKET listeningSocket = pParam->listeningSocket; bool* pStillRun = pParam->pBRunning; // create a completion port HANDLE hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0); if(0 == hCompletionPort) { printf("failed to CreateIoCompletionPort!\n"); return -1; } // a bool variable for notifying the worker threads of exiting. bool bWorkThreadRunning = true; // a vector of HANDLEs,which will be used for synchronization of waiting the worker threads to exit. std::vector<HANDLE> threadHandlesVec; SYSTEM_INFO systemInfo; GetSystemInfo(&systemInfo); //the parameter to be passed to the worker thread. CompletionPortWorkerThreadParam cpwtp; cpwtp.pBRunning = &bWorkThreadRunning; cpwtp.hCompletionPort = hCompletionPort; for(int i = 0; i < systemInfo.dwNumberOfProcessors; i++) { HANDLE hThread = CreateThread(0,0,CompletionPortWorkerThread,&cpwtp,0,0); if(0 == hThread) { printf("failed to create a completion port worker thread!\n"); bWorkThreadRunning = false; // terminate all threads created safely. std::vector<HANDLE>::iterator vecIter; for(vecIter = threadHandlesVec.begin(); vecIter != threadHandlesVec.end(); vecIter++) { DWORD waitRes = WaitForSingleObject(*vecIter,2000); if(WAIT_TIMEOUT == waitRes) { printf("failed the wait for the completion port worker thread!\n"); } CloseHandle(*vecIter); } threadHandlesVec.clear(); CloseHandle(hCompletionPort); return -1; } else { threadHandlesVec.push_back(hThread); //add the handle to the vector } } printf("succeed creating completion port worker threads!\n"); while(true == *pStillRun) { fd_set fdAccept; FD_ZERO(&fdAccept); FD_SET(listeningSocket,&fdAccept); TIMEVAL acceptTimeVal; acceptTimeVal.tv_sec = 1; acceptTimeVal.tv_usec = 0; int selRes = select(0,&fdAccept,0,0,&acceptTimeVal); if(selRes > 0) // a client connected { SOCKADDR_IN clientAddr; int addrLen = sizeof(clientAddr); SOCKET acceptedSocket = WSAAccept(listeningSocket,(struct sockaddr*)&clientAddr,&addrLen,0,0); if(0 == acceptedSocket) { printf("failed to accept a connection!\n"); } else { printf("a clent %s:%d has connected!\n",inet_ntoa(clientAddr.sin_addr),ntohs(clientAddr.sin_port)); PER_IO_OPERATION_DATA* perIoData = new PER_IO_OPERATION_DATA; if(0 == perIoData) { closesocket(acceptedSocket); printf("failed to new a struct! there is not enough memory!\n\n"); } else { //associate the newly connected client socket with the completion port. if(0 == CreateIoCompletionPort((HANDLE)acceptedSocket,hCompletionPort,(ULONG_PTR)perIoData,0)) { printf("failed to associate the newly connected client socket with the completion port!\n"); closesocket(acceptedSocket); delete perIoData; perIoData = 0; } else { //associated successfully, Set the information of the client socket in A PER_IO_OPERATION_DATA struct. //when a IO operation is completed, we can get notified with the struct to be one of the parameters. perIoData->sock = acceptedSocket; perIoData->operationType = OPERATION_SEND; perIoData->wsaBuf.buf = perIoData->message; perIoData->overlap.hEvent = INVALID_HANDLE_VALUE; strcpy(perIoData->message,"Welcome to connect to the server!"); perIoData->wsaBuf.len = strlen(perIoData->message); int sendRes = WSASend(acceptedSocket,&(perIoData->wsaBuf),1,&(perIoData->bytesRecv),0,0,0); if(0 == sendRes) //finished immediately { // asynchronously invoke a receive operation. When the reception finished,we can get its information by // invoking GetQueuedCompletionStatus() perIoData->wsaBuf.buf = perIoData->message; perIoData->wsaBuf.len = MESSAGE_BUF_SIZE; perIoData->flags = 0; perIoData->operationType = OPERATION_RECV; ZeroMemory(&perIoData->overlap,sizeof(perIoData->overlap)); int recvRes = WSARecv(acceptedSocket,&perIoData->wsaBuf,1,&perIoData->bytesRecv,&perIoData->flags,&perIoData->overlap,0); if(0 == recvRes) //the receiving operation finished immediately , the information of the operation has been queued. { g_perIoDataPointerVec.push_back(perIoData); } else if(SOCKET_ERROR == recvRes && WSA_IO_PENDING == WSAGetLastError()) //the receiving operation will finish later { g_perIoDataPointerVec.push_back(perIoData); } else { printf("failed to WSARecv!\n"); closesocket(acceptedSocket); delete perIoData; perIoData = 0; } } else if(SOCKET_ERROR == sendRes && WSA_IO_PENDING == WSAGetLastError()) //the sending operation will finish later { g_perIoDataPointerVec.push_back(perIoData); } else { //int lastErr = WSAGetLastError(); printf("send data error!\n"); closesocket(acceptedSocket); delete perIoData; perIoData = 0; } } } } } else if(selRes < 0) { printf("select error!\n"); } } bWorkThreadRunning = false; //notifies the worker threads of exiting // terminate all threads created safely. std::vector<HANDLE>::iterator vecIter; for(vecIter = threadHandlesVec.begin(); vecIter != threadHandlesVec.end(); vecIter++) { DWORD waitRes = WaitForSingleObject(*vecIter,2000); if(WAIT_TIMEOUT == waitRes) { printf("failed the wait for the completion port worker thread!\n"); } CloseHandle(*vecIter); } threadHandlesVec.clear(); CloseHandle(hCompletionPort); //delete the structs of PER_IO_OPERATION_DATA newed for clients connected. std::vector<PER_IO_OPERATION_DATA*>::iterator pIoDataPointerIter; for(pIoDataPointerIter = g_perIoDataPointerVec.begin(); pIoDataPointerIter != g_perIoDataPointerVec.end(); pIoDataPointerIter++) { closesocket((*pIoDataPointerIter)->sock); delete (*pIoDataPointerIter); *pIoDataPointerIter = 0; } g_perIoDataPointerVec.clear(); printf(" the repeat accepting thread is exiting!\n"); return 0; } bool ReleaseIOOperationData(PER_IO_OPERATION_DATA* & pDataToBeDeleted) { bool retVal = false; std::vector<PER_IO_OPERATION_DATA*>::iterator vecIter; for(vecIter = g_perIoDataPointerVec.begin(); vecIter != g_perIoDataPointerVec.end(); vecIter++) { if(pDataToBeDeleted == (*vecIter)) { g_perIoDataPointerVec.erase(vecIter); closesocket(pDataToBeDeleted->sock); delete pDataToBeDeleted; pDataToBeDeleted = 0; retVal = true; break; } } return retVal; } DWORD WINAPI CompletionPortWorkerThread(LPVOID lpParameter) { CompletionPortWorkerThreadParam* pParam = (CompletionPortWorkerThreadParam*)lpParameter; bool* pStillRun = pParam->pBRunning; HANDLE hCompletionPort = pParam->hCompletionPort; DWORD dwBytesTransfered; PER_IO_OPERATION_DATA* pIoData; WSAOVERLAPPED* pOverlap; while(true == *pStillRun) { dwBytesTransfered = 0; pIoData = 0; pOverlap = 0; BOOL bGetStatus = GetQueuedCompletionStatus(hCompletionPort,&dwBytesTransfered,(PULONG_PTR)&pIoData,&pOverlap,500); if(FALSE == bGetStatus) { if(0 == pOverlap) //did not get a packet from the queue. { continue; } else { //get a packet for a failed I/O operation. } } if(OPERATION_SEND == pIoData->operationType) { if(0 == dwBytesTransfered) //a packet for a failed I/O operation. { printf("the client %d has close the socket!\n",pIoData->sock); ReleaseIOOperationData(pIoData); } else { // receive operation. pIoData->operationType = OPERATION_RECV; pIoData->wsaBuf.buf = pIoData->message; pIoData->wsaBuf.len = MESSAGE_BUF_SIZE; pIoData->flags = 0; ZeroMemory(&pIoData->overlap,sizeof(pIoData->overlap)); int recvRes = WSARecv(pIoData->sock,&pIoData->wsaBuf,1,&pIoData->bytesRecv,&pIoData->flags,&pIoData->overlap,0); if(0 != recvRes && WSA_IO_PENDING != WSAGetLastError()) { printf("recv error, may be the client %d has close the socket!\n",pIoData->sock); ReleaseIOOperationData(pIoData); } } } else if(OPERATION_RECV == pIoData->operationType) { if(0 == dwBytesTransfered) //a packet for a failed I/O operation. { printf("the client %d has close the socket!\n",pIoData->sock); ReleaseIOOperationData(pIoData); } else { // show the data received pIoData->message[dwBytesTransfered] = ‘\0‘; printf("the client %d:%s \n",pIoData->sock,pIoData->message); //send back the data received add a "server:" in the front char tempBuf[MESSAGE_BUF_SIZE]; sprintf(tempBuf,"server:%s",pIoData->message); strcpy(pIoData->message,tempBuf); pIoData->operationType = OPERATION_SEND; pIoData->wsaBuf.buf = pIoData->message; pIoData->wsaBuf.len = strlen(pIoData->message); int sendRes = WSASend(pIoData->sock,&pIoData->wsaBuf,1,&pIoData->bytesRecv,0,0,0); if(0 == sendRes) { pIoData->operationType = OPERATION_RECV; pIoData->wsaBuf.buf = pIoData->message; pIoData->wsaBuf.len = MESSAGE_BUF_SIZE; pIoData->flags = 0; ZeroMemory(&pIoData->overlap,sizeof(pIoData->overlap)); int recvRes = WSARecv(pIoData->sock,&pIoData->wsaBuf,1,&pIoData->bytesRecv,&pIoData->flags,&pIoData->overlap,0); if(0 != recvRes && WSA_IO_PENDING != WSAGetLastError()) { printf("recv error, may be the client %d has close the socket!\n",pIoData->sock); ReleaseIOOperationData(pIoData); } } else if(SOCKET_ERROR == sendRes && WSA_IO_PENDING == WSAGetLastError()) { } else { printf("send error, maybe the client %d has close the socket!\n",pIoData->sock); ReleaseIOOperationData(pIoData); } } } } printf("a completion port thread is exiting!\n"); return 0; }