Книга: UNIX: взаимодействие процессов

7.3. Схема производитель-потребитель

7.3. Схема производитель-потребитель

Одна из классических задач на синхронизацию называется задачей производителя и потребителя. Она также известна как задача ограниченного буфера. Один или несколько производителей (потоков или процессов) создают данные, которые обрабатываются одним или несколькими потребителями. Эти данные передаются между производителями и потребителями с помощью одной из форм IPC.

С этой задачей мы регулярно сталкиваемся при использовании каналов Unix. Команда интерпретатора, использующая канал

grep pattern chapters.* | wc -l

является примером такой задачи. Программа grep выступает как производитель (единственный), a wc — как потребитель (тоже единственный). Канал используется как форма IPC. Требуемая синхронизация между производителем и потребителем обеспечивается ядром, обрабатывающим команды write производителя и read покупателя. Если производитель опережает потребителя (канал переполняется), ядро приостанавливает производителя при вызове write, пока в канале не появится место. Если потребитель опережает производителя (канал опустошается), ядро приостанавливает потребителя при вызове read, пока в канале не появятся данные.

Такой тип синхронизации называется неявным; производитель и потребитель не знают о том, что синхронизация вообще осуществляется. Если бы мы использовали очередь сообщений Posix или System V в качестве средства IPC между производителем и потребителем, ядро снова взяло бы на себя обеспечение синхронизации.

При использовании разделяемой памяти как средства IPC производителя и потребителя, однако, требуется использование какого-либо вида явной синхронизации. Мы продемонстрируем это на использовании взаимного исключения. Схема рассматриваемого примера изображена на рис. 7.1.

В одном процессе у нас имеется несколько потоков-производителей и один поток-потребитель. Целочисленный массив buff содержит производимые и потребляемые данные (данные совместного пользования). Для простоты производители просто устанавливают значение buff[0] в 0, buff [1] в 1 и т.д. Потребитель перебирает элементы массива, проверяя правильность записей.

В этом первом примере мы концентрируем внимание на синхронизации между отдельными потоками-производителями. Поток-потребитель не будет запущен, пока все производители не завершат свою работу. В листинге 7.1 приведена функция main нашего примера.


Рис. 7.1. Производители и потребитель

Листинг 7.1.[1] Функция main

//mutex/prodcons2.с
1  #include "unpipc.h"
2  #define MAXNITEMS 1000000
3  #define MAXNTHREADS 100
4  int nitems; /* только для чтения потребителем и производителем */
5  struct {
6   pthread_mutex_t mutex;
7   int buff[MAXNITEMS];
8   int nput;
9   int nval;
10 } shared = {
11  PTHREAD_MUTEX_INITIALIZER
12 };
13 void *produce(void *), *consume(void *);
14 int
15 main(int argc, char **argv)
16 {
17  int i, nthreads, count[MAXNTHREADS];
18  pthread_t tid_produce[MAXNTHREADS], tid_consume;
19  if (argc != 3)
20   err_quit("usage: prodcons2 <#items> <#threads>");
21  nitems = min(atoi(argv[1]), MAXNITEMS);
22  nthreads = min(atoi(argv[2]), MAXNTHREADS);
23  Set_concurrency(nthreads);
24  /* запуск всех потоков-производителей */
25  for (i = 0; i < nthreads; i++) {
26   count[i] = 0;
27   Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
28  }
29  /* ожидание завершения всех производителей */
30  for (i = 0; i < nthreads; i++) {
31   Pthread_join(tid_produce[i], NULL);
32   printf("count[%d] = %dn", i, count[i]);
33  }
34  /* запуск и ожидание завершения потока-потребителя */
35  Pthread_create(&tid_consume, NULL, consume, NULL);
36  Pthread_join(tid_consume, NULL);
37  exit(0);
38 }

Совместное использование глобальных переменных потоками

4-12 Эти переменные совместно используются потоками. Мы объединяем их в структуру с именем shared вместе с взаимным исключением, чтобы подчеркнуть, что доступ к ним можно получить только вместе с ним. Переменная nput хранит индекс следующего элемента массива buff, подлежащего обработке, a nval содержит следующее значение, которое должно быть в него помещено (0, 1, 2 и т.д.). Мы выделяем память под эту структуру и инициализируем взаимное исключение, используемое для синхронизации потоков-производителей.

ПРИМЕЧАНИЕ

Мы всегда будем стараться размещать совместно используемые данные вместе со средствами синхронизации, к ним относящимися (взаимными исключениями, условными переменными, семафорами), в одной структуре, как мы сделали в этом примере. Это хороший стиль программирования. Однако во многих случаях совместно используемые данные являются динамическими, представляя собой, например, связный список. Мы, наверное, сможем поместить в структуру первый элемент списка вместе со средствами синхронизации (как в структуре mq_hdr в листинге 5.16), но оставшаяся часть списка в структуру не попадет. Следовательно, это решение не всегда является идеальным.

Аргументы командной строки

19-22 Первый аргумент командной строки указывает количество элементов, которые будут произведены производителями, а второй — количество запускаемых потоков-производителей.

Установка уровня параллельности

23 Функция set_concurrency (наша собственная) указывает подсистеме потоков количество одновременно выполняемых потоков. В Solaris 2.6 она просто вызывает thr_setconcurrency, причем ее запуск необходим, если мы хотим, чтобы у нескольких процессов-производителей была возможность начать выполняться. Если мы не сделаем этого вызова в системе Solaris, будет запущен только первый поток. В Digital Unix 4.0B наша функция set_concurrency не делает ничего, поскольку в этой системе по умолчанию все потоки процесса имеют равные права на вычислительные ресурсы.

ПРИМЕЧАНИЕ

Unix 98 требует наличия функции pthread_setconcurrency, выполняющей это же действие. Эта функция требуется для тех реализаций, которые мультиплексируют пользовательские потоки (создаваемые функцией pthread_create) на небольшое множество выполняемых потоков ядра. Такие реализации часто называются «многие-к-немногим» (many-to-few), «двухуровневые» (two-level) или «М-на-N» (M-to-N). В разделе 5.6 книги [3] отношения между пользовательскими потоками и потоками ядра рассматриваются более подробно.

Создание процессов-производителей

24-28 Создаются потоки-производители, каждый из которых вызывает функцию produce. Идентификаторы потоков хранятся в массиве tid_produce. Аргументом каждого потока-производителя является указатель на элемент массива count. Счетчики инициализируются значением 0, и каждый поток увеличивает значение своего счетчика на 1 при помещении очередного элемента в буфер. Содержимое массива счетчиков затем выводится на экран, так что мы можем узнать, сколько элементов было помещено в буфер каждым из потоков.

Ожидание завершения работы производителей, запуск потребителя

29-36 Мы ожидаем завершения работы всех потоков-производителей, выводя содержимое счетчика для каждого потока, а затем запускаем единственный процесс-потребитель. Таким образом (на данный момент) мы исключаем необходимость синхронизации между потребителем и производителями. Мы ждем завершения работы потребителя, а затем завершаем работу процесса. В листинге 7.2 приведен текст функций produce и consume.

Листинг 7.2. Функции produce и consume

//mutex/prodcons2.с
39 void *
40 produce(void *arg)
41 {
42  for (;;) {
43   Pthread_mutex_lock(&shared.mutex);
44   if (shared.nput >= nitems) {
45    Pthread_mutex_unlock(&shared.mutex);
46    return(NULL); /* массив полный, готово */
47   }
48   shared.buff[shared.nput] = shared.nval;
49   shared.nput++;
50   shared.nval++;
51   Pthread_mutex_unlock(&shared.mutex);
52   *((int *) arg) += 1;
53  }
54 }
55 void *
56 consume(void *arg)
57 {
58  int i;
59  for (i = 0; i < nitems; i++) {
60   if (shared.buff[i] != i)
61    printf("buff[%d] = %dn", i, shared.buff[i]);
62  }
63  return(NULL);
64 }

Формирование данных

42-53 Критическая область кода производителя состоит из проверки на достижение конца массива (завершение работы)

if (shared.nput >= nitems)

и трех строк, помещающих очередное значение в массив:

shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;

Мы защищаем эту область с помощью взаимного исключения, не забыв разблокировать его после завершения работы. Обратите внимание, что увеличение элемента count (через указатель arg) не относится к критической области, поскольку у каждого потока счетчик свой (массив count в функции main). Поэтому мы не включаем эту строку в блокируемую взаимным исключением область. Один из принципов хорошего стиля программирования заключается в минимизации объема кода, защищаемого взаимным исключением.

Потребитель проверяет содержимое массива

59-62 Потребитель проверяет правильность значений всех элементов массива и выводит сообщение в случае обнаружения ошибки. Как уже говорилось, эта функция запускается в единственном экземпляре и только после того, как все потоки-производители завершат свою работу, так что надобность в синхронизации отсутствует.

При запуске только что описанной программы с пятью процессами-производителями, которые должны вместе создать один миллион элементов данных, мы получим следующий результат:

solaris % prodcons2 1000000 5
count[0] = 167165
count[1] = 249891
count[2] = 194221
count[3] = 191815
count[4] = 196908

Как мы отмечали ранее, если убрать вызов set_concurrency, в системе Solaris 2.6 значение count[0] будет 1000000, а все остальные счетчики будут нулевыми.

Если убрать из этого примера блокировку с помощью взаимного исключения, он перестанет работать, как и предполагается. Потребитель обнаружит множество элементов buff[i], значения которых будут отличны от i. Также мы можем убедиться, что удаление блокировки ничего не изменит, если будет выполняться только один поток.

Оглавление книги


Генерация: 1.744. Запросов К БД/Cache: 3 / 0
поделиться
Вверх Вниз