线程池实现文件复制操作

项目:使用线程池实现大目录拷贝

       创建一个线程的综合资源,让需要执行的任务挂载在线程池中。如果线程池有空闲的线程,就可以安排线程去执行任务,如果线程池没有空闲的线程,就安排任务等待,直到有线程空闲出来。

 

线程池步骤:

  1. 初始化线程池资源
  2. 向线程池中加入线程
  3. 向线程池中添加任务               ---如果有空闲的线程,空闲线程可以自动获取任务并运行
  4. 销毁线程池。

源码分享:thread_pool.c

/*
    设计思路:
        1、预处理板块:构建任务结点结构体、线程池结构体,文件路径结构体
        2、void handler(void *arg)	//解除死锁,防止死锁
        3、void *routine(void *arg)//提取调用任务链表的任务结点
        4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
        5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
        6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)  //添加任务
        7、void *copyfile(void * arg)  //复制文件函数
        8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
        9、bool destroy_pool(thread_pool *pool)	 //摧毁线程池

        main()
        {
            if-判断输入命令是否规范规范
            初始化线程池
            创建对象文件,存储源文件和复制文件路径
            判断文件属性,执行拷贝操作    
            销毁线程池

        }




*/




#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>

//********************预处理板块**************************
#define MAX_WAITING_TASKS	1000  //等待任务最大数目
#define MAX_ACTIVE_THREADS	200	  //最大线程数

//任务结构体
typedef struct task
{
    void *(*task)(void *arg);//定义任务函数指针以及参数
	void *arg;			 	 //函数参数(指针传参)
	struct task *next;       //结点内当然有指向下一结点的指针
    /*
         任务链表的结点内包含三个元素:
            1、函数指针,指向任务函数(属于data)
            2、函数所对应的参数,这里用指针传参
            3、结点内指向下一结点的指针 next
     */
}task;

typedef struct thread_pool
{
    /* data */
    pthread_mutex_t lock;		//互斥锁
	pthread_cond_t  cond;		//条件变量  跟互斥锁是搭配使用
	
    task *task_list;		    //一个任务节点
	pthread_t *tids;			//线程号指针变量
 
	unsigned waiting_tasks;		//等待任务
	unsigned active_threads;	//执行线程
	
	bool shutdown;				//一个线程池销毁开关
}thread_pool;

typedef struct file_path
{
    /* 用于存储文件路径的结构体 */
    char source_file[4096];  //字符数组,用于存储源文件路径
    char copy_file[4096];    //字符数组,用于存放复制后的文件路径
}file_path;


//********************预处理板块**************************


//解除死锁,防止死锁     handler函数会被routine调用
void handler(void *arg)	
{
    pthread_mutex_unlock((pthread_mutex_t *)arg);
}

//提取调用任务链表的任务结点
void *routine(void *arg)    //这里的arg是task结构体的一项参数
{
    thread_pool *pool = (thread_pool *)arg;    //
    task *p;      //定义一个任务结点

    while (1)
    {
        /* code */
        pthread_cleanup_push(handler, (void *)&pool->lock);
        //调用hand函数,防止死锁

        //做一个判断,当没有任务,并且pool没有销毁时,执行休眠操作
        if (pool->waiting_tasks == 0 && !pool->shutdown)
        {
            pthread_cond_wait(&pool->cond, &pool->lock); //休眠
        }

        //判断是否需要要关闭线程
        if(pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);	//先执行解锁操作
			pthread_exit(NULL);	//退出线程
		}
         
		p = pool->task_list->next;			//从线程池中任务链表取出(复制)一个任务结点给P
		pool->task_list->next = p->next;	//然后被取出的任务被其他任务覆盖
		pool->waiting_tasks--;				//线程池中等待的任务减少一项

        pthread_mutex_unlock(&pool->lock);	//一次任务取出成功,解除互斥锁资源,方便下一次调用任务结点使用
		pthread_cleanup_pop(0);             //
 
//注意!

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //强制性的阻塞 任何进程都不能取消,我想老老实实把这个线程完成,其他的都先排队
		(p->task)(p->arg);		
					
									//他运行的是add_task( pool, copfile, tmpfile);	
									//传过来的参数 
									//p->task 等于copyfile
									//p->arg 等于  tmpfile
		
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);//关闭强制阻塞
		
		free(p->arg);							//释放在检索目录时 在内存开辟的空间
		free(p);								//释放掉完成任务的节点
	}
 
	pthread_exit(NULL);

 //注意!没有这一段,copyfile无法执行


}

//初始化线程池
bool pool_init(thread_pool *pool, unsigned int threads_number)
{
/*  线程池结构体元素
    pthread_mutex_t lock;		//互斥锁
	pthread_cond_t  cond;		//条件变量  跟互斥锁是搭配使用
	
    task *task_list;		    //一个任务节点
	pthread_t *tids;			//线程号指针变量
 
	unsigned waiting_tasks;		//等待任务   任务数量
	unsigned active_threads;	//执行线程   线程数量
	
	bool shutdown;				//一个线程池销毁开关
*/

    pthread_mutex_init(&pool->lock,NULL);   //线程池互斥锁初始化,这是个初始化函数
    pthread_cond_init(&pool->cond,NULL);   //线程池条件变量初始化,这是个初始化函数


//先分配内存
    pool->shutdown = false;  //线程池销毁为假,即不销毁(刚申请一个线程池,初始化肯定不能销毁呀!)
    pool->task_list = malloc(sizeof(task));  //线程池中的任务链表初始化分配内存空间
    pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//给进程分配空间

    if (pool->task_list == NULL || pool->tids == NULL)
    {
        perror("任务链表 或 线程分配 内存失败!");
        return false;
    }

//在把数量清零
    pool->task_list->next = NULL; //只有一个节点,还未连城任务链表
    pool->waiting_tasks = 0;   //暂时没有等待中的任务,要等后面添加任务结点
    pool->active_threads = threads_number;   //thread_number是参数,是传入的线程数量,赋值过来

    int i;
    for (i = 0; i < pool->active_threads; i++)
    {
        if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0)  
		//返回值为0表示创建成功,返回不为0表示创建失败
		{
			perror("线程创建失败");//根据传入的线程数,逐一创建线程,存入tid[]
			return false;
		}
    }
    return true;
}




//添加线程
int add_thread(thread_pool *pool, unsigned additional_threads)		
{
    if(additional_threads == 0)     //传入添加的线程数
    {
	    return 0;         //没有添加线程,啥都干不了,只能退出
    }
	unsigned total_threads = pool->active_threads + additional_threads;
    //计算先从总数 = 线程池中的线程 + 新添加的线程

	int i, actual_increment = 0;
	for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
	{
		if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0)
		{
			perror("添加线程失败!");
			if(actual_increment == 0)
            {
				return -1;
            }
			break;  //创建失败的话,直接break,不再执行后面的++
		}//创建失败

		actual_increment++; //实际上添加成功的线程数
	}
 
	pool->active_threads += actual_increment;
	return actual_increment;

}


//添加任务
bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)
{
	
    // pool->task_list = malloc(sizeof(task));  //之前初始化的是线程池中的链表结点内存
	struct task *new_task = malloc(sizeof(task));  // 任务结点new_task分配内存

	if(new_task == NULL)
	{
		perror("任务结点分配内存失败!");
		return false;
	}

    //初始化结点参数
	new_task->task = task;   
	new_task->arg = arg;
	new_task->next = NULL;
	
	pthread_mutex_lock(&pool->lock);   //上锁
	
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)  //判断任务是否超出上限,必要的操作
	{
		pthread_mutex_unlock(&pool->lock);
 
		fprintf(stderr, "任务过多!\n");
		free(new_task);  //释放任务空间
		return false;
	}
	
	struct task *tmp = pool->task_list;
	while(tmp->next != NULL)  //从任务结点取到不为空,就是取到任务
    {
        tmp = tmp->next;  //把这个任务取出来,拿到tmp上
    }

	tmp->next = new_task;
	pool->waiting_tasks++;
 
 
	pthread_mutex_unlock(&pool->lock);   //释放锁资源
	pthread_cond_signal(&pool->cond);	//唤醒一个休眠的线程

	return true;
}


//复制文件函数
void *copyfile(void * arg)
{

/*
    把要打开的文件路径放入source_file变量下(路径)
    打开源文件--------可读   判断打开是否成功
    创建目标文件,即复制文件    判断打开是否结束
    利用read函数和write函数,实现文件读写
    关闭打开的两个文件
*/

	file_path *dofile = (file_path *)arg; //arg是文件参数
	struct stat file_stat; //这个结构体来自#include <sys/stat.h>	

	stat(dofile->source_file, &file_stat);
    //通过文件名 获取文件的路径把他存放到结构体的sourc_file里面
	int source_fd,copy_fd;					
	source_fd = open(dofile->source_file,O_RDONLY);//打开源路径,只读模式

	if(source_fd == -1 )
	{
		printf("打开文件 %s\n 失败!\n",dofile->source_file);
        //打开文件失败
		return NULL;
	}
	
	copy_fd = open(dofile->copy_file,O_CREAT | O_TRUNC | O_RDWR,file_stat.st_mode);
	
	if( copy_fd == -1)
	{
		printf("打开文件 %s 失败!\n",dofile->copy_file);
		return NULL;
	}
 
	int nread;
	char buf[100];
	while((nread = read(source_fd,buf,100)) > 0)  //读取源文件的内容
	{
		if( write(copy_fd,buf,nread) == -1)	  //把读到的全部写进目标文件
		{
			break;
		}
	}
	printf("读取文件: %s\n",dofile->source_file);
	printf("写入文件: %s\n",dofile->copy_file);

	close(source_fd);  //关闭源文件
	close(copy_fd);    //关闭复制文件
	return NULL;

}

//复制目录函数
int copydir( file_path *dofile,thread_pool *pool)
{
	struct stat file_stat;				
	stat(dofile->source_file,&file_stat); //获取文件的属性
	mkdir(dofile->copy_file,file_stat.st_mode); //以源目录的类型和目录来创建一个目录
 
	DIR *sourcedir = opendir(dofile->source_file); //打开源目录
	struct dirent *dp;
	
	
	
	while( (dp = readdir(sourcedir))!=NULL )    //获取文件夹内文件的信息
	{
		
	
		if(dp->d_name[0] == '.') //如果文件为. 或者 .. 则跳过
		{						 
			continue;
		}
		//对本目录下的所有文件进行拷贝操作
		file_path *tmpfile = malloc(sizeof(file_path)); //为文件结构体开辟内存空间
		
		memset(tmpfile,0,sizeof(file_path));				//对内存清零 
		
		sprintf(tmpfile->source_file,"%s/%s",dofile->source_file,dp->d_name);//拼凑源文件路径
		sprintf(tmpfile->copy_file,"%s/%s",dofile->copy_file,dp->d_name);//拼凑目标文件路径
 
		struct stat tmpstat;
		stat(tmpfile->source_file,&tmpstat);	     			
 
		if(S_ISREG(tmpstat.st_mode))						//如果为普通文件,则拷贝
		{
			printf("tmpfile->source_file = %s\n",tmpfile->source_file);
			printf("tmpfile->copy_file = %s\n", tmpfile->copy_file);
			printf("\n");
			add_task( pool, copyfile, tmpfile);			//把复制的任务丢到任务链表
		}
		else if(S_ISDIR(tmpstat.st_mode))//如果为目录,则递归
		{
			copydir(tmpfile,pool);
		}
 
	}
	return 0;
}


//摧毁线程池
bool destroy_pool(thread_pool *pool)	
{
	pool->shutdown = true;
	pthread_cond_broadcast(&pool->cond);
	
	sleep(1);//等待空闲线程响应完毕

	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		errno = pthread_join(pool->tids[i], NULL);
		if(errno != 0)
		{
			printf("copy tids[%d] error: %s\n",i, strerror(errno));
		}
		else
			printf("[%u] is copyed\n", (unsigned)pool->tids[i]);
		
	}
 
	free(pool->task_list);
	free(pool->tids);
	free(pool);
 
	return true;
}











//函数1:初始化线程池   pool_init(pool,100);


int main(int argc, char const *argv[])
{

    //第一步:   if-判断输入命令是否规范规范
    if(argc != 3)
    {
        printf("你的命令格式错误!\n");
        printf("请按右侧格式run: ./%s 源文件路径  生成文件路径 \n",argv[0]);
        return -1; //命令异常退出
    }


    //第二步:   初始化线程池
    thread_pool *pool = malloc(sizeof(thread_pool));//定义线程池的poor指针,初始化内存空间
    /*调用线程初始化函数------pool_init()*/
    pool_init(pool,100);          //调用函数初始化线程,分配内存(初始化线程函数会调用routine)


    //第三步:   创建对象文件,存储源文件和复制文件路径
    file_path dofile;     //创建一个路径实例dofile,存储源文件和复制文件的路径
    strcpy(dofile.source_file,argv[1]);   //strcpy是函数   #include <string.h>
    strcpy(dofile.copy_file,argv[2]);    // char *strcpy(char *dest, const char *src);

    struct stat copestat;//这个结构体来自#include <sys/stat.h>
    stat(dofile.source_file,&copestat);  //stat是函数,将dofile.source_file的信息,获取到&copestat
    /*
        #include<sys/stat.h>
        int stat(const char *restrict pathname,struct stat *restrict buf);
        int fstat(int fields,struct stat *buf);
        int lstat(const char *restrict pathname,struct stat *restrict buf);
    */


    //第四步:   判断文件属性,执行拷贝操作    
    if(S_ISREG(copestat.st_mode))//如果为普通文件,则拷贝
	{
		copyfile(&dofile);  //直接拷贝,不用添加到任务链表
		
	}
	else if(S_ISDIR(copestat.st_mode))//如果为目录,则递归
	{
		copydir(&dofile,pool);
	}


    //第五步:   销毁线程池
    destroy_pool(pool);    //销毁pool   destroy破坏
    return 0;
}


/*


    详细设计步骤:
        1、预处理板块:构建
				任务结点结构体、
				线程池结构体,
				文件路径结构体
        
        2、void handler(void *arg)	//解除死锁,防止死锁
				pthread_mutex_unlock((pthread_mutex_t *)arg);
				
        3、void *routine(void *arg)//提取调用任务链表的任务结点  被pool_init和add_thread调用过
				传入pool  定义一个任务结点p(task结构体的实例)
				while循环
				{
					首先调用线程清理函数,清理掉残留的线程,解放锁资源
					当没有任务情况下,线程执行休眠操作还是直接销毁(由shutdown决定)
					当有任务情况下,提取任务结点(拿出来给p,任务结点数量减少),任务取出后,解放对应锁资源
					
					
					
				}
				
        
        4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
				(传入pool)
				加锁,条件变量
				给pool的各成员变量初始化分配内存
						判断是否成功
				pool各成员变量初始化赋值为空
				for循环逐一创建线程:(返回为0表示创建成功,不为0表示创建失败)
		(注意:thread:线程标识符     attr:线程属性设置      start_routine:线程函数起始地址   arg:传递参数)
				
        5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
				创建线程,增加线程数,计算可用线程
        
        6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)  		//添加任务
				先定于一个new_task指针,申请内存,初始化结构体成员变量
							内存申请成功?
				给互斥锁资源
				判断等待任务结点是否超过上限?
				定义一个tmp结点指针指向任务链表		
				把任务结点拿出来,组织起来,形成等待任务链表
				任务链表串完了,解除互斥锁哦并且唤醒一个进程
				
        7、void *copyfile(void * arg)  //复制文件函数
				把要打开的文件路径放入source_file变量下(路径)
				打开源文件--------可读   判断打开是否成功
				创建目标文件,即复制文件    判断打开是否结束
				利用read函数和write函数,实现文件读写
				关闭打开的两个文件
        
        8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
				普通文件	调用add_task函数复制
				目录文件	mkdir创建目录(tmpfile内的文件夹命名)  如果是目录不停递归
				
        10、bool destroy_pool(thread_pool *pool)	 //摧毁线程池

        main()
        {
            第一步:if-判断输入命令是否规范规范(三个字符串命令:./%s 源文件路径  生成文件路径)
            第二步:初始化线程池
					  thread_pool *pool = malloc(sizeof(thread_pool));
					  //定义线程池的poor指针,初始化内存空间
					  
					  //调用线程初始化函数------pool_init()
					  pool_init(pool,100);          
					  //调用函数初始化线程,分配内存(初始化线程函数会调用routine)
							

            第三步:创建对象文件,存储源文件和复制文件路径
						创建一个dofile,存储源文件和复制文件路径
								(dofile是file_path结构体的一个实例)dofile.source 源文件路径
						(copeatat:提取文件队列,作为判断文件是普通文件还是目录文件的参数)
						
            第四步:判断文件属性,执行拷贝操作
						使用st_mode判断文件类型
								执行copyfile和copydir操作(复制文件和目录)
            第五步:销毁线程池
						直接调用destroy_pool函数销毁线程池
        }

*/
上一篇:一文了解OOM及解决方案,附相关架构及资料


下一篇:Java程序员必须掌握的8大排序算法