Книга: Системное программирование в среде Windows

Пример: система "производитель/потребитель"

Пример: система "производитель/потребитель"

В этом примере возможности программы 8.1 расширяются таким образом, чтобы потребитель мог дожидаться момента, когда появится доступное сообщение. Тем самым устраняется одна из проблем, связанная с тем, что в предыдущем варианте программы потребитель должен был непрерывно повторять попытки получения новых сообщений. Результирующая программа (программа 8.2) называется eventPC.

Заметьте, что в предлагаемом решении вместо объектов CRITICAL_SECTION используются мьютексы; единственной причиной для этого послужило лишь желание проиллюстрировать применение мьютексов. В то же время, использование автоматически сбрасываемого события и функции SetEvent в потоке потребителя является весьма существенным для работы программы, поскольку это гарантирует освобождение только одного потока.

Также обратите внимание на способ связывания мьютекса и события со структурой данных блока сообщения. Мьютекс активизирует критический участок кода для доступа к объекту структуры данных, тогда как событие используется для уведомления о том, что появилось новое сообщение. Обобщая, можно сказать, что мьютекс гарантирует сохранение инвариантов объекта, а событие сигнализирует о нахождении объекта в заданном состоянии. Эта базовая методика широко применяется в последующих главах.

Программа 8.2. eventPC: система "производитель/потребитель", использующая сигналы 

/* Глава 8. eventPC.с */
/* Поддерживает два потока — производителя и потребителя. */
/* Производитель периодически создает буферные данные с контрольными */
/* суммами, или "блоки сообщений", сигнализирующие потребителю о готовности*/
/* сообщения. Поток потребителя отображает информацию в ответ на запрос.*/
#include "EvryThng.h"
#include <time.h>
#define DATA_SIZE 256
typedef struct msg_block_tag { /* Блок сообщения. */
 volatile DWORD f_ready, f_stop; /* Флаги готовности и прекращения сообщений. */
 volatile DWORD sequence; /* Порядковый номер блока сообщения. */
 volatile DWORD nCons, nLost; time_t timestamp;
 HANDLE mguard; /* Мьютекс, защищающий структуру блока сообщения. */ 
 HANDLE mready; /* Событие "Сообщение готово". */
 DWORD checksum; /* Контрольная сумма сообщения. */
 DWORD data[DATA_SIZE]; /* Содержимое сообщения. */
} MSG_BLOCK;
/* … */
DWORD _tmain(DWORD argc, LPTSTR argv[]) {
 DWORD Status, ThId;
 HANDLE produce_h, consume_h;
 /* Инициализировать мьютекс и событие (автоматически сбрасываемое) в блоке сообщения. */
 mblock.mguard = CreateMutex(NULL, FALSE, NULL);
 mblock.mready = CreateEvent(NULL, FALSE, FALSE, NULL);
 /* Создать потоки производителя и потребителя; ожидать их завершения.*/
 /* … Как в программе 9.1 … */
 CloseHandle(mblock.mguard);
 CloseHandle(mblock.mready);
 _tprintf(_T("Потоки производителя и потребителя завершили выполнениеn"));
 _tprintf(_T("Отправлено: %d, Получено: %d, Известные потери: %dn"), mblock.sequence, mblock.nCons, mblock.nLost);
 return 0;
}
DWORD WINAPI produce(void *arg)
/* Поток производителя — создание новых сообщений через случайные */
/* интервалы времени. */
{
 srand((DWORD)time(NULL)); /* Создать начальное число для генератора случайных чисел. */
 while(!mblock.f_stop) {
  /* Случайная задержка. */
  Sleep(rand() / 10); /* Длительный период ожидания следующего сообщения. */
  /* Получить и заполнить буфер. */
  WaitForSingleObject(mblock.mguard, INFINITE);
  __try {
   if (!mblock.f_stop) {
    mblock.f_ready = 0;
    MessageFill(&mblock);
    mblock.f_ready = 1;
    mblock.sequence++;
    SetEvent(mblock.mready); /* Сигнал "Сообщение готово". */
   }
  } __finally { ReleaseMutex (mblock.mguard); }
 }
 return 0;
}
DWORD WINAPI consume (void *arg) {
 DWORD ShutDown = 0;
 CHAR command, extra;
 /* Принять ОЧЕРЕДНОЕ сообщение по запросу пользователя. */
 while (!ShutDown) { /* Единственный поток, получающий доступ к стандартным устройствам ввода/вывода. */
  _tprintf(_T("n** Введите 'с' для приема; 's' для прекращения работы: "));
  _tscanf("%c%c", &command, &extra);
  if (command == 's') {
   WaitForSingleObject(mblock.mguard, INFINITE);
   ShutDown = mblock.f_stop = 1;
   ReleaseMutex(mblock.mguard);
  } else if (command == 'c') {
   /* Получить новый буфер принимаемых сообщений. */
   WaitForSingleObject(mblock.mready, INFINITE);
   WaitForSingleObject(mblock.mguard, INFINITE);
   __try {
    if (!mblock.f_ready) _leave;
    /* Ожидать наступление события, указывающего на готовность сообщения. */
    MessageDisplay(&mblock);
    mblock.nCons++;
    mblock.nLost = mblock.sequence – mblock.nCons;
    mblock.f_ready = 0; /* Новые готовые сообщения отсутствуют. */
   } __finally { ReleaseMutex (mblock.mguard); }
  } else {
   _tprintf(_T("Недопустимая команда. Повторите попытку.n"));
  }
 }
 return 0;
}

Примечание

Существует вероятность того, что поток потребителя, уведомленный о готовности сообщения, в действительности не успеет обработать текущее сообщение до того, как поток производителя сгенерирует еще одно сообщение до захвата мьютекса потоком потребителя. В результате такого поведения программы поток потребителя может обработать одно и то же сообщение дважды, если бы не проверка, предусмотренная в начале try-блока потребителя. Эта и другие аналогичные проблемы обсуждаются в главе 10.

Оглавление книги


Генерация: 5.138. Запросов К БД/Cache: 3 / 1
поделиться
Вверх Вниз