Êíèãà: Programming with POSIX® Threads

4.2 Work crew

4.2 Work crew

The twelve jurors were all writing very busily on slates. "What are they doing?"Alice whispered to the Gryphon.

"They ca'n't have anything to put down yet, before the trial's begun." "They're putting down their names," the Gryphon whispered in reply, "for fear they should forget them before the end of the trial."

Lewis Carroll, Alice's Adventures in Wonderland

In a work crew, data is processed independently by a set of threads (Figure 4.2). A "parallel decomposition" of a loop generally falls into this category. A set of threads may be created, for example, each directed to process some set of rows or columns of an array. A single set of data is split between the threads, and the result is a single (filtered) set of data. Because all the threads in the work crew, in this model, are performing the same operation on different data, it is often known as SIMD parallel processing, for "single instruction, multiple data." The original use of SIMD was in an entirely different form of parallelism, and doesn't literally apply to threads — but the concept is similar.


FIGURE 4.2 Work crew

The threads in a work crew don't have to use a SIMD model, though. They may perform entirely different operations on different data. The members of our work crew, for example, each remove work requests from a shared queue, and do whatever is required by that request. Each queued request packet could describe a variety of operations—but the common queue and "mission statement" (to process that queue) make them a "crew" rather than independent worker threads. This model can be compared to the original definition of MIMD parallel processing, "multiple instruction, multiple data."

Section 7.2, by the way, shows the development of a more robust and general (and more complicated) "work queue manager" package. A "work crew" and a "work queue" are related in much the same way as "invariants" and "critical sections" — it depends on how you look at what's happening. A work crew is the set of threads that independently processes data, whereas a work queue is a mechanism by which your code may request that data be processed by anonymous and independent "agents." So in this section, we develop a "work crew," whereas in Section 7.2 we develop a more sophisticated "work queue." The focus differs, but the principle is the same.

The following program, called crew.c, shows a simple work crew. Run the program with two arguments, a string, and a file path. The program will queue the file path to the work crew. A crew member will determine whether the file path is a file or a directory—if a file, it will search the file for the string; if a directory, it will use readdir_r to find all directories and regular files within the directory, and queue each entry as new work. Each file containing the search string will be reported on stdout.

Part 1 shows the header files and definitions used by the program. 7 The symbol CREW_SIZE determines how many threads are created for each work crew.

13-17 Each item of work is described by a work_t structure. This structure has a pointer to the next work item (set to NULL to indicate the end of the List), a pointer to the file path described by the work item, and a pointer to the string for which the program is searching. As currently constructed, all work items point to the same search string.

23-27 Each member of a work crew has a worker_t structure. This structure contains the index of the crew member in the crew vector, the thread identifier of the crew member (thread), and a pointer to the crew_t structure (crew).

33-41 The crew_t structure describes the work crew state. It records the number of members in the work crew (crew_size) and an array of worker_t structures (crew). It also has a counter of how many work items remain to be processed (work_count) and a list of outstanding work items (first points to the earliest item, and last to the latest). Finally, it contains the various Pthreads synchronization objects: a mutex to control access, a condition variable (done) to wait for the work crew to finish a task, and a condition variable on which crew members wait to receive new work (go).

43-44 The allowed size of a file name and path name may vary depending on the file system to which the path leads. When a crew is started, the program calculates the allowable file name and path length for the specified file path by calling path-conf, and stores the values in path_max and name_max, respectively, for later use.

1 #include <sys/types.h>
2 #include <pthread.h>
3 #include <sys/stat.h>
4 #include <dirent.h>
5 #include "errors.h"
6
7 #define CREW_SIZE 4
8

9 /*
/*
10 * Queued items of work for the crew. One is queued by
11 * crew_start, and each worker may queue additional items.
12 */
13 typedef struct work_tag {
14 struct work_tag *next; /* Next work item */
15 char *path; /* Directory or file */
16 char *string; /* Search string */
17

18

19 } work_t, *work_p;
/*
20 * One of these is initialized for each worker thread in the
21 * crew. It contains the "identity" of each worker.
22 */
23 typedef struct worker_tag {
24 int index; /* Thread's index */
25 pthread_t thread; /* Thread for stage */
26 struct crew_tag *crew; /* Pointer to crew */
27 } worker_t, *worker_p;
28
29 /*
30 * The external "handle' ' for a work crew. Contains the
31 * crew synchronization state and staging area.
32 */
33 typedef struct crew_tag {
34 int crew_size; /* Size of array */
35 worker_t crew[CREW_SIZE];/* Crew members */
36 long work_count; /* Count of work items */
37 work_t *first, *last; /* First & last work item */
38 pthread_mutex_t mutex; /* Mutex for crew data */
39 pthread_cond_t done; /* Wait for crew done */
40 pthread_cond_t go; /* Wait for work */
41
42 } crew_t, *crew_p;

43 size_t path_max; /* Filepath length */
44 size t name max; /* Name length */

Part 2 shows worker_routine, the start function for crew threads. The outer loop repeats processing until the thread is told to terminate.

20-23 POSIX is a little ambiguous about the actual size of the struct dirent type. The actual requirement for readdir_r is that you pass the address of a buffer large enough to contain a struct dirent with a name member of at least NAME_ MAX bytes. To ensure that we have enough space, allocate a buffer the size of the system's struct dirent plus the maximum size necessary for a file name on the file system we're using. This may be bigger than necessary, but it surely won't be too small.

33-37 This condition variable loop blocks each new crew member until work is made available.

61-65 This wait is a little different. While the work list is empty, wait for more work. The crew members never terminate—once they're all done with the current assignment, they're ready for a new assignment. (This example doesn't take advantage of that capability — the process will terminate once the single search command has completed.)

73-76 Remove the first work item from the queue. If the queue becomes empty, also clear the pointer to the last entry, crew->last.

81-83 Unlock the work crew mutex, so that the bulk of the crew's work can proceed concurrently.

89 Determine what sort of file we've got in the work item's path string. We use lstat, which will return information for a symbolic link, rather than stat, which would return information for the file to which the link pointed. By not following symbolic links, we reduce the amount of work in this example, and, especially, avoid following links into other file systems where our name_max and path_max sizes may not be sufficient.

91-95 If the file is a link, report the name, but do nothing else with it. Note that each message includes the thread's work crew index (mine->index), so that you can easily see "concurrency at work" within the example.

96-165 If the file is a directory, open it with opendir. Find all entries in the directory by repeatedly calling readdir_r. Each directory entry is entered as a new work item.

166-206 If the file is a regular file, open it and read all text, looking for the search string. If we find it, write a message and exit the search loop.

207-218 If the file is of any other type, write a message attempting to identify the type.

232-252 Relock the work crew mutex, and report that another work item is done. If the count reaches 0, then the crew has completed the assignment, and we broadcast to awaken any threads waiting to issue a new assignment. Note that the work count is decreased only after the work item is fully processed — the count will never reach 0 if any crew member is still busy (and might queue additional directory entries).

? crew.c part 2 worker_routine

1 /*
2 * The thread start routine for crew threads. Waits until "go"
3 * command, processes work items until requested to shut down.
4 */
5 void *worker_routine (void *arg)
6 {
7 worker_p mine = (worker_t*)arg;
8 crew_p crew = mine->crew;
9 work_p work, new_work;
10 struct stat filestat;
11 struct dirent *entry;
12 int status; 13
14 /*
15 * "struct dirent" is funny, because POSIX doesn't require
16 * the definition to be more than a header for a variable
17 * buffer. Thus, allocate a "big chunk" of memory, and use
18 * it as a buffer.
19 */
20 entry = (struct dirent*)malloc (
21 sizeof (struct dirent) + name_max);
22 if (entry == NULL)
23 errno_abort ("Allocating dirent");

24
25 status = pthread_mutex_lock (&crew->mutex);
26 if (status != 0)
27 err_abort (status, "Lock crew mutex");

28
29 /*
30 * There won't be any work when the crew is created, so wait
31 * until something's put on the queue.
32 */
33 while (crew->work_count == 0) {
34  status = pthread_cond_wait (&crew->go, &crew->mutex);
35  if (status != 0)
36  err_abort (status, "Wait for go");
37 }

38
39 status = pthread_mutex_unlock (&crew->mutex);
40 if (status != 0)
41  err_abort (status, "Unlock mutex");

42
43 DPRINTF (("Crew %d startingn", mine->index));

44
45 /*
46 * Now, as long as there's work, keep doing it.
47 */
48 while (1) {
49 /*
50 * Wait while there is nothing to do, and
51 * the hope of something coming along later. If
52 * crew->first is NULL, there's no work. But if
53 * crew->work_count goes to zero, we're done.
54 */
55  status = pthread_mutex_lock (&crew->mutex);
56  if (status != 0)
57  err_abort (status, "Lock crew mutex");

58
59  DPRINTF (("Crew %d top: first is %#lx, count is %dn",
60  mine->index, crew->first, crew->work_count));
61  while (crew->first == NULL) {
62  status = pthread_cond_wait (&crew->go, &crew->mutex);
63  if (status != 0)
64  err_abort (status, "Wait for work");
65  }

66
67  DPRINTF (("Crew %d woke: %#lx, %dn",
68  mine->index, crew->first, crew->work_count));

69
70 /*
71 * Remove and process a work item.
72 */
73  work = crew->first;
74  crew->first = work->next;
75  if (crew->first == NULL)
76  crew->last = NULL;
77
78  DPRINTF (("Crew %d took %#lx, leaves first %#lx, last %#lxn",
79  mine->index, work, crew->first, crew->last));

80
81  status = pthread_mutex_unlock (&crew->mutex);
82  if (status != 0)
83  err_abort (status, "Unlock mutex");
84
85 /*
86 * We have a work item. Process it, which may involve
87 * queuing new work items.
88 */
89  status = lstat (work->path, &filestat);

90
91  if (S_ISLNK (filestat.st_mode))
92  printf (
93  "Thread %d: %s is a link, skipping.n",
94  mine->index,
95  work->path);
96  else if (S_ISDIR (filestat.st_mode)) {
97  DIR *directory;
98  struct dirent *result;

99
100 /*
101 * If the file is a directory, search it and place
102 * all files onto the queue as new work items.
103 */
104  directory = opendir (work->path);
105  if (directory == NULL) {
106  fprintf (
107  stderr, "Unable to open directory %s: %d (%s)n",
108  work->path,
109  errno, strerror (errno));
110  continue;
111  }
112
113  while (1) {
114  status = readdir_r (directory, entry, &result);
115  if (status != 0) {
116  fprintf (
117  stderr,
118  "Unable to read directory %s: %d (%s)n",
119  work->path,
120  status, strerror (status));
121  break;
122  }
123  if (result == NULL)
124  break; /* End of directory */

125
126 /*
127 * Ignore "." and entries.
128 */
129  if (strcmp (entry->d_name, ".") == 0)
130  continue;
131  if (strcmp (entry->d_name, "..") == 0)
132  continue;
133  new_work = (work_p)malloc (sizeof (work_t));
134  if (new_work == NULL)
135  errno_abort ("Unable to allocate space");
136  new_work->path = (char*)malloc (path_max);
137  if (new_work->path == NULL)
138  errno_abort ("Unable to allocate path");
139  strcpy (new_work->path, work->path);
140  strcat (new_work->path, "/");
141  strcat (new_work->path, entry->d_name);
142  new_work->string = work->string;
143  new_work->next = NULL;
144  status = pthread_mutex_lock (&crew->mutex);
145  if (status != 0)
146  err_abort (status, "Lock mutex");
147  if (crew->first == NULL) {
148  crew->first = new_work;
149  crew->last = new_work;
150  } else {
151  crew->last->next = new_work;
152  crew->last = new_work;
153  }
154  crew->work_count++;
155  DPRINTF ((
156  "Crew %d: add %#lx, first %#lx, last %#lx, %dn",
157  mine->index, new_work, crew->first,
158  crew->last, crew->work_count));
159  status = pthread_cond_signal (&crew->go);
160  status = pthread_mutex_unlock (&crew->mutex);
161  if (status != 0)
162  err_abort (status, "Unlock mutex");
163  }

164
165  closedir (directory);
166  } else if (S_ISREG (filestat.st_mode)) {
167  FILE *search;
168  char buffer[256], *bufptr, *search_ptr;

169
170 /*
171 * If this is a file, not a directory, then search
172 * it for the string.
173 */
174  search = fopen (work->path, "r");
175  if (search == NULL)
176  fprintf (
177  stderr, "Unable to open %s: %d (%s)n",
178  work->path,
179  errno, strerror (errno));
180  else {

181
182  while (1) {
183  bufptr = fgets (
184  buffer, sizeof (buffer), search);
185  if (bufptr == NULL) {
186  if (feof (search))
187  break;
188  if (ferror (search)) {
189  fprintf (
190  stderr,
191  "Unable to read %s: %d (%s)n",
192  work->path,
193  errno, strerror (errno));
194  break;
195  }
196  }
197  search_ptr = strstr (buffer, work->string);
198  if (search_ptr != NULL) {
199  printf (
200  "Thread %d found "%s" in %sn",
201  mine->index, work->string, work->path);
202  break;
203  }
204  }
205  fclose (search);
206  }
207 } else
208  fprintf (
209  stderr,
210  "Thread %d: %s is type %o (%s))n",
211  mine->index,
212  work->path,
213  filestat.st_mode & S_IFMT,
214  (S_ISFIFO (filestat.st_mode) ? "FIFO"
215  :(S_ISCHR (filestat.st_mode) ? "CHR"
216  :(S_ISBLK (filestat.st_mode) ? "BLK"
217  :(S_ISSOCK (filestat.st_mode) ? "SOCK"
218  :"unknown")))));

219
220  free (work->path); /* Free path buffer */
221  free (work); /* We're done with this */

222
223 /*
224 * Decrement count of outstanding work items, and wake
225 * waiters (trying to collect results or start a new
226 * calculation) if the crew is now idle.
227 *
228 * It's important that the count be decremented AFTER
229 * processing the current work item. That ensures the
230 * count won't go to 0 until we're really done.
231 */
232  status = pthread_mutex_lock (&crew->mutex);
233  if (status != 0)
234  err_abort (status, "Lock crew mutex");

235
236  crew->work_count--;
237  DPRINTF (("Crew %d decremented work to %dn", mine->index,
238  crew->work_count));
239  if (crew->work_count <= 0) {
240  DPRINTF (("Crew thread %d donen", mine->index));
241  status = pthread_cond_broadcast (&crew->done);
242  if (status != 0)
243  err_abort (status, "Wake waiters");
244  status = pthread_mutex_unlock (&crew->mutex);
245  if (status != 0)
246  err_abort (status, "Unlock mutex");
247  break;
248  }

249
250  status = pthread_mutex_unlock (&crew->mutex);
251  if (status != 0)
252  err_abort (status, "Unlock mutex");

253
254 }
255
256 free (entry);
257 return NULL;
258 }

Part 3 shows crew_create, the function used to create a new work crew. This simple example does not provide a way to destroy a work crew, because that is not necessary — the work crew would be destroyed only when the main program was prepared to exit, and process exit will destroy all threads and process data.

12-15 The crew_create function begins by checking the crew_size argument.The size of the crew is not allowed to exceed the size of the crew array in crew_t. If the requested size is acceptable, copy it into the structure.

16-31 Start with no work and an empty work queue. Initialize the crew's synchronization objects.

36-43 Then, for each crew member, initialize the member's worker_t data. The index of the member within the crew array is recorded, and a pointer back to the crew_t. Then the crew member thread is created, with a pointer to the member's worker_t as its argument.

? crew.c part 3 crew_create

1 /*
2 * Create a work crew.
3 */
4 int crew_create (crew_t *crew, int crew_size)
5 {
6 int crew_index;
7 int status;

8
9 /*
10 * We won't create more than CREW_SIZE members.
11 */
12 if (crew_size > CREW_SIZE)
13 return EINVAL;

14
15 crew->crew_size = crew_size;
16 crew->work_count = 0;
17 crew->first = NULL;
18 crew->last = NULL;

19
20 /*
21 * Initialize synchronization objects.
22 */
23 status = pthread_mutex_init (&crew->mutex, NULL);
24 if (status != 0)
25 return status;
26 status = pthread_cond_init (&crew->done, NULL);
27 if (status != 0)
28 return status;
29 status = pthread_cond_init (&crew->go, NULL);
30 if (status != 0)
31 return status;

32
33 /*
34 * Create the worker threads.
35 */
36 for (crew_index = 0; crew_index < CREW_SIZE; crew_index++) {
37 crew->crew[crew_index].index = crew_index;
38 crew->crew[crew_index].crew = crew;
39 status = pthread_create (&crew->crew[crew_index].thread,
40 NULL, worker_routine, (void*)&crew->crew[crew_index]);
41 if (status != 0)
42 err_abort (status, "Create worker");
43 }
44 return 0;
45 }

Part 4 shows the crew_start function, which is called to assign a new path name and search string to the work crew. The function is synchronous — that is, after assigning the task it waits for the crew members to complete the task before returning to the caller. The crew_start function assumes that the crew_t structure has been previously created by calling crew_create, shown in part 3, but does not attempt to validate the structure.

20-26 Wait for the crew members to finish any previously assigned work. Although crew_start is synchronous, the crew may be processing a task assigned by another thread. On creation, the crew's work_count is set to 0, so the first call to crew start will not need to wait.

28-43 Get the proper values of path_max and name_max for the file system specified by the file path we'll be reading. The pathconf function may return a value of -1 without setting errno, if the requested value for the file system is "unlimited." To detect this, we need to clear errno before making the call. If pathconf returns -1 without setting errno, assume reasonable values.

47-48 The values returned by pathconf don't include the terminating null character of a string — so add one character to both.

49-67 Allocate a work queue entry (work_t) and fill it in. Add it to the end of the request queue.

68-75 We've queued a single work request, so awaken one of the waiting work crew members by signaling the condition variable. If the attempt fails, free the work request, clear the work queue, and return with the error.

76-80 Wait for the crew to complete the task. The crew members handle all output, so when they're done we simply return to the caller.

? crew.c part 4 crew_start

1 /*
2 * Pass a file path to a work crew previously created
3 * using crew_create
4 */
5 int crew_start (
6 crew_p crew,
7 char *filepath,
8 char *search)
9 {
10 work_p request;
11 int status;

12
13 status = pthread_mutex_lock (&crew->mutex);
14 if (status != 0)
15 return status;

16
17 /*
18 * If the crew is busy, wait for them to finish.
19 */
20 while (crew->work_count > 0) {
21 status = pthread_cond_wait (&crew->done, &crew->mutex);
22 if (status != 0) {
23 pthread_mutex_unlock (&crew->mutex);
24 return status;
25 }
26 } 27
28 errno = 0;
29 path_max = pathconf (filepath, _PC_PATH_MAX);
30 if (path_max == -1) {
31 if (errno == 0)
32 path_max = 1024; /* "No limit" */
33 else
34 errno_abort ("Unable to get PATH_MAX");
35 }
36 errno = 0;
37 name_max = pathconf (filepath, _PC_NAME_MAX);
38 if (name_max == -1) {
39 if (errno == 0)
40 name_max = 256; /* "No limit" */
41 else
42 errno_abort ("Unable to get NAME_MAX");
43 }
44 DPRINTF ((
45 "PATH_MAX for %s is %ld, NAME_MAX is %ldn",
46 filepath, path_max, name_max));
47 path_max++; /* Add null byte */
48 name_max++; /* Add null byte */
49 request = (work_p)malloc (sizeof (work_t));
50 if (request == NULL)
51 errno_abort ("Unable to allocate request");
52 DPRINTF (("Requesting %sn", filepath));
53 request->path = (char*)malloc (path_max);
54 if (request->path == NULL)
55 errno_abort ("Unable to allocate path");
56 strcpy (request->path, filepath);
57 request->string = search;
58 request->next = NULL;
59 if (crew->first == NULL) {
60 crew->first = request;
61 crew->last = request;
62 } else {
63 crew->last->next = request;
64 crew->last = request;
65 } 66
67 crew->work_count++;
68 status = pthread_cond_signal (&crew->go);
69 if (status != 0) {
70 free (crew->first);
71 crew->first = NULL;
72 crew->work_count = 0;
73 pthread_mutex_unlock (&crew->mutex);
74 return status;
75 }
76 while (crew->work_count > 0) {
77 status = pthread_cond_wait (&crew->done, &crew->mutex);
78 if (status != 0)
79 err_abort (status, "waiting for crew to finish");
80 }
81 status = pthread_mutex_unlock (&crew->mutex);
82 if (status != 0)
83 err_abort (status, "Unlock crew mutex");
84 return 0;
85 }

Part 5 shows the initial thread (main) for the little work crew sample.

10-13 The program requires three arguments — the program name, a string for which to search, and a path name. For example, "crew butenhof ~"

15-23 On a Solaris system, call thr_setconcurrency to ensure that at least one LWP (kernel execution context) is created for each crew member. The program will work without this call, but, on a uniprocessor, you would not see any concurrency. See Section 5.6.3 for more information on "many to few" scheduling models, and Section 10.1.3 for information on "set concurrency" functions.

24-30 Create a work crew, and assign to it the concurrent file search.

crew.c part 5 main

1 /*
2 * The main program to "drive" the crew...
3 */
4 int main (int argc, char *argv[])
5 {
6 crew_t my_crew;
7 char line[128], *next;
8 int status;

9
10 if (argc < 3) {
11 fprintf (stderr, "Usage: %s string pathn", argv[0]);
12 return -1;
13 }
14

15 #ifdef sun
16 /*
17 * On Solaris 2.5, threads are not timesliced. To ensure
18 * that our threads can run concurrently, we need to
19 * increase the concurrency level to CREW_SIZE.
20 */
21 DPRINTF (("Setting concurrency level to %dn", CREW_SIZE));
22 thr_setconcurrency (CREW_SIZE);
23 #endif
24 status = crew_create (&my_crew, CREW_SIZE);
25 if (status != 0)
26 err_abort (status, "Create crew");
27
28 status = crew_start (&my_crew, argv[2], argv[l]);
29 if (status != 0)
30 err_abort (status, "Start crew");

31
32 return 0;
33 }

Îãëàâëåíèå êíèãè

Îãëàâëåíèå ñòàòüè/êíèãè

Ãåíåðàöèÿ: 1.302. Çàïðîñîâ Ê ÁÄ/Cache: 3 / 1
ïîäåëèòüñÿ
Ââåðõ Âíèç