1 #include <stdio.h> 2 #include <unistd.h> 3 #include <stdlib.h> 4 #include <pthread.h> 5 6 typedef void* (*fun)(void*); 7 8 fun fun1, fun2; 9 10 pthread_mutex_t pmu = PTHREAD_MUTEX_INITIALIZER; 11 pthread_cond_t cond; 12 pthread_t pid1, pid2; 13 int flag = 0; 14 int gnum = 0; 15 int gsub = 100; 16 17 void * func1(void * para) 18 { 19 int k = (int)para; 20 printf("func1, ******\n"); 21 while(gnum<=100) 22 { 23 pthread_mutex_lock(&pmu); 24 printf("gnum == %d", gnum); 25 while(gnum==50) 26 { 27 printf("suspend thread1 at gnum==50 !!! \n"); 28 pthread_cond_wait(&cond, &pmu); 29 gnum++; 30 } 31 ++gnum; 32 ++flag; 33 ++k; 34 //printf("flag = %d, k = %d\n", flag, k); 35 pthread_mutex_unlock(&pmu); 36 printf("I am func1\n"); 37 } 38 pthread_exit((void*)0); 39 40 } 41 42 void * func2(void * para) 43 { 44 int f = (int)para; 45 printf("f == %d\n", f); 46 printf("pthread2 start running !\n"); 47 void * ret = NULL; 48 while(gsub>=0) 49 { 50 pthread_mutex_lock(&pmu); 51 gsub--; 52 printf("gsub= %d ", gsub); 53 if(gsub == 20) 54 { 55 printf("now gsnb ==20, and send signal\n"); 56 pthread_cond_signal(&cond); 57 } 58 ++flag; 59 ++f; 60 printf("flag = %d, f = %d\n", flag, f); 61 pthread_mutex_unlock(&pmu); 62 printf("I am func2 \n"); 63 } 64 //pthread_join(pid1, &ret); 65 pthread_exit((void*)0); 66 } 67 68 int main() 69 { 70 int id = 0; 71 void * ret = NULL; 72 int key = 5; 73 74 pthread_cond_init(&cond, NULL); //属性设置NULL默认属性 75 id = pthread_create(&pid1, NULL, func1, (void*)key); 76 if(id != 0) 77 { 78 printf("pthread_create error !\n"); 79 exit(0); 80 } 81 82 if(pthread_create(&pid2, NULL, func2, (void*)key)) 83 { 84 printf("pthread_create error ! \n"); 85 exit(0); 86 } 87 pthread_join(pid2, &ret); //等待pid2线程退出 88 pthread_join(pid1, &ret); //等待pid1线程退出
//pthread_detach(pid1); //主线程与pid1线程进行分离,一般用来实现异步返回
//pthread_detach(pid2); //同上 89 pthread_exit((void*)0); 90 }
gcc test_thread.c -lpthread
./a.out
线程池实例代码:
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <sys/types.h> 5 #include <pthread.h> 6 #include <assert.h> 7 8 typedef struct worker 9 { 10 //回调函数,任务运行时会调用此函数,也可以声明为其他形式; 11 void * (*process)(void *arg); //该函数返回值是任意类型的;参数也是任意类型的;` 12 void *arg; //回调函数的参数; 13 struct worker *next; 14 }CThread_worker; 15 16 //线程池结构 17 typedef struct 18 { 19 pthread_mutex_t queue_lock; //互斥量 20 pthread_cond_t queue_ready; //条件变量 21 22 //链表结构, 线程池中所有等待任务 23 CThread_worker *queue_head; 24 25 //是否销毁线程池 26 int shutdown; 27 pthread_t *threadid; 28 29 //线程池中允许的活动线程数目; 30 //线程池中允许的活动线程数目; 31 int max_thread_num; 32 //当前等待队列的任务数目; 33 int cur_queue_size; 34 35 }CThread_pool; 36 37 int pool_add_worker(void * (*process)(void *arg), void *arg); 38 void * thread_routine(void *arg); 39 40 static CThread_pool *pool = NULL; 41 void pool_init(int max_thread_num) 42 { 43 pool = (CThread_pool*)malloc(sizeof(CThread_pool)); 44 45 //初始化互斥量; 46 pthread_mutex_init(&(pool->queue_lock), NULL); 47 //初始化条件变量 48 pthread_cond_init(&(pool->queue_ready), NULL); 49 50 pool->queue_head = NULL; 51 52 //最大线程数目 53 pool->max_thread_num = max_thread_num; 54 //当前线程数目 55 pool->cur_queue_size = 0; 56 57 pool->shutdown = 0; 58 pool->threadid = (pthread_t*)malloc(max_thread_num * sizeof(pthread_t)); 59 int i = 0; 60 for(i=0; i<max_thread_num;i++) 61 { 62 pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); 63 } 64 } 65 66 //向线程池中加入任务 67 int pool_add_worker(void*(*process)(void *arg), void *arg) 68 { 69 //构建一个新任务 70 CThread_worker *newworker = (CThread_worker *)malloc(sizeof(CThread_worker)); 71 newworker->process = process; 72 newworker->arg = arg; 73 //别忘了置空 74 newworker->next = NULL; 75 76 //加锁互斥量 77 pthread_mutex_lock(&(pool->queue_lock)); 78 //将任务加入到等待队列中 79 CThread_worker *member = pool->queue_head; 80 if(member !=NULL) 81 { 82 while(member->next != NULL) 83 member = member->next; 84 member->next = newworker; 85 } 86 else 87 { 88 pool->queue_head = newworker; 89 } 90 91 assert(pool->queue_head != NULL); 92 pool->cur_queue_size++; 93 pthread_mutex_unlock(&(pool->queue_lock)); 94 95 //好了,等待队列中有任务了,唤醒一个等待线程; 96 // 注意如果所有线程都在忙碌,这句没有任何作用 97 pthread_cond_signal(&(pool->queue_ready)); 98 return 0; 99 } 100 101 /*销毁线程池,等待队列中的任务不会再被执行, 102 *但是正在运行的线程会一直 把任务运行完后 再退出; 103 */ 104 105 int pool_destroy() 106 { 107 if(pool->shutdown) 108 return -1; //防止两次调用 109 pool->shutdown = 1; 110 111 //唤醒所有等待线程,线程池要销毁了 112 pthread_cond_broadcast(&(pool->queue_ready)); 113 114 //阻塞等待线程退出, 否则就成僵尸了 115 int i; 116 for(i=0; i<pool->max_thread_num; i++) 117 { 118 pthread_join(pool->threadid[i], NULL); 119 } 120 121 free(pool->threadid); 122 123 //销毁等待队列 124 CThread_worker *head = NULL; 125 while(pool->queue_head != NULL) 126 { 127 head=pool->queue_head; 128 pool->queue_head = pool->queue_head->next; 129 free(head); 130 } 131 132 //条件变量和互斥量也别忘了销毁 133 pthread_mutex_destroy(&(pool->queue_lock)); 134 pthread_cond_destroy(&(pool->queue_ready)); 135 136 free(pool); 137 /*销毁后指针置空是个好习惯*/ 138 pool = NULL; 139 return 0; 140 } 141 142 void* thread_routine(void *arg) 143 { 144 printf("start thread 0x%x\n", pthread_self()); 145 while(1) 146 { 147 pthread_mutex_lock(&(pool->queue_lock)); 148 /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意 149 *pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/ 150 while(pool->cur_queue_size == 0 && !pool->shutdown) 151 { 152 printf("thread 0x%x is waiting \n", pthread_self()); 153 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); 154 } 155 156 //线程池要销毁了; 157 if(pool->shutdown) 158 { 159 //遇到break,continue,return等跳转语句,千万不要忘记先解锁*/ 160 pthread_mutex_unlock(&(pool->queue_lock)); 161 printf("thread 0x %x will exit \n", pthread_self()); 162 pthread_exit(NULL); 163 } 164 165 printf("thread 0x %x is starting to work \n", pthread_self()); 166 167 //使用断言 168 assert(pool->cur_queue_size!= 0); 169 assert(pool->queue_head!= NULL); 170 171 //等待队列长度减去1,并取出链表中的头元素 172 pool->cur_queue_size--; 173 CThread_worker *worker = pool->queue_head; 174 pool->queue_head = worker->next; 175 pthread_mutex_unlock(&(pool->queue_lock)); 176 177 //调用回调函数,执行任务 178 (*(worker->process))(worker->arg); 179 free(worker); 180 worker = NULL; 181 } 182 //这一句正常情况下是不可达的 183 pthread_exit(NULL); 184 } 185 186 //test code 187 void *myprocess(void *arg) 188 { 189 printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int*)arg); 190 sleep(1); //休息一秒,延长任务的执行时间 191 return NULL; 192 } 193 194 int main(int argc, char** argv) 195 { 196 pool_init(3); /*线程池中最多三个活动线程*/ 197 198 //连续向线程池中放入10个任务; 199 int *workingnum = (int*)malloc(sizeof(int)*10); 200 int i; 201 for(i=0; i< 10;i++) 202 { 203 workingnum[i] = i; 204 pool_add_worker(myprocess, &workingnum[i]); 205 } 206 207 sleep(5); 208 //销毁线程池; 209 pool_destroy(); 210 free(workingnum); 211 212 return 0; 213 }