一、环境准备:
Windows,VS2015
Mysql使用官方c语言版本客户端,mysql-connector-c-6.1.10-win32.zip,不使用c++库,因为c++库依赖boost库
https://downloads.mysql.com/archives/c-c/
库函数和头文件使用方法参见http://blog.csdn.net/libaineu2004/article/details/79607432
Redis使用开源的cpp_redis库,是c++语言实现
https://github.com/Cylix/cpp_redis v4.3.1
https://github.com/Cylix/tacopie v3.2.0
https://cylix.github.io/cpp_redis/html/classcpp__redis_1_1client.html
库函数和头文件使用方法参见http://blog.csdn.net/libaineu2004/article/details/79582185
二、完整的工程源码下载地址:https://download.csdn.net/download/libaineu2004/10300060
源码涉及线程池和连接池的实现,C++11语法,关注一下线程安全退出的方法
ThreadPool.cpp
#include "stdafx.h" #include <stdlib.h> #include <windows.h> #include "ThreadPool.h" /// CWorkerThread::CWorkerThread()//:m_thread(StartRoutine, this) //c++中类的成员对象初始化 { m_thread_idx = 0; m_task_cnt = 0; m_task_list.clear(); } CWorkerThread::~CWorkerThread() { } void CWorkerThread::Start() { //t[i] = thread(std::mem_fn(&MyClass::thread_func), Object, args..); //thread t(std::bind(&TestThread::run, this)); m_bExit = false; thread t(StartRoutine, this); t.detach(); } void* CWorkerThread::StartRoutine(void* arg) { CWorkerThread* pThread = (CWorkerThread*)arg; pThread->m_bRunning = true; pThread->Execute(); pThread->m_bRunning = false; return NULL; } void CWorkerThread::Stop(void) { m_mutex.lock(); m_bExit = true; m_task_list.clear(); m_mutex.unlock(); } void CWorkerThread::Execute() { while (true) { // put wait in while cause there can be spurious wake up (due to signal/ENITR) while (m_task_list.empty()) { if (m_bExit) { return; } std::this_thread::sleep_for(std::chrono::milliseconds(1));//等同于Sleep(1); continue; } m_mutex.lock(); CTask* pTask = m_task_list.front(); m_task_list.pop_front(); m_mutex.unlock(); pTask->run(); delete pTask; pTask = NULL; m_task_cnt++; printf("thread%d have the execute %d task\n", m_thread_idx, m_task_cnt); //printf("test thread id:%xH\n", std::this_thread::get_id());//这个返回的id不是整数,不能直接打印输出 } } void CWorkerThread::PushTask(CTask* pTask) { m_mutex.lock(); m_task_list.push_back(pTask); m_mutex.unlock(); } // CThreadPool::CThreadPool() { m_worker_size = 0; m_worker_list = NULL; } CThreadPool::~CThreadPool() { Destory(); } int CThreadPool::Init(uint32_t worker_size) { m_worker_size = worker_size; m_worker_list = new CWorkerThread[m_worker_size]; if (!m_worker_list) { return 1; } for (uint32_t i = 0; i < m_worker_size; i++) { m_worker_list[i].SetThreadIdx(i); m_worker_list[i].Start(); } return 0; } void CThreadPool::Destory() { if (m_worker_list) { for (int i = 0; i < m_worker_size; i++) { m_worker_list[i].Stop(); } for (int i = 0; i < m_worker_size; i++) { while (m_worker_list[i].IsRunning()) { Sleep(1);//等待线程安全退出 } } delete [] m_worker_list; m_worker_list = NULL; } } void CThreadPool::AddTask(CTask* pTask) { /* * select a random thread to push task * we can also select a thread that has less task to do * but that will scan the whole thread list and use thread lock to get each task size */ uint32_t thread_idx = rand() % m_worker_size; m_worker_list[thread_idx].PushTask(pTask); }
ThreadPool.h
#ifndef THREADPOOL_H_ #define THREADPOOL_H_ #include <stdio.h> #include <stdint.h> #include <string> #include <string.h> #include "Task.h" #include <list> #include <thread> #include <mutex> using namespace std; class CWorkerThread { public: CWorkerThread(); ~CWorkerThread(); static void* StartRoutine(void* arg); void Start(void); void Stop(void); void Execute(void); void PushTask(CTask* pTask); void SetThreadIdx(uint32_t idx) { m_thread_idx = idx; } bool IsRunning(void) { return m_bRunning; } private: uint32_t m_thread_idx; uint32_t m_task_cnt; //thread m_thread; mutex m_mutex; list<CTask*> m_task_list; bool m_bExit; bool m_bRunning; }; class CThreadPool { public: CThreadPool(); virtual ~CThreadPool(); int Init(uint32_t worker_size); void AddTask(CTask* pTask); void Destory(); private: uint32_t m_worker_size; CWorkerThread* m_worker_list; }; #endif /* THREADPOOL_H_ */
三、欢迎访问姊妹篇
mysql,redis客户端连接池和线程池的Linux C编程实现(★firecat推荐★)