Êíèãà: Programming with POSIX® Threads

7.2 Work queue manager

7.2 Work queue manager

I've already briefly outlined the various models of thread cooperation. These include pipelines, work crews, client/servers, and so forth. In this section, I present the development of a "work queue," a set of threads that accepts work requests from a common queue, processing them (potentially) in parallel.

The work queue manager could also be considered a work crew manager, depending on your reference point. If you think of it as a way to feed work to a set of threads, then "work crew" might be more appropriate. I prefer to think of it as a queue that magically does work for you in the background, since the presence of the work crew is almost completely invisible to the caller.

When you create the work queue, you can specify the maximum level of parallelism that you need. The work queue manager interprets that as the maximum number of "engine" threads that it may create to process your requests. Threads will be started and stopped as required by the amount of work. A thread that finds nothing to do will wait a short time and then terminate. The optimal "short time" depends on how expensive it is to create a new thread on your system, the cost in system resources to keep a thread going that's not doing anything, and how likely it is that you'll need the thread again soon. I've chosen two seconds, which is probably much too long.

The header file workq.h and the C source file workq.c demonstrate an implementation of a work queue manager. Part 1 shows the two structure types used by the work queue package. The workq_t type is the external representation of a work queue, and the workq_ele_t is an internal representation of work items that have been queued.

6-9 The workq_ele_t structure is used to maintain a linked list of work items. It has a link element (called next) and a data value, which is stored when the work item is queued and passed to the caller's "engine function" with no interpretation.

14-16 Of course, there's a mutex to serialize access to the workq_t, and a condition variable (cv) on which the engine threads wait for work to be queued.

17 The attr member is a thread attributes object, used when creating new engine threads. The attributes object could instead have been a static variable within workq.c, but I chose to add a little memory overhead to each work queue, rather than add the minor complexity of one-time initialization of a static data item.

18 The first member points to the first item on the work queue. As an optimization to make it easier to queue new items at the end of the queue, the last member points to the last item on the queue.

19-24 These members record assorted information about the work queue. The valid member is a magic number that's set when the work queue is initialized, as we've seen before in barriers and read/write locks. (In this case, the magic number is the month and year of my daughter's birthday.) The quit member is a flag that allows the "work queue manager" to tell engine threads to terminate as soon as the queue is empty. The parallelism member records how many threads the creator chose to allow the work queue to utilize, counter records the number of threads created, and idle records the current number of threads that are waiting for work. The engine member is the user's "engine function," supplied when the work queue was created. As you can see, the engine function takes an "untyped" (void *) argument, and has no return value.

? workq.h part 1 workq_t

1 #include <pthread.h>

2
3 /*
4 * Structure to keep track of work queue requests.
5 */
6 typedef struct workq_ele_tag {
7  struct workq_ele_tag *next;
8  void *data;
9 } workq_ele_t;

10
11 /*
12 * Structure describing a work queue.
13 */
14 typedef struct workq_tag {
15 pthread_mutex_t mutex; /* control access to queue */
16 pthread_cond_t cv; /* wait for work */
17 pthread_attr_t attr; /* create detached threads */
18 workq_ele_t *first, *last; /* work queue */
19 int valid; /* valid */
20 int quit; /* workq should quit */
21 int parallelism; /* maximum threads */
22 int counter; /* current threads */
23 int idle; /* number of idle threads * /
24 void (*engine)(void *arg); /* user engine */
25 } workq_t; 26
27 #define WORKQ_VALID 0xdec1992

Part 2 shows the interfaces we'll create for our work queue. We need to create and destroy work queue managers, so we'll define workq_init and workq_destroy. Both take a pointer to a workq_t structure. In addition, the initializer needs the maximum number of threads the manager is allowed to create to service the queue, and the engine function. Finally, the program needs to be able to queue work items for processing — we'll call the interface for this workq_add. It takes a pointer to the workq_t and the argument that should be passed to the engine function.

? workq.h part 2 interfaces

1 /*
2 * Define work queue functions.
3 */
4 extern int workq_init (
5  workq_t *wq,
6  int threads, /* maximum threads */
7  void (*engine)(void *)); /* engine routine */
8 extern int workq_destroy (workq_t *wq);
9 extern int workq_add (workq_t *wq, void *data);

The file workq.c contains the implementation of our work queue. The following examples break down each of the functions used to implement the workq.h interfaces.

Part 1 shows the workq_init function, which initializes a work queue. We create the Pthreads synchronization objects that we need, and fill in the remaining members.

14-22 Initialize the thread attributes object attr so that the engine threads we create will run detached. That means we do not need to keep track of their thread identifier values, or worry about joining with them.

34-40 We're not ready to quit yet (we've hardly started!), so clear the quit flag. The parallelism member records the maximum number of threads we are allowed to create, which is the workq_init parameter threads. The counter member will record the current number of active engine threads, initially 0, and idle will record the number of active threads waiting for more work. And of course, finally, we set the valid member.

? workq.c part 1 workq_init

1 #include <pthread.h>
2 #include <stdlib.h>
3 #include <time.h>
4 #include "errors.h"
5 #include "workq.h"

6
7 /*
8 * Initialize a work queue.
9 */
10 int workq_init (workq_t *wq, int threads, void (*engine)(void *arg))
11 {
12 int status;
13
14 status = pthread_attr_init (&wq->attr);
15 if (status != 0)
16 return status;
17 status = pthread_attr_setdetachstate (
18  &wq->attr, PTHREAD_CREATE_DETACHED);
19 if (status != 0) {
20  pthread_attr_destroy (&wq->attr);
21  return status;
22 }
23 status = pthread_mutex_init (&wq->mutex, NULL);
24 if (status != 0) {
25  pthread_attr_destroy (&wq->attr);

26  return status;
27 }
28 status = pthread_cond_init (&wq->cv, NULL);
29 if (status != 0) {
30  pthread_mutex_destroy (&wq->mutex);
31  pthread_attr_destroy (&wq->attr);
32  return status;
33 }
34 wq->quit = 0; /* not time to quit */
35 wq->first = wq->last = NULL; /* no queue entries */
36 wq->parallelism = threads; /* max servers */
37 wq->counter = 0; /* no server threads yet */
38 wq->idle = 0; /* no idle servers */
39 wq->engine = engine;
40 wq->valid = WORKQ_VALID;
41 return 0;
42 }

? workq.c part 1 workq_init

Part 2 shows the workq_destroy function. The procedure for shutting down a work queue is a little different than the others we've seen. Remember that the Pthreads mutex and condition variable destroy function fail, returning EBUSY, when you try to destroy an object that is in use. We used the same model for barriers and read/write locks. But we cannot do the same for work queues — the calling program cannot know whether the work queue is in use, because the caller only queues requests that are processed asynchronously.

The work queue manager will accept a request to shut down at any time, but it will wait for all existing engine threads to complete their work and terminate. Only when the last work queue element has been processed and the last engine thread has exited will workq_destroy return successfully.

24 If the work queue has no threads, either it was never used or all threads have timed out and shut down since it was last used. That makes things easy, and we can skip all the shutdown complication.

25-33 If there are engine threads, they are asked to shut down by setting the quit flag in the workq_t structure and broadcasting the condition variable to awaken any waiting (idle) engine threads. Each engine thread will eventually run and see this flag. When they see it and find no more work, they'll shut themselves down.

44-50 The last thread to shut down will wake up the thread that's waiting in workq_destroy, and the shutdown will complete. Instead of creating a condition variable that's used only to wake up workq_destroy, the last thread will signal the same condition variable used to inform idle engine threads of new work. At this point, all waiters have already been awakened by a broadcast, and they won't wait again because the quit flag is set. Shutdown occurs only once during the life of the work queue manager, so there's little point to creating a separate condition variable for this purpose.

? workq.c part 2 workq_destroy

1 /*
2 * Destroy a work queue.
3 */
4 int workq_destroy (workq_t *wq)
5 {
6 int status, status1, status2;
7
8 if (wq->valid != WORKQ_VALID)
9  return EINVAL;
10 status = pthread_mutex_lock (&wq->mutex);
11 if (status != 0)
12  return status;
13 wq->valid = 0; /* prevent any other operations */

14
15 /*
16 * Check whether any threads are active, and run them down:
17 *
18 * 1. set the quit flag
19 * 2. broadcast to wake any servers that may be asleep
20 * 4. wait for all threads to quit (counter goes to 0)
21 * Because we don't use join, we don't need to worry
22 * about tracking thread IDs.
23 */
24 if (wq->counter > 0) {
25  wq->quit = 1;
26 /* if any threads are idling, wake them. */
27 if (wq->idle > 0) {
28  status = pthread_cond_broadcast (&wq->cv);
29  if (status != 0) {
30  pthread_mutex_unlock (&wq->mutex);
31  return status;
32  }
33 }

34
35 /*
36 * Just to prove that every rule has an exception, I'm
37 * using the "cv" condition for two separate predicates
38 * here. That's OK, since the case used here applies
39 * only once during the life of a work queue — during
40 * rundown. The overhead is minimal and it's not worth
41 * creating a separate condition variable that would
42 * wait and be signaled exactly once!
43 */
44 while (wq->counter > 0) {
45  status = pthread_cond_wait (&wq->cv, &wq->mutex);
46  if (status != 0) {
47  pthread_mutex_unlock (&wq->mutex);
48 r eturn status;
49  }
50 }
51 }
52 status = pthread_mutex_unlock (&wq->mutex);
53 if (status != 0)
54  return status;
55 status = pthread_mutex_destroy (&wq->mutex);
56 status1 = pthread_cond_destroy (&wq->cv);
57 status2 = pthread_attr_destroy (&wq->attr);
58 return (status ? status : (status1 ? status1 : status2));
59 }

Part 3 shows workq_add, which accepts work for the queue manager system.

16-35 It allocates a new work queue element and initializes it from the parameters. It

queues the element, updating the first and last pointers as necessary.

40-45 If there are idle engine threads, which were created but ran out of work, signal

the condition variable to wake one.

46-59 If there are no idle engine threads, and the value of parallelism allows for

more, create a new engine thread. If there are no idle threads and it can't create a

new engine thread, workq_add returns, leaving the new element for the next

thread that finishes its current assignment.

workq.c part 3 workq_add

1 /*
2 * Add an item to a work queue.
3 */
4 int workq_add (workq_t *wq, void *element)
5 {
6 workq_ele_t *item;
7 pthread_t id;
8 int status;

9
10 if (wq->valid != WORKQ_VALID)
11  return EINVAL;

12
13 /*
14 * Create and initialize a request structure.
15 */
16 item = (workq_ele_t *)malloc (sizeof (workq_ele_t));
17 if (item == NULL)
18  return ENOMEM;
19 item->data = element;
20 item->next = NULL;
21 status = pthread_mutex_lock (&wq->mutex);
22 if (status != 0) {
23  free (item);
24  return status;
25 }

26
27 /*
28 * Add the request to the end of the queue, updating the
29 * first and last pointers.
30 */
31 if (wq->first == NULL)
32 wq->first = item;
33 else
34 wq->last->next = item;
35 wq->last = item;

36
37 /*
38 * if any threads are idling, wake one.
39 */
40 if (wq->idle > 0) {
41  status = pthread_cond_signal (&wq->cv);
42  if (status != 0) {
43  pthread_mutex_unlock (&wq->mutex);
44  return status;
45  }
46 } else if (wq->counter < wq->parallelism) {
47 /*
48 * If there were no idling threads, and we're allowed to
49 * create a new thread, do so.
50 */
51  DPRINTF (("Creating new workern"));
52  status = pthread_create (
53  &id, &wq->attr, workq_server, (void*)wq);
54  if (status != 0) {
55  pthread_mutex_unlock (&wq->mutex);
56  return status;
57  }
58 wq->counter++;
59 }
60 pthread_mutex_unlock (&wq->mutex);
61 return 0;

That takes care of all the external interfaces, but we will need one more function, the start function for the engine threads. The function, shown in part 4, is called workq_server. Although we could start a thread running the caller's engine with the appropriate argument for each request, this is more efficient. The workq_server function will dequeue the next request and pass it to the engine function, then look for new work. It will wait if necessary and shut down only when a certain period of time passes without any new work appearing, or when told to shut down by workq_destroy.

Notice that the server begins by locking the work queue mutex, and the "matching" unlock does not occur until the engine thread is ready to terminate. Despite this, the thread spends most of its life with the mutex unlocked, either waiting for work in the condition variable wait or within the caller's engine function.

29-62 When a thread completes the condition wait loop, either there is work to be done or the work queue is shutting down (wq->quit is nonzero).

67-80 First, we check for work and process the work queue element if there is one. There could still be work queued when workq_destroy is called, and it must all be processed before any engine thread terminates.

The user's engine function is called with the mutex unlocked, so that the user's engine can run a long time, or block, without affecting the execution of other engine threads. That does not necessarily mean that engine functions can run in parallel — the caller-supplied engine function is responsible for ensuring whatever synchronization is needed to allow the desired level of concurrency or parallelism. Ideal engine functions would require little or no synchronization and would run in parallel.

86-104 When there is no more work and the queue is being shut down, the thread terminates, awakening workq_destroy if this was the last engine thread to shut down.

110-114 Finally we check whether the engine thread timed out looking for work, which

would mean the engine has waited long enough. If there's still no work to be found, the engine thread exits.

? workq.c part 4 workq_server

1 /*
2 * Thread start routine to serve the work queue.
3 */
4 static void *workq_server (void *arg)
5 {
6 struct timespec timeout;
7 workq_t *wq = (workq_t *)arg;
8 workq_ele_t *we;
9 int status, timedout;
10
11 /*
12 * We don't need to validate the workq_t here... we don't
13 * create server threads until requests are queued (the
14 * queue has been initialized by then!) and we wait for all
15 * server threads to terminate before destroying a work
16 * queue.
17 */
18 DPRINTF (("A worker is startingn"));
19 status = pthread_mutex_lock (&wq->mutex);
20 if (status != 0)
21  return NULL;

22
23 while (1) {
24  timedout = 0;
25  DPRINTF (("Worker waiting for workn"));
26  clock_gettime (CLOCK_REALTIME, &timeout);
27  timeout.tv_sec += 2;

28
29  while (wq->first == NULL && !wq->quit) {
30 /*
31 * Server threads time out after spending 2 seconds
32 * waiting for new work, and exit.
33 */
34  status = pthread_cond_timedwait (
35  &wq->cv, &wq->mutex, &timeout);
36  If (status == ETIMEDOUT) {
37  DPRINTF (("Worker wait timed outn"));
38  timedout = 1;
39  break;
40  } else if (status != 0) {
41 /*
42 * This shouldn't happen, so the work queue
43 * package should fail. Because the work queue
44 * API is asynchronous, that would add
45 * complication. Because the chances of failure
46 * are slim, I choose to avoid that
47 * complication. The server thread will return,
48 * and allow another server thread to pick up
49 * the work later. Note that if this were the
50 * only server thread, the queue wouldn't be
51 * serviced until a new work item is
52 * queued. That could be fixed by creating a new
53 * server here.
54 */
55  DPRINTF ((
56  "Worker wait failed, %d (%s)n",
57  status, strerror (status)));
58  wq->counter--;
59  pthread_mutex_unlock (&wq->mutex);
60  return NULL;
61  }
62  }
63  DPRINTF (("Work queue: %#lx, quit: %dn",
64  wq->first, wq->quit));
65  we = wq->first;

66
67  if (we != NULL) {
68  wq->first = we->next;
69  If (wq->last == we)
70  wq->last = NULL;
71  status = pthread_mutex_unlock (&wq->mutex);
72  If (status != 0)
73  return NULL;
74  DPRINTF (("Worker calling enginen"));
75  wq->engine (we->data);
76  free (we);
77  status = pthread_mutex_lock (&wq->mutex);
78  if (status ! = 0)
79  return NULL;
80 }

81
82 /*
83 * If there are no more work requests, and the servers
84 * have been asked to quit, then shut down.
85 */
86 if (wq->first == NULL && wq->quit) {
87  DPRINTF (("Worker shutting downn"));
88  wq->counter--;

89
90 /*
91 * NOTE: Just to prove that every rule has an
92 * exception, I'm using the "cv" condition for two
93 * separate predicates here. That's OK, since the
94 * case used here applies only once during the life
95 * of a work queue — during rundown. The overhead
96 * is minimal and it's not worth creating a separate
97 * condition variable that would wait and be
98 * signaled exactly once!
99 */
100  if (wq->counter == 0)
101  pthread_cond_broadcast (&wq->cv);
102  pthread_mutex_unlock (&wq->mutex);
103  return NULL;
104 }

105
106 /*
107 * If there's no more work, and we wait for as long as
108 * we're allowed, then terminate this server thread.
109 */
110 if (wq->first == NULL && timedout) {
111  DPRINTF (("engine terminating due to timeout.n"));
112 wq->counter--;

113 break;
114 }
115 }
116
117 pthread_mutex_unlock (&wq->mutex);
118 DPRINTF (("Worker exitingn"));
119 return NULL;
120 }

Finally, workq_main.c is a sample program that uses our work queue manager. Two threads queue work elements to the work queue in parallel. The engine function is designed to gather some statistics about engine usage. To accomplish this, it uses thread-specific data. When the sample run completes, main collects all of the thread-specific data and reports some statistics.

15-19 Each engine thread has an engine_t structure associated with the thread-specific data key engine_key. The engine function gets the calling thread's value of this key, and if the current value is NULL, creates a new engine_t structure and assigns it to the key. The calls member of engine_t structure records the number of calls to the engine function within each thread.

29-37 The thread-specific data key's destructor function, destructor, adds the terminating thread's engine_t to a list (engine_list_head), where main can find it later to generate the final report.

43-68 The engine function's work is relatively boring. The argument is a pointer to a power_t structure, containing the members value and power. It uses a trivial loop to multiply value by itself power times. The result is discarded in this example, and the power_t structure is freed.

73-98 A thread is started, by main, running the thread_routine function. In addition, main calls thread_routine. The thread_routine function loops for some number of iterations, determined by the macro ITERATIONS, creating and queuing work queue elements. The value and power members of the power_t structure are determined semirandomly using rand_r. The function sleeps for a random period of time, from zero to four seconds, to occasionally allow engine threads to time out and terminate. Typically when you run this program you would expect to see summary messages reporting some small number of engine threads, each of which processed some number of calls — which total 50 calls (25 each from the two threads).

? workq_main.c

1 #include <pthread.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <time.h>
5 #include "workq.h"
6 #include "errors.h"
7
8 #define ITERATIONS 25
9
10 typedef struct power_tag {
11 int value;
12 int power;
13 } power_t; 14
15 typedef struct engine_tag {
16 struct engine_tag *link;
17 pthread_t thread_id;
18 int calls;
19 } engine_t; 20
21 pthread_key_t engine_key; /* Keep track of active engines */
22 pthread_mutex_t engine_list_mutex = PTHREAD_MUTEX_INITIALIZER;
23 engine_t *engine_list_head = NULL;
24 workq_t workq; 25
26 /*
27 * Thread-specific data destructor routine for engine_key.
28 */
29 void destructor (void *value_ptr)
30 {
31 engine_t *engine = (engine_t*)value_ptr; 32
33 pthread_mutex_lock (&engine_list_mutex);
34 engine->link = engine_list_head;
35 engine_list_head = engine;
36 pthread_mutex_unlock (&engine_list_mutex);
37 } 38
39 /*
40 * This is the routine called by the work queue servers to
41 * perform operations in parallel.
42 */
43 void engine_routine (void *arg)
44 {
45 engine_t *engine;
46 power_t *power = (power_t*)arg;
47 int result, count; 4 8 int status;
49
50 engine = pthread_getspecific (engine_key);
51 if (engine == NULL) {
52 engine = (engine_t*)malloc (sizeof (engine_t));
53 status = pthread_setspecific (
54 engine_key, (void*)engine);
55 if (status != 0)
56 err_abort (status, "Set tsd");
57 engine->thread_id = pthread_self ();
58 engine->calls = 1;
59 } else
60 engine->calls++;
61 result = 1;
62 printf (
63 "Engine: computing %d^%dn",
64 power->value, power->power);
65 for (count = 1; count <= power->power; count++)
66 result *= power->value;
67 free (arg);
68 }
69
70 /*
71 * Thread start routine that issues work queue requests.
72 */
73 void *thread_routine (void *arg)
74 {
75 power_t *element;
76 int count;
77 unsigned int seed = (unsigned int)time (NULL);
78 int status;

79
80 /*
81 * Loop, making requests.
82 */
83 for (count = 0; count < ITERATIONS; count++) {
84 element = (power_t*)malloc (sizeof (power_t));
85 if (element == NULL)
86 errno_abort ("Allocate element");
87 element->value = rand_r (&seed) % 20;
88 element->power = rand_r (&seed) % 7;
89 DPRINTF ((
90 "Request: %d^%dn",
91 element->value, element->power));
92 status = workq_add (&workq, (void*)element);
93 if (status != 0)
94 err_abort (status, "Add to work queue");
95 sleep (rand_r (&seed) % 5);
96 }
97 return NULL;
98 } 99
100 int main (int argc, char *argv[])
101 {
102 pthread_t thread_id;
103 engine_t *engine;
104 int count = 0, calls = 0;
105 int status;

106
107 status = pthread_key_create (&engine_key, destructor);
108 if (status != 0)
109 err_abort (status, "Create key");
110 status = workq_init (&workq, 4, engine_routine);
111 if (status != 0)
112 err_abort (status, "Init work queue");
113 status = pthread_create (&thread_id, NULL, thread_routine, NULL);
114 if (status != 0)
115 err_abort (status, "Create thread");
116 (void)thread_routine (NULL);
117 status = pthread_join (thread_id, NULL);
118 if (status != 0)
119 err_abort (status, "Join thread");
120 status = workq_destroy (&workq);
121 if (status != 0)
122 err_abort (status, "Destroy work queue"); 123
124 /*
125 * By now, all of the engine_t structures have been placed
126 * on the list (by the engine thread destructors), so we
127 * can count and summarize them.
128 */
129 engine = engine_list_head;
130 while (engine != NULL) {
131 count++;
132 calls += engine->calls;
133 printf ("engine %d: %d callsn", count, engine->calls);
134 engine = engine->link;
135 }
136 printf ("%d engine threads processed %d callsn",
137 count, calls);
138 return 0;
139 }

Îãëàâëåíèå êíèãè


Ãåíåðàöèÿ: 6.373. Çàïðîñîâ Ê ÁÄ/Cache: 3 / 0
ïîäåëèòüñÿ
Ââåðõ Âíèç