Книга: Системное программирование в среде Windows

Пример: использование очередей в многоступенчатом конвейере

Модель "хозяин/рабочий", во всех ее вариациях, является одной из наиболее популярных моделей многопоточного программирования, а программа 8.2 представляет простую модель "производитель/потребитель", являющуюся частным случаем более общей конвейерной модели (pipeline model).

В другом важном частном случае имеется один главный поток, который производит единичные рабочие задания (work units) для ограниченного количества рабочих потоков и помещает их в очередь. Такая методика может оказаться полезной при создании масштабируемого сервера с большим количеством клиентов (число которых может достигать тысячи и более), когда возможность выделения независимого рабочего потока для каждого клиента весьма сомнительна. В главе 14 задача создания масштабируемого сервера обсуждается в контексте портов завершения ввода/вывода.

В конвейерной модели каждый поток или группа потоков определенным образом обрабатывает единичные задания, например, сообщения, и передает их другим потокам для дальнейшей обработки. Аналогом многопоточного конвейера может служить производственная сборочная линия. Идеальным механизмом реализации конвейера являются очереди.

В программе 10.5 (ThreeStage.c) предусмотрено создание нескольких этапов производства и потребления, на каждой из которых поддерживается очередь рабочих заданий, подлежащих обработке. Каждая очередь имеет ограниченную, конечную длину. Всего существует три конвейерных ступени, соединяющих четыре этапа обработки. Программа имеет следующую структуру:

• Производители (producers) периодически создают единичные сообщения, дополненные контрольными суммами, используя для этого ту же функцию, что и в программе 8.2, если не считать того, что в каждом сообщении содержится дополнительное поле адресата, указывающее поток потребителя (consumer), для которой предназначено это сообщение, причем каждый производитель связывается только с одним потребителем. Количество пар "производитель/потребитель" задается в виде параметра командной строки. Далее производитель посылает одиночное сообщение передающему потоку (transmitter), помещая его в очередь передачи сообщений. Если очередь заполнена, производитель ждет, пока ее состояние не изменится.

• Передающий поток объединяет имеющиеся единичные сообщения (но не более пяти за один раз) и создает одно передаваемое сообщение, которое содержит заголовок и ряд единичных сообщений. Затем передающий поток помещает каждое передаваемое сообщение в очередь приема сообщений (receiver), блокируясь, если очередь заполнена. В общем случае передатчик и приемник могут связываться между собой через сетевое соединение. Произвольно выбранное здесь значение коэффициента блокирования (blocking factor), равное 5:1, легко поддается регулировке. 

• Принимающий поток обрабатывает единичные сообщения, входящие в состав каждого передаваемого сообщения, и помещает каждое из них в соответствующую очередь потребителя, если она не заполнена.

• Каждый поток потребителя получает одиночные сообщения по мере их поступления и записывает сообщение в файл журнала регистрации.

Блок-схема системы представлена на рис. 10.1. Обратите внимание, что эта система моделирует сетевое соединение, в котором сообщения, относящиеся к различным парам "отправитель/получатель" объединяются и передаются по общему каналу связи. 


Рис. 10.1. Многоступенчатый конвейер

В программе 10.5 предложен вариант реализации, в котором используются функции очереди из программы 10.4. Функции генерации и отображения сообщений здесь не представлены, но они взяты из программы 8.1. При этом, наряду с контрольными суммами и данными, в блоки сообщений введены поля производителя и адресата.

Программа 10.5. ThreeStage.с: многоступенчатыйконвейер 

/* Глава 10. ThreeStage.с */
/* Трехступенчатая система производитель/потребитель. */
/* Использование: ThreeStage npc goal. */
/* Запустить "npc" пар потоков производителя и потребителя. */
/* Каждый производитель должен сгенерировать в общей сложности */
/* "goal" сообщений, каждое из которых снабжается меткой, указывающей */
/* потребителя, для которого оно предназначено. */
/* Сообщения отправляются "передающему потоку", который, прежде чем */
/* отправить группу сообщений "принимающему потоку", выполняет некоторую*/
/* дополнительную обработку. Наконец, принимающий поток отправляет сообщения потокам потребителя. */
#include "EvryThng.h"
#include "SynchObj.h"
#include "messages.h"
#include <time.h>
#define DELAY_COUNT 1000
#define MAX_THREADS 1024
/* Размеры и коэффициенты блокирования очередей. Эти величины являются */
/* произвольными и могут регулироваться для обеспечения оптимальной */
/* производительности. Текущие значения не являются сбалансированными. */
#define TBLOCK_SIZE 5 /*Передающий поток формирует группы из 5 сообщений.*/
#define TBLOCK_TIMEOUT 50 /*Интервал ожидания сообщений передающим потоком.*/
#define P2T_QLEN 10 /* Размер очереди "производитель/передающий поток". */
#define T2R_QLEN 4 /*Размер очереди "передающий поток/принимающий поток".*/
#define R2C_QLEN 4 /* Размер очереди "принимающий поток/потребитель" -- */
/* для каждого потребителя существует только одна очередь.*/
DWORD WINAPI producer(PVOID);
DWORD WINAPI consumer(PVOID);
DWORD WINAPI transmitter(PVOID);
DWORD WINAPI receiver(PVOID);
typedef struct _THARG {
 volatile DWORD thread_number;
 volatile DWORD work_goal; /* Используется потоками производителей. */
 volatile DWORD work_done; /* Используется потоками производителей и потребителей. */ '
 char future[8]; 
} THARG;
/* Сгруппированные сообщения, посылаемые передающим потоком потребителю.*/
typedef struct t2r_msg_tag {
 volatile DWORD num_msgs; /* Количество содержащихся сообщений. */
 msg_block_t messages[TBLOCK_SIZE];
} t2r_msg_t;
queue_t p2tq, t2rq, *r2cq_array;
static volatile DWORD ShutDown = 0;
static DWORD EventTimeout = 50;
DWORD _tmain(DWORD argc, LPTSTR * argv[]) {
 DWORD tstatus, nthread, ithread, goal, thid;
 HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;
 THARG *producer_arg, *consumer_arg;
 nthread = atoi(argv[1]);
 goal = atoi(argv[2]);
 producer_th = malloc(nthread * sizeof(HANDLE));
 producer_arg = calloc(nthread, sizeof(THARG));
 consumer_th = malloc(nthread * sizeof(HANDLE));
 consumer_arg = calloc(nthread, sizeof(THARG));
 q_initialize(&p2tq, sizeof(msg_block_t), P2T_QLEN); 
 q_initialize(&t2rq, sizeof(t2r_msg_t), T2R_QLEN);
 /* Распределить ресурсы, инициализировать очереди "принимающий поток/потребитель" для каждого потребителя. */
 r2cq_array = calloc(nthread, sizeof(queue_t));
 for (ithread = 0; ithread < nthread; ithread++) {
  /* Инициализировать очередь r2с для потока данного потребителя. */
  q_initialize(&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN);
  /* Заполнить аргументы потока. */
  consumer_arg[ithread].thread_number = ithread;
  consumer_arg[ithread].work_goal = goal;
  consumer_arg[ithread].work_done = 0;
  consumer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, consumer, (PVOID)&consumer_arg[ithread], 0, &thid);
  producer_arg[ithread].thread_number = ithread;
  producer_arg[ithread].work_goal = goal;
  producer_arg[ithread].work_done = 0;
  producer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, producer, (PVOID)&producer_arg[ithread], 0, &thid);
 }
 transraitter_th = (HANDLE)_beginthreadex(NULL, 0, transmitter, NULL, 0, &thid);
 receiver_th = (HANDLE)_beginthreadex (NULL, 0, receiver, NULL, 0, &thid);
 _tprintf(_T("ХОЗЯИН: Выполняются все потокиn"));
 /* Ждать завершения потоков производителя. */
 for (ithread = 0; ithread < nthread; ithread++) {
  WaitForSingleObject(producer_th[ithread], INFINITE);
  _tprintf(_T("ХОЗЯИН: производитель %d выработал %d единичных сообщенийn"), ithread, producer_arg[ithread].work_done);
 }
 /* Производители завершили работу. */
 _tprintf(_T("ХОЗЯИН: Все потоки производителя выполнили свою работу.n"));
 /* Ждать завершения потоков потребителя. */
 for (ithread = 0; ithread < nthread; ithread++) {
  WaitForSingleObject(consumer_th[ithread], INFINITE);
  _tprintf(_T("ХОЗЯИН: потребитель %d принял %d одиночных сообщенийn"), ithread, consumer_arg[ithread].work_done);
 }
 _tprintf(_T("ХОЗЯИН: Все потоки потребителя выполнили свою работу.n"));
 ShutDown = 1; /* Установить флаг завершения работы. */
 /* Завершить выполнение и перейти в состояние ожидания передающих и принимающих потоков. */
 /* Эта процедура завершения работает нормально, поскольку и передающий,*/
 /* и принимающий потоки не владеют иными ресурсами, кроме мьютекса, */
 /* которые они могли бы покинуть по завершении выполнения, не уступив прав владения ими. Можете ли вы улучшить эту процедуру? */ 
 TerminateThread(transmitter_th, 0);
 TerminateThread(receiver_th, 0);
 WaitForSingleObject(transmitter_th, INFINITE);
 WaitForSingleObject(receiver_th, INFINITE);
 q_destroy(&p2tq);
 q_destroy(&t2rq);
 for (ithread = 0; ithread < nthread; ithread++) q_destroy(&r2cq_array [ithread]);
 free(r2cq_array);
 free(producer_th);
 free(consumer_th);
 free(producer_arg);
 free(consumer_arg);
 _tprintf(_T("Система завершила работу. Останов системыn"));
 return 0;
}
DWORD WINAPI producer(PVOID arg) {
 THARG * parg;
 DWORD ithread, tstatus;
 msg_block_t msg;
 parg = (THARG *)arg;
 ithread = parg->thread_number;
 while (parg->work_done < parg->work_goal) {
  /* Вырабатывать единичные сообщения, пока их общее количество */
  /* не станет равным "goal". */
  /* Сообщения снабжаются адресами отправителя и адресата, которые в */
  /* нашем примере одинаковы для всех сообщений, но в общем случае */
  /* могут быть различными. */
  delay_cpu(DELAY_COUNT * rand() / RAND_MAX);
  message_fill(&msg, ithread, ithread, parg->work_done);
  /* Поместить сообщение в очередь. */
  tstatus = q_put(&p2tq, &msg, sizeof(msg), INFINITE);
  parg->work_done++;
 }
 return 0;
}
DWORD WINAPI transmitter(PVOID arg) {
 /* Получись несколько сообщений от производителя, объединяя их в одно*/
 /* составное сообщение, предназначенное для принимающего потока. */
 DWORD tstatus, im;
 t2r_msg_t t2r_msg = {0};
 msg_block_t p2t_msg;
 while (!ShutDown) {
  t2r_msg.num_msgs = 0;
  /* Упаковать сообщения для передачи принимающему потоку. */
  for (im = 0; im < TBLOCK_SIZE; im++) {
   tstatus = q_get(&p2tq, &p2t_msg, sizeof(p2t_msg), INFINITE); 
   if (tstatus != 0) break;
   memcpy(&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg));
   t2r_rasg.num_msgs++;
  }
  tstatus = q_put(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
  if (tstatus != 0) return tstatus;
 }
 return 0;
}
DWORD WINAPI receiver(PVOID arg) {
 /* Получить составные сообщения от передающего потока; распаковать */
 /* их и передать соответствующему потребителю. */
 DWORD tstatus, im, ic;
 t2r_msg_t t2r_msg;
 msg_block_t r2c_msg;
 while (!ShutDown) {
  tstatus = q_get(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
  if (tstatus != 0) return tstatus;
  /* Распределить сообщения между соответствующими потребителями. */
  for (im = 0; im < t2r_msg.num_msgs; im++) {
   memcpy(&r2c_msg, &t2r_msg.messages[im], sizeof(r2c_msg));
   ic = r2c_msg.destination; /* Конечный потребитель. */
   tstatus = q_put(&r2cq_array[ic], &r2c_msg, sizeof(r2c_msg), INFINITE);
   if (tstatus != 0) return tstatus;
  }
 }
 return 0;
}
DWORD WINAPI consumer(PVOID arg) {
 THARG * carg;
 DWORD tstatus, ithread;
 msg_block_t msg;
 queue_t *pr2cq;
 carg = (THARG *)arg;
 ithread = carg->thread_number;
 carg = (THARG *)arg;
 pr2cq = &r2cq_array[ithread];
 while (carg->work_done < carg->work_goal) {
  /* Получить и отобразить (необязательно — не показано) сообщения. */
  tstatus = q_get(pr2cq, &msg, sizeof(msg), INFINITE);
  if (tstatus != 0) return tstatus;
  carg->work_done++;
 }
 return 0;
}
 

Оглавление книги


Генерация: 0.048. Запросов К БД/Cache: 0 / 0
поделиться
Вверх Вниз