Êíèãà: Programming with POSIX® Threads
4.2 Work crew
4.2 Work crew
The twelve jurors were all writing very busily on slates. "What are they doing?"Alice whispered to the Gryphon.
"They ca'n't have anything to put down yet, before the trial's begun." "They're putting down their names," the Gryphon whispered in reply, "for fear they should forget them before the end of the trial."
In a work crew, data is processed independently by a set of threads (Figure 4.2). A "parallel decomposition" of a loop generally falls into this category. A set of threads may be created, for example, each directed to process some set of rows or columns of an array. A single set of data is split between the threads, and the result is a single (filtered) set of data. Because all the threads in the work crew, in this model, are performing the same operation on different data, it is often known as SIMD parallel processing, for "single instruction, multiple data." The original use of SIMD was in an entirely different form of parallelism, and doesn't literally apply to threads — but the concept is similar.
FIGURE 4.2 Work crew
The threads in a work crew don't have to use a SIMD model, though. They may perform entirely different operations on different data. The members of our work crew, for example, each remove work requests from a shared queue, and do whatever is required by that request. Each queued request packet could describe a variety of operations—but the common queue and "mission statement" (to process that queue) make them a "crew" rather than independent worker threads. This model can be compared to the original definition of MIMD parallel processing, "multiple instruction, multiple data."
Section 7.2, by the way, shows the development of a more robust and general (and more complicated) "work queue manager" package. A "work crew" and a "work queue" are related in much the same way as "invariants" and "critical sections" — it depends on how you look at what's happening. A work crew is the set of threads that independently processes data, whereas a work queue is a mechanism by which your code may request that data be processed by anonymous and independent "agents." So in this section, we develop a "work crew," whereas in Section 7.2 we develop a more sophisticated "work queue." The focus differs, but the principle is the same.
The following program, called crew.c, shows a simple work crew. Run the program with two arguments, a string, and a file path. The program will queue the file path to the work crew. A crew member will determine whether the file path is a file or a directory—if a file, it will search the file for the string; if a directory, it will use readdir_r
to find all directories and regular files within the directory, and queue each entry as new work. Each file containing the search string will be reported on stdout.
Part 1 shows the header files and definitions used by the program. 7 The symbol CREW_SIZE determines how many threads are created for each work crew.
13-17 Each item of work is described by a work_t structure. This structure has a pointer to the next work item (set to NULL to indicate the end of the List), a pointer to the file path described by the work item, and a pointer to the string for which the program is searching. As currently constructed, all work items point to the same search string.
23-27 Each member of a work crew has a worker_t structure. This structure contains the index of the crew member in the crew vector, the thread identifier of the crew member (thread), and a pointer to the crew_t structure (crew).
33-41 The crew_t structure describes the work crew state. It records the number of members in the work crew (crew_size) and an array of worker_t structures (crew). It also has a counter of how many work items remain to be processed (work_count) and a list of outstanding work items (first points to the earliest item, and last to the latest). Finally, it contains the various Pthreads synchronization objects: a mutex to control access, a condition variable (done) to wait for the work crew to finish a task, and a condition variable on which crew members wait to receive new work (go).
43-44 The allowed size of a file name and path name may vary depending on the file system to which the path leads. When a crew is started, the program calculates the allowable file name and path length for the specified file path by calling path-conf, and stores the values in path_max and name_max, respectively, for later use.
1 #include <sys/types.h>
2 #include <pthread.h>
3 #include <sys/stat.h>
4 #include <dirent.h>
5 #include "errors.h"
6
7 #define CREW_SIZE 4
8
9 /*
/*
10 * Queued items of work for the crew. One is queued by
11 * crew_start, and each worker may queue additional items.
12 */
13 typedef struct work_tag {
14 struct work_tag *next; /* Next work item */
15 char *path; /* Directory or file */
16 char *string; /* Search string */
17
18
19 } work_t, *work_p;
/*
20 * One of these is initialized for each worker thread in the
21 * crew. It contains the "identity" of each worker.
22 */
23 typedef struct worker_tag {
24 int index; /* Thread's index */
25 pthread_t thread; /* Thread for stage */
26 struct crew_tag *crew; /* Pointer to crew */
27 } worker_t, *worker_p;
28
29 /*
30 * The external "handle' ' for a work crew. Contains the
31 * crew synchronization state and staging area.
32 */
33 typedef struct crew_tag {
34 int crew_size; /* Size of array */
35 worker_t crew[CREW_SIZE];/* Crew members */
36 long work_count; /* Count of work items */
37 work_t *first, *last; /* First & last work item */
38 pthread_mutex_t mutex; /* Mutex for crew data */
39 pthread_cond_t done; /* Wait for crew done */
40 pthread_cond_t go; /* Wait for work */
41
42 } crew_t, *crew_p;
43 size_t path_max; /* Filepath length */
44 size t name max; /* Name length */
Part 2 shows worker_routine, the start function for crew threads. The outer loop repeats processing until the thread is told to terminate.
20-23 POSIX is a little ambiguous about the actual size of the struct dirent type. The actual requirement for readdir_r is that you pass the address of a buffer large enough to contain a struct dirent with a name member of at least NAME_ MAX bytes. To ensure that we have enough space, allocate a buffer the size of the system's struct dirent plus the maximum size necessary for a file name on the file system we're using. This may be bigger than necessary, but it surely won't be too small.
33-37 This condition variable loop blocks each new crew member until work is made available.
61-65 This wait is a little different. While the work list is empty, wait for more work. The crew members never terminate—once they're all done with the current assignment, they're ready for a new assignment. (This example doesn't take advantage of that capability — the process will terminate once the single search command has completed.)
73-76 Remove the first work item from the queue. If the queue becomes empty, also clear the pointer to the last entry, crew->last.
81-83 Unlock the work crew mutex, so that the bulk of the crew's work can proceed concurrently.
89 Determine what sort of file we've got in the work item's path string. We use lstat, which will return information for a symbolic link, rather than stat, which would return information for the file to which the link pointed. By not following symbolic links, we reduce the amount of work in this example, and, especially, avoid following links into other file systems where our name_max and path_max sizes may not be sufficient.
91-95 If the file is a link, report the name, but do nothing else with it. Note that each message includes the thread's work crew index (mine->index), so that you can easily see "concurrency at work" within the example.
96-165 If the file is a directory, open it with opendir. Find all entries in the directory by repeatedly calling readdir_r. Each directory entry is entered as a new work item.
166-206 If the file is a regular file, open it and read all text, looking for the search string. If we find it, write a message and exit the search loop.
207-218 If the file is of any other type, write a message attempting to identify the type.
232-252 Relock the work crew mutex, and report that another work item is done. If the count reaches 0, then the crew has completed the assignment, and we broadcast to awaken any threads waiting to issue a new assignment. Note that the work count is decreased only after the work item is fully processed — the count will never reach 0 if any crew member is still busy (and might queue additional directory entries).
? crew.c part 2 worker_routine
1 /*
2 * The thread start routine for crew threads. Waits until "go"
3 * command, processes work items until requested to shut down.
4 */
5 void *worker_routine (void *arg)
6 {
7 worker_p mine = (worker_t*)arg;
8 crew_p crew = mine->crew;
9 work_p work, new_work;
10 struct stat filestat;
11 struct dirent *entry;
12 int status; 13
14 /*
15 * "struct dirent" is funny, because POSIX doesn't require
16 * the definition to be more than a header for a variable
17 * buffer. Thus, allocate a "big chunk" of memory, and use
18 * it as a buffer.
19 */
20 entry = (struct dirent*)malloc (
21 sizeof (struct dirent) + name_max);
22 if (entry == NULL)
23 errno_abort ("Allocating dirent");
24
25 status = pthread_mutex_lock (&crew->mutex);
26 if (status != 0)
27 err_abort (status, "Lock crew mutex");
28
29 /*
30 * There won't be any work when the crew is created, so wait
31 * until something's put on the queue.
32 */
33 while (crew->work_count == 0) {
34 status = pthread_cond_wait (&crew->go, &crew->mutex);
35 if (status != 0)
36 err_abort (status, "Wait for go");
37 }
38
39 status = pthread_mutex_unlock (&crew->mutex);
40 if (status != 0)
41 err_abort (status, "Unlock mutex");
42
43 DPRINTF (("Crew %d startingn", mine->index));
44
45 /*
46 * Now, as long as there's work, keep doing it.
47 */
48 while (1) {
49 /*
50 * Wait while there is nothing to do, and
51 * the hope of something coming along later. If
52 * crew->first is NULL, there's no work. But if
53 * crew->work_count goes to zero, we're done.
54 */
55 status = pthread_mutex_lock (&crew->mutex);
56 if (status != 0)
57 err_abort (status, "Lock crew mutex");
58
59 DPRINTF (("Crew %d top: first is %#lx, count is %dn",
60 mine->index, crew->first, crew->work_count));
61 while (crew->first == NULL) {
62 status = pthread_cond_wait (&crew->go, &crew->mutex);
63 if (status != 0)
64 err_abort (status, "Wait for work");
65 }
66
67 DPRINTF (("Crew %d woke: %#lx, %dn",
68 mine->index, crew->first, crew->work_count));
69
70 /*
71 * Remove and process a work item.
72 */
73 work = crew->first;
74 crew->first = work->next;
75 if (crew->first == NULL)
76 crew->last = NULL;
77
78 DPRINTF (("Crew %d took %#lx, leaves first %#lx, last %#lxn",
79 mine->index, work, crew->first, crew->last));
80
81 status = pthread_mutex_unlock (&crew->mutex);
82 if (status != 0)
83 err_abort (status, "Unlock mutex");
84
85 /*
86 * We have a work item. Process it, which may involve
87 * queuing new work items.
88 */
89 status = lstat (work->path, &filestat);
90
91 if (S_ISLNK (filestat.st_mode))
92 printf (
93 "Thread %d: %s is a link, skipping.n",
94 mine->index,
95 work->path);
96 else if (S_ISDIR (filestat.st_mode)) {
97 DIR *directory;
98 struct dirent *result;
99
100 /*
101 * If the file is a directory, search it and place
102 * all files onto the queue as new work items.
103 */
104 directory = opendir (work->path);
105 if (directory == NULL) {
106 fprintf (
107 stderr, "Unable to open directory %s: %d (%s)n",
108 work->path,
109 errno, strerror (errno));
110 continue;
111 }
112
113 while (1) {
114 status = readdir_r (directory, entry, &result);
115 if (status != 0) {
116 fprintf (
117 stderr,
118 "Unable to read directory %s: %d (%s)n",
119 work->path,
120 status, strerror (status));
121 break;
122 }
123 if (result == NULL)
124 break; /* End of directory */
125
126 /*
127 * Ignore "." and entries.
128 */
129 if (strcmp (entry->d_name, ".") == 0)
130 continue;
131 if (strcmp (entry->d_name, "..") == 0)
132 continue;
133 new_work = (work_p)malloc (sizeof (work_t));
134 if (new_work == NULL)
135 errno_abort ("Unable to allocate space");
136 new_work->path = (char*)malloc (path_max);
137 if (new_work->path == NULL)
138 errno_abort ("Unable to allocate path");
139 strcpy (new_work->path, work->path);
140 strcat (new_work->path, "/");
141 strcat (new_work->path, entry->d_name);
142 new_work->string = work->string;
143 new_work->next = NULL;
144 status = pthread_mutex_lock (&crew->mutex);
145 if (status != 0)
146 err_abort (status, "Lock mutex");
147 if (crew->first == NULL) {
148 crew->first = new_work;
149 crew->last = new_work;
150 } else {
151 crew->last->next = new_work;
152 crew->last = new_work;
153 }
154 crew->work_count++;
155 DPRINTF ((
156 "Crew %d: add %#lx, first %#lx, last %#lx, %dn",
157 mine->index, new_work, crew->first,
158 crew->last, crew->work_count));
159 status = pthread_cond_signal (&crew->go);
160 status = pthread_mutex_unlock (&crew->mutex);
161 if (status != 0)
162 err_abort (status, "Unlock mutex");
163 }
164
165 closedir (directory);
166 } else if (S_ISREG (filestat.st_mode)) {
167 FILE *search;
168 char buffer[256], *bufptr, *search_ptr;
169
170 /*
171 * If this is a file, not a directory, then search
172 * it for the string.
173 */
174 search = fopen (work->path, "r");
175 if (search == NULL)
176 fprintf (
177 stderr, "Unable to open %s: %d (%s)n",
178 work->path,
179 errno, strerror (errno));
180 else {
181
182 while (1) {
183 bufptr = fgets (
184 buffer, sizeof (buffer), search);
185 if (bufptr == NULL) {
186 if (feof (search))
187 break;
188 if (ferror (search)) {
189 fprintf (
190 stderr,
191 "Unable to read %s: %d (%s)n",
192 work->path,
193 errno, strerror (errno));
194 break;
195 }
196 }
197 search_ptr = strstr (buffer, work->string);
198 if (search_ptr != NULL) {
199 printf (
200 "Thread %d found "%s" in %sn",
201 mine->index, work->string, work->path);
202 break;
203 }
204 }
205 fclose (search);
206 }
207 } else
208 fprintf (
209 stderr,
210 "Thread %d: %s is type %o (%s))n",
211 mine->index,
212 work->path,
213 filestat.st_mode & S_IFMT,
214 (S_ISFIFO (filestat.st_mode) ? "FIFO"
215 :(S_ISCHR (filestat.st_mode) ? "CHR"
216 :(S_ISBLK (filestat.st_mode) ? "BLK"
217 :(S_ISSOCK (filestat.st_mode) ? "SOCK"
218 :"unknown")))));
219
220 free (work->path); /* Free path buffer */
221 free (work); /* We're done with this */
222
223 /*
224 * Decrement count of outstanding work items, and wake
225 * waiters (trying to collect results or start a new
226 * calculation) if the crew is now idle.
227 *
228 * It's important that the count be decremented AFTER
229 * processing the current work item. That ensures the
230 * count won't go to 0 until we're really done.
231 */
232 status = pthread_mutex_lock (&crew->mutex);
233 if (status != 0)
234 err_abort (status, "Lock crew mutex");
235
236 crew->work_count--;
237 DPRINTF (("Crew %d decremented work to %dn", mine->index,
238 crew->work_count));
239 if (crew->work_count <= 0) {
240 DPRINTF (("Crew thread %d donen", mine->index));
241 status = pthread_cond_broadcast (&crew->done);
242 if (status != 0)
243 err_abort (status, "Wake waiters");
244 status = pthread_mutex_unlock (&crew->mutex);
245 if (status != 0)
246 err_abort (status, "Unlock mutex");
247 break;
248 }
249
250 status = pthread_mutex_unlock (&crew->mutex);
251 if (status != 0)
252 err_abort (status, "Unlock mutex");
253
254 }
255
256 free (entry);
257 return NULL;
258 }
Part 3 shows crew_create, the function used to create a new work crew. This simple example does not provide a way to destroy a work crew, because that is not necessary — the work crew would be destroyed only when the main program was prepared to exit, and process exit will destroy all threads and process data.
12-15 The crew_create
function begins by checking the crew_size
argument.The size of the crew is not allowed to exceed the size of the crew array in crew_t. If the requested size is acceptable, copy it into the structure.
16-31 Start with no work and an empty work queue. Initialize the crew's synchronization objects.
36-43 Then, for each crew member, initialize the member's worker_t
data. The index of the member within the crew array is recorded, and a pointer back to the crew_t
. Then the crew member thread is created, with a pointer to the member's worker_t
as its argument.
? crew.c part 3 crew_create
1 /*
2 * Create a work crew.
3 */
4 int crew_create (crew_t *crew, int crew_size)
5 {
6 int crew_index;
7 int status;
8
9 /*
10 * We won't create more than CREW_SIZE members.
11 */
12 if (crew_size > CREW_SIZE)
13 return EINVAL;
14
15 crew->crew_size = crew_size;
16 crew->work_count = 0;
17 crew->first = NULL;
18 crew->last = NULL;
19
20 /*
21 * Initialize synchronization objects.
22 */
23 status = pthread_mutex_init (&crew->mutex, NULL);
24 if (status != 0)
25 return status;
26 status = pthread_cond_init (&crew->done, NULL);
27 if (status != 0)
28 return status;
29 status = pthread_cond_init (&crew->go, NULL);
30 if (status != 0)
31 return status;
32
33 /*
34 * Create the worker threads.
35 */
36 for (crew_index = 0; crew_index < CREW_SIZE; crew_index++) {
37 crew->crew[crew_index].index = crew_index;
38 crew->crew[crew_index].crew = crew;
39 status = pthread_create (&crew->crew[crew_index].thread,
40 NULL, worker_routine, (void*)&crew->crew[crew_index]);
41 if (status != 0)
42 err_abort (status, "Create worker");
43 }
44 return 0;
45 }
Part 4 shows the crew_start function, which is called to assign a new path name and search string to the work crew. The function is synchronous — that is, after assigning the task it waits for the crew members to complete the task before returning to the caller. The crew_start function assumes that the crew_t structure has been previously created by calling crew_create, shown in part 3, but does not attempt to validate the structure.
20-26 Wait for the crew members to finish any previously assigned work. Although crew_start is synchronous, the crew may be processing a task assigned by another thread. On creation, the crew's work_count is set to 0, so the first call to crew start will not need to wait.
28-43 Get the proper values of path_max and name_max for the file system specified by the file path we'll be reading. The pathconf
function may return a value of -1 without setting errno, if the requested value for the file system is "unlimited." To detect this, we need to clear errno before making the call. If pathconf
returns -1 without setting errno, assume reasonable values.
47-48 The values returned by pathconf don't include the terminating null character of a string — so add one character to both.
49-67 Allocate a work queue entry (work_t) and fill it in. Add it to the end of the request queue.
68-75 We've queued a single work request, so awaken one of the waiting work crew members by signaling the condition variable. If the attempt fails, free the work request, clear the work queue, and return with the error.
76-80 Wait for the crew to complete the task. The crew members handle all output, so when they're done we simply return to the caller.
? crew.c part 4 crew_start
1 /*
2 * Pass a file path to a work crew previously created
3 * using crew_create
4 */
5 int crew_start (
6 crew_p crew,
7 char *filepath,
8 char *search)
9 {
10 work_p request;
11 int status;
12
13 status = pthread_mutex_lock (&crew->mutex);
14 if (status != 0)
15 return status;
16
17 /*
18 * If the crew is busy, wait for them to finish.
19 */
20 while (crew->work_count > 0) {
21 status = pthread_cond_wait (&crew->done, &crew->mutex);
22 if (status != 0) {
23 pthread_mutex_unlock (&crew->mutex);
24 return status;
25 }
26 } 27
28 errno = 0;
29 path_max = pathconf (filepath, _PC_PATH_MAX);
30 if (path_max == -1) {
31 if (errno == 0)
32 path_max = 1024; /* "No limit" */
33 else
34 errno_abort ("Unable to get PATH_MAX");
35 }
36 errno = 0;
37 name_max = pathconf (filepath, _PC_NAME_MAX);
38 if (name_max == -1) {
39 if (errno == 0)
40 name_max = 256; /* "No limit" */
41 else
42 errno_abort ("Unable to get NAME_MAX");
43 }
44 DPRINTF ((
45 "PATH_MAX for %s is %ld, NAME_MAX is %ldn",
46 filepath, path_max, name_max));
47 path_max++; /* Add null byte */
48 name_max++; /* Add null byte */
49 request = (work_p)malloc (sizeof (work_t));
50 if (request == NULL)
51 errno_abort ("Unable to allocate request");
52 DPRINTF (("Requesting %sn", filepath));
53 request->path = (char*)malloc (path_max);
54 if (request->path == NULL)
55 errno_abort ("Unable to allocate path");
56 strcpy (request->path, filepath);
57 request->string = search;
58 request->next = NULL;
59 if (crew->first == NULL) {
60 crew->first = request;
61 crew->last = request;
62 } else {
63 crew->last->next = request;
64 crew->last = request;
65 } 66
67 crew->work_count++;
68 status = pthread_cond_signal (&crew->go);
69 if (status != 0) {
70 free (crew->first);
71 crew->first = NULL;
72 crew->work_count = 0;
73 pthread_mutex_unlock (&crew->mutex);
74 return status;
75 }
76 while (crew->work_count > 0) {
77 status = pthread_cond_wait (&crew->done, &crew->mutex);
78 if (status != 0)
79 err_abort (status, "waiting for crew to finish");
80 }
81 status = pthread_mutex_unlock (&crew->mutex);
82 if (status != 0)
83 err_abort (status, "Unlock crew mutex");
84 return 0;
85 }
Part 5 shows the initial thread (main) for the little work crew sample.
10-13 The program requires three arguments — the program name, a string for which to search, and a path name. For example, "crew butenhof ~"
15-23 On a Solaris system, call thr_setconcurrency to ensure that at least one LWP (kernel execution context) is created for each crew member. The program will work without this call, but, on a uniprocessor, you would not see any concurrency. See Section 5.6.3 for more information on "many to few" scheduling models, and Section 10.1.3 for information on "set concurrency" functions.
24-30 Create a work crew, and assign to it the concurrent file search.
? crew.c part 5 main
1 /*
2 * The main program to "drive" the crew...
3 */
4 int main (int argc, char *argv[])
5 {
6 crew_t my_crew;
7 char line[128], *next;
8 int status;
9
10 if (argc < 3) {
11 fprintf (stderr, "Usage: %s string pathn", argv[0]);
12 return -1;
13 }
14
15 #ifdef sun
16 /*
17 * On Solaris 2.5, threads are not timesliced. To ensure
18 * that our threads can run concurrently, we need to
19 * increase the concurrency level to CREW_SIZE.
20 */
21 DPRINTF (("Setting concurrency level to %dn", CREW_SIZE));
22 thr_setconcurrency (CREW_SIZE);
23 #endif
24 status = crew_create (&my_crew, CREW_SIZE);
25 if (status != 0)
26 err_abort (status, "Create crew");
27
28 status = crew_start (&my_crew, argv[2], argv[l]);
29 if (status != 0)
30 err_abort (status, "Start crew");
31
32 return 0;
33 }
- SERVER WORKING SIZE
- 7. AGGREGATION WITH INDEPENDENT WORKS
- Ãëàâà 3 Âèðòóàëüíûå ìàøèíû VMware Workstation
- Ãëàâà 4 Âèðòóàëüíûå ìàøèíû Parallels Workstation
- Ñïåöèôèêà .NET Compact Framework: ADO.NET
- Êëàññû ñèíõðîíèçàöèè, âíåäðåííûå â âåðñèè .NET Framework 4.0
- CHAPTER 3 Working with GNOME
- CHAPTER 14 Networking
- Installing Using a Network
- Network Configuration
- Internet Workstation
- Configuring Wireless Networks