Êíèãà: Programming with POSIX® Threads

7.1.1 Barriers

7.1.1 Barriers

A barrier is a way to keep the members of a group together. If our intrepid "bailing programmers" washed up on a deserted island, for example, and they ventured into the jungle to explore, they would want to remain together, for the illusion of safety in numbers, if for no other reason (Figure 7.1). Any exploring programmer finding himself very far in front of the others would therefore wait for them before continuing.

A barrier is usually employed to ensure that all threads cooperating in some parallel algorithm reach a specific point in that algorithm before any can pass. This is especially common in code that has been decomposed automatically by creating fine-grained parallelism within compiled source code. All threads may execute the same code, with threads processing separate portions of a shared data set (such as an array) in some areas and processing private data in parallel


FIGURE 7.1 Barrier analogy

in other areas. Still other areas must be executed by only one thread, such as setup or cleanup for the parallel regions. The boundaries between these areas are often implemented using barriers. Thus, threads completing a matrix computation may wait at a barrier until all have finished. One may then perform setup for the next parallel segment while the others skip ahead to another barrier. When the setup thread reaches that barrier, all threads begin the next parallel region.

Figure 7.2 shows the operation of a barrier being used to synchronize three threads, called thread 1, thread 2, and thread 3. The figure is a sort of timing diagram, with time increasing from left to right. Each of the lines beginning at the labels in the upper left designates the behavior of a specific thread — solid for thread 1, dotted for thread 2, and dashed for thread 3. When the lines drop within the rounded rectangle, they are interacting with the barrier. If the line drops below the center line, it shows that the thread is blocked waiting for other threads to reach the barrier. The line that stops above the center line represents the final thread to reach the barrier, awakening all waiters.

In this example, thread 1 and then thread 2 wait on the barrier. At a later time, thread 3 waits on the barrier, finds that the barrier is now full, and awakens all the waiters. All three threads then return from the barrier wait.

The core of a barrier is a counter. The counter is initialized to the number of threads in the "tour group," the number of threads that must wait on a barrier before all the waiters return. I'll call that the "threshold," to give it a simple one-word name. When each thread reaches the barrier, it decreases the counter. If the value hasn't reached 0, it waits. If the value has reached 0, it wakes up the waiting threads.


FIGURE 7.2 Barrier operation

Because the counter will be modified by multiple threads, it has to be protected by a mutex. Because threads will be waiting for some event (a counter value of 0), the barrier needs to have a condition variable and a predicate expression. When the counter reaches 0 and the barrier drops open, we need to reset the counter, which means the barrier must store the original threshold.

The obvious predicate is to simply wait for the counter to reach 0, but that complicates the process of resetting the barrier. When can we reset the count to the original value? We can't reset it when the counter reaches 0, because at that point most of the threads are waiting on the condition variable. The counter must be 0 when they wake up, or they'll continue waiting. Remember that condition variable waits occur in loops that retest the predicate.

The best solution is to add a separate variable for the predicate. We will use a "cycle" variable that is logically inverted each time some thread determines that one cycle of the barrier is complete. That is, whenever the counter value is reset, before broadcasting the condition variable, the thread inverts the cycle flag. Threads wait in a loop as long as the cycle flag remains the same as the value seen on entry, which means that each thread must save the initial value.

The header file barrier.h and the C source file barrier.c demonstrate an implementation of barriers using standard Pthreads mutexes and condition variables. This is a portable implementation that is relatively easy to understand. One could, of course, create a much more efficient implementation for any specific system based on knowledge of nonportable hardware and operating system characteristics.

6-13 Part 1 shows the structure of a barrier, represented by the type barrier_t. You can see the mutex (mutex) and the condition variable (cv). The threshhold member is the number of threads in the group, whereas counter is the number of threads that have yet to join the group at the barrier. And cycle is the flag discussed in the previous paragraph. It is used to ensure that a thread awakened from a barrier wait will immediately return to the caller, but will block in the barrier if it calls the wait operation again before all threads have resumed execution.

15 The BARRIER_VALID macro defines a "magic number," which we store into the valid member and then check to determine whether an address passed to other barrier interfaces is "reasonably likely to be" a barrier. This is an easy, quick check that will catch the most common errors.[7]

? barrier.h  part 1 barrier_t

1 #include <pthread.h>

2
3 /*
4 * Structure describing a barrier.
5 */
6 typedef struct barrier_tag {
7 pthread_mutex_t mutex; /* Control access to barrier * /
8 pthread_cond_t cv; /* wait for barrier */
9 int valid; /* set when valid */
10 int threshold; /* number of threads required */
11 int counter; /* current number of threads * /
12 int cycle; /* alternate cycles (0 or 1) * /
13 } barrier_t;

14
15 #define BARRIER_VALID 0xdbcafe

Part 2 shows definitions and prototypes that allow you to do something with the barrier_t structure. First, you will want to initialize a new barrier. 4-6 You can initialize a static barrier at compile time by using the macro BARRIER_ INITIALIZER. You can instead dynamically initialize a barrier by calling the function barrier_init.

11-13 Once you have initialized a barrier, you will want to be able to use it. and the main thing to be done with a barrier is to wait on it. When we're done with a barrier, it would be nice to be able to destroy the barrier and reclaim the resources it used. We'll call these operations barrier_init, barrier_wait, and barrier_ destroy. All the operations need to specify upon which barrier they will operate. Because barriers are synchronization objects, and contain both a mutex and a condition variable (neither of which can be copied), we always pass a pointer to a barrier. Only the initialization operation requires a second parameter, the number of waiters required before the barrier opens.

To be consistent with Pthreads conventions, the functions all return an integer value, representing an error number defined in <errno.h>. The value 0 represents success.

? barrier.h part 2 interfaces

1 /*
2 * Support static initialization of barriers.
3 */
4 #define BARRIER_INITIALIZER(cnt)
5 {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
6 BARRIER_VALID, cnt, cnt, 0}

7
8 /*
9 * Define barrier functions
10 */
11 extern int barrier_init (barrier_t *barrier, int count);
12 extern int barrier_destroy (barrier_t *barrier);
13 extern int barrier_wait (barrier_t *barrier);

Now that you know the interface definition, you could write a program using barriers. But then, the point of this section is not to tell you how to use barriers, but to help improve your understanding of threaded programming by showing how to build a barrier. The following examples show the functions provided by barrier .c, to implement the interfaces we've just seen in barrier.h.

Part 1 shows barrier_init, which you would call to dynamically initialize a barrier, for example, if you allocate a barrier with malloc.

12 Both the counter and threshold are set to the same value. The counter is the "working counter" and will be reset to threshold for each barrier cycle.

14-16 If mutex initialization fails, barrier_init returns the failing status to the caller.

17-21 If condition variable (cv) initialization fails, barrier_init destroys the mutex it had already created and returns the failure status — the status of pthread_mutex_destroy is ignored because the failure to create the condition variable is more important than the failure to destroy the mutex.

22 The barrier is marked valid only after all initialization is complete. This does not completely guarantee that another thread erroneously trying to wait on that barrier will detect the invalid barrier rather than failing in some less easily diag-nosable manner, but at least it is a token attempt.

? barrier.c part 1 barrier_init

1 #include <pthread.h>
2 #include "errors.h"
3 #include "barrier.h"
4
5 /*
6 * Initialize a barrier for use.
7 */
8 int barrier_init (barrier_t *barrier, int count)
9 {
10 int status; 11
12 barrier->threshold = barrier->counter = count;
13 barrier->cycle = 0;
14 status = pthread_mutex_init (&barrier->mutex, NULL);
15 if (status != 0)
16  return status;
17 status = pthread_cond_init (&barrier->cv, NULL);
18 if (status != 0) {
19  pthread_mutex_destroy (&barrier->mutex);
20  return status;
21 }
22 barrier->valid = BARRIER_VALID;
23 return 0;
24 }

Part 2 shows the barrier_destroy function, which destroys the mutex and condition variable (cv) in the barrier_t structure. If we had allocated any additional resources for the barrier, we would need to release those resources also. 8-9 First check that the barrier appears to be valid, and initialized, by looking at the valid member. We don't lock the mutex first, because that will fail, possibly with something nasty like a segmentation fault, if the mutex has been destroyed or hasn't been initialized. Because we do not lock the mutex first, the validation check is not entirely reliable, but it is better than nothing, and will only fail to detect some race conditions where one thread attempts to destroy a barrier while another is initializing it, or where two threads attempt to destroy a barrier at nearly the same time.

19-22 If any thread is currently waiting on the barrier, return EBUSY.

24-27 At this point, the barrier is "destroyed"—all that's left is cleanup. To minimize the chances of confusing errors should another thread try to wait on the barrier before we're done, mark the barrier "not valid" by clearing valid, before changing any other state. Then, unlock the mutex, since we cannot destroy it while it is locked.

33-35 Destroy the mutex and condition variable. If the mutex destruction fails return the status; otherwise, return the status of the condition variable destruc-tion. Or, to put it another way, return an error status if either destruction failed otherwise, return success.

barrier.c part 2 barrier_destroy

1 /*
2 * Destroy a barrier when done using it.
3 */
4 int barrier_destroy (barrier_t *barrier)
5 {
6 int status, status2;

7
8 if (barrier->valid != BARRIER_VALID)
9  return EINVAL;

10
11 status = pthread_mutex_lock (&barrier->mutex);
12 if (status != 0)
13  return status;

14
15 /*
16 * Check whether any threads are known to be waiting; report
17 * "BUSY" if so.
18 */
19 if (barrier->counter != barrier->threshold) {
20  pthread_mutex_unlock (&barrier->mutex);
21  return EBUSY;
22 }

23
24 barrier->valid = 0;
25 status = pthread_mutex_unlock (&barrier->mutex);
26 if (status != 0)
27 return status;

28
29 /*
30 * If unable to destroy either 1003.1c synchronization
31 * object, return the error status.
32 */
33 status = pthread_mutex_destroy (&barrier->mutex);
34 status2 = pthread_cond_destroy (&barrier->cv);
35 return (status == 0 ? status : status2);
36 }

Finally, part 3 shows the implementation ofbarrier_wait.

10-11 First we verify that the argument barrier appears to be a valid barrier_t. We

perform this check before locking the mutex, so that barrier_destroy can safely destroy the mutex once it has cleared the valid member. This is a simple attempt to minimize the damage if one thread attempts to wait on a barrier while another thread is simultaneously either initializing or destroying that barrier.

We cannot entirely avoid problems, since without the mutex, barrier_wait has no guarantee that it will see the correct (up-to-date) value of valid. The valid check may succeed when the barrier is being made invalid, or fail when the barrier is being made valid. Locking the mutex first would do no good, because the mutex may not exist if the barrier is not fully initialized, or if it is being destroyed. This isn't a problem as long as you use the barrier correctly — that is, you initialize the barrier before any thread can possibly try to use it, and do not destroy the barrier until you are sure no thread will try to use it again.

17 Copy the current value of the barrier's cycle into a local variable. The comparison of our local cycle against the barrier_t structure's cycle member becomes our condition wait predicate. The predicate ensures that all currently waiting threads will return from barrier_wait when the last waiter broadcasts the condition variable, but that any thread that calls barrier_wait again will wait for the next broadcast. (This is the "tricky part" of correctly implementing a barrier.)

19-22 Now we decrease counter, which is the number of threads that are required but haven't yet waited on the barrier. When counter reaches 0, no more threads are needed — they're all here and waiting anxiously to continue to the next attraction. Now all we need to do is tell them to wake up. We advance to the next cycle, reset the counter, and broadcast the barrier's condition variable.

28-29 Earlier, I mentioned that a program often needs one thread to perform some cleanup or setup between parallel regions. Each thread could lock a mutex and check a flag so that only one thread would perform the setup. However, the setup may not need additional synchronization, for example, because the other threads will wait at a barrier for the next parallel region, and, in that case, it would be nice to avoid locking an extra mutex.

The barrier_wait function has this capability built into it. One and only one thread will return with the special value of -1 while the others return 0. In this particular implementation, the one that waits last and wakes the others will take the honor, but in principle it is "unspecified" which thread returns -1. The thread that receives -1 can perform the setup, while others race ahead. If you do not need the special return status, treat -1 as another form of success. The proposed POSIX.1j standard has a similar capability — one (unspecified) thread completing a barrier will return the status BARRIER_SERIAL_THREAD.

35 Any threaded code that uses condition variables should always either support deferred cancellation or disable cancellation. Remember that there are two distinct types of cancellation: deferred and asynchronous. Code that deals with asynchronous cancellation is rare. In general it is difficult or impossible to support asynchronous cancellation in any code that acquires resources (including locking a mutex). Programmers can't assume any function supports asynchronous cancellation unless its documentation specifically says so. Therefore we do not need to worry about asynchronous cancellation.

We could code barrier_wait to deal with deferred cancellation, but that raises difficult questions. How, for example, will the barrier wait ever be satisfied if one of the threads has been canceled? And if it won't be satisfied, what happens to all the other threads that have already waited (or are about to wait) on that barrier? There are various ways to answer these questions. One would be for barrier_wait to record the thread identifiers of all threads waiting on the barrier, and for any thread that's canceled within the wait to cancel all other waiters.

Or we might handle cancellation by setting some special error flag and broadcasting the condition variable, and modifying barrier_wait to return a special error when awakened in that way. However, it makes little sense to cancel one thread that's using a barrier. We're going to disallow it, by disabling cancellation prior to the wait, and restoring the previous state of cancellation afterward. This is the same approach taken by the proposed POSIX.1j standard, by the way—barrier waits are not cancellation points. 42-46 If there are more threads that haven't reached the barrier, we need to wait for them. We do that by waiting on the condition variable until the barrier has advanced to the next cycle — that is, the barrier's cycle no longer matches the local copy.

? barrier.c part 3 barrier_wait

1 /*
2 * Wait for all members of a barrier to reach the barrier. When
3 * the count (of remaining members) reaches 0, broadcast to wake
4 * all threads waiting.
5 */
6 int barrier_wait (barrier_t *barrier)
7 {
8 int status, cancel, tmp, cycle;

9
10 if (barrier->valid != BARRIER_VALID)
11 return EINVAL;

12
13 status = pthread_mutex_lock (&barrier->mutex);
14 if (status != 0)
15 return status;

16
17 cycle = barrier->cycle; /* Remember which cycle we're on */
18
19 if (--barrier->counter == 0) {
20  barrier->cycle = !barrier->cycle;
21  barrier->counter = barrier->threshold;
22  status = pthread_cond_broadcast (&barrier->cv);
23 /*
24 * The last thread into the barrier will return status
25 * -1 rather than 0, so that it can be used to perform
26 * some special serial code following the barrier.
27 */
28  if (status == 0)
29  status = -1;
30 } else {
31 /*
32 * Wait with cancellation disabled, because barrier_wait
33 * should not be a cancellation point.
34 */
35 pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, &cancel); 36
37 /*
38 * Wait until the barrier's cycle changes, which means
39 * that it has been broadcast, and we don't want to wait
40 * anymore.
41 */
42 while (cycle == barrier->cycle) {
43  status = pthread_cond_wait (
44  &barrier->cv, &barrier->mutex);
45  if (status != 0) break;
46 }
47
48 pthread_setcancelstate (cancel, &tmp);
49 }
50 /*
51 * Ignore an error in unlocking. It shouldn't happen, and
52 * reporting it here would be misleading — the barrier wait
53 * completed, after all, whereas returning, for example,
54 * EINVAL would imply the wait had failed. The next attempt
55 * to use the barrier *will* return an error, or hang, due
56 * to whatever happened to the mutex.
57 */
58 pthread_mutex_unlock (&barrier->mutex);
59 return status; /* error, -1 for waker, or 0 */
60 }

Finally, barrier_main.c is a simple program that uses barriers. Each thread

loops on calculations within a private array. 35,47 At the beginning and end of each iteration, the threads, running function

thread_routine, all wait on a barrier to synchronize the operation. 56-61 At the end of each iteration, the "lead thread" (the one receiving a -1 result

from barrier_wait) will modify the data of all threads, preparing them for the next iteration. The others go directly to the top of the loop and wait on the barrier at line 35.

? barrier_main.c

1 #include <pthread.h>
2 #include "barrier.h"
3 #include "errors.h"
4
5 #define THREADS 5
6 #define ARRAY 6
7 #define INLOOPS 1000
8 #define OUTLOOPS 10

9
10 /*
11 * Keep track of each thread.
12 */
13 typedef struct thread_tag {
14  pthread_t thread_id;
15  int number;
16  int increment;
17  int array[ARRAY];
18 } thread_t;

19
20 barrier_t barrier;
21 thread_t thread[THREADS];

22
23 /*
24 * Start routine for threads.
25 */
26 void *thread_routine (void *arg)
27 {
28 thread_t *self = (thread_t*)arg; /* Thread's thread_t */
29 int in_loop, out_loop, count, status;
30
31 /*
32 * Loop through OUTLOOPS barrier cycles.
33 */
34 for (out_loop = 0; out_loop < OUTLOOPS; out_loop++) {
35 status = barrier_wait (&barrier);
36 if (status > 0)
37 err_abort (status, "Wait on barrier");
38
39 /*
40 * This inner loop just adds a value to each element in
41 * the working array.
42 */
43 for (in_loop = 0; in_loop < INLOOPS; in_loop++)
44  for (count = 0; count < ARRAY; count++)
45  self->array[count] += self->increment; 46
47 status = barrier_wait (&barrier);
48 if (status > 0)
49  err_abort (status, "Wait on barrier");

50
51 /*
52 * The barrier causes one thread to return with the
53 * special return status -1. The thread receiving this
54 * value increments each element in the shared array.
55 */
56 if (status == -1) {
57  int thread_num;
58
59  for (thread_num = 0; thread_num < THREADS; thread_num++)
60  thread[thread_num].increment += 1;
61 }
62 }
63 return NULL;
64 }

65
66 int main (int arg, char *argv[])
67 {
68 int thread_count, array_count;
69 int status;
70
71 barrier_init (&barrier, THREADS);
72
73 /*
74 * Create a set of threads that will use the barrier.
75 */
76 for (thread_count = 0; thread_count < THREADS; thread_count++) { 7 7 thread[thread_count].increment = thread_count;
78 thread[thread_count].number = thread_count;
79
80 for (array_count = 0; array_count < ARRAY; array_count++)
81  thread[thread_count].array[array_count] = array_count + 1;

82
83 status = pthread_create (&thread[thread_count].thread_id,
84 NULL, thread_routine, (void*)&thread[thread_count]);
85 if (status != 0)
86  err_abort (status, "Create thread");
87 }

88
89 /*
90 * Now join with each of the threads.
91 */
92 for (thread_count = 0; thread_count < THREADS; thread_count++) {
93 status = pthread_join (thread[thread_count].thread_id, NULL);
94 if (status != 0)
95  err_abort (status, "Join thread");

96
97 printf ("%02d: (%d) ",
98 thread_count, thread[thread_count].increment);
99
100 for (array_count = 0; array_count < ARRAY; array_count++)
101 printf ("%010u ",
102 thread[thread_count].array[array_count]);
103 printf ("n");
104 }

105
106 /*
107 * To be thorough, destroy the barrier.
108 */
109 barrier_destroy (&barrier);
110 return 0;
111 }

Îãëàâëåíèå êíèãè

Îãëàâëåíèå ñòàòüè/êíèãè

Ãåíåðàöèÿ: 1.565. Çàïðîñîâ Ê ÁÄ/Cache: 3 / 0
ïîäåëèòüñÿ
Ââåðõ Âíèç