一个生产者多消费者的应用 , 代码里目前就先简单用了单生成单消费,
每一局麻将都需要记录:每一次的发牌,抓牌,出牌 ,吃碰杠胡听;
每一局麻将生成一个key , 可随意自己组合
下面代码思路仅供参考, 还未测试过; 可修改成多线程日志
主线程唯一需要在开始的时候调用一次 start , 记录用push;
4种模式可选, 一种不关闭文件handle, 直到一局游戏结束,再关闭
一种每次打开,写入,关闭;
BIN 还没写
DB 还没写
class ErrorInfo{
static const SIZE_T ERR_MSG_SIZE = 1 << 13;
static CRITICAL_SECTION err_cs;
static std::once_flag oneflag;
public:
static void init(){
std::call_once(ErrorInfo::oneflag,&ErrorInfo::init_cs);
}
static void init_cs(){
InitializeCriticalSectionAndSpinCount(&ErrorInfo::err_cs, 4000);
}
static std::string getError(DWORD err){
EnterCriticalSection(&err_cs);
static HANDLE g_heap = HeapCreate(0, ERR_MSG_SIZE, 0);
static char *buf = (char*)HeapAlloc(g_heap, 0, ERR_MSG_SIZE);
static std::string errstr;
static std::stringstream ss;
ss.str("");
ss.clear();
errstr.clear();
DWORD syslocale = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT);
DWORD ret = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, err, syslocale, buf, ERR_MSG_SIZE, NULL);
if (!ret){
static HMODULE hDll = LoadLibraryEx(TEXT("netmsg.dll"), NULL, DONT_RESOLVE_DLL_REFERENCES);
if (hDll){
//如果在dll中查找,FORMAT_MESSAGE_FROM_HMODULE 添加上去, 第2个参数填写句柄
ret = FormatMessageA(FORMAT_MESSAGE_FROM_HMODULE | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
hDll, err, syslocale, buf, ERR_MSG_SIZE, NULL);
}
}
if (ret && buf){
buf[ret] = 0;
errstr = buf;
LeaveCriticalSection(&err_cs);
return errstr;
}
ss << err;
ss >> errstr;
LeaveCriticalSection(&err_cs);
return errstr;
}
};
struct FileHandleData{
HANDLE hFile;
volatile bool inUse;
std::string filepath;
FileHandleData(HANDLE file = INVALID_HANDLE_VALUE, bool inUse = false):hFile(file),inUse(inUse){}
};
struct storeData{
std::string key;
std::string data;
storeData(const char * key, const char *data):key(key),data(data){}
storeData(storeData && obj) throw() : key(std::move(obj.key)),data(std::move(obj.data)){}
storeData & operator=(storeData && obj) throw(){
if(this != &obj){
key = std::move(obj.key);
data = std::move(obj.data);
}
return *this;
}
};
class RecordFile
{
CRITICAL_SECTION write_cs;
CRITICAL_SECTION prepare_cs;
CRITICAL_SECTION errlog_cs;
CONDITION_VARIABLE write_cond;
HANDLE hSem;
deque<storeData> prepare_queue;
deque<std::string> wait_for_close_queue;
deque<storeData> write_queue;
std::unordered_map<std::string,FileHandleData> write_map;
HANDLE hThreadProducer;
unsigned tid_producer;
HANDLE hThreadConsumers[4];
unsigned tid_consumers[4];
bool bStopPrepare;
bool bStopProcess;
bool bAllDone;
bool bStarted;
int m_operate;
std::string cur_dir_name;
HANDLE hErrorLog;
std::string errorLogPath;
public:
/*
OPENFILE 只创建一次文件HANDLE, 不关闭
CLOSEFILE 每次写完数据就关闭文件句柄
BIN 和 DB 还没写
*/
enum eOperate { OPENFILE = 1, CLOSEFILE = 2, BIN = 3 , DB = 4};
enum eSpinCount { SpinCountMin = 1000, SpinCountNormal = 2000, SpinCountMax = 4000};
enum eSemaphoreCount{ SemMin = 200 , SemNormal = 1000 , SemMax = 2000};
RecordFile(int operate =CLOSEFILE):bAllDone(false),
bStarted(false),
bStopProcess(false),
bStopPrepare(false),
m_operate(operate) {
cur_dir_name = std::string(typeid(*this).name()).substr(6);
errorLogPath = cur_dir_name + "/errorlog.log";
ErrorInfo::init();
}
void push(const char * key, const char * data){
EnterCriticalSection(&prepare_cs);
prepare_queue.emplace_back(key,data);
LeaveCriticalSection(&prepare_cs);
ReleaseSemaphore(&hSem,1,0);
}
void start(){
bStarted = true;
bAllDone = false;
bStopProcess = false;
bStopPrepare = false;
hThreadProducer = NULL;
memset(hThreadConsumers,0,sizeof(hThreadConsumers));
InitializeCriticalSectionAndSpinCount(&write_cs , SpinCountMax);
InitializeCriticalSectionAndSpinCount(&prepare_cs , SpinCountMax);
InitializeCriticalSectionAndSpinCount(&errlog_cs , SpinCountMax);
InitializeConditionVariable(&write_cond);
hSem = CreateSemaphoreW(0,0,SemMax,NULL);
if(!RecordFile::pathExists(cur_dir_name.c_str())){
CreateDirectoryA(cur_dir_name.c_str(),NULL);
}
hErrorLog = CreateFileA(errorLogPath.c_str(),GENERIC_WRITE|GENERIC_READ,FILE_SHARE_READ ,
NULL,OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL,NULL);
if(hErrorLog != INVALID_HANDLE_VALUE){
setAppendFileHandle(hErrorLog);
}
if(OPENFILE == m_operate || CLOSEFILE == m_operate || BIN == m_operate ){
hThreadProducer = (HANDLE)_beginthreadex(0,0,&RecordFile::mem_func_prepare,(void*)this,0,&tid_producer);
hThreadConsumers[0] = (HANDLE)_beginthreadex(0,0,&RecordFile::mem_func_process ,(void*)this,0,&tid_consumers[0]);
}
}
virtual ~RecordFile(){
}
void log_error(const std::string & str){
if(hErrorLog == INVALID_HANDLE_VALUE){
return;
}
EnterCriticalSection(&errlog_cs);
WriteFile(hErrorLog,str.c_str(),str.size(),0,0);
FlushFileBuffers(hErrorLog);
LeaveCriticalSection(&errlog_cs);
}
static bool pathExists(const char * path){
WIN32_FILE_ATTRIBUTE_DATA attr = { 0 };
return 0 != GetFileAttributesExA(path, GetFileExInfoStandard, &attr);
}
void stop(){
if(bAllDone || !bStarted)
return;
bAllDone = true;
bStarted = false;
EnterCriticalSection(&prepare_cs);
bStopPrepare = true;
LeaveCriticalSection(&prepare_cs);
ReleaseSemaphore(&hSem,1,0);
EnterCriticalSection(&write_cs);
bStopProcess = true;
LeaveCriticalSection(&write_cs);
WakeAllConditionVariable(&write_cond);
if(hThreadProducer){
WaitForSingleObject(hThreadProducer,INFINITE);
CloseHandle(hThreadProducer);
hThreadProducer = 0;
}
if(hThreadConsumers[0]){
WaitForSingleObject(hThreadConsumers[0] , INFINITE);
CloseHandle(hThreadConsumers[0]);
hThreadConsumers[0] = 0;
}
CloseHandle(hSem);
hSem = NULL;
DeleteCriticalSection(&write_cs);
DeleteCriticalSection(&prepare_cs);
DeleteCriticalSection(&errlog_cs);
}
bool setAppendFileHandle(HANDLE fileHandle){
static LARGE_INTEGER pos = {0};
if(fileHandle != INVALID_HANDLE_VALUE){
return SetFilePointerEx(fileHandle,pos,0,FILE_END);
}
return false;
}
static unsigned int mem_func_prepare(void * arg){
RecordFile * p = (RecordFile * )arg;
if( OPENFILE == p->m_operate)
return p->__prepareOpenFileMode__();
else if(CLOSEFILE == p->m_operate)
return p->__prepareCloseFileEachTime__();
}
static unsigned int mem_func_process(void * arg){
RecordFile * p = (RecordFile * )arg;
if(OPENFILE == p->m_operate)
return p->__processOpenFileMode__();
else if(CLOSEFILE == p->m_operate)
return p->__processCloseFileEachTime__();
}
std::string getFileName(const char * key){
char filepath[MAX_PATH] = {0};
SYSTEMTIME lt = {0};
GetLocalTime(<);
int len = cur_dir_name.size();
memcpy(filepath,cur_dir_name.c_str(),len* sizeof(char));
sprintf_s(filepath+len,MAX_PATH - len,"/%04d.%02d.%02d %02d.%02d.%02d.%04d-%s.txt",
lt.wYear,lt.wMonth,lt.wDay,lt.wHour,lt.wMinute,lt.wSecond,lt.wMilliseconds,
key);
return filepath;
}
virtual unsigned int __prepareCloseFileEachTime__(){
std::string filepath;
bool bInsert_ok = false;
std::string errmsg;
while(true){
bInsert_ok = false;
WaitForSingleObject(hSem,INFINITE);
EnterCriticalSection(&prepare_cs);
storeData data = std::move(prepare_queue.front());
prepare_queue.pop_front();
LeaveCriticalSection(&prepare_cs);
EnterCriticalSection(&write_cs);
auto iter_find = write_map.find(data.key);
auto iter_end = write_map.end();
if(iter_find == iter_end){
filepath = getFileName(data.key.c_str());
FileHandleData fileInfo;
fileInfo.filepath = filepath;
bInsert_ok = write_map.insert(std::make_pair(data.key,fileInfo)).second;
if(bInsert_ok){
write_queue.push_back(std::move(data));
LeaveCriticalSection(&write_cs);
WakeConditionVariable(&write_cond);
}
else{
LeaveCriticalSection(&write_cs);
errmsg = __FUNCTION__;
errmsg += " insert " + data.key + " failed\r\n";
log_error(errmsg);
}
}
else{
write_queue.push_back(std::move(data));
LeaveCriticalSection(&write_cs);
WakeConditionVariable(&write_cond);
}
}
return 0;
}
virtual unsigned int __processCloseFileEachTime__(){
std::string errmsg;
bool bFind = false;
std::string filepath;
DWORD lastError = 0;
bool hasError = false;
while(true){
bFind = false;
hasError = false;
EnterCriticalSection(&write_cs);
while(!bStopProcess && write_queue.empty()){
SleepConditionVariableCS(&write_cond,&write_cs,INFINITE);
}
storeData data = std::move(write_queue.front());
write_queue.pop_front();
auto iter_find = write_map.find(data.key);
bFind = iter_find == write_map.end();
if(!bFind){
LeaveCriticalSection(&write_cs);
errmsg = __FUNCTION__;
errmsg += " write_map find error\r\n";
log_error(errmsg);
continue;
}
volatile bool * pInUse = &iter_find->second.inUse;
if(*pInUse == true){
write_queue.push_back(std::move(data));
LeaveCriticalSection(&write_cs);
WakeConditionVariable(&write_cond);
continue;
}
*pInUse = true;
filepath = iter_find->second.filepath;
LeaveCriticalSection(&write_cs);
HANDLE hFile = CreateFileA(filepath.c_str(),GENERIC_READ|GENERIC_WRITE,
FILE_SHARE_READ,0,OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL,
NULL);
if(hFile != INVALID_HANDLE_VALUE){
setAppendFileHandle(hFile);
if(WriteFile(hFile,data.data.c_str(),data.data.size() * sizeof(char),0,0)){
FlushFileBuffers(hFile);
CloseHandle(hFile);
}
else{
lastError = GetLastError();
hasError = true;
}
}
else{
lastError = GetLastError();
hasError = true;
}
EnterCriticalSection(&write_cs);
*pInUse = false;
LeaveCriticalSection(&write_cs);
if(hasError){
errmsg = ErrorInfo::getError(lastError);
log_error(errmsg);
}
}
return 0;
}
virtual unsigned int __prepareOpenFileMode__(){
bool bFind = false;
std::string filepath;
bool bInsert_ok = false;
std::string errmsg;
DWORD errorNo =0;
while(1){
bFind = false;
bInsert_ok = false;
WaitForSingleObject(hSem,INFINITE);
EnterCriticalSection(&prepare_cs);
storeData data = std::move(prepare_queue.front());
prepare_queue.pop_front();
LeaveCriticalSection(&prepare_cs);
EnterCriticalSection(&write_cs);
auto iter = write_map.find(data.key);
if(iter != write_map.end()){
bFind = true;
}
if(!bFind){
filepath = getFileName(data.key.c_str());
HANDLE hFile = CreateFileA(filepath.c_str(),GENERIC_READ|GENERIC_WRITE,
FILE_SHARE_READ,0,OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL,
NULL);
if(hFile != INVALID_HANDLE_VALUE){
FileHandleData fileinfo;
fileinfo.hFile = hFile;
fileinfo.filepath = filepath;
fileinfo.inUse = false;
bInsert_ok = write_map.insert(make_pair(data.key , fileinfo)).second;
if(bInsert_ok){
write_queue.push_back(std::move(data));
LeaveCriticalSection(&write_cs);
WakeConditionVariable(&write_cond);
}
else{
LeaveCriticalSection(&write_cs);
errmsg = __FUNCTION__;
errmsg += " write_map insert failed \r\n";
log_error(errmsg);
}
}
else{
errorNo = GetLastError();
LeaveCriticalSection(&write_cs);
errmsg = ErrorInfo::getError(errorNo);
log_error(errmsg);
}
}
else{
write_queue.push_back(std::move(data));
LeaveCriticalSection(&write_cs);
WakeConditionVariable(&write_cond);
}
}
return 0;
}
virtual unsigned int __processOpenFileMode__(){
bool bFind = false;
std::string errmsg;
DWORD errorNo = 0;
bool hasError = false;
while (true) {
bFind = false;
hasError = false;
EnterCriticalSection(&write_cs);
while(!bStopProcess && write_queue.empty()){
SleepConditionVariableCS(&write_cond,&write_cs,INFINITE);
}
storeData data = std::move(write_queue.front());
write_queue.pop_front();
auto iter = write_map.find(data.key);
if(iter != write_map.end())
bFind = true;
if(!bFind){
errmsg = __FUNCTION__;
errmsg += " write_map find error \r\n";
LeaveCriticalSection(&write_cs);
log_error(errmsg);
continue;
}
if(iter->second.inUse){
write_queue.push_back(std::move(data));
LeaveCriticalSection(&write_cs);
WakeConditionVariable(&write_cond);
continue;
}
volatile bool *pInUse = &iter->second.inUse;
HANDLE hFile = iter->second.hFile;
LeaveCriticalSection(&write_cs);
if( INVALID_HANDLE_VALUE == hFile){
errmsg = __FUNCTION__;
errmsg += " hFile : INVALID_HANDLE_VALUE\r\n";
log_error(errmsg);
continue;
}
if(WriteFile(hFile,data.data.c_str() , data.data.size() * sizeof(char),0,0)){
FlushFileBuffers(hFile);
}
else{
hasError= true;
errorNo = GetLastError();
}
EnterCriticalSection(&write_cs);
*pInUse = false;
LeaveCriticalSection(&write_cs);
if(hasError){
errmsg = __FUNCTION__;
errmsg += ErrorInfo::getError(errorNo);
errmsg += "\r\n";
log_error(errmsg);
}
}
return 0;
}
private:
RecordFile(const RecordFile&);
RecordFile& operator=(const RecordFile &);
};