iocp-socket 服务(借鉴别人的,根据自己的需要改的)未完待续

#pragma once
#include <WinSock2.h>
#include <MSWSock.h>
#include <Windows.h>
#pragma comment(lib,"ws2_32.lib") #define BUFFER_SIZE (1024*8) // 8KB
#define BUFFER_SIZE_DATA (3*BUFFER_SIZE ) #define NOTIFY_MSG_ACCEPT 0xa1
#define NOTIFY_MSG_CONNECT 0xa2
#define NOTIFY_MSG_DISCONNECT 0xa3
#define NOTIFY_MSG_READ 0xa4
#define NOTIFY_MSG_WRITE 0xa5 struct PER_IO_BUFFER
{
WSAOVERLAPPED ol;
SOCKET sClient ; //the socket of client use by AcceptEx
LPBYTE lpBuffer; // the pointer of buffer
DWORD dwBufferSize; // the size of buffer
DWORD dwTrans ; // the size of io-trans
BYTE opType; // opteion type
#define OP_ACCEPT 6
#define OP_CONNECT 7
#define OP_WRITE 8
#define OP_READ 9
PER_IO_BUFFER* pNext; // next buffer }; struct PER_HANDLE_DATA
{
SOCKET s;
SOCKADDR_IN saddr;
BOOL bConnect;
BYTE readBytes[BUFFER_SIZE_DATA];
DWORD dwBufferOffSet;
HANDLE m_hWriteComplete ;
//DWORD dwBufferSize;
PER_HANDLE_DATA* pNext;
}; typedef void (__stdcall* PNOTIFYPROC)(PER_HANDLE_DATA*,PER_IO_BUFFER* ,DWORD );
class Ciocp
{
private:
HANDLE m_hIocp; // iocp handle
SOCKET m_sListen; // the socket of listen
SOCKET m_sConnect; // the socket of connect
DWORD m_dwProt; // the port of listen
DWORD m_dwMaxConns; // the max count of connectios
DWORD m_dwMaxFreeBuffers; // the max count of freebuffers
DWORD m_dwMaxFreeContexts;// the max count of freecontexts
DWORD m_dwInitOp; // the count of init-op
ULONGLONG m_ulWriteBytes; // the total of write-bytes
ULONGLONG m_ulReadBytes; // the total of read-bytes DWORD m_dwWorkThreadCount ; // the count of worker thread
DWORD m_dwCurWorkThreadCount ;
CRITICAL_SECTION m_csWorkLock ;
CRITICAL_SECTION m_csIoLock ; CRITICAL_SECTION m_csBuffersListLock; // the cs-lock of free-buffers-list
CRITICAL_SECTION m_csContextsListLock; // the cs-lock of free-count
DWORD m_dwBuffersCount; // the count of cur buffers
DWORD m_dwContextsCount; // the count of cur contexts PER_IO_BUFFER* m_pFreeBuffersList; // the list of free-buffers
PER_HANDLE_DATA* m_pFreeContextsList; // the list of free-contexts SOCKADDR_IN m_siRemoteAddr ;
bool m_bStarted ; // the status of socket server LPFN_ACCEPTEX m_lpfnAcceptEx ;
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;
LPFN_CONNECTEX m_lpfnConnectEx ;
PNOTIFYPROC m_pNotifyProc; BOOL PostAccept();
BOOL PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize);
BOOL PostRead(PER_HANDLE_DATA* pContext);
BOOL PostConnect();
static unsigned int WINAPI _WorkerThreadProc(LPVOID lpParam);
PER_IO_BUFFER* AllocBuffer(DWORD dwSize = BUFFER_SIZE);
PER_HANDLE_DATA* AllocContext(SOCKET s);
void ReleaseContext(PER_HANDLE_DATA* pContext);
void ReleaseIoBuffer(PER_IO_BUFFER* pBuffer);
void CreateWokerThreads();
void CreateIocp();
void AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext=0);
void DecCurWorkCount();
void HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans);
void NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg);
void ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
BOOL SetKeepAlive(SOCKET s);
void ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
void ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
public:
Ciocp(void);
~Ciocp(void);
BOOL Start(PNOTIFYPROC pNotifyProc, DWORD dwPort=8080,DWORD dwMaxConns= 2000,DWORD dwMaxFreeBuffers = 100,DWORD dwMaxFreeContexts =100 ,DWORD dwInitOp = 5 );
void Shutdown();
BOOL Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp = "127.0.0.1",DWORD dwPort=443 );
void GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite);
void Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize);
void ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans); };

  

#include "StdAfx.h"
#include "iocp.h"
#include <process.h>
#include <mstcpip.h> Ciocp::Ciocp(void)
{ m_hIocp = INVALID_HANDLE_VALUE ;
m_sListen = INVALID_SOCKET ;
m_sConnect = INVALID_SOCKET ;
m_dwInitOp =5;
m_dwMaxConns = 2000;
m_dwMaxFreeContexts = 100;
m_dwMaxFreeBuffers = 100 ;
m_ulWriteBytes =0;
m_ulReadBytes = 0;
// get max worker count
SYSTEM_INFO sys_info ;
GetSystemInfo(&sys_info);
m_dwWorkThreadCount = sys_info.dwNumberOfProcessors *2 ;
m_dwCurWorkThreadCount = 0;
InitializeCriticalSection(& m_csBuffersListLock);
InitializeCriticalSection(& m_csContextsListLock);
InitializeCriticalSection(& m_csWorkLock);
InitializeCriticalSection((& m_csIoLock));
m_dwBuffersCount = 0;
m_dwContextsCount = 0; m_pFreeBuffersList = NULL ;
m_pFreeContextsList = NULL ; m_lpfnAcceptEx = NULL ;
m_lpfnConnectEx = NULL ;
m_lpfnGetAcceptExSockAddrs = NULL ;
m_bStarted = false ; m_dwProt = 0;
WSADATA wsaData ;
WORD sockVerSion = MAKEWORD(2,2);
WSAStartup(sockVerSion,&wsaData);
} Ciocp::~Ciocp(void)
{
DeleteCriticalSection(& m_csBuffersListLock);
DeleteCriticalSection(& m_csContextsListLock);
DeleteCriticalSection(& m_csWorkLock);
DeleteCriticalSection(& m_csIoLock);
if (m_sListen != INVALID_SOCKET)
{
closesocket(m_sListen);
} if (m_sConnect != INVALID_SOCKET)
{
closesocket(m_sConnect);
}
} BOOL Ciocp::Start(PNOTIFYPROC pNotifyProc, DWORD dwPort/*=8080*/,DWORD dwMaxConns/*= 2000*/,DWORD dwMaxFreeBuffers /*= 100*/,DWORD dwMaxFreeContexts /*=100 */,DWORD dwInitOp /*= 5 */)
{
m_pNotifyProc = pNotifyProc ;
m_dwProt = dwPort;
m_dwMaxConns = dwMaxConns;
m_dwMaxFreeBuffers = dwMaxFreeBuffers;
m_dwMaxFreeContexts = dwMaxFreeContexts;
m_dwInitOp = dwInitOp ; m_sListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
SOCKADDR_IN saddr ;
saddr.sin_family = AF_INET;
saddr.sin_port = ntohs(m_dwProt);
saddr.sin_addr.S_un.S_addr = INADDR_ANY ;
m_bStarted = true ; if (bind(m_sListen,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
{
m_bStarted = false ;
return FALSE ;
} listen(m_sListen,m_dwMaxConns); CreateIocp(); GUID guidAcceptEx = WSAID_ACCEPTEX ;
DWORD dwBytes ;
WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidAcceptEx,sizeof(guidAcceptEx),&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),&dwBytes,NULL,NULL); GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS ; WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidGetAcceptExSockAddrs,sizeof(guidGetAcceptExSockAddrs),&m_lpfnGetAcceptExSockAddrs,sizeof(m_lpfnGetAcceptExSockAddrs),&dwBytes,NULL,NULL); AddSocketToIocp(m_sListen);
//create worker
CreateWokerThreads(); // post accept for(int i=0;i < m_dwInitOp;i++)
{ PostAccept();
} } BOOL Ciocp::PostAccept()
{
// set io type
PER_IO_BUFFER* pBuffer = NULL ;
pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);
pBuffer->opType = OP_ACCEPT ; // post io
DWORD dwBytes ;
DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;
pBuffer->sClient = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
// BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
// pBuffer->lpBuffer,pBuffer->dwBufferSize-dwAddrSize*2
// ,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
pBuffer->lpBuffer,0
,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
if (!b && WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE ;
} return TRUE; } BOOL Ciocp::PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize)
{
PER_IO_BUFFER* pBuffer = AllocBuffer();
pBuffer->opType = OP_WRITE; // post i/o
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = (char*)lpBuffer;
buf.len = dwSize ;
if(::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{ return FALSE;
}
} return TRUE ; } BOOL Ciocp::PostRead(PER_HANDLE_DATA* pContext)
{
PER_IO_BUFFER* pBuffer = AllocBuffer();
pBuffer->opType = OP_READ; // post i/o
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = (char*)pBuffer->lpBuffer;
buf.len = pBuffer->dwBufferSize ;
if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{ return FALSE;
}
} return TRUE ;
} BOOL Ciocp::PostConnect()
{
PER_IO_BUFFER* pBuffer = NULL ; pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE); SOCKADDR_IN saddr ;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(0);
saddr.sin_addr.s_addr = htonl(ADDR_ANY); if (m_sConnect != INVALID_SOCKET)
{
closesocket(m_sConnect);
m_sConnect = INVALID_SOCKET ;
}
m_sConnect = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); if (bind(m_sConnect,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
{
m_bStarted = false ;
return FALSE ;
} PER_HANDLE_DATA *pContext = AllocContext(m_sConnect);
pContext->bConnect = TRUE ;
AddSocketToIocp(m_sConnect,pContext);
//create worker if (m_lpfnConnectEx == NULL )
{
GUID guidConnectex = WSAID_CONNECTEX ;
DWORD dwBytes ;
WSAIoctl(m_sConnect,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidConnectex,sizeof(guidConnectex),&m_lpfnConnectEx,sizeof(m_lpfnConnectEx),&dwBytes,NULL,NULL);
} DWORD dwSend = 0;
pBuffer->opType = OP_CONNECT ;
strcpy((LPSTR)pBuffer->lpBuffer,"hello kid");
pBuffer->dwBufferSize = 5;
bool b = m_lpfnConnectEx(m_sConnect,(SOCKADDR*)&m_siRemoteAddr,sizeof(m_siRemoteAddr),
pBuffer->lpBuffer,pBuffer->dwBufferSize,&dwSend,& pBuffer->ol); if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
} return TRUE;
} PER_IO_BUFFER* Ciocp::AllocBuffer(DWORD dwSize)
{
OutputDebugString(L"Buffer ++ \r\n ");
PER_IO_BUFFER* pBuffer = NULL;
if (dwSize > BUFFER_SIZE)
{
return NULL ;
} EnterCriticalSection(&m_csBuffersListLock);
if (m_pFreeBuffersList == NULL )
{
// 2) HeapAlloc buffer
pBuffer = (PER_IO_BUFFER*) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_IO_BUFFER)+ BUFFER_SIZE );
}else
{
// 1) check pFreeBuffersList
pBuffer = m_pFreeBuffersList ;
m_pFreeBuffersList = m_pFreeBuffersList->pNext ;
pBuffer->pNext = 0;
m_dwBuffersCount -- ;
}
LeaveCriticalSection(&m_csBuffersListLock);
//
if (pBuffer!= NULL )
{
pBuffer->dwBufferSize = dwSize ;
pBuffer->lpBuffer =(LPBYTE) (pBuffer+1 ); }
return pBuffer ; } unsigned int WINAPI Ciocp::_WorkerThreadProc(LPVOID lpParam)
{
Sleep(1000);
Ciocp* pThis = (Ciocp*)lpParam ;
DWORD dwTrans = 0;
DWORD dwKey = 0;
LPOVERLAPPED lpol;
PER_IO_BUFFER* pBuffer = NULL ;
while (pThis->m_bStarted )
{
BOOL bOk = GetQueuedCompletionStatus(pThis->m_hIocp,&dwTrans,&dwKey,&lpol,WSA_INFINITE);
if (dwTrans == -1 )
{
pThis->DecCurWorkCount();
_endthreadex(0);
return 0;
}
pBuffer = CONTAINING_RECORD(lpol,PER_IO_BUFFER,ol);
if (!bOk)
{
if (pBuffer->opType == OP_CONNECT)
{
DWORD dwError = WSAGetLastError();
if (dwError != ERROR_IO_PENDING)
{ pThis->PostConnect();
pThis->ReleaseContext((PER_HANDLE_DATA*)dwKey);
}else
{
OutputDebugString(L"Connect Pending .... ");
}
}// (pBuffer->opType == OP_CONNECT) pThis->ReleaseIoBuffer(pBuffer);
}else //(!bOk)
{
pThis->HandleIoOp((PER_HANDLE_DATA*)dwKey,pBuffer,dwTrans);
}
}
pThis->DecCurWorkCount();
return WSAGetLastError();
} void Ciocp::CreateWokerThreads()
{ for (int i= 0;i< m_dwWorkThreadCount-m_dwCurWorkThreadCount ;i++)
{
unsigned threadid;
_beginthreadex(NULL,0,_WorkerThreadProc,this,0,&threadid);
m_dwCurWorkThreadCount ++ ;
}
} void Ciocp::CreateIocp()
{
if (m_hIocp == INVALID_HANDLE_VALUE)
{
m_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
}
} void Ciocp::AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext/*=0*/)
{
CreateIoCompletionPort((HANDLE)s,m_hIocp,(DWORD)pContext,0);
} BOOL Ciocp::Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp /*= "127.0.0.1"*/,DWORD dwPort/*=443 */)
{
m_pNotifyProc = pNotifyProc ; m_siRemoteAddr.sin_family = AF_INET;
m_siRemoteAddr.sin_port = htons(dwPort);
m_siRemoteAddr.sin_addr.S_un.S_addr = inet_addr(lpstrIp); //listen(m_sListen,m_dwMaxConns); CreateIocp(); m_bStarted = true ;
CreateWokerThreads(); // post accept return PostConnect(); } PER_HANDLE_DATA* Ciocp::AllocContext(SOCKET s)
{
PER_HANDLE_DATA *pContext = NULL ;
OutputDebugString(L"Context ++ \r\n"); EnterCriticalSection(&m_csContextsListLock);
if (m_pFreeContextsList== NULL )
{
pContext = (PER_HANDLE_DATA*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_HANDLE_DATA));
}else
{
pContext = m_pFreeContextsList ;
m_pFreeContextsList = pContext->pNext ;
pContext->pNext = 0;
m_dwContextsCount -- ;
}
LeaveCriticalSection(& m_csContextsListLock); if (pContext != NULL)
{
pContext->s = s ;
pContext->m_hWriteComplete = CreateEvent(NULL,true,TRUE,NULL);
}
return pContext ;
} void Ciocp::DecCurWorkCount()
{
EnterCriticalSection(&m_csWorkLock);
m_dwCurWorkThreadCount -- ;
LeaveCriticalSection(&m_csWorkLock);
} void Ciocp::HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans)
{
EnterCriticalSection(&m_csIoLock); switch(pBuffer->opType)
{
case OP_ACCEPT:
//OutputDebugString(L"Accept \r\n");
ProcessIoAccept(pContext,pBuffer,dwTrans);
break;
case OP_CONNECT: ProcessIoConnect(pContext,pBuffer,dwTrans); break;
case OP_WRITE:
//OutputDebugString(L"Write \r\n");
ProcessIoWrite(pContext,pBuffer,dwTrans);
break;
case OP_READ:
//OutputDebugString(L"Read \r\n");
ProcessIoRead(pContext,pBuffer,dwTrans);
break;
default:
OutputDebugString(L"HandleIoOp Default... \r\n");
break;
}
// release pBuffer
//ReleaseIoBuffer(pBuffer); LeaveCriticalSection(&m_csIoLock);
} void Ciocp::ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
if (dwTrans ==0 ) // read error
{
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT); if (pContext->bConnect) // is connect
{ PostConnect(); }else
{ }
ReleaseContext(pContext); }else
{
// read ok notify main thread
pBuffer->dwTrans = dwTrans ;
if (pContext->dwBufferOffSet+pBuffer->dwTrans > BUFFER_SIZE_DATA )
{
closesocket(pContext->s);
if (pContext->bConnect)
{
PostConnect();
}
return ;
}
memcpy(pContext->readBytes+pContext->dwBufferOffSet,pBuffer->lpBuffer,pBuffer->dwTrans);
pContext->dwBufferOffSet+= pBuffer->dwTrans ; NotifyMsg(pContext,pBuffer,NOTIFY_MSG_READ);
PostRead(pContext); } ReleaseIoBuffer(pBuffer); } void Ciocp::NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg)
{
if (m_pNotifyProc == NULL )
{
OutputDebugString(L"NotifyMsg m_pNotifyProc is NUll \r\n");
}
if(!IsBadCodePtr((FARPROC)m_pNotifyProc))
{
m_pNotifyProc(pContext,pBuffer,dwMsg);
}else
{
OutputDebugString(L" m_pNotifyProc is badcodeptr \r\n");
}
} void Ciocp::ReleaseContext(PER_HANDLE_DATA* pContext)
{
OutputDebugString(L"Context -- \r\n");
EnterCriticalSection(& m_csContextsListLock);
CloseHandle(pContext->m_hWriteComplete); ZeroMemory(pContext,sizeof(PER_HANDLE_DATA));
if (m_dwContextsCount > m_dwMaxFreeContexts)
{
HeapFree(GetProcessHeap(),0,pContext);
LeaveCriticalSection(& m_csContextsListLock);
return ;
}else
{ pContext->pNext = m_pFreeContextsList;
m_pFreeContextsList = pContext ;
m_dwContextsCount ++ ; }
LeaveCriticalSection(& m_csContextsListLock);
} void Ciocp::ReleaseIoBuffer(PER_IO_BUFFER* pBuffer)
{
OutputDebugString(L"Buffer -- \r\n");
EnterCriticalSection(& m_csBuffersListLock);
ZeroMemory(pBuffer,sizeof(PER_IO_BUFFER)+pBuffer->dwBufferSize);
if (m_dwBuffersCount > m_dwMaxFreeBuffers)
{
HeapFree(GetProcessHeap(),0,pBuffer);
LeaveCriticalSection(& m_csBuffersListLock);
return ;
}else
{
pBuffer->pNext = m_pFreeBuffersList ;
m_pFreeBuffersList =pBuffer ;
m_dwBuffersCount ++ ;
}
LeaveCriticalSection(& m_csBuffersListLock); } void Ciocp::ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
if (dwTrans == 0)
{
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
ReleaseContext(pContext);
}else
{
int nAddrLen = sizeof(SOCKADDR_IN);
getsockname(pContext->s,(SOCKADDR*)&pContext->saddr,&nAddrLen);
SetKeepAlive(pContext->s);
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_CONNECT);
//OutputDebugString(L"Connect \r\n");
PostRead(pContext);
}
ReleaseIoBuffer(pBuffer);
} void Ciocp::Shutdown()
{
m_bStarted = false ; if (m_sListen != INVALID_SOCKET )
{
closesocket(m_sListen);
m_sListen = INVALID_SOCKET ; }
if (m_sConnect != INVALID_SOCKET)
{
closesocket(m_sConnect);
m_sConnect = INVALID_SOCKET ;
} while (m_dwCurWorkThreadCount > 0)
{
::PostQueuedCompletionStatus(m_hIocp, -1, 0, NULL);
Sleep(100);
} PER_IO_BUFFER* pBuffer = m_pFreeBuffersList ;
while(pBuffer)
{
m_pFreeBuffersList = pBuffer->pNext ;
HeapFree(GetProcessHeap(),0,pBuffer);
pBuffer = m_pFreeBuffersList ;
}
m_dwBuffersCount = 0;
PER_HANDLE_DATA* pContext = m_pFreeContextsList ;
while(pContext)
{
m_pFreeContextsList = pContext->pNext ;
HeapFree(GetProcessHeap(),0,pContext);
pContext = m_pFreeContextsList ;
}
m_dwContextsCount = 0 ; } void Ciocp::ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{ LPSOCKADDR lpLocalAddr,lpRemoteAddr;
int nLocalAddr,nRemoteAddr;
DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ; PER_HANDLE_DATA* pContext1 = AllocContext(pBuffer->sClient); m_lpfnGetAcceptExSockAddrs(pBuffer->lpBuffer,pBuffer->dwBufferSize- 2*dwAddrSize,dwAddrSize,dwAddrSize,&lpLocalAddr,&nLocalAddr,&lpRemoteAddr,&nRemoteAddr);
memcpy(& (pContext1->saddr),lpRemoteAddr,nRemoteAddr); SetKeepAlive(pBuffer->sClient); AddSocketToIocp(pBuffer->sClient,pContext1); NotifyMsg(pContext1,pBuffer,NOTIFY_MSG_ACCEPT); PostRead(pContext1); PostAccept(); ReleaseIoBuffer(pBuffer);
} BOOL Ciocp::SetKeepAlive(SOCKET s)
{
BOOL bKeepAlive = TRUE;
int nRet = ::setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive));
if (nRet == SOCKET_ERROR)
{
return false ;
}else
{
tcp_keepalive alive_in = {0};
tcp_keepalive alive_out = {0}; alive_in.keepalivetime = 5000;
alive_in.keepaliveinterval = 1000;
alive_in.onoff = TRUE ;
unsigned long ulBytesReturn = 0 ;
nRet = WSAIoctl(s,SIO_KEEPALIVE_VALS,&alive_in,sizeof(alive_in),&alive_out,sizeof(alive_out),&ulBytesReturn,NULL,NULL);
if (nRet == SOCKET_ERROR)
{
return FALSE ;
} } return TRUE ;
} void Ciocp::GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite)
{
*pulWrite = m_ulWriteBytes ;
*pulRead = m_ulReadBytes ;
} void Ciocp::Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize)
{
WaitForSingleObject(pContext->m_hWriteComplete,INFINITE);
PostWrite(pContext,lpBuffer,dwSize);
} void Ciocp::ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
if (dwTrans == 0)
{
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT); if (pContext->bConnect) // is connect
{ PostConnect(); }else
{ }
ReleaseContext(pContext);
}
else
{
SetEvent(pContext->m_hWriteComplete);
} ReleaseIoBuffer(pBuffer);
}

  

IOCP - UDP

//NOTE
//
//This code taken from Mr. Bob Quinn's article
//titled 'Internet Multicasting'
//published in Dr. Dobb's Journal dated Oct 1997 //I have modified the original code to illustrate
//the use I/O completion ports with UDP. //If you have any comments email me : shapall@hotmail.com #include "StdAfx.h"
#include <winsock2.h>
#include <ws2tcpip.h>
#include "Stdio.h" #define BUFSIZE 1024 //max size of incoming data buffer
#define MAXADDRSTR 16 #define DEFAULT_GROUP_ADDRESS "239.254.1.2"
#define DEFAULT_PORT 7125 LONG nCount = ;
HANDLE g_hCompletionPort;
DWORD WINAPI WorkerThread( LPVOID WorkContext ); BOOL HandleIncomingData( UCHAR* pBuf);
BOOL CreateNetConnections( VOID );
BOOL CreateWorkers( UINT );
void InitWinsock2();
void UnInitWinsock2(); HANDLE g_hReadEvent;
SOCKET g_hSocket;
UCHAR achInBuf [BUFSIZE];
char achMCAddr[MAXADDRSTR] = DEFAULT_GROUP_ADDRESS;
u_short nPort = DEFAULT_PORT; OVERLAPPED Overlapped; //-----------------------------------------------------------------
void InitWinsock2()
{
WSADATA data;
WORD version;
int ret = ; version = (MAKEWORD(, ));
ret = WSAStartup(version, &data);
if (ret != )
{
ret = WSAGetLastError(); if (ret == WSANOTINITIALISED)
{
printf("not initialised");
}
}
} //-----------------------------------------------------------------
void UnInitWinsock2()
{
WSACleanup();
} //-----------------------------------------------------------------
BOOL CreateNetConnections (void)
{
DWORD nbytes;
BOOL b;
BOOL fFlag = TRUE;
int nRet=; SOCKADDR_IN stLclAddr;
struct ip_mreq stMreq; // Multicast interface structure // Get a datagram socket
g_hSocket = socket(AF_INET, SOCK_DGRAM,); if (g_hSocket == INVALID_SOCKET)
{
printf ("socket() failed, Err: %d\n", WSAGetLastError());
return FALSE;
} nRet = setsockopt(g_hSocket,SOL_SOCKET,
SO_REUSEADDR, (char *)&fFlag, sizeof(fFlag));
if (nRet == SOCKET_ERROR)
{
printf ("setsockopt() SO_REUSEADDR failed,
Err: %d\n",WSAGetLastError());
} // Name the socket (assign the local port number to receive on)
stLclAddr.sin_family = AF_INET;
stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY);
stLclAddr.sin_port = htons(nPort); nRet = bind(g_hSocket,(struct sockaddr*) &stLclAddr,sizeof(stLclAddr));
if (nRet == SOCKET_ERROR)
{
printf ("bind() port: %d failed, Err: %d\n",
nPort,WSAGetLastError());
}
// Join the multicast group so we can receive from it
stMreq.imr_multiaddr.s_addr = inet_addr(achMCAddr);
stMreq.imr_interface.s_addr = INADDR_ANY;
nRet = setsockopt(g_hSocket,IPPROTO_IP,
IP_ADD_MEMBERSHIP,(char *)&stMreq,sizeof(stMreq)); if (nRet == SOCKET_ERROR)
{
printf("setsockopt() IP_ADD_MEMBERSHIP address %s failed,
Err: %d\n",achMCAddr,
WSAGetLastError());
} //
//note the 10 says how many concurrent cpu bound threads to allow thru
//this should be tunable based on the requests. CPU bound requests will
// really really honor this.
// g_hCompletionPort = CreateIoCompletionPort (INVALID_HANDLE_VALUE,
NULL,,);
if (!g_hCompletionPort)
{
fprintf (stdout, "g_hCompletionPort Create Failed\n");
return FALSE;
}
//Associate this socket to this I/O completion port
CreateIoCompletionPort((HANDLE)g_hSocket,g_hCompletionPort,
(DWORD)g_hSocket,); //
// Start off an asynchronous read on the socket.
//
Overlapped.hEvent = g_hReadEvent;
Overlapped.Internal = ;
Overlapped.InternalHigh = ;
Overlapped.Offset = ;
Overlapped.OffsetHigh = ;
b = ReadFile ((HANDLE)g_hSocket,&achInBuf,
sizeof(achInBuf),&nbytes,&Overlapped); if (!b && GetLastError () != ERROR_IO_PENDING)
{
fprintf (stdout, "ReadFile Failed\n");
return FALSE;
} return TRUE;
}
//----------------------------------------------------------------- BOOL CreateWorkers (UINT dwNumberOfWorkers)
{
DWORD ThreadId;
HANDLE ThreadHandle;
DWORD i; for (i = ; i < dwNumberOfWorkers; i++)
{
ThreadHandle = CreateThread (NULL,,
WorkerThread,NULL,,&ThreadId);
if (!ThreadHandle)
{
fprintf (stdout, "Create Worker Thread Failed\n");
return FALSE;
} CloseHandle (ThreadHandle);
}
return TRUE;
} //-----------------------------------------------------------------
DWORD WINAPI WorkerThread (LPVOID WorkContext)
{
DWORD nSocket;
BOOL b;
OVERLAPPED ovl;
LPOVERLAPPED lpo=&ovl;
DWORD nBytesRead=;
DWORD nBytesToBeRead;
UCHAR ReadBuffer[BUFSIZE];
LPVOID lpMsgBuf; memset(&ReadBuffer,,BUFSIZE);
for (;;)
{
b = GetQueuedCompletionStatus(g_hCompletionPort,
&nBytesToBeRead,&nSocket,&lpo,INFINITE);
if (b || lpo)
{
if (b)
{
//
// Determine how long a response was desired by the client.
// OVERLAPPED ol;
ol.hEvent = g_hReadEvent;
ol.Offset = ;
ol.OffsetHigh = ; b = ReadFile ((HANDLE)nSocket,&ReadBuffer,
nBytesToBeRead,&nBytesRead,&ol);
if (!b )
{
DWORD dwErrCode = GetLastError();
if( dwErrCode != ERROR_IO_PENDING )
{
// something has gone wrong here...
printf("Something has gone
wrong:Error code - %d\n",dwErrCode ); FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, dwErrCode,
MAKELANGID(LANG_NEUTRAL,
SUBLANG_DEFAULT),// Default language
(LPTSTR) &lpMsgBuf, , NULL); OutputDebugString((LPCTSTR)lpMsgBuf);
//Free the buffer. LocalFree(lpMsgBuf );
}
else if( dwErrCode == ERROR_IO_PENDING )
{
// I had to do this for my UDP sample
//Never did for my TCP servers
WaitForSingleObject(ol.hEvent,INFINITE); HandleIncomingData(ReadBuffer);
}
}
else
{
HandleIncomingData(ReadBuffer);
}
continue;
}
else
{
fprintf (stdout, "WorkThread Wait Failed\n");
//exit (1);
}
}
return ;
}
}
//-----------------------------------------------------------------
BOOL HandleIncomingData( UCHAR* pBuf)
{
InterlockedIncrement(&nCount);
SYSTEMTIME *lpstSysTime; lpstSysTime = (SYSTEMTIME *)(pBuf);
printf("[%d]UTC Time %02d:%02d:%02d:%03d on %02d-%02d-%d \n",nCount,
lpstSysTime->wHour, lpstSysTime->wMinute,
lpstSysTime->wSecond, lpstSysTime->wMilliseconds,
lpstSysTime->wMonth, lpstSysTime->wDay, lpstSysTime->wYear);
memset(&pBuf,,BUFSIZE);
//just making sure that i am not showing stale data return TRUE;
}
//-----------------------------------------------------------------
main ()
{
//You can modify your program to take some arguments for port number
//and multicast group address here printf("\n***************************************\n");
printf("Group IP address: %s\n",achMCAddr);
printf("Port number : %d\n",nPort);
printf("\n***************************************\n"); //Initialize winsock 2
InitWinsock2(); //We want to keep the main thread running
HANDLE hWait2Exit = CreateEvent(NULL,FALSE,TRUE,"MCLIENT");
ResetEvent(hWait2Exit ); //This OVERLAPPED event
g_hReadEvent = CreateEvent(NULL,TRUE,TRUE,NULL); //
// try to get timing more accurate... Avoid context
// switch that could occur when threads are released
// SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_TIME_CRITICAL);
if (!CreateNetConnections ())
{
printf("Error condition @ CreateNetConnections , exiting\n");
return ;
} if (!CreateWorkers ())
{
printf("Error condition @CreateWorkers, exiting\n");
return ;
} WaitForSingleObject(hWait2Exit,INFINITE);
UnInitWinsock2();
return ;
}

转 实现UDP IOCP心得-zt

http://www.cnblogs.com/BeginGame/archive/2011/09/18/2180241.html

上一篇:蚂蚁运输(ant)


下一篇:html中offsetTop、clientTop、scrollTop、offsetTop