项目:使用线程池实现大目录拷贝
创建一个线程的综合资源,让需要执行的任务挂载在线程池中。如果线程池有空闲的线程,就可以安排线程去执行任务,如果线程池没有空闲的线程,就安排任务等待,直到有线程空闲出来。
线程池步骤:
- 初始化线程池资源
- 向线程池中加入线程
- 向线程池中添加任务 ---如果有空闲的线程,空闲线程可以自动获取任务并运行
- 销毁线程池。
源码分享:thread_pool.c
/*
设计思路:
1、预处理板块:构建任务结点结构体、线程池结构体,文件路径结构体
2、void handler(void *arg) //解除死锁,防止死锁
3、void *routine(void *arg)//提取调用任务链表的任务结点
4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg) //添加任务
7、void *copyfile(void * arg) //复制文件函数
8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
9、bool destroy_pool(thread_pool *pool) //摧毁线程池
main()
{
if-判断输入命令是否规范规范
初始化线程池
创建对象文件,存储源文件和复制文件路径
判断文件属性,执行拷贝操作
销毁线程池
}
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>
//********************预处理板块**************************
#define MAX_WAITING_TASKS 1000 //等待任务最大数目
#define MAX_ACTIVE_THREADS 200 //最大线程数
//任务结构体
typedef struct task
{
void *(*task)(void *arg);//定义任务函数指针以及参数
void *arg; //函数参数(指针传参)
struct task *next; //结点内当然有指向下一结点的指针
/*
任务链表的结点内包含三个元素:
1、函数指针,指向任务函数(属于data)
2、函数所对应的参数,这里用指针传参
3、结点内指向下一结点的指针 next
*/
}task;
typedef struct thread_pool
{
/* data */
pthread_mutex_t lock; //互斥锁
pthread_cond_t cond; //条件变量 跟互斥锁是搭配使用
task *task_list; //一个任务节点
pthread_t *tids; //线程号指针变量
unsigned waiting_tasks; //等待任务
unsigned active_threads; //执行线程
bool shutdown; //一个线程池销毁开关
}thread_pool;
typedef struct file_path
{
/* 用于存储文件路径的结构体 */
char source_file[4096]; //字符数组,用于存储源文件路径
char copy_file[4096]; //字符数组,用于存放复制后的文件路径
}file_path;
//********************预处理板块**************************
//解除死锁,防止死锁 handler函数会被routine调用
void handler(void *arg)
{
pthread_mutex_unlock((pthread_mutex_t *)arg);
}
//提取调用任务链表的任务结点
void *routine(void *arg) //这里的arg是task结构体的一项参数
{
thread_pool *pool = (thread_pool *)arg; //
task *p; //定义一个任务结点
while (1)
{
/* code */
pthread_cleanup_push(handler, (void *)&pool->lock);
//调用hand函数,防止死锁
//做一个判断,当没有任务,并且pool没有销毁时,执行休眠操作
if (pool->waiting_tasks == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->cond, &pool->lock); //休眠
}
//判断是否需要要关闭线程
if(pool->waiting_tasks == 0 && pool->shutdown == true)
{
pthread_mutex_unlock(&pool->lock); //先执行解锁操作
pthread_exit(NULL); //退出线程
}
p = pool->task_list->next; //从线程池中任务链表取出(复制)一个任务结点给P
pool->task_list->next = p->next; //然后被取出的任务被其他任务覆盖
pool->waiting_tasks--; //线程池中等待的任务减少一项
pthread_mutex_unlock(&pool->lock); //一次任务取出成功,解除互斥锁资源,方便下一次调用任务结点使用
pthread_cleanup_pop(0); //
//注意!
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //强制性的阻塞 任何进程都不能取消,我想老老实实把这个线程完成,其他的都先排队
(p->task)(p->arg);
//他运行的是add_task( pool, copfile, tmpfile);
//传过来的参数
//p->task 等于copyfile
//p->arg 等于 tmpfile
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);//关闭强制阻塞
free(p->arg); //释放在检索目录时 在内存开辟的空间
free(p); //释放掉完成任务的节点
}
pthread_exit(NULL);
//注意!没有这一段,copyfile无法执行
}
//初始化线程池
bool pool_init(thread_pool *pool, unsigned int threads_number)
{
/* 线程池结构体元素
pthread_mutex_t lock; //互斥锁
pthread_cond_t cond; //条件变量 跟互斥锁是搭配使用
task *task_list; //一个任务节点
pthread_t *tids; //线程号指针变量
unsigned waiting_tasks; //等待任务 任务数量
unsigned active_threads; //执行线程 线程数量
bool shutdown; //一个线程池销毁开关
*/
pthread_mutex_init(&pool->lock,NULL); //线程池互斥锁初始化,这是个初始化函数
pthread_cond_init(&pool->cond,NULL); //线程池条件变量初始化,这是个初始化函数
//先分配内存
pool->shutdown = false; //线程池销毁为假,即不销毁(刚申请一个线程池,初始化肯定不能销毁呀!)
pool->task_list = malloc(sizeof(task)); //线程池中的任务链表初始化分配内存空间
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//给进程分配空间
if (pool->task_list == NULL || pool->tids == NULL)
{
perror("任务链表 或 线程分配 内存失败!");
return false;
}
//在把数量清零
pool->task_list->next = NULL; //只有一个节点,还未连城任务链表
pool->waiting_tasks = 0; //暂时没有等待中的任务,要等后面添加任务结点
pool->active_threads = threads_number; //thread_number是参数,是传入的线程数量,赋值过来
int i;
for (i = 0; i < pool->active_threads; i++)
{
if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0)
//返回值为0表示创建成功,返回不为0表示创建失败
{
perror("线程创建失败");//根据传入的线程数,逐一创建线程,存入tid[]
return false;
}
}
return true;
}
//添加线程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
if(additional_threads == 0) //传入添加的线程数
{
return 0; //没有添加线程,啥都干不了,只能退出
}
unsigned total_threads = pool->active_threads + additional_threads;
//计算先从总数 = 线程池中的线程 + 新添加的线程
int i, actual_increment = 0;
for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
{
if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0)
{
perror("添加线程失败!");
if(actual_increment == 0)
{
return -1;
}
break; //创建失败的话,直接break,不再执行后面的++
}//创建失败
actual_increment++; //实际上添加成功的线程数
}
pool->active_threads += actual_increment;
return actual_increment;
}
//添加任务
bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)
{
// pool->task_list = malloc(sizeof(task)); //之前初始化的是线程池中的链表结点内存
struct task *new_task = malloc(sizeof(task)); // 任务结点new_task分配内存
if(new_task == NULL)
{
perror("任务结点分配内存失败!");
return false;
}
//初始化结点参数
new_task->task = task;
new_task->arg = arg;
new_task->next = NULL;
pthread_mutex_lock(&pool->lock); //上锁
if(pool->waiting_tasks >= MAX_WAITING_TASKS) //判断任务是否超出上限,必要的操作
{
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "任务过多!\n");
free(new_task); //释放任务空间
return false;
}
struct task *tmp = pool->task_list;
while(tmp->next != NULL) //从任务结点取到不为空,就是取到任务
{
tmp = tmp->next; //把这个任务取出来,拿到tmp上
}
tmp->next = new_task;
pool->waiting_tasks++;
pthread_mutex_unlock(&pool->lock); //释放锁资源
pthread_cond_signal(&pool->cond); //唤醒一个休眠的线程
return true;
}
//复制文件函数
void *copyfile(void * arg)
{
/*
把要打开的文件路径放入source_file变量下(路径)
打开源文件--------可读 判断打开是否成功
创建目标文件,即复制文件 判断打开是否结束
利用read函数和write函数,实现文件读写
关闭打开的两个文件
*/
file_path *dofile = (file_path *)arg; //arg是文件参数
struct stat file_stat; //这个结构体来自#include <sys/stat.h>
stat(dofile->source_file, &file_stat);
//通过文件名 获取文件的路径把他存放到结构体的sourc_file里面
int source_fd,copy_fd;
source_fd = open(dofile->source_file,O_RDONLY);//打开源路径,只读模式
if(source_fd == -1 )
{
printf("打开文件 %s\n 失败!\n",dofile->source_file);
//打开文件失败
return NULL;
}
copy_fd = open(dofile->copy_file,O_CREAT | O_TRUNC | O_RDWR,file_stat.st_mode);
if( copy_fd == -1)
{
printf("打开文件 %s 失败!\n",dofile->copy_file);
return NULL;
}
int nread;
char buf[100];
while((nread = read(source_fd,buf,100)) > 0) //读取源文件的内容
{
if( write(copy_fd,buf,nread) == -1) //把读到的全部写进目标文件
{
break;
}
}
printf("读取文件: %s\n",dofile->source_file);
printf("写入文件: %s\n",dofile->copy_file);
close(source_fd); //关闭源文件
close(copy_fd); //关闭复制文件
return NULL;
}
//复制目录函数
int copydir( file_path *dofile,thread_pool *pool)
{
struct stat file_stat;
stat(dofile->source_file,&file_stat); //获取文件的属性
mkdir(dofile->copy_file,file_stat.st_mode); //以源目录的类型和目录来创建一个目录
DIR *sourcedir = opendir(dofile->source_file); //打开源目录
struct dirent *dp;
while( (dp = readdir(sourcedir))!=NULL ) //获取文件夹内文件的信息
{
if(dp->d_name[0] == '.') //如果文件为. 或者 .. 则跳过
{
continue;
}
//对本目录下的所有文件进行拷贝操作
file_path *tmpfile = malloc(sizeof(file_path)); //为文件结构体开辟内存空间
memset(tmpfile,0,sizeof(file_path)); //对内存清零
sprintf(tmpfile->source_file,"%s/%s",dofile->source_file,dp->d_name);//拼凑源文件路径
sprintf(tmpfile->copy_file,"%s/%s",dofile->copy_file,dp->d_name);//拼凑目标文件路径
struct stat tmpstat;
stat(tmpfile->source_file,&tmpstat);
if(S_ISREG(tmpstat.st_mode)) //如果为普通文件,则拷贝
{
printf("tmpfile->source_file = %s\n",tmpfile->source_file);
printf("tmpfile->copy_file = %s\n", tmpfile->copy_file);
printf("\n");
add_task( pool, copyfile, tmpfile); //把复制的任务丢到任务链表
}
else if(S_ISDIR(tmpstat.st_mode))//如果为目录,则递归
{
copydir(tmpfile,pool);
}
}
return 0;
}
//摧毁线程池
bool destroy_pool(thread_pool *pool)
{
pool->shutdown = true;
pthread_cond_broadcast(&pool->cond);
sleep(1);//等待空闲线程响应完毕
int i;
for(i=0; i<pool->active_threads; i++)
{
errno = pthread_join(pool->tids[i], NULL);
if(errno != 0)
{
printf("copy tids[%d] error: %s\n",i, strerror(errno));
}
else
printf("[%u] is copyed\n", (unsigned)pool->tids[i]);
}
free(pool->task_list);
free(pool->tids);
free(pool);
return true;
}
//函数1:初始化线程池 pool_init(pool,100);
int main(int argc, char const *argv[])
{
//第一步: if-判断输入命令是否规范规范
if(argc != 3)
{
printf("你的命令格式错误!\n");
printf("请按右侧格式run: ./%s 源文件路径 生成文件路径 \n",argv[0]);
return -1; //命令异常退出
}
//第二步: 初始化线程池
thread_pool *pool = malloc(sizeof(thread_pool));//定义线程池的poor指针,初始化内存空间
/*调用线程初始化函数------pool_init()*/
pool_init(pool,100); //调用函数初始化线程,分配内存(初始化线程函数会调用routine)
//第三步: 创建对象文件,存储源文件和复制文件路径
file_path dofile; //创建一个路径实例dofile,存储源文件和复制文件的路径
strcpy(dofile.source_file,argv[1]); //strcpy是函数 #include <string.h>
strcpy(dofile.copy_file,argv[2]); // char *strcpy(char *dest, const char *src);
struct stat copestat;//这个结构体来自#include <sys/stat.h>
stat(dofile.source_file,&copestat); //stat是函数,将dofile.source_file的信息,获取到&copestat
/*
#include<sys/stat.h>
int stat(const char *restrict pathname,struct stat *restrict buf);
int fstat(int fields,struct stat *buf);
int lstat(const char *restrict pathname,struct stat *restrict buf);
*/
//第四步: 判断文件属性,执行拷贝操作
if(S_ISREG(copestat.st_mode))//如果为普通文件,则拷贝
{
copyfile(&dofile); //直接拷贝,不用添加到任务链表
}
else if(S_ISDIR(copestat.st_mode))//如果为目录,则递归
{
copydir(&dofile,pool);
}
//第五步: 销毁线程池
destroy_pool(pool); //销毁pool destroy破坏
return 0;
}
/*
详细设计步骤:
1、预处理板块:构建
任务结点结构体、
线程池结构体,
文件路径结构体
2、void handler(void *arg) //解除死锁,防止死锁
pthread_mutex_unlock((pthread_mutex_t *)arg);
3、void *routine(void *arg)//提取调用任务链表的任务结点 被pool_init和add_thread调用过
传入pool 定义一个任务结点p(task结构体的实例)
while循环
{
首先调用线程清理函数,清理掉残留的线程,解放锁资源
当没有任务情况下,线程执行休眠操作还是直接销毁(由shutdown决定)
当有任务情况下,提取任务结点(拿出来给p,任务结点数量减少),任务取出后,解放对应锁资源
}
4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
(传入pool)
加锁,条件变量
给pool的各成员变量初始化分配内存
判断是否成功
pool各成员变量初始化赋值为空
for循环逐一创建线程:(返回为0表示创建成功,不为0表示创建失败)
(注意:thread:线程标识符 attr:线程属性设置 start_routine:线程函数起始地址 arg:传递参数)
5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
创建线程,增加线程数,计算可用线程
6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg) //添加任务
先定于一个new_task指针,申请内存,初始化结构体成员变量
内存申请成功?
给互斥锁资源
判断等待任务结点是否超过上限?
定义一个tmp结点指针指向任务链表
把任务结点拿出来,组织起来,形成等待任务链表
任务链表串完了,解除互斥锁哦并且唤醒一个进程
7、void *copyfile(void * arg) //复制文件函数
把要打开的文件路径放入source_file变量下(路径)
打开源文件--------可读 判断打开是否成功
创建目标文件,即复制文件 判断打开是否结束
利用read函数和write函数,实现文件读写
关闭打开的两个文件
8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
普通文件 调用add_task函数复制
目录文件 mkdir创建目录(tmpfile内的文件夹命名) 如果是目录不停递归
10、bool destroy_pool(thread_pool *pool) //摧毁线程池
main()
{
第一步:if-判断输入命令是否规范规范(三个字符串命令:./%s 源文件路径 生成文件路径)
第二步:初始化线程池
thread_pool *pool = malloc(sizeof(thread_pool));
//定义线程池的poor指针,初始化内存空间
//调用线程初始化函数------pool_init()
pool_init(pool,100);
//调用函数初始化线程,分配内存(初始化线程函数会调用routine)
第三步:创建对象文件,存储源文件和复制文件路径
创建一个dofile,存储源文件和复制文件路径
(dofile是file_path结构体的一个实例)dofile.source 源文件路径
(copeatat:提取文件队列,作为判断文件是普通文件还是目录文件的参数)
第四步:判断文件属性,执行拷贝操作
使用st_mode判断文件类型
执行copyfile和copydir操作(复制文件和目录)
第五步:销毁线程池
直接调用destroy_pool函数销毁线程池
}
*/