Pthread线程使用模型之二工作组(Work crew)
场景
1.一些耗时的任务,比如分析多个类型的数据, 是独立的任务, 并不像 pipeline那样有序的依赖关系, 这时候pipeline就显得不合适了,因为它不能同时处理这些任务. 当然有些任务A可能依赖任务B的输出, 这可能就嵌套了pipeline模型了.
2.复杂的计算,可以分开独立的逻辑单独处理, 之后再合并结果.
说明
1.在工作组里, 数据是被一组线程独立处理的, 这意味着有一个“parallel decomposition” 循环。一个数据集被多个线程分割, 同时结果也是单个数据集.
2.在工作组里的线程对不同的数据执行完全不同的操作. 我们工作组里的成员, 比如, 每个移除工作从共享队列里获取请求, 同时做请求所要求做的事情. 每个队列请求数据包能描述多样化的操作—- 但是这个通用的队列和”任务描述”(用来处理队列)让他们成为一个”组”而不是独立的工作线程. 这个模型类似于 MIMD 并行处理的原始定义 “multiple instruction, multiple data.”
3.在后边的篇幅会介绍我们开发的更加工业级和更加通用(也是更加复杂)的工作队列管理器(work queue manager). 一个工作组(work crew)和一个工作队列(work queue)相对于“不可变”和“临界区”这两个特性来说是很相似的. 一个工作组是一组线程的集合独立处理数据; 然而一个工作队列是一种机制, 你的代码可以通过这种机制请求被匿名和独立的代理(agents)处理过的数据.
例子
1.以下的程序, crew.c, 显示了一个简单的工作组. 运行程序加两个参数, 搜索字符串A和文件路径. 程序会把文件路径放入工作组里. 一个工作组成员会决定是否文件或目录 – 如果是文件, 它会搜索文件里出现的字符串A; 如果是目录,它会使用readdir_r 查找该目录下的所有的目录和普通的文件同时排列每个入口作为新的work. 每个文件里包含搜索字符串的都会被打印出来.
2.每个work由一个work_t结构体进行描述. 这个结构体有一个指向下一个work项的指针, 一个指向文件路径的指针, 一个指向需要搜索的字符串. 在当前的构造里, 所有的work都指向同一个字符串.
3.每个work crew都有一个成员worker_t结构体. 这个结构体包含了crew 数组成员的索引, crew成员的线程标识, 和一个指向crew_t结构体的指针.
4.crew_t结构体描述了work crew的状态. 它记录了work crew成员的数量(crew_size)和一个数组worker_t. 它同时也有一个计数器表明剩余多少个work item需要处理(work_count),并且有一个未做完的work item链表. 最后. 它包含了各种pthreads同步对象: 一个mutex来控制访问, 一个condition变量(done)来等待work crew来完成任务, 还有一个condition变量(go)是给crew的成员worker_t来等待接收新的任务的.
* Special notes: On a Solaris 2.5 uniprocessor, this test will
* not produce interleaved output unless extra LWPs are created
* by calling thr_setconcurrency(), because threads are not
* timesliced.
*/
#include <sys/types.h>
#include <pthread.h>
#include <sys/stat.h>
#include <dirent.h>
#include "errors.h"
#define CREW_SIZE 4
/*
* Queued items of work for the crew. One is queued by
* crew_start, and each worker may queue additional items.
*/
typedef struct work_tag {
struct work_tag *next; /* Next work item */
char *path; /* Directory or file */
char *string; /* Search string */
} work_t, *work_p;
/*
* One of these is initialized for each worker thread in the
* crew. It contains the "identity" of each worker.
*/
typedef struct worker_tag {
int index; /* Thread's index */
pthread_t thread; /* Thread for stage */
struct crew_tag *crew; /* Pointer to crew */
} worker_t, *worker_p;
/*
* The external "handle" for a work crew. Contains the
* crew synchronization state and staging area.
*/
typedef struct crew_tag {
int crew_size; /* Size of array */
worker_t crew[CREW_SIZE];/* Crew members */
long work_count; /* Count of work items */
work_t *first, *last; /* First & last work item */
pthread_mutex_t mutex; /* Mutex for crew data */
pthread_cond_t done; /* Wait for crew done */
pthread_cond_t go; /* Wait for work */
} crew_t, *crew_p;
size_t path_max; /* Filepath length */
size_t name_max; /* Name length */
5.worker_routine函数是crew线程的起始函数. 外围的循环会一直重复处理直到线程被告知终止.条件变量循环阻塞每个新的crew member直到收到新的work. 等待稍微有点不同. 当work 列表为空时, 等待更多的work. crew成员不会终止– 一旦他们完成呢个当前的作业, 他们就会等待新的作业.
6.如果文件是一个链接, 输出这个名字. 注意每个消息包含了县城的work crew index(mine->index), 一边你能看到并发的任务; 如果文件是目录, 打开目录,查找该目录下的所有目录项(文件,目录,链接), 之后每个目录项作为一个新任务加入到crew的 work链表末尾. 注意此时的lock, 并且 crew->work_count++ 会递增; 如果文件是普通文件, 打开它,并搜索指定的字符串. 如果找到, 写一个消息并退出循环; 如果是其他类型, 我们会把她的类型报告出来.
7.处理完work后重新锁定mutex, 同时报告work item已经完成. 如果计数达到0, 那么这个crew已经完成这次作业, 同时广播去唤醒等待新的作业的线程(注,如果crew_start被多线程调用). 注意,work_count被递减仅当work item被完全处理后– count不会为0, 如果任何crew 成员仍然busy(同时也许被添加了额外的目录项到队列里).
/*
* The thread start routine for crew threads. Waits until "go"
* command, processes work items until requested to shut down.
*/
void *worker_routine (void *arg)
{
worker_p mine = (worker_t*)arg;
crew_p crew = mine->crew;
work_p work, new_work;
struct stat filestat;
struct dirent *entry;
int status;
/*
* "struct dirent" is funny, because POSIX doesn't require
* the definition to be more than a header for a variable
* buffer. Thus, allocate a "big chunk" of memory, and use
* it as a buffer.
*/
entry = (struct dirent*)malloc (
sizeof (struct dirent) + name_max);
if (entry == NULL)
errno_abort ("Allocating dirent");
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock crew mutex");
/*
* There won't be any work when the crew is created, so wait
* until something's put on the queue.
*/
while (crew->work_count == 0) {
status = pthread_cond_wait (&crew->go, &crew->mutex);
if (status != 0)
err_abort (status, "Wait for go");
}
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
DPRINTF (("Crew %d starting\n", mine->index));
/*
* Now, as long as there's work, keep doing it.
*/
while (1) {
/*
* Wait while there is nothing to do, and
* the hope of something coming along later. If
* crew->first is NULL, there's no work. But if
* crew->work_count goes to zero, we're done.
*/
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock crew mutex");
DPRINTF (("Crew %d top: first is %#lx, count is %d\n",
mine->index, crew->first, crew->work_count));
while (crew->first == NULL) {
status = pthread_cond_wait (&crew->go, &crew->mutex);
if (status != 0)
err_abort (status, "Wait for work");
}
DPRINTF (("Crew %d woke: %#lx, %d\n",
mine->index, crew->first, crew->work_count));
/*
* Remove and process a work item
*/
work = crew->first;
crew->first = work->next;
if (crew->first == NULL)
crew->last = NULL;
DPRINTF (("Crew %d took %#lx, leaves first %#lx, last %#lx\n",
mine->index, work, crew->first, crew->last));
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
/*
* We have a work item. Process it, which may involve
* queuing new work items.
*/
status = lstat (work->path, &filestat);
if (S_ISLNK (filestat.st_mode))
printf (
"Thread %d: %s is a link, skipping.\n",
mine->index,
work->path);
else if (S_ISDIR (filestat.st_mode)) {
DIR *directory;
struct dirent *result;
/*
* If the file is a directory, search it and place
* all files onto the queue as new work items.
*/
directory = opendir (work->path);
if (directory == NULL) {
fprintf (
stderr, "Unable to open directory %s: %d (%s)\n",
work->path,
errno, strerror (errno));
continue;
}
while (1) {
status = readdir_r (directory, entry, &result);
if (status != 0) {
fprintf (
stderr,
"Unable to read directory %s: %d (%s)\n",
work->path,
status, strerror (status));
break;
}
if (result == NULL)
break; /* End of directory */
/*
* Ignore "." and ".." entries.
*/
if (strcmp (entry->d_name, ".") == 0)
continue;
if (strcmp (entry->d_name, "..") == 0)
continue;
new_work = (work_p)malloc (sizeof (work_t));
if (new_work == NULL)
errno_abort ("Unable to allocate space");
new_work->path = (char*)malloc (path_max);
if (new_work->path == NULL)
errno_abort ("Unable to allocate path");
strcpy (new_work->path, work->path);
strcat (new_work->path, "/");
strcat (new_work->path, entry->d_name);
new_work->string = work->string;
new_work->next = NULL;
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock mutex");
if (crew->first == NULL) {
crew->first = new_work;
crew->last = new_work;
} else {
crew->last->next = new_work;
crew->last = new_work;
}
crew->work_count++;
DPRINTF ((
"Crew %d: add work %#lx, first %#lx, last %#lx, %d\n",
mine->index, new_work, crew->first,
crew->last, crew->work_count));
status = pthread_cond_signal (&crew->go);
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
}
closedir (directory);
} else if (S_ISREG (filestat.st_mode)) {
FILE *search;
char buffer[256], *bufptr, *search_ptr;
/*
* If this is a file, not a directory, then search
* it for the string.
*/
search = fopen (work->path, "r");
if (search == NULL)
fprintf (
stderr, "Unable to open %s: %d (%s)\n",
work->path,
errno, strerror (errno));
else {
while (1) {
bufptr = fgets (
buffer, sizeof (buffer), search);
if (bufptr == NULL) {
if (feof (search))
break;
if (ferror (search)) {
fprintf (
stderr,
"Unable to read %s: %d (%s)\n",
work->path,
errno, strerror (errno));
break;
}
}
search_ptr = strstr (buffer, work->string);
if (search_ptr != NULL) {
flockfile (stdout);
printf (
"Thread %d found \"%s\" in %s\n",
mine->index, work->string, work->path);
#if 0
printf ("%s\n", buffer);
#endif
funlockfile (stdout);
break;
}
}
fclose (search);
}
} else
fprintf (
stderr,
"Thread %d: %s is type %o (%s))\n",
mine->index,
work->path,
filestat.st_mode & S_IFMT,
(S_ISFIFO (filestat.st_mode) ? "FIFO"
: (S_ISCHR (filestat.st_mode) ? "CHR"
: (S_ISBLK (filestat.st_mode) ? "BLK"
: (S_ISSOCK (filestat.st_mode) ? "SOCK"
: "unknown")))));
free (work->path); /* Free path buffer */
free (work); /* We're done with this */
/*
* Decrement count of outstanding work items, and wake
* waiters (trying to collect results or start a new
* calculation) if the crew is now idle.
*
* It's important that the count be decremented AFTER
* processing the current work item. That ensures the
* count won't go to 0 until we're really done.
*/
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock crew mutex");
crew->work_count--;
DPRINTF (("Crew %d decremented work to %d\n", mine->index,
crew->work_count));
if (crew->work_count <= 0) {
DPRINTF (("Crew thread %d done\n", mine->index));
status = pthread_cond_broadcast (&crew->done);
if (status != 0)
err_abort (status, "Wake waiters");
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
break;
}
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
}
free (entry);
return NULL;
}
8.crew_start是同步函数, 也就是说, 在分配任务后, 它等待crew成员完成任务才返回到调用者. 虽然它是同步的, 但是crew也可以被另一个线程分配新的任务.(注, 只有在等待work_count为0时才会去赋值新的path和string给crew开始新的作业). 刚开始调用crew_create时, work_count是0, 所以第一次调用crew_start不需要等待.
/*
* Create a work crew.
*/
int crew_create (crew_t *crew, int crew_size)
{
int crew_index;
int status;
/*
* We won't create more than CREW_SIZE members
*/
if (crew_size > CREW_SIZE)
return EINVAL;
crew->crew_size = crew_size;
crew->work_count = 0;
crew->first = NULL;
crew->last = NULL;
/*
* Initialize synchronization objects
*/
status = pthread_mutex_init (&crew->mutex, NULL);
if (status != 0)
return status;
status = pthread_cond_init (&crew->done, NULL);
if (status != 0)
return status;
status = pthread_cond_init (&crew->go, NULL);
if (status != 0)
return status;
/*
* Create the worker threads.
*/
for (crew_index = 0; crew_index < CREW_SIZE; crew_index++) {
crew->crew[crew_index].index = crew_index;
crew->crew[crew_index].crew = crew;
status = pthread_create (&crew->crew[crew_index].thread,
NULL, worker_routine, (void*)&crew->crew[crew_index]);
if (status != 0)
err_abort (status, "Create worker");
}
return 0;
}
/*
* Pass a file path to a work crew previously created
* using crew_create
*/
int crew_start (
crew_p crew,
char *filepath,
char *search)
{
work_p request;
int status;
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
return status;
/*
* If the crew is busy, wait for them to finish.
*/
while (crew->work_count > 0) {
status = pthread_cond_wait (&crew->done, &crew->mutex);
if (status != 0) {
pthread_mutex_unlock (&crew->mutex);
return status;
}
}
errno = 0;
path_max = pathconf (filepath, _PC_PATH_MAX);
if (path_max == -1) {
if (errno == 0)
path_max = 1024; /* "No limit" */
else
errno_abort ("Unable to get PATH_MAX");
}
errno = 0;
name_max = pathconf (filepath, _PC_NAME_MAX);
if (name_max == -1) {
if (errno == 0)
name_max = 256; /* "No limit" */
else
errno_abort ("Unable to get NAME_MAX");
}
DPRINTF ((
"PATH_MAX for %s is %ld, NAME_MAX is %ld\n",
filepath, path_max, name_max));
path_max++; /* Add null byte */
name_max++; /* Add null byte */
request = (work_p)malloc (sizeof (work_t));
if (request == NULL)
errno_abort ("Unable to allocate request");
DPRINTF (("Requesting %s\n", filepath));
request->path = (char*)malloc (path_max);
if (request->path == NULL)
errno_abort ("Unable to allocate path");
strcpy (request->path, filepath);
request->string = search;
request->next = NULL;
if (crew->first == NULL) {
crew->first = request;
crew->last = request;
} else {
crew->last->next = request;
crew->last = request;
}
crew->work_count++;
status = pthread_cond_signal (&crew->go);
if (status != 0) {
free (crew->first);
crew->first = NULL;
crew->work_count = 0;
pthread_mutex_unlock (&crew->mutex);
return status;
}
while (crew->work_count > 0) {
status = pthread_cond_wait (&crew->done, &crew->mutex);
if (status != 0)
err_abort (status, "waiting for crew to finish");
}
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock crew mutex");
return 0;
}
/*
* The main program to "drive" the crew...
*/
int main (int argc, char *argv[])
{
crew_t my_crew;
char line[128], *next;
int status;
if (argc < 3) {
fprintf (stderr, "Usage: %s string path\n", argv[0]);
return -1;
}
#ifdef sun
/*
* On Solaris 2.5, threads are not timesliced. To ensure
* that our threads can run concurrently, we need to
* increase the concurrency level to CREW_SIZE.
*/
DPRINTF (("Setting concurrency level to %d\n", CREW_SIZE));
thr_setconcurrency (CREW_SIZE);
#endif
status = crew_create (&my_crew, CREW_SIZE);
if (status != 0)
err_abort (status, "Create crew");
status = crew_start (&my_crew, argv[2], argv[1]);
if (status != 0)
err_abort (status, "Start crew");
return 0;
}
参考
- << Programming with POSIX Threads >>