muduo网络库base源码详细分析

1 muduo网络库的目录结构

/muduo$ tree ./ -L 2
./
├── BUILD.bazel
├── build.sh
├── ChangeLog
├── ChangeLog2
├── CMakeLists.txt
├── contrib
│?? ├── CMakeLists.txt
│?? ├── hiredis
│?? └── thrift
├── muduo
├── README
└── WORKSPACE
……

muduo库主体代码

├── muduo
│   ├── base
# base与网络无关的基础代码,包括线程库::muduo namespace 
# ::muduo::net namespace 
│   └── net
        ├── inspect
            ├── poller
            ├── http
            ├── protobuf
            ├── protorpc
# 关于网络模块和rpc的代码

base目录

chen@ecs-213609:~/muduo/muduo/base$ tree ./ -L 1
./
├── AsyncLogging.cc	# 异步的日志
├── AsyncLogging.h
├── Atomic.h		# 原子操作
├── BlockingQueue.h	# 无阻塞队列
├── BoundedBlockingQueue.h
├── BUILD.bazel
├── CMakeLists.txt
├── Condition.cc		# 条件变量
├── Condition.h
├── copyable.h			# 默认可以拷贝的类,空基类
├── CountDownLatch.cc 	# 倒计时门闩同步作用
├── CountDownLatch.h	
├── CurrentThread.cc	# 线程
├── CurrentThread.h
├── Date.cc		
├── Date.h
├── Exception.cc
├── Exception.h
├── FileUtil.cc
├── FileUtil.h
├── GzipFile.h			# 压缩文件
├── LogFile.cc			# 日志文件等
├── LogFile.h		
├── Logging.cc
├── Logging.h
├── LogStream.cc
├── LogStream.h
├── Mutex.h				# 互斥
├── noncopyable.h
├── ProcessInfo.cc
├── ProcessInfo.h
├── Singleton.h
├── StringPiece.h
├── tests
├── Thread.cc
├── Thread.h
├── ThreadLocal.h
├── ThreadLocalSingleton.h
├── ThreadPool.cc	# 线程池
├── ThreadPool.h
├── Timestamp.cc
├── Timestamp.h
├── TimeZone.cc
├── TimeZone.h
├── Types.h
└── WeakCallback.h

base目录的类

muduo网络库base源码详细分析

base都是可以提取出来直接使用的一些工具类

base/tests下是一些测试用例

执行./build.sh会在上层目录生成一个build/Debug目录

有需要可以把shell脚本给改掉

$ chmod +x build.sh # 添加可执行权限

set -x

SOURCE_DIR=`pwd` # 设置当前路径变量
BUILD_DIR=${BUILD_DIR:-./build} # 设置输出路径为 ./build
BUILD_NO_EXAMPLES=${BUILD_NO_EXAMPLES:-0}

mkdir -p $BUILD_DIR/ \
  && cd $BUILD_DIR/ \
  && cmake $SOURCE_DIR \
  && make $*
rm CMakeCache.txt
rm CMakeFiles -r 
rm cmake_install.cmake

删掉一些冗余文件和Debug目录的一些信息,只做最简单的应用输出

build/release-cpp11/bin下会生成可执行文件

tree ./ -L 1
./
├── CMakeCache.txt
├── CMakeFiles
├── cmake_install.cmake
├── dep.dot
├── dep.dot.muduo_base
├── dep.dot.muduo_base.dependers
├── lib
├── Makefile
└── muduo

单独编译Timestamp.cc会生成muduo_base静态库

set(base_SRCS
  Timestamp.cc
  )

add_library(muduo_base ${base_SRCS})
target_link_libraries(muduo_base pthread rt)

install(TARGETS muduo_base DESTINATION lib)
file(GLOB HEADERS "*.h")
install(FILES ${HEADERS} DESTINATION include/muduo/base)

2 Timestamp时间处理

copyable.h //默认可以拷贝的类,空基类,值语义

#ifndef MUDUO_BASE_COPYABLE_H
#define MUDUO_BASE_COPYABLE_H
namespace muduo {
class copyable {
 protected:
  copyable() = default;
  ~copyable() = default;
};
} 
#endif  // MUDUO_BASE_COPYABLE_H                    

比如 Timestamp 类 处理时间的

class Timestamp : public muduo::copyable,
                  public boost::less_than_comparable<Timestamp>
{
   inline bool operator<(Timestamp lhs, Timestamp rhs)
    {
      return lhs.microSecondsSinceEpoch() < rhs.microSecondsSinceEpoch();
    }                   
}

继承了该类,要求实现了<,可以自动实现>,<=,>= 的运算符重载,模板元编程的思想。

static const int muduo::Timestamp::microSecondsSinceEpoch_ 
priavte : int64_t muduo::Timestamp::kMicroSecondsPerSecond 
    
    
muduo::Timestamp::valid muduo::Timestamp::toString()const
muduo::Timestamp::toFormattedString() const
muduo::Timestamp::Timestamp muduo::Timestamp::Timestamp()
muduo::Timestamp::swap muduo::Timestamp::secondsSinceEpoch()const
muduo::Timestamp::now()
muduo::Timestamp(int64_t microSecondsSinceEpoch)
muduo::Timestamp::invalid()
void swap(Timestamp& that)
  {
    std::swap(microSecondsSinceEpoch_, that.microSecondsSinceEpoch_);
  }
// 用于交换两个时间戳,&引用传递
inline double timeDifference(Timestamp high, Timestamp low)
// 返回时间戳的微秒数
    
inline Timestamp addTime(Timestamp timestamp, double seconds)
// 两个时间戳相加
Timestamp Timestamp::now()
// 距离1970年的微秒数
string Timestamp::toString() const
{
      // 乘以100W,得到微秒数,用一个结构体tm_time来获取当前距离1970年经过的秒数
      char buf[32] = {0};
      time_t seconds = static_cast<time_t>(microSecondsSinceEpoch_ / kMicroSecondsPerSecond);
      int microseconds = static_cast<int>(microSecondsSinceEpoch_ % kMicroSecondsPerSecond);
      struct tm tm_time;
      gmtime_r(&seconds, &tm_time); // 线程安全函数

      snprintf(buf, sizeof(buf), "%4d%02d%02d %02d:%02d:%02d.%06d",
          tm_time.tm_year + 1900, tm_time.tm_mon + 1, tm_time.tm_mday,
          tm_time.tm_hour, tm_time.tm_min, tm_time.tm_sec,
          microseconds);
    // 拼接到buf里面,获取当前的年月日,时分秒,微妙
  return buf;

}

toString()

string Timestamp::toString() const
{
  // int64_t PRId64用来表示64位整数,跨平台的打印法是PRld64
  char buf[32] = {0};
  int64_t seconds = microSecondsSinceEpoch_ / kMicroSecondsPerSecond;
  int64_t microseconds = microSecondsSinceEpoch_ % kMicroSecondsPerSecond;
  snprintf(buf, sizeof(buf)-1, "%"  ".%06" PRId64 "", seconds, microseconds);
  return buf;
}

Timestamp_unittest.cc

#include <muduo/base/Timestamp.h>
#include <vector>
#include <stdio.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <ctype.h>
using muduo::Timestamp;

void passByConstReference(const Timestamp& x)
{
  printf("%s\n", x.toString().c_str());
}
/**
 * @brief 把微妙以跨平台得方式打印出来
 * 
 * @param x 
 */
void passByValue(Timestamp x)
{
    int a = 0;
    printf("%s\n", x.toString().c_str());
}

void benchmark()
{
  const int kNumber = 1000*1000;

  std::vector<Timestamp> stamps;
  stamps.reserve(kNumber);  // 预分配空间,100万个对象空间
  for (int i = 0; i < kNumber; ++i)
  {
    // 一百万个时间对象,now静态函数 系统调用计算微妙的
    stamps.push_back(Timestamp::now());
  }
  printf("%s\n", stamps.front().toString().c_str());
  printf("%s\n", stamps.back().toString().c_str());
  // 计算一百万次的时间差
  printf("%f\n", timeDifference(stamps.back(), stamps.front()));

  int increments[100] = { 0 };
  int64_t start = stamps.front().microSecondsSinceEpoch(); // 第一个时间的微秒数
  for (int i = 1; i < kNumber; ++i)
  {
    int64_t next = stamps[i].microSecondsSinceEpoch();    // 相近两个时间的时间差
    int64_t inc = next - start;
    start = next;
    if (inc < 0)          
    {
      // 时间逆转了,一般不可能出现这种问题
      printf("reverse!\n");
    }
    else if (inc < 100)
    {
      ++increments[inc]; // 有几个时间差是小于100
    }
    else
    {
      printf("big gap %d\n", static_cast<int>(inc));
      // 大于一百微妙的时间差 
    }
  }

  for (int i = 0; i < 100; ++i)
  {
    printf("%2d: %d\n", i, increments[i]);
  }
}

int main()
{
  Timestamp now(Timestamp::now());
  printf("%s\n", now.toString().c_str()); // 输出当前时间
  passByValue(now);
  passByConstReference(now);
  benchmark(); // 度量时间的函数
}

在包含的type.h里面,提供了两个类型转换的函数

隐式转换和向下转换

template<typename To, typename From>
inline To implicit_cast(From const &f) {
  return f;
}
template<typename To, typename From>     
inline To down_cast(From* f) {
     if (false) {
    implicit_cast<From*, To>(0);
  }
#if !defined(NDEBUG) && !defined(GOOGLE_PROTOBUF_NO_RTTI) // 判断运行时类型识别才能转型RTTI
  assert(f == NULL || dynamic_cast<To>(f) != NULL);  // c++类型转换,专门父类转子类的运算符,前提是基类指针指向派生类对象才能
#endif
  return static_cast<To>(f);
}

3 GCC的CAS

GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, …)

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, …)

AtomicIntegerT一个整数得原子性操作

volatile T value_ 系统总是重新从它所在得内存读取数据,而不是使用保存在寄存器中得备份。即使他前一条指令刚刚读取过数据被保存。防止编译器对该操作优化,需要每次精准读值。

T get()
  {
    return __sync_val_compare_and_swap(&value_, 0, 0);
    // 比较一下当前value_得值是否为0,返回value_修改之前得值
  }
T getAndAdd(T x)
  {
    return __sync_fetch_and_add(&value_, x);
    // 返回没有修改得值,再把value_+x
  }
T incrementAndGet()
  {
    return addAndGet(1);
	// 自加+  
  }
T getAndSet(T newValue)
  {
    return __sync_lock_test_and_set(&value_, newValue);
    // 返回原来得值,再获取新的值
  }
typedef detail::AtomicIntegerT<int32_t> AtomicInt32;
typedef detail::AtomicIntegerT<int64_t> AtomicInt64;
// 32 64位整数

4 Exception 类实现

用于保存栈帧地址

Exception::Exception(const char* msg)
  : message_(msg)
{
  fillStackTrace();
}
void Exception::fillStackTrace()
{
  const int len = 200;
  void* buffer[len];
  int nptrs = ::backtrace(buffer, len); // man 3 是个把调用信息返回的函数,当前程序活动中的程序调用,返回到buffer里面
  char** strings = ::backtrace_symbols(buffer, nptrs);
    //对应返回task frame的解析,将地址转换成函数符号,二级指针,指向了一个指针数组
  if (strings)
  {
    for (int i = 0; i < nptrs; ++i)
    {
      
      stack_.append(strings[i]);
      stack_.push_back('\n');
    }
    free(strings);
  }
}

Exception_test.cc使用实例

能够把异常的调用堆栈打印出来

int main()
{
  try
  {
    foo();
  }
  catch (const muduo::Exception& ex)
  {
    printf("reason: %s\n", ex.what());
    printf("stack trace: %s\n", ex.stackTrace());
  }
}

5 Thread类的一些细节

thread_test.cc

int main()
{
  printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());
}
1 打印当前工作线程的时候

CurrentThread::tid()

// ……
extern __thread int t_cachedTid; // 线程局部存储
// ……
inline int tid()
  {
    if (t_cachedTid == 0)
    {
      cacheTid(); // 如果已经获取过缓存就不会再获取,而是直接返回,减少系统调用
    }
    return t_cachedTid;
  }
void CurrentThread::cacheTid()
{
  if (t_cachedTid == 0)
  {
    t_cachedTid = detail::gettid();
    int n = snprintf(t_tidString, sizeof t_tidString, "%5d ", t_cachedTid);
    assert(n == 6); (void) n; 
      // 由于n是编译时断言,加上(void),防止release verison因为该变量没有使用而出现警告
  }
}

pid_t gettid()

pid_t gettid()
{
  return static_cast<pid_t>(::syscall(SYS_gettid));
    // 调用系统调用获取真实的tid
}

由于用的是2013年提交的,现在似乎编译已经不支持在命名空间这样声明了,我自己改了一下,怎么改都是错的,只好在自己项目里改成了静态变量。


	/**
 * @brief 
 * @param __thread 是gcc下的线程局部存储,如果用__修饰,每个线程的私有的全局变量 
 */
	namespace CurrentThread
	{
		// __thread修饰的变量是线程局部存储的。
		__thread int t_cachedTid = 0;							 // 线程真实pid(tid)的缓存,如果每次都用系统调用去获取pid,效率较低
																 // 是为了减少::syscall(SYS_gettid)系统调用的次数
																 // 提高获取tid的效率
		__thread char t_tidString[32];							 // 这是tid的字符串表示形式
		__thread const char *t_threadName = "unknown";			 // 线程名称
		const bool sameType = boost::is_same<int, pid_t>::value; // 表示是否相同类型
		BOOST_STATIC_ASSERT(sameType);							 // 编译断言,判断类型
	}

修改后的

namespace muduo
{
    namespace CurrentThread
    {
        // internal
        static __thread int t_cachedTid;
        inline int tid()
        {
            if (t_cachedTid == 0)
                t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
            return t_cachedTid;
        }
    }
}
2 需要注意一下ThreadNameInitializer该类的初始化在

namespace muduo里面,意味着他的执行要在main函数之前。

class ThreadNameInitializer
{
 public:
  ThreadNameInitializer()
  {
    muduo::CurrentThread::t_threadName = "main";
    CurrentThread::tid();
    pthread_atfork(NULL, NULL, &afterFork);
  }
};
ThreadNameInitializer init;

pthread_atfork三个函数指针

内部创建子进程前父进程会调用参数1,创建后,父进程调用参数2,子进程调用参数3

3 示例代码
#include <stdio.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/wait.h>
void prepare(void)
{
	printf("pid = %d prepare ...\n", static_cast<int>(getpid()));
}

void parent(void)
{
	printf("pid = %d parent ...\n", static_cast<int>(getpid()));
}

void child(void)
{
	printf("pid = %d child ...\n", static_cast<int>(getpid()));
}


int main(void)
{
	printf("pid = %d Entering main ...\n", static_cast<int>(getpid()));

	pthread_atfork(prepare, parent, child); // 内部创建子进程前父进程会调用prepare ,创建后,父进程调用Parent,子进程调用child
	int pid;
	int c_pid;   
	if((pid =fork()))
	{
		 c_pid = pid;
		// 父进程
		int staus;
		if ((pid = wait(&staus)) != -1 && pid == c_pid)
		{
			printf("回收子进程\n");
			fflush(stdin);
		}
	}
	printf("pid = %d Exiting main ...\n",static_cast<int>(getpid()));
}

运行结果

pid = 23777 Entering main …
pid = 23777 prepare …
pid = 23777 parent …
pid = 23778 child …
pid = 23778 Exiting main …
回收子进程
pid = 23777 Exiting main …

6 MutexLock和MutexLockGuard

他们属于一种关联关系,MutexLockGuard不负责MutexLock的生命周期,只负责帮他解锁

class MutexLock : boost::noncopyable{/*……*/};
class MutexLockGuard : boost::noncopyable
{
  public:
		explicit MutexLockGuard(MutexLock &mutex)
			: mutex_(mutex)
		{
			mutex_.lock();
		}
		~MutexLockGuard()
		{
			mutex_.unlock();
		}

	private:
		MutexLock &mutex_;  
};
#define MutexLockGuard(x) error "Missing guard object name"

作者还在最后一句,设置了宏不准用临时对象来拥有锁,防止滥用。

1 示例代码

关于加锁的效率问题

MutexLock g_mutex;
vector<int> g_vec;
const int kCount = 10 * 1000 * 1000; // 全局常量1000w
void threadFunc()
{
	for (int i = 0; i < kCount; ++i)
	{
		MutexLockGuard lock(g_mutex);
		g_vec.push_back(i);
	}
}

int main()
{
	const int kMaxThreads = 8;			 // 最多八个线程
	g_vec.reserve(kMaxThreads * kCount); // 预留八千万个空间
	
	Timestamp start(Timestamp::now());   // 获取时间
	for (int i = 0; i < kCount; ++i)
	{
		g_vec.push_back(i);
	}

	printf("single thread without lock %f\n", timeDifference(Timestamp::now(), start)); // 两个时间戳相减
	
	start = Timestamp::now();
	threadFunc();
	printf("single thread with lock %f\n", timeDifference(Timestamp::now(), start));	// 计算加锁之后的时间

	for (int nthreads = 1; nthreads < kMaxThreads; ++nthreads)
	{
		boost::ptr_vector<Thread> threads; // ptr_vector在销毁的时候,能够释放Thread对象
		g_vec.clear();
		start = Timestamp::now();
		for (int i = 0; i < nthreads; ++i)
		{
			threads.push_back(new Thread(&threadFunc));
			threads.back().start();
		}
		for (int i = 0; i < nthreads; ++i)
		{
			threads[i].join();
		}
		printf("%d thread(s) with lock %f\n", nthreads, timeDifference(Timestamp::now(), start));
	}
}
2 条件变量

该类即可以用于所有子线程等待主线程发起start也可以用于等待子线程初始化完毕才工作

为了方便展示设置doxygen

$sudo apt install graphviz   # 用于生成代码关系图 
$sudo apt install doxygen
$ cd CODE_DIR
$ doxygen -g Doxygen.config   # 生成配置文件 
$ vim Doxygen.config          # 修改配置文件

RECURSIVE              = YES 
$ doxygen Doxygen.config      # 根据代码生成文档

设置config显示私有和静态变量

muduo::CountDownLatch类 参考

类 muduo::CountDownLatch 继承关系图:

muduo::CountDownLatch 的协作图:

muduo网络库base源码详细分析

[图例]

Public 成员函数
CountDownLatch (int count)
void wait ()
void countDown ()
int getCount () const
Private 属性
MutexLock mutex_ // 互斥量
Condition condition_ // 条件变量
int count_ // 引用计数

该类的文档由以下文件生成:

class CountDownLatch : boost::noncopyable
{
 public:
  explicit CountDownLatch(int count);
  void wait();
  void countDown();
  int getCount() const;
 private:
  mutable MutexLock mutex_; // 变化的
  Condition condition_; // 一个条件变量的类是一个聚合关系,控制着他的生命周期
  int count_; // 
};
~Condition()
  {
    // 条件变量的虚析构函数,会检测释放条件变量
    MCHECK(pthread_cond_destroy(&pcond_));
  }

防止虚假唤醒的标准写法

void CountDownLatch::wait()
{
  MutexLockGuard lock(mutex_);
  while (count_ > 0) {
    condition_.wait();
  }
}

又用到了MutexLockGuard类的使用,可以自动加锁解锁

int CountDownLatch::getCount() const
{
  MutexLockGuard lock(mutex_); // const成员函数,因为前面声明了mutable关键字,因为需要在内部改变它的状态
  return count_; // 
}

7 有界队列和*队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vhTyevAO-1641279265269)(https://gitee.com/ak47chen111/cloud-library.git/https://gitee.com/ak47chen111/cloud-library.git/image-20220104145403436.png)]

Public 成员函数
void put (const T &x)
T take ()
size_t size () const
Private 属性
MutexLock mutex_
Condition notEmpty_
std::deque< T > queue_

该类的文档由以下文件生成:

取出元素

T take()
  {
    MutexLockGuard lock(mutex_);
    while (queue_.empty())   // 防止虚假唤醒的标准写法,先加锁,然后陷入等待条件变量的激活
    {
      notEmpty_.wait();
    }
    assert(!queue_.empty()); // 断言队列不为空
    T front(queue_.front()); // 取出第一个元素
    queue_.pop_front();
    return front;
  }
1 示例代码

统计入队出队耗时出现的次数

class Bench
{
public:
	Bench(int numThreads)
		: latch_(numThreads),
		  threads_(numThreads)
	{
		for (int i = 0; i < numThreads; ++i)
		{
			char name[32];
			snprintf(name, sizeof name, "work thread %d", i);
			threads_.push_back(new muduo::Thread(
				boost::bind(&Bench::threadFunc, this), muduo::string(name)));
		}
		for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
	}

	void run(int times)
	{
		printf("waiting for count down latch\n");
		latch_.wait();
		printf("all threads started\n");
		for (int i = 0; i < times; ++i)
		{
			muduo::Timestamp now(muduo::Timestamp::now());
			queue_.put(now);
			usleep(1000);
			// 生产产品的时间戳
		}
	}

	void joinAll()
	{
		for (size_t i = 0; i < threads_.size(); ++i)
		{
			queue_.put(muduo::Timestamp::invalid()); // 给每个线程生产一个非法时间,用于结束
		}

		for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
	}

private:
	void threadFunc()
	{
		printf("tid=%d, %s started\n",
			   muduo::CurrentThread::tid(),
			   muduo::CurrentThread::name());

		std::map<int, int> delays;
		latch_.countDown();
		bool running = true;
		while (running)
		{
			muduo::Timestamp t(queue_.take());  // 从队列中取出存放时间的元素
			muduo::Timestamp now(muduo::Timestamp::now());
			if (t.valid()) // 判断时间是否合法
			{
				int delay = static_cast<int>(timeDifference(now, t) * 1000000);
				++delays[delay]; // 统计时间片出现的次数
			}
			running = t.valid(); // 是个非法时间才能跳出循环
		}

		printf("tid=%d, %s stopped\n",
			   muduo::CurrentThread::tid(),
			   muduo::CurrentThread::name());
		for (std::map<int, int>::iterator it = delays.begin();
			 it != delays.end(); ++it)
		{
			printf("tid = %d, delay = %d, count = %d\n",
				   muduo::CurrentThread::tid(),
				   it->first, it->second);
		}
	}

	muduo::BlockingQueue<muduo::Timestamp> queue_; // 队列
	muduo::CountDownLatch latch_;			   //  倒计时
	boost::ptr_vector<muduo::Thread> threads_; // 线程容器
};
int main(int argc, char *argv[])
{
	int threads = argc > 1 ? atoi(argv[1]) : 1;
	
	Bench t(threads);
	t.run(10000); // 生产10000个产品
	t.joinAll();  // 主线程生产一个非法时间,用于所有线程结束
}

8 线程池对象

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aSKAxx7I-1641279265270)(https://gitee.com/ak47chen111/cloud-library.git/https://gitee.com/ak47chen111/cloud-library.git/image-20220104114947374.png)]

Public 类型
typedef boost::function< void()> Task
注册一个void类型的函数,相当于函数指针
Public 成员函数
ThreadPool (const string &name=string())
void start (int numThreads)
void stop ()
void run (const Task &f)
Private 成员函数
void runInThread ()
Task take ()
Private 属性
MutexLock mutex_
Condition cond_
string name_
boost::ptr_vector< muduo::Thread > threads_
std::deque< Task > queue_
bool running_

该类的文档由以下文件生成:

boost::ptr_vector<muduo::Thread> threads_;

// 线程对象的容器,这里之所以是muduo::Thread而不是指针 muduo::Thread*

// 可能是作者不想设置分离属性,而是让线程池对象把持线程对象的生命周期

1 重点是start函数
void ThreadPool::start(int numThreads)
{
    assert(threads_.empty());       // 断言线程池是否为空
    running_ = true;
    threads_.reserve(numThreads);   // 预留空间
    for (int i = 0; i < numThreads; ++i)
    {
        char id[32];
        snprintf(id, sizeof id, "%d", i);
        threads_.push_back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_ + id)); 
        					 // 创建线程对象,并且设置线程的名字
        threads_[i].start(); // 线程对象创建线程
    }
}

runInThread是启动线程时分配的任务

void ThreadPool::runInThread()
{
    try
    {

        while (running_)
        {
            Task task(take()); // ThreadPool::Task ThreadPool::take()
            if (task)
            {
                task();
            }
        }
    }
	// ……
}

threads_.push_back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_ + id));

threads_[i].start();是核心关键,尤为精髓,是muduo线程池的妙笔

再回头看Thread类的细节

muduo网络库base源码详细分析

删除掉一些冗余的信息

Public 类型
typedef boost::function< void()> ThreadFunc
Private 属性
bool started_
bool joined_
pthread_t pthreadId_
boost::shared_ptr< pid_t > tid_
ThreadFunc func_
string name_

该类的文档由以下文件生成:

ThreadFunc注册了一个类型,是void返回值的在该类的构造函数时就需要初始化了

Thread::Thread(const ThreadFunc &func, const string &n)
    : started_(false),
      pthreadId_(0),
      tid_(0),
      func_(func),
      name_(n)
{
    numCreated_.increment();
}

调用方法

muduo::Thread *th = new muduo::Thread(boost::bind(&ThreadPool::runInThread,this),"test001"+id);

该线程类的func由ThreadPool去赋予任务,可以绑定不同的任务,普通函数,成员函数,静态成员函数

threads_.push_back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_ + id));就是创建线程对象的时候赋予不同的任务类型。比如读写分离,读的线程专门去干读的任务,写的线程专门去干写的事儿,涉及到数据库操作的可以统一放到db类型的任务中去。

线程启动start

 assert(!started_);
    started_ = true;
    errno = pthread_create(&pthreadId_, NULL, &startThread, this);
    if (errno != 0)
    {
        //LOG_SYSFATAL << "Failed in pthread_create";
    }
2 实例代码
#include <muduo/base/ThreadPool.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/CurrentThread.h>

#include <boost/bind.hpp>
#include <stdio.h>

void print()
{
	printf("tid=%d\n", muduo::CurrentThread::tid());
}

void printString(const std::string &str)
{
	printf("tid=%d, str=%s\n", muduo::CurrentThread::tid(), str.c_str());
}

int main()
{
	muduo::ThreadPool pool("MainThreadPool");  // 创建线程池
	pool.start(5);

	pool.run(print);	// 添加了两个print普通成员函数任务运行
	pool.run(print);
	for (int i = 0; i < 100; ++i)
	{
		char buf[32];
		snprintf(buf, sizeof buf, "task %d", i);
		pool.run(boost::bind(printString, std::string(buf)));
		// 添加一百个普通的函数
	}

	muduo::CountDownLatch latch(1);
	pool.run(boost::bind(&muduo::CountDownLatch::countDown, &latch));
	// 添加一个成员函数需要添加this指针
	latch.wait();
	pool.stop();
}

pool.run

如果线程队列为空,那么就直接执行任务,否则就往任务队列里面塞任务进去。

void ThreadPool::run(const Task &task)
{
    if (threads_.empty())
    {
        task(); 
    }
    else
    {
        MutexLockGuard lock(mutex_);
        queue_.push_back(task);
        cond_.notify();
    }
}

#include <muduo/base/ThreadPool.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/CurrentThread.h>

#include <boost/bind.hpp>
#include <stdio.h>

void print()
{
	printf("tid=%d\n", muduo::CurrentThread::tid());
}

void printString(const std::string &str)
{
	printf("tid=%d, str=%s\n", muduo::CurrentThread::tid(), str.c_str());
}

int main()
{
	muduo::ThreadPool pool("MainThreadPool");  // 创建线程池
	pool.start(5);

	pool.run(print);	// 添加了两个print普通成员函数任务运行
	pool.run(print);
	for (int i = 0; i < 100; ++i)
	{
		char buf[32];
		snprintf(buf, sizeof buf, "task %d", i);
		pool.run(boost::bind(printString, std::string(buf)));
		// 添加一百个普通的函数
	}

	muduo::CountDownLatch latch(1);
	pool.run(boost::bind(&muduo::CountDownLatch::countDown, &latch));
	// 添加一个成员函数需要添加this指针
	latch.wait();
	pool.stop();
}

pool.run

如果线程队列为空,那么就直接执行任务,否则就往任务队列里面塞任务进去。

void ThreadPool::run(const Task &task)
{
    if (threads_.empty())
    {
        task(); 
    }
    else
    {
        MutexLockGuard lock(mutex_);
        queue_.push_back(task);
        cond_.notify();
    }
}
上一篇:一段将多张图片生成AVI视频的C++源码


下一篇:Ubuntu Server Arm64安装Arm32运行环境