Êíèãà: Programming with POSIX® Threads
4.1 Pipeline
4.1 Pipeline
"I want a clean cup," interrupted the Hatter: "let's all move one place on.
" He moved on as he spoke, and the Dormouse followed him: the March Hare moved into the Dormouse's place, and Alice rather unwillingly took the place of the March Hare. The Hatter was the only one who got any advantage from the change; and Alice was a good deal worse off than before, as the March Hare had just upset the milk-jug into his plate.
In pipelining, a stream of "data items" is processed serially by an ordered set of threads (Figure 4.1). Each thread performs a specific operation on each item in sequence, passing the data on to the next thread in the pipeline.
For example, the data might be a scanned image, and thread A might process an image array, thread B might search the processed data for a specific set of fea-tures, and thread C might collect the serial stream of search results from thread B into a report. Or each thread might perform a single step in some sequence of modifications on the data.
The following program, called pipe.c, shows the pieces of a simple pipeline program. Each thread in the pipeline increments its input value by 1 and passes it to the next thread. The main program reads a series of "command lines" from stdin. A command line is either a number, which is fed into the beginning of the pipeline, or the character "=," which causes the program to read the next result from the end of the pipeline and print it to stdout.
FIGURE4.1 Pipelining
1 #include <pthread.h>
2 #include "errors.h" 3
4 /*
5 * Internal structure describing a "stage" in the
6 * pipeline. One for each thread, plus a "result
7 * stage" where the final thread can stash the value.
8 */
9 typedef struct stage_tag {
10 pthread_mutex_t mutex; /* Protect data */
11 pthread_cond_t avail; /* Data available */
12 pthread_cond_t ready; /* Ready for data */
13 int data_ready; /* Data present */
14 long data; /* Data to process */
15 pthread_t thread; /* Thread for stage */
16 struct stage_tag *next; /* Next stage */
17 } stage_t; 18
19 /*
20 * External structure representing the entire
21 * pipeline.
22 */
23 typedef struct pipe_tag {
24 pthread_mutex_t mutex; /* Mutex to protect pipe */
25 stage_t *head; /* First stage */
26 stage_t *tail; /* Final stage */
27 int stages; /* Number of stages */
28 int active; /* Active data elements */
29 } pipe_t;
9-17 Each stage of a pipeline is represented by a variable of type stage_t. stage_t contains a mutex to synchronize access to the stage. The avail condition variable is used to signal a stage that data is ready for it to process, and each stage signals its own ready condition variable when it is ready for new data. The data member is the data passed from the previous stage, thread is the thread operating this stage, and next is a pointer to the following stage.
23-29 The pipe_t structure describes a pipeline. It provides pointers to the first and last stage of a pipeline. The first stage, head, represents the first thread in the pipeline. The last stage, tail, is a special stage_t that has no thread—it is a place to store the final result of the pipeline.
Part 2 shows pipe_send, a utility function used to start data along a pipeline, and also called by each stage to pass data to the next stage.
17-23 It begins by waiting on the specified pipeline stage's ready condition variable until it can accept new data.
28-30 Store the new data value, and then tell the stage that data is available.
? pipe.c part 2 pipe_send
1 /*
2 * Internal function to send a "message" to the
3 * specified pipe stage. Threads use this to pass
4 * along the modified data item.
5 */
6 int pipe_send (stage_t *stage, long data)
7 {
8 int status; 9
10 status = pthread_mutex_lock (&stage->mutex);
11 if (status != 0)
12 return status;
13 /*
14 * If there's data in the pipe stage, wait for it
15 * to be consumed.
16 */
17 while (stage->data_ready) {
18 status = pthread_cond_wait (&stage->ready, &stage->mutex);
19 if (status != 0) {
20 pthread_mutex_unlock (&stage->mutex);
21 return status;
22 }
23 }
24
25 /*
26 * Send the new data
27 */
28 stage->data = data;
29 stage->data_ready = 1;
30 status = pthread_cond_signal (&stage->avail);
31 if (status != 0) {
32 pthread_mutex_unlock (&stage->mutex);
33 return status;
34 }
35 status = pthread_mutex_unlock (&stage->mutex);
36 return status;
37 }
Part 3 shows pipe_stage, the start function for each thread in the pipeline. The thread's argument is a pointer to its stage_t structure.
16-27 The thread loops forever, processing data. Because the mutex is locked outside the loop, the thread appears to have the pipeline stage's mutex locked all the
time. However, it spends most of its time waiting for new data, on the avail condition variable. Remember that a thread automatically unlocks the mutex associated with a condition variable, while waiting on that condition variable. In reality, therefore, the thread spends most of its time with mutex unlocked.
22-26 When given data, the thread increases its own data value by one, and passes the result to the next stage. The thread then records that the stage no longer has data by clearing the data_ready flag, and signals the ready condition variable to wake any thread that might be waiting for this pipeline stage.
? pipe.c part 3 pipe_stage
1 /*
2 * The thread start routine for pipe stage threads.
3 * Each will wait for a data item passed from the
4 * caller or the previous stage, modify the data
5 * and pass it along to the next (or final) stage.
6 */
7 void *pipe_stage (void *arg)
8 {
9 stage_t *stage = (stage_t*)arg;
10 stage_t *next_stage = stage->next;
11 int status;
12
13 status = pthread_mutex_lock (&stage->mutex);
14 if (status != 0)
15 err_abort (status, "Lock pipe stage");
16 while (1) {
17 while (stage->data_ready != 1) {
18 status = pthread_cond_wait (&stage->avail, &stage->mutex);
19 if (status != 0)
20 err_abort (status, "Wait for previous stage");
21 }
22 pipe_send (next_stage, stage->data + 1);
23 stage->data_ready = 0;
24 status = pthread_cond_signal (&stage->ready);
25 if (status != 0)
26 err_abort (status, "Wake next stage");
27 }
28 /*
29 * Notice that the routine never unlocks the stage->mutex.
30 * The call to pthread_cond_wait implicitly unlocks the
31 * mutex while the thread is waiting, allowing other threads
32 * to make progress. Because the loop never terminates, this
33 * function has no need to unlock the mutex explicitly.
34 */
Part 4 shows pipe_create, the function that creates a pipeline. It can create a pipeline of any number of stages, linking them together in a list.
18-34 For each stage, it allocates a new stage_t structure and initializes the members. Notice that one additional "stage" is allocated and initialized to hold the final result of the pipeline.
36-37 The link member of the final stage is set to NULL to terminate the list, and the pipeline's tail is set to point at the final stage. The tail pointer allows pipe_ result to easily find the final product of the pipeline, which is stored into the final stage.
52-59 After all the stage data is initialized, pipe_create creates a thread for each stage. The extra "final stage" does not get a thread—the termination condition of the for loop is that the current stage's next link is not NULL, which means that it will not process the final stage.
? pipe.c part 4 pipe_create
1 /*
2 * External interface to create a pipeline. All the
3 * data is initialized and the threads created. They'll
4 * wait for data.
5 */
6 int pipe_create (pipe_t *pipe, int stages)
7 {
8 int pipe_index;
9 stage_t **link = &pipe->head, *new_stage, *stage; 10 int status;
11
12 status = pthread_mutex_init(&pipe->mutex, NULL);
13 if (status != 0)
14 err_abort (status, "Init pipe mutex");
15 pipe->stages = stages;
16 pipe->active = 0;
17
18 for (pipe_index = 0; pipe_index <= stages; pipe_index++) {
19 new_stage = (stage_t*)malloc (sizeof (stage_t));
20 if (new_stage == NULL)
21 errno_abort ("Allocate stage");
22 status = pthread_mutex_init (&new_stage->mutex, NULL);
23 if (status != 0)
24 err_abort (status, "Init stage mutex");
25 status = pthread_cond_init (&new_stage->avail, NULL);
26 if (status != 0)
27 err_abort (status, "Init avail condition");
28 status = pthread_cond_init (&new_stage->ready, NULL);
29 if (status != 0)
30 err_abort (status, "Init ready condition");
31 new_stage->data_ready = 0;
32 *link = new_stage;
33 link = &new_stage->next;
34 }
35
36 *link = (stage_t*)NULL; /* Terminate list */
37 pipe->tail = new_stage; /* Record the tail */ 38
39 /*
40 * Create the threads for the pipe stages only after all
41 * the data is initialized (including all links). Note
42 * that the last stage doesn't get a thread, it's just
43 * a receptacle for the final pipeline value.
44 *
45 * At this point, proper cleanup on an error would take up
46 * more space than worthwhile in a "simple example," so
47 * instead of cancelling and detaching all the threads
48 * already created, plus the synchronization object and
49 * memory cleanup done for earlier errors, it will simply
50 * abort.
51 */
52 for ( stage = pipe->head;
53 stage->next != NULL;
54 stage = stage->next) {
55 status = pthread_create (
56 &stage->thread, NULL, pipe_stage, (void*)stage);
57 if (status != 0)
58 err_abort (status, "Create pipe stage");
59 }
60 return 0;
61 }
Part 5 shows pipe_start
and pipe_result
. The pipe_start function pushes an item of data into the beginning of the pipeline and then returns immediately without waiting for a result. The pipe_result function allows the caller to wait for the final result, whenever the result might be needed.
19-22 The pipe_start function sends data to the first stage of the pipeline. The function increments a count of "active" items in the pipeline, which allows pipe_ result to detect that there are no more active items to collect, and to return immediately instead of blocking. You would not always want a pipeline to behave this way — it makes sense for this example because a single thread alternately "feeds" and "reads" the pipeline, and the application would hang forever if the user inadvertently reads one more item than had been fed.
23-47 The pipe_result function first checks whether there is an active item in the pipeline. If not, it returns with a status of 0, after unlocking the pipeline mutex.
48-55 If there is another item in the pipeline, pipe_result locks the tail (final) stage, and waits for it to receive data. It copies the data and then resets the stage so it can receive the next item of data. Remember that the final stage does not have a thread, and cannot reset itself.
? pipe.c part 5 pipe_start,pipe_result
1 /*
2 * External interface to start a pipeline by passing
3 * data to the first stage. The routine returns while
4 * the pipeline processes in parallel. Call the
5 * pipe_result return to collect the final stage values
6 * (note that the pipe will stall when each stage fills,
7 * until the result is collected).
8 */
9 int pipe_start (pipe_t *pipe, long value)
10 {
11 int status; 12
13 status = pthread_mutex_lock (&pipe->mutex);
14 if (status != 0)
15 err_abort (status, "Lock pipe mutex");
16 pipe->active++;
17 status = pthread_mutex_unlock (&pipe->mutex);
18 if (status != 0)
19 err_abort (status, "Unlock pipe mutex");
20 pipe_send (pipe->head, value);
21 return 0;
22 } 23
24 /*
25 * Collect the result of the pipeline. Wait for a
26 * result if the pipeline hasn't produced one.
27 */
28 int pipe_result (pipe_t *pipe, long *result)
29 {
30 stage_t *tail = pipe->tail;
31 long value;
32 int empty = 0;
33 int status;
34
35 status = pthread_mutex_lock (&pipe->mutex);
36 if (status != 0)
37 err_abort (status, "Lock pipe mutex");
38 if (pipe->active <= 0)
39 empty = 1;
40 else
41 pipe->active--;
42
43 status = pthread_mutex_unlock (&pipe->mutex);
44 if (status != 0)
45 err_abort (status, "Unlock pipe mutex");
46 if (empty)
47 return 0;
48
49 pthread_mutex_lock (&tail->mutex);
50 while (!tail->data_ready)
51 pthread_cond_wait (&tail->avail, &tail->mutex);
52 *result = tail->data;
53 tail->data_ready = 0;
54 pthread_cond_signal (&tail->ready);
55 pthread_mutex_unlock (&tail->mutex);
56 return 1;
57 }
Part 6 shows the main program that drives the pipeline. It creates a pipeline, and then loops reading lines from stdin. If the line is a single "=" character, it pulls a result from the pipeline and prints it. Otherwise, it converts the line to an integer value, which it feeds into the pipeline.
? pipe.c part 6 main
1 /*
2 * The main program to "drive" the pipeline...
3 */
4 int main (int argc, char *argv[])
5 {
6 pipe_t my_pipe;
7 long value, result;
8 int status;
9 char line[128];
10
11 pipe_create (&my_pipe, 10);
12 printf ("Enter integer values, or "=" for next resultn");
13
14 while (1) {
15 printf ("Data> ");
16 if (fgets (line, sizeof (line), stdin) == NULL) exit (0);
17 if (strlen (line) <= 1) continue;
18 if (strlen (line) <= 2 && line[0] == '=') {
19 if (pipe_result (&my_pipe, &result))
20 printf ("Result is %ldn", result);
21 else
22 printf ("Pipe is emptyn");
23 } else {
24 if (sscanf (line, "%ld", &value) < 1)
25 fprintf (stderr, "Enter an integer valuen");
26 else
27 pipe_start (&my_pipe, value);
28 }
29 }
30 }