Книга: UNIX: взаимодействие процессов

10.10. Несколько производителей, несколько потребителей

10.10. Несколько производителей, несколько потребителей

Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.

1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.

ПРИМЕЧАНИЕ

Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков. 

2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.

В листинге 10.15 приведены глобальные переменные программы.

Листинг 10.15. Глобальные переменные

//pxsem/prodcons4.с
1  #include "unpipc.h"
2  #define NBUFF 10
3  #define MAXNTHREADS 100
4  int nitems, nproducers, nconsumers; /* только для чтения */
5  struct { /* общие данные производителей и потребителей */
6   int buff[NBUFF];
7   int nput; /* номер объекта: 0, 1. 2, … */
8   int nputval; /* сохраняемое в buff[] значение */
9   int nget; /* номер объекта: 0, 1, 2, … */
10  int ngetval; /* получаемое из buff[] значение */
11  sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
12 } shared;
13 void *produce(void *), *consume(void *);

Глобальные переменные и общая структура

4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.

Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.

19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.

24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.

Листинг 10.16. Функция main для версии с несколькими производителями и потребителями

//pxsem/prodcons4.с
14 int
15 main(int argc, char **argv)
16 {
17  int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
18  pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
19  if (argc != 4)
20   err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");
21  nitems = atoi(argv[1]);
22  nproducers = min(atoi(argv[2]), MAXNTHREADS);
23  nconsumers = min(atoi(argv[3]), MAXNTHREADS);
24  /* инициализация трех семафоров */
25  Sem_init(&shared.mutex, 0, 1);
26  Sem_init(&shared.nempty, 0, NBUFF);
27  Sem_init(&shared.nstored, 0, 0);
28  /* создание производителей и потребителей */
29  Set_concurrency(nproducers + nconsumers);
30  for (i = 0; i < nproducers; i++) {
31   prodcount[i] = 0;
32   Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
33  }
34  for (i = 0; i < nconsumers; i++) {
35   conscount[i] = 0;
36   Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
37  }
38  /* ожидание завершения всех производителей и потребителей */
39  for (i = 0; i < nproducers: i++) {
40   Pthread_join(tid_produce[i], NULL);
41   printf("producer count[%d] = %dn", i, prodcount[i]);
42  }
43  for (i = 0; i < nconsumers; i++) {
44   Pthread_join(tid_consume[i], NULL);
45   printf("consumer count[%d] = %dn", i, conscount[i]);
46  }
47  Sem_destroy(&shared.mutex);
48  Sem_destroy(&shared.nempty);
49  Sem_destroy(&shared.nstored);
50  exit(0);
51 }

Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:

 if (shared.nput >= nitems) {
+ Sem_post(&shared.nstored); /* даем возможность потребителям завершить работу */
  Sem_post(&shared.nempty);
  Sem_post(&shared.mutex);
  return(NULL); /* готово */
 }

Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове

Sem_wait(&shared.nstored); /* Ожидание помещения объекта в буфер */

Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа завершена. Функция consume приведена в листинге 10.17. 

Листинг 10.17. Функция, выполняемая всеми потоками-потребителями

//pxsem/prodcons4.c
72 void *
73 consume(void *arg)
74 {
75  int i;
76  for (;;) {
77   Sem_wait(&shared.nstored); /* ожидание помещения объекта в буфер */
78   Sem_wait(&shared.mutex);
79   if (shared.nget >= nitems) {
80    Sem_post(&shared.nstored);
81    Sem_post(&shared.mutex);
82    return(NULL); /* готово */
83   }
84   i = shared.nget % NBUFF;
85   if (shared.buff[i] != shared.ngetval)
86    printf("error: buff[%d] = %dn", i, shared.buff[i]);
87   shared.nget++;
88   shared.ngetval++;
89   Sem_post(&shared.mutex);
90   Sem_post(&shared.nempty); /* освобождается место для элемента */
91   *((int *) arg) += 1;
92  }
93 }

Завершение потоков-потребителей

79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.

Оглавление книги


Генерация: 1.087. Запросов К БД/Cache: 3 / 0
поделиться
Вверх Вниз