glog另启动线程写文本日志

glog本身是很高效的,google的大牛肯定知道大规模的写日志用glog的话肯定会影响业务线程的处理,带负荷的磁盘IO谁都桑不起。比方levelDB就是默认异步写,更不用说google的三驾马车都是分布式的。之前看过其论文,简直是引领时代。

在glog的issue里有人提出了异步写的问题,可是语焉不详,只是0.33版本号已经有了接口,可是还不友好,可是全然能够实现磁盘日志的异步写。

今天算是花了点时间踩了点坑,算是基本能够搞了。稳定之后会把这个版本号和glog,g2log,mudo logging一起測试下。mudo对buffer做了些trick,内部有两个bufferptr,做了双缓冲,据说效率非常高,只是仅仅有linux平台的,只是但把它的log抽离出来也不难,陈老师封装了mutex,thread,conditional等,在gcc4.8,clang3.3,VS2010都不是问题,已经没多大必要,并且之前为之乐道的linux下的threadsafe的initonce,如今C++11x也有了支持。

glog中能够让client定制接口是:

class GOOGLE_GLOG_DLL_DECL Logger {
public:
virtual ~Logger(); // Writes "message[0,message_len-1]" corresponding to an event that
// occurred at "timestamp". If "force_flush" is true, the log file
// is flushed immediately.
//
// The input message has already been formatted as deemed
// appropriate by the higher level logging facility. For example,
// textual log messages already contain timestamps, and the
// file:linenumber header.
virtual void Write(bool force_flush,
time_t timestamp,
const char* message,
int message_len) = 0; // Flush any buffered messages
virtual void Flush() = 0; // Get the current LOG file size.
// The returned value is approximate since some
// logged data may not have been flushed to disk yet.
virtual uint32 LogSize() = 0; virtual void SetBasename(const char* basename) = 0;
virtual void SetExtension(const char* ext) = 0 ;
virtual void SetSymlinkBasename(const char* symlink_basename) = 0; };

我在里面另外加了几个接口,为了之后的方便。

用Active object模式非常好解决,就是我们通常所说的生产者消费者,在logmsg析构时就会fflush到磁盘,这次就会调用logger的write方法,此时就是我们接手的机会,把数据封装下,投递到业务线程,然后取出,实际写磁盘就好。

封装了简单的Active模式,Activer里封装了LogData用来封装打印实体,Buffer用来线程间传递数据,另外要显式设置Active的回调函数callBack.线程间传递数据用了C++11里的currentQueue,就不须要自己造*了:

/** ==========================================================================
* 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes
* with no warranties. This code is yours to share, use and modify with no
* strings attached and no restrictions or obligations.
* ============================================================================
*
* Example of a Active Object, using C++11 std::thread mechanisms to make it
* safe for thread communication.
*
* This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x
* and inspired from Herb Sutter's C++11 Active Object
* http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads
*
* The code below uses JustSoftware Solutions Inc std::thread implementation
* http://www.justsoftwaresolutions.co.uk
*
* Last update 2012-10-10, by Kjell Hedstrom,
* e-mail: hedstrom at kjellkod dot cc
* linkedin: http://linkedin.com/se/kjellkod */ #ifndef ACTIVE_H_
#define ACTIVE_H_ #include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <memory>
#include <concurrent_queue.h>
#include "shared_queue.h" struct Buffer
{
Buffer():m_Len(0), m_pMsg(NULL){}
~Buffer()
{
if (NULL != m_pMsg)
delete []m_pMsg;
}
Buffer(int size):m_Len(size)
, m_pMsg(new char[m_Len])
{ }
int m_Len;
char* m_pMsg;
}; typedef std::function<void(Buffer*)> Callback; class Active {
private:
Active(const Active&); // c++11 feature not yet in vs2010 = delete;
Active& operator=(const Active&); // c++11 feature not yet in vs2010 = delete;
Active(); // Construction ONLY through factory createActive();
void doDone(){done_ = true;}
void run();
void setCallBack(Callback aCallBack); Concurrency::concurrent_queue<Buffer*> mq_;
std::thread thd_;
bool done_; // finished flag to be set through msg queue by ~Active
Callback callBack_; public:
virtual ~Active();
void send(Buffer* apBuffer);
static std::unique_ptr<Active> createActive(Callback aCallBack); // Factory: safe construction & thread start
}; #endif
/** ==========================================================================
* 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes
* with no warranties. This code is yours to share, use and modify with no
* strings attached and no restrictions or obligations.
* ============================================================================
*
* Example of a Active Object, using C++11 std::thread mechanisms to make it
* safe for thread communication.
*
* This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x
* and inspired from Herb Sutter's C++11 Active Object
* http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads
*
* The code below uses JustSoftware Solutions Inc std::thread implementation
* http://www.justsoftwaresolutions.co.uk
*
* Last update 2012-10-10, by Kjell Hedstrom,
* e-mail: hedstrom at kjellkod dot cc
* linkedin: http://linkedin.com/se/kjellkod */ #include "active.h"
#include <cassert> Active::Active(): done_(false){} Active::~Active() {
Callback quit_token = std::bind(&Active::doDone, this);
thd_.join();
} // Add asynchronously a work-message to queue
void Active::send( Buffer* apBuffer )
{
if (NULL != apBuffer)
{
mq_.push(apBuffer);
}
} void Active::run() {
while (!done_) {
if (!mq_.empty())
{
Buffer* pBuffer = NULL;
mq_.try_pop(pBuffer);
if (NULL != pBuffer)
{
callBack_(pBuffer); delete pBuffer;
}
}
}
} // Factory: safe construction of object before thread start
std::unique_ptr<Active> Active::createActive(Callback aCallBack){
std::unique_ptr<Active> aPtr(new Active());
aPtr->thd_ = std::thread(&Active::run, aPtr.get());
aPtr->callBack_ = aCallBack;
return aPtr;
} void Active::setCallBack( Callback aCallBack )
{
callBack_ = aCallBack;
}

重点是在threadlogger里,实现了Logger的接口。Write函数实现真正的写逻辑,几个set函数会在内部被调用。

#pragma once
#include <glog/logging.h>
#include <mutex>
#include "active.h" using namespace std; namespace google
{ class ThreadLog : public google::base::Logger
{
public:
ThreadLog();
~ThreadLog();
virtual void Write(bool force_flush,
time_t timestamp,
const char* message,
int message_len) ;
virtual void Flush();
virtual uint32 LogSize(); // Configuration options
void SetBasename(const char* basename);
void SetExtension(const char* ext);
void SetSymlinkBasename(const char* symlink_basename);
void CallBack(Buffer* pBuffer); private:
static const uint32 kRolloverAttemptFrequency = 0x20;
mutex lock_;
bool base_filename_selected_;
string base_filename_;
string symlink_basename_;
string filename_extension_; // option users can specify (eg to add port#)
FILE* file_;
LogSeverity severity_;
uint32 bytes_since_flush_;
uint32 file_length_;
unsigned int rollover_attempt_;
int64 next_flush_time_; // cycle count at which to flush log
string hostname;
bool stopWriting;
std::unique_ptr<Active> m_pActive;
bool CreateLogfile(const string& time_pid_string);
void FlushUnlocked();
void WriteInteral(bool force_flush, time_t timestamp, const char* message, int message_len);
}; } #include "ThreadLog.h"
#include "port.h"
#include <fcntl.h>
#include <iomanip>
#include "utilities.h"
#include <functional> namespace google
{
static int GetSize(bool& force_flush, time_t& timestamp, const char* message, int& message_len)
{
return sizeof(force_flush)+sizeof(timestamp)+sizeof(message_len)+message_len;
} void ThreadLog::Write( bool force_flush, time_t timestamp, const char* message, int message_len )
{
Buffer* pBuffer = new Buffer(GetSize(force_flush, timestamp, message, message_len));
char* curData = pBuffer->m_pMsg;
memcpy(curData, &force_flush, sizeof(force_flush));
curData += sizeof(force_flush); memcpy(curData, ×tamp, sizeof(timestamp));
curData += sizeof(timestamp); memcpy(curData, &message_len, sizeof(message_len));
curData += sizeof(message_len); memcpy(curData, message, message_len);
curData += message_len; m_pActive->send(pBuffer);
} void ThreadLog::Flush()
{ } google::uint32 ThreadLog::LogSize()
{
return 0;
} void ThreadLog::SetBasename( const char* basename )
{
std::lock_guard<std::mutex> lock(lock_);
base_filename_selected_ = true;
if (base_filename_ != basename)
{
if (file_ != NULL)
{
fclose(file_);
file_ = NULL;
rollover_attempt_ = kRolloverAttemptFrequency-1;
}
base_filename_ = basename;
}
} void ThreadLog::SetExtension( const char* ext )
{
std::lock_guard<std::mutex> lock(lock_);
if (filename_extension_ != ext)
{
// Get rid of old log file since we are changing names
if (file_ != NULL)
{
fclose(file_);
file_ = NULL;
rollover_attempt_ = kRolloverAttemptFrequency-1;
}
filename_extension_ = ext;
}
} void ThreadLog::SetSymlinkBasename( const char* symlink_basename )
{
std::lock_guard<std::mutex> lock(lock_);
symlink_basename_ = symlink_basename;
} bool ThreadLog::CreateLogfile( const string& time_pid_string )
{
string string_filename = base_filename_+filename_extension_+
time_pid_string;
const char* filename = string_filename.c_str();
int fd = open(filename, O_WRONLY | O_CREAT | O_EXCL, 0664);
if (fd == -1) return false;
#ifdef HAVE_FCNTL
// Mark the file close-on-exec. We don't really care if this fails
fcntl(fd, F_SETFD, FD_CLOEXEC);
#endif file_ = fdopen(fd, "a"); // Make a FILE*.
if (file_ == NULL) { // Man, we're screwed!
close(fd);
unlink(filename); // Erase the half-baked evidence: an unusable log file
return false;
} if (!symlink_basename_.empty()) {
// take directory from filename
const char* slash = strrchr(filename, '/');
const string linkname =
symlink_basename_ + '.' + LogSeverityNames[severity_];
string linkpath;
if ( slash ) linkpath = string(filename, slash-filename+1); // get dirname
linkpath += linkname;
unlink(linkpath.c_str()); // delete old one if it exists // We must have unistd.h.
#ifdef HAVE_UNISTD_H
// Make the symlink be relative (in the same dir) so that if the
// entire log directory gets relocated the link is still valid.
const char *linkdest = slash ? (slash + 1) : filename;
if (symlink(linkdest, linkpath.c_str()) != 0) {
// silently ignore failures
} // Make an additional link to the log file in a place specified by
// FLAGS_log_link, if indicated
if (!FLAGS_log_link.empty()) {
linkpath = FLAGS_log_link + "/" + linkname;
unlink(linkpath.c_str()); // delete old one if it exists
if (symlink(filename, linkpath.c_str()) != 0) {
// silently ignore failures
}
}
#endif
} return true; // Everything worked
} void ThreadLog::FlushUnlocked()
{
if (file_ != NULL)
{
fflush(file_);
bytes_since_flush_ = 0;
} const int64 next = (FLAGS_logbufsecs * static_cast<int64>(1000000)); // in usec
next_flush_time_ = CycleClock_Now() + UsecToCycles(next);
} ThreadLog::ThreadLog(): file_(NULL)
, bytes_since_flush_(0)
, file_length_(0)
, rollover_attempt_(0)
, next_flush_time_(0)
, stopWriting(false)
, m_pActive(Active::createActive(std::bind(&ThreadLog::CallBack, this, std::placeholders::_1)))
{
} ThreadLog::~ThreadLog()
{ } void ThreadLog::WriteInteral( bool force_flush, time_t timestamp, const char* message, int message_len )
{
if (base_filename_selected_ && base_filename_.empty())
{
return;
} if (static_cast<int>(file_length_ >> 20) >= MaxLogSize())
{
if (file_ != NULL)
fclose(file_);
file_ = NULL;
file_length_ = bytes_since_flush_ = 0;
rollover_attempt_ = kRolloverAttemptFrequency-1;
} if (file_ == NULL)
{
//if (++rollover_attempt_ != kRolloverAttemptFrequency)
// return;
//rollover_attempt_ = 0; struct ::tm tm_time;
localtime_r(×tamp, &tm_time);
ostringstream time_pid_stream;
time_pid_stream.fill('0');
time_pid_stream << 1900+tm_time.tm_year
<< setw(2) << 1+tm_time.tm_mon
<< setw(2) << tm_time.tm_mday
<< '-'
<< setw(2) << tm_time.tm_hour
<< setw(2) << tm_time.tm_min
<< setw(2) << tm_time.tm_sec
<< '.'
<< GetCurrentThreadId();
const string& time_pid_string = time_pid_stream.str(); if (base_filename_selected_)
{
if (!CreateLogfile(time_pid_string))
{
perror("Could not create log file");
fprintf(stderr, "COULD NOT CREATE LOGFILE '%s'!\n", time_pid_string.c_str());
return;
}
}
else
{
string stripped_filename(glog_internal_namespace_::ProgramInvocationShortName());
GetHostName(&hostname);
string uidname = MyUserName();
if (uidname.empty())
uidname = "invalid-user"; stripped_filename = stripped_filename+'.'+hostname+'.'+uidname+".log."+LogSeverityNames[severity_]+'.';
const vector<string> & log_dirs = GetLoggingDirectories(); bool success = false;
for (vector<string>::const_iterator dir = log_dirs.begin();dir != log_dirs.end(); ++dir)
{
base_filename_ = *dir + "/" + stripped_filename;
if ( CreateLogfile(time_pid_string) )
{
success = true;
break;
}
} if ( success == false )
{
perror("Could not create logging file");
fprintf(stderr, "COULD NOT CREATE A LOGGINGFILE %s!",
time_pid_string.c_str());
return;
}
} ostringstream file_header_stream;
file_header_stream.fill('0');
file_header_stream << "Log file created at: "
<< 1900+tm_time.tm_year << '/'
<< setw(2) << 1+tm_time.tm_mon << '/'
<< setw(2) << tm_time.tm_mday
<< ' '
<< setw(2) << tm_time.tm_hour << ':'
<< setw(2) << tm_time.tm_min << ':'
<< setw(2) << tm_time.tm_sec << '\n'
<< "Running on machine: "
<< hostname << '\n'
<< "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu "
<< "threadid file:line] msg" << '\n';
const string& file_header_string = file_header_stream.str(); const int header_len = file_header_string.size();
fwrite(file_header_string.data(), 1, header_len, file_);
file_length_ += header_len;
bytes_since_flush_ += header_len;
} if ( !stopWriting )
{
errno = 0;
fwrite(message, 1, message_len, file_);
if ( FLAGS_stop_logging_if_full_disk && errno == ENOSPC )
{ // disk full, stop writing to disk
stopWriting = true; // until the disk is
return;
}
else
{
file_length_ += message_len;
bytes_since_flush_ += message_len;
}
}
else
{
if ( CycleClock_Now() >= next_flush_time_ )
stopWriting = true; // check to see if disk has free space.
} if ( force_flush || (bytes_since_flush_ >= 1000000) || (CycleClock_Now() >= next_flush_time_) ) {
FlushUnlocked();
#ifdef OS_LINUX
if (FLAGS_drop_log_memory) {
if (file_length_ >= logging::kPageSize) {
// don't evict the most recent page
uint32 len = file_length_ & ~(logging::kPageSize - 1);
posix_fadvise(fileno(file_), 0, len, POSIX_FADV_DONTNEED);
}
}
#endif
}
} void ThreadLog::CallBack( Buffer* pBuffer )
{
char* curData = pBuffer->m_pMsg;
bool force_flush = *(bool*)curData;
curData += sizeof(force_flush);
time_t timestamp = *(time_t*)curData;
curData += sizeof(timestamp);
int message_len = *(int*)curData;
curData += sizeof(message_len);
char* message = curData;
WriteInteral(force_flush, timestamp, message, message_len);
} }

这样搞定之后,main函数能够这样使用,就能够把自己的ThreadLog类内嵌到glog里。

#define GLOG_NO_ABBREVIATED_SEVERITIES
#include <windows.h>
#include <glog/logging.h>
#include "ThreadLog.h" using namespace google;
int main(int argc, char* argv[]) {
google::InitGoogleLogging("test/testsss");
google::base::Logger* mylogger = new google::ThreadLog;
SetLogger(google::GLOG_INFO, mylogger); google::SetLogDestination(google::GLOG_INFO, "../Debug/logtestInfo");
//google::SetLogDestination(google::GLOG_ERROR, "../Debug/logtestDebug"); int num_cookies = 0; google::SetStderrLogging(google::GLOG_INFO);
//google::SetStderrLogging(google::GLOG_ERROR);
//google::LogToStderr();
for (int i = 0; i < 1000; ++i){
LOG(INFO) << "how are " << i << " cookies";
} google::ShutdownGoogleLogging();
}

当然直接用这源代码是无法编译成功的,我改动了glog内部的源代码。

整个项目地址:git@github.com:boyxiaolong/Proejcts.git

測试还有点问题,偶尔会有乱码,并且须要优化的是那个Buffer的动态申请。

只是都是后话了。

上一篇:C++ 之 简单的五子棋AI程序


下一篇:MDEV Primer