linux多线程示例

 1 #include <stdio.h>
 2 #include <unistd.h>
 3 #include <stdlib.h>
 4 #include <pthread.h>
 5 
 6 typedef void* (*fun)(void*);
 7 
 8 fun fun1, fun2;
 9 
10 pthread_mutex_t pmu = PTHREAD_MUTEX_INITIALIZER;
11 pthread_cond_t cond;
12 pthread_t pid1, pid2;
13 int flag = 0;
14 int gnum = 0;
15 int gsub = 100;
16 
17 void *  func1(void * para)
18 {
19     int k = (int)para;
20     printf("func1, ******\n");
21     while(gnum<=100)
22     {   
23         pthread_mutex_lock(&pmu);
24         printf("gnum == %d", gnum);
25         while(gnum==50)
26         {   
27             printf("suspend thread1 at gnum==50 !!! \n");
28             pthread_cond_wait(&cond, &pmu);
29             gnum++;
30        }
31         ++gnum;
32         ++flag;
33         ++k;
34         //printf("flag = %d, k = %d\n", flag, k);
35         pthread_mutex_unlock(&pmu);
36         printf("I am func1\n");
37     }
38             pthread_exit((void*)0);
39 
40 }
41 
42 void * func2(void * para)
43 {
44     int f = (int)para;
45     printf("f == %d\n", f);
46     printf("pthread2 start running !\n");
47     void * ret = NULL;
48     while(gsub>=0)
49     {
50         pthread_mutex_lock(&pmu);
51         gsub--;
52         printf("gsub= %d ", gsub);
53         if(gsub == 20)
54         {
55             printf("now gsnb ==20, and send signal\n");
56             pthread_cond_signal(&cond);
57         }
58         ++flag;
59        ++f;
60         printf("flag = %d, f = %d\n", flag, f);
61         pthread_mutex_unlock(&pmu);
62         printf("I am func2 \n");
63     }
64     //pthread_join(pid1, &ret);
65     pthread_exit((void*)0);
66 }
67 
68 int main()
69 {
70     int id = 0;
71     void * ret = NULL;
72     int key = 5;
73 
74     pthread_cond_init(&cond, NULL);  //属性设置NULL默认属性
75     id = pthread_create(&pid1, NULL, func1, (void*)key);
76     if(id != 0)
77     {
78         printf("pthread_create error !\n");
79         exit(0);
80     }
81 
82     if(pthread_create(&pid2, NULL, func2, (void*)key))
83     {
84         printf("pthread_create error ! \n");
85         exit(0);
86     }
87     pthread_join(pid2, &ret);      //等待pid2线程退出
88     pthread_join(pid1, &ret);      //等待pid1线程退出

    //pthread_detach(pid1);        //主线程与pid1线程进行分离,一般用来实现异步返回
    //pthread_detach(pid2);
        //同上
89 pthread_exit((void*)0); 90 }

gcc test_thread.c -lpthread
./a.out

线程池实例代码:

  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <unistd.h>
  4 #include <sys/types.h>
  5 #include <pthread.h>
  6 #include <assert.h>
  7 
  8 typedef struct worker
  9 {
 10     //回调函数,任务运行时会调用此函数,也可以声明为其他形式;
 11     void * (*process)(void *arg);   //该函数返回值是任意类型的;参数也是任意类型的;`
 12     void *arg;  //回调函数的参数;
 13     struct worker *next;
 14 }CThread_worker;
 15 
 16 //线程池结构
 17 typedef struct
 18 {
 19     pthread_mutex_t queue_lock;     //互斥量
 20     pthread_cond_t queue_ready;     //条件变量
 21 
 22     //链表结构, 线程池中所有等待任务
 23     CThread_worker *queue_head;
 24     
 25     //是否销毁线程池
 26     int shutdown;
 27     pthread_t *threadid;
 28     
 29     //线程池中允许的活动线程数目;
 30     //线程池中允许的活动线程数目;
 31     int max_thread_num;
 32     //当前等待队列的任务数目;
 33     int  cur_queue_size;
 34 
 35 }CThread_pool;
 36 
 37 int pool_add_worker(void * (*process)(void *arg), void *arg);
 38 void * thread_routine(void *arg);
 39 
 40 static CThread_pool *pool = NULL;
 41 void pool_init(int max_thread_num)
 42 {
 43     pool = (CThread_pool*)malloc(sizeof(CThread_pool));
 44 
 45 //初始化互斥量;
 46     pthread_mutex_init(&(pool->queue_lock), NULL);
 47 //初始化条件变量
 48     pthread_cond_init(&(pool->queue_ready), NULL);
 49 
 50     pool->queue_head = NULL;
 51     
 52 //最大线程数目
 53     pool->max_thread_num = max_thread_num;
 54 //当前线程数目
 55     pool->cur_queue_size = 0;
 56     
 57     pool->shutdown = 0;
 58     pool->threadid = (pthread_t*)malloc(max_thread_num * sizeof(pthread_t));
 59     int i = 0;
 60     for(i=0; i<max_thread_num;i++)
 61     {
 62         pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);
 63     }
 64 }
 65 
 66 //向线程池中加入任务
 67 int pool_add_worker(void*(*process)(void *arg), void *arg)
 68 {
 69     //构建一个新任务
 70     CThread_worker *newworker = (CThread_worker *)malloc(sizeof(CThread_worker));
 71     newworker->process = process;
 72     newworker->arg = arg;
 73     //别忘了置空
 74     newworker->next = NULL;
 75 
 76 //加锁互斥量
 77     pthread_mutex_lock(&(pool->queue_lock));
 78     //将任务加入到等待队列中
 79     CThread_worker *member = pool->queue_head;
 80     if(member !=NULL)
 81     {
 82         while(member->next != NULL)
 83             member = member->next;
 84         member->next = newworker;
 85     }
 86     else
 87     {
 88         pool->queue_head = newworker;
 89     }
 90     
 91     assert(pool->queue_head != NULL);
 92     pool->cur_queue_size++;
 93     pthread_mutex_unlock(&(pool->queue_lock));
 94     
 95     //好了,等待队列中有任务了,唤醒一个等待线程;
 96     //     注意如果所有线程都在忙碌,这句没有任何作用
 97     pthread_cond_signal(&(pool->queue_ready));
 98     return 0;
 99 }
100 
101 /*销毁线程池,等待队列中的任务不会再被执行,
102 *但是正在运行的线程会一直 把任务运行完后 再退出;
103 */
104 
105 int pool_destroy()
106 {
107     if(pool->shutdown)
108         return -1;   //防止两次调用
109     pool->shutdown = 1;
110     
111     //唤醒所有等待线程,线程池要销毁了 
112     pthread_cond_broadcast(&(pool->queue_ready));
113     
114     //阻塞等待线程退出, 否则就成僵尸了
115     int i;
116     for(i=0; i<pool->max_thread_num; i++)
117     {
118         pthread_join(pool->threadid[i], NULL);
119     }
120 
121     free(pool->threadid);
122 
123     //销毁等待队列
124     CThread_worker *head = NULL;
125     while(pool->queue_head != NULL)
126     {
127         head=pool->queue_head;
128         pool->queue_head = pool->queue_head->next;
129         free(head);
130     }
131 
132     //条件变量和互斥量也别忘了销毁
133     pthread_mutex_destroy(&(pool->queue_lock));
134     pthread_cond_destroy(&(pool->queue_ready));
135 
136     free(pool);
137      /*销毁后指针置空是个好习惯*/
138     pool = NULL;
139     return 0;
140 }
141 
142 void* thread_routine(void *arg)
143 {
144                    printf("start thread 0x%x\n", pthread_self());
145     while(1)
146     {
147         pthread_mutex_lock(&(pool->queue_lock));
148 /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
149  *pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
150         while(pool->cur_queue_size == 0 && !pool->shutdown)
151         {
152             printf("thread 0x%x is waiting \n", pthread_self());
153             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
154         }
155 
156         //线程池要销毁了;
157         if(pool->shutdown)
158         {
159             //遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
160             pthread_mutex_unlock(&(pool->queue_lock));
161             printf("thread 0x %x will exit \n", pthread_self());
162             pthread_exit(NULL);
163         }
164 
165         printf("thread 0x %x is starting to work \n", pthread_self());
166 
167         //使用断言
168         assert(pool->cur_queue_size!= 0);
169         assert(pool->queue_head!= NULL);
170 
171         //等待队列长度减去1,并取出链表中的头元素
172         pool->cur_queue_size--;
173        CThread_worker *worker = pool->queue_head;
174         pool->queue_head = worker->next;
175         pthread_mutex_unlock(&(pool->queue_lock));
176 
177         //调用回调函数,执行任务
178         (*(worker->process))(worker->arg);
179         free(worker);
180         worker = NULL;
181     }
182     //这一句正常情况下是不可达的
183     pthread_exit(NULL);
184 }
185 
186 //test code
187 void *myprocess(void *arg)
188 {
189     printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int*)arg);
190     sleep(1);  //休息一秒,延长任务的执行时间
191     return NULL;
192 }
193 
194 int main(int argc, char** argv)
195 {
196     pool_init(3); /*线程池中最多三个活动线程*/
197 
198     //连续向线程池中放入10个任务;
199     int *workingnum = (int*)malloc(sizeof(int)*10);
200     int i;
201     for(i=0; i< 10;i++)
202     {
203         workingnum[i] = i;
204         pool_add_worker(myprocess, &workingnum[i]);
205     }
206 
207     sleep(5);
208 //销毁线程池;  
209     pool_destroy();
210     free(workingnum);
211 
212     return 0;
213 }

 

上一篇:高性能 MobX 模式(part 2)- 响应变化


下一篇:tidb IO 高问题排查