/************************************************************************* > File Name: pipe.c > Author: likeyi > Mail: likeyiyy@sina.com > Created Time: Thu 17 Apr 2014 03:53:25 PM CST ************************************************************************/ #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <string.h> typedef struct buffer { unsigned char * buffer; unsigned long length; }buffer_t; typedef struct stage_tag { pthread_mutex_t mutex; pthread_cond_t avail; pthread_cond_t ready; int busy; int data_ready; buffer_t * data; pthread_t thread; long total; /* 这个用来计数,看一个线程被调用多少次,当没有空闲的线程时,就用这个来比较。*/ }stage_t; static stage_t * get_tcp_thread(); static stage_t * get_ip_thread(); static stage_t * get_eth_thread(); static stage_t * tcp_thread_t,* ip_thread_t,*eth_thread_t; static unsigned char * tcp_hdr = "tcp header"; static unsigned char * ip_hdr = "ip header"; static unsigned char * eth_hdr = "eth header"; void * pipe_add_tcp_header(void * arg) { /* 每次读取文件的一行,作为数据,发出去。*/ pthread_detach(pthread_self()); FILE * fp; unsigned char data[4096]; stage_t * ip_thread = get_ip_thread(); if((fp = fopen("data","r")) == NULL) { printf("Error When open data file\n"); pthread_exit(NULL); /*NOTICE: pthread_exit 不应该是局部变量。*/ } int i = 0; int size = strlen(tcp_hdr); while(1) { pthread_mutex_lock(&ip_thread->mutex); /* * 什么IP 正忙?哈哈,那么我只有等了。 * 是的data_ready的状态说明正忙。 * */ while(ip_thread->data_ready) { pthread_cond_wait(&ip_thread->ready,&ip_thread->mutex); } /* * 开始准备数据。 * */ if(fgets(data,sizeof(data),fp) == NULL) { break; } int size2 = strlen(data); unsigned char * next_data = malloc(size + size2 + 1); strcpy(next_data,tcp_hdr); strcpy(next_data + size,data); /* 数据封装完毕 */ ip_thread->data->buffer = next_data; ip_thread->data->length = size + size2; ip_thread->data_ready = 1; pthread_cond_signal(&ip_thread->avail); /* * 呵呵,某种意义上,我这个时候是不是也应该告诉我的上级,我也做好准备了? * 但是,我没有上级,哈哈哈。 * */ pthread_mutex_unlock(&ip_thread->mutex); } printf("tcp_thread stoped\n"); } void * pipe_add_ip_header(void * arg) { stage_t * ip_stage = get_ip_thread(); pthread_mutex_lock(&ip_stage->mutex); int size = strlen(ip_hdr); while(1) { /* 数据没准备好 * 看起来用data_ready来表示是否busy很科学的样子,我这样做反而不好了。 * */ while(ip_stage->data_ready != 1) { pthread_cond_wait(&ip_stage->avail,&ip_stage->mutex); } /* * * */ int size2 = ip_stage->data->length; unsigned char * next_data = malloc(size + size2 + 2); strcpy(next_data,ip_hdr); *(next_data + size) = ‘ ‘; strcpy(next_data + size + 1,ip_stage->data->buffer); /* 数据封装完毕 */ /*********************************************************************/ stage_t * eth_thread = get_eth_thread(); pthread_mutex_lock(ð_thread->mutex); while(eth_thread->data_ready) { pthread_cond_wait(ð_thread->ready,ð_thread->mutex); } eth_thread->data->buffer = next_data; eth_thread->data->length = size + size2; eth_thread->data_ready = 1; pthread_cond_signal(ð_thread->avail); pthread_mutex_unlock(ð_thread->mutex); /*********************************************************************/ /* * 我自己呢?data_ready = 0,并且释放信号。 * */ ip_stage->data_ready = 0; pthread_cond_signal(&ip_stage->ready); } } void * pipe_add_ethdr(void * arg) { pthread_detach(pthread_self()); stage_t * eth_stage = get_eth_thread(); pthread_mutex_lock(ð_stage->mutex); while(1) { while(eth_stage->data_ready != 1) { pthread_cond_wait(ð_stage->avail,ð_stage->mutex); } printf("Data is :%s\n",eth_stage->data->buffer); eth_stage->data_ready = 0; pthread_cond_signal(ð_stage->ready); } } static stage_t * get_tcp_thread() { return tcp_thread_t; } static stage_t * get_ip_thread() { return ip_thread_t; } static stage_t * get_eth_thread() { return eth_thread_t; } int main(int argc, char ** argv) { eth_thread_t = malloc(sizeof(stage_t)); ip_thread_t = malloc(sizeof(stage_t)); tcp_thread_t = malloc(sizeof(stage_t)); eth_thread_t->data = malloc(sizeof(buffer_t)); ip_thread_t->data = malloc(sizeof(buffer_t)); tcp_thread_t->data = malloc(sizeof(buffer_t)); pthread_mutex_init(ð_thread_t->mutex,NULL); pthread_mutex_init(&ip_thread_t->mutex,NULL); pthread_mutex_init(&tcp_thread_t->mutex,NULL); pthread_cond_init(ð_thread_t->avail,NULL); pthread_cond_init(&ip_thread_t->avail,NULL); pthread_cond_init(&tcp_thread_t->avail,NULL); pthread_cond_init(ð_thread_t->ready,NULL); pthread_cond_init(&ip_thread_t->ready,NULL); pthread_cond_init(&tcp_thread_t->ready,NULL); pthread_create(ð_thread_t->thread,NULL,pipe_add_ethdr,NULL); pthread_create(&ip_thread_t->thread,NULL,pipe_add_ip_header,NULL); pthread_create(&tcp_thread_t->thread,NULL,pipe_add_tcp_header,NULL); pthread_exit(NULL); }
线程工作有三个基本模式:流水线,工作组,客户/服务器模型。
三个基本模式可以相互组合,比如流水线和工作组就可以组合到一块。
书上的例子还是可以看的,我也第一次有实体这个概念,
通过结构体把线程,互斥量,条件变量,谓词分装到一块真是个不错的注意。
我简化了书上的列子(或者更复杂),模拟了TCP到ether层传输的过程,每经过一层,它们都打上自己的标志,然后传给下一层。
假设流水线上有A-BC-D四个等级,由于A是流水线的开头,D是流水线的结尾,所以它们两个有所不同,但是流水线中间的工作方式都是类似的。
A: 它检测下一级流水线是否做好准备,假如做好准备了,就把数据发给下一层,并且告诉下一层数据准备好了。
注意这里的概念,上级要知道下级做好准备了才会发送数据,但是发送了数据之后又必须告诉下级数据准备好了。
看起来,上级看下级准备好了通知下级,下级一直处于接受状态,并不需啊哟上级告诉下级数据准备好了就可以,因为隐私的说,下级准备好了就可以接受数据了。
其实不可以,因为你下级准备好,我上级不一定准备好,你不能自己取数据,你可能取得错误的数据,必须,我知道你准备好了,我这边开始准备,然后再通知你。
那么,上级先准备好,然后再等下级可不可以呢?看似可以,但是上级的数据从哪里来的呢?上级也是流水线中的一级。
A的执行流是:
wait for next not busy.
process
tell next is ok.
当然,由于是流水线的头,它可能更灵活一些。
B. B首先检测数据是否可用,可用,则处理数据,然后看看C是否准备好了吗,假如C没有准备好就一直等,注意B等C的时候,B也不接受新的数据了。
当C可用后,B把数据发给C,告诉C准备好了,然后告诉A你又可以给我发数据了。
wait for prev send me data.
.... (I am in busy mode in fact)
wait for next ready.
tell next is ok
tell pev is not busy now.
D:tail的处理模式是,
wait for prev send me data
....(I am in busy mode)
tell prev is not busy now.
总结:
流水线的上一级还是修改了下一级的数据结构,这个很不好,但是假如不修改的话,应该是上一级告诉下一级数据好了的时候,是不是应该告诉下一级去哪里取数据?