Книга: Системное программирование в среде Windows

Пример: простая система "производитель/потребитель"

Программа 8.1 иллюстрирует, насколько полезными могут быть объекты CS. Кроме того, эта программа демонстрирует, как создаются защищенные структуры данных для хранения состояний объектов, и знакомит с понятием инварианта (invariant) — свойства состояния объекта, относительно которого гарантируется (путем соответствующей реализации программы), что оно будет истинным за пределами критического участка кода. 


Рис. 8.2. Разделение общей памяти синхронизированными потоками

Описание задачи приводится ниже.

• Имеются два потока, производитель (producer) и потребитель (consumer), работающие в полностью асинхронном режиме.

• Производитель периодически создает сообщения, содержащие таблицу чисел, например, таблицу биржевых котировок, которая периодически обновляется.

• По требованию пользователя потребитель отображает текущие данные. Требуется, чтобы отображаемые данные представляли собой самый последний полный набор данных, но никакие данные не должны отображаться дважды.

• Данные не должны отображаться в те промежутки времени, когда они обновляются производителем; устаревшие данные также не должны отображаться. Обратите внимание на то, что многие сообщения вообще никогда не используются и, таким образом, "теряются". Этот пример является частным случаем конвейерной модели, в которой данные передаются из одного потока в другой.

• В качестве средства контроля целостности данных производитель вычисляет простую контрольную сумму[28] данных таблицы, которая далее сравнивается с аналогичной суммой, вычисленной потребителем, дабы удостовериться в том, что данные не были повреждены при их передаче из одного потока в другой. Данные, полученные при обращении к таблице в моменты ее обновления, будут недействительными; использование объектов CS гарантирует, что этого никогда не произойдет. Инвариантом блока сообщения (message block invariant) является корректность контрольной суммы для содержимого текущего сообщения.

• Обоими потоками поддерживается статистика суммарного количества отправленных, полученных и утерянных сообщений.

Программа 8.1.simplePC: простая система "производитель/потребитель"

/* Глава 8. simplePC.с */
/* Поддерживает два потока — производителя и потребителя. */
/* Производитель периодически создает буферные данные с контрольными */
/* суммами, или "блоки сообщений", отображаемые потребителем по запросу */
/* пользователя. */
#include "EvryThng.h"
#include <time.h>
#define DATA_SIZE 256
typedef struct msg_block_tag { /* Блок сообщения. */
 volatile DWORD f_ready, f_stop; /* Флаги готовности и прекращения сообщений. */
 volatile DWORD sequence; /* Порядковый номер блока сообщения. */
 volatile DWORD nCons, nLost;
 time_t timestamp;
 CRITICAL_SECTION mguard; /* Структура защиты блока сообщения. */
 DWORD checksum; /* Контрольная сумма содержимого сообщения. */
 DWORD data[DATA_SIZE]; /* Содержимое сообщения. */
} MSG_BLOCK;
/* Одиночный блок, подготовленный к заполнению новым сообщением. */
MSG_BLOCK mblock = { 0, 0, 0, 0, 0 };
DWORD WINAPI produce(void*);
DWORD WINAPI consume(void*);
void MessageFill(MSG_BLOCK*);
void MessageDisplay(MSG_BLOCK*);
DWORD _tmain(DWORD argc, LPTSTR argv[]) {
 DWORD Status, ThId;
 HANDLE produce h, consume_h;
 /* Инициализировать критический участок блока сообщения. */
 InitializeCriticalSection (&mblock.mguard);
 /* Создать два потока. */
 produce_h = (HANDLE)_beginthreadex(NULL, 0, produce, NULL, 0, &ThId);
 consume_h = (HANDLE)_beginthreadex (NULL, 0, consume, NULL, 0, &ThId); 
 /* Ожидать завершения потоков производителя и потребителя. */
 WaitForSingleObject(consume_h, INFINITE);
 WaitForSingleObject(produce_h, INFINITE);
 DeleteCriticalSection(&mblock.mguard);
 _tprintf(_T("Потоки производителя и потребителя завершили выполнениеn"));
 _tprintf(_T("Отправлено: %d, Получено: %d, Известные потери: %dn"), mblock.sequence, mblock.nCons, mblock.nLost);
 return 0;
}
DWORD WINAPI produce(void *arg)
/* Поток производителя — создание новых сообщений через случайные */
/* интервалы времени. */
{
 srand((DWORD)time(NULL)); /* Создать начальное число для генератора случайных чисел. */
 while (!mblock.f_stop) {
  /* Случайная задержка. */
  Sleep(rand() / 100);
  /* Получить и заполнить буфер. */
  EnterCriticalSection(&mblock.mguard);
  __try {
   if (!mblock.f_stop) {
    mblock.f_ready = 0;
    MessageFill(&mblock);
    mblock.f_ready = 1;
    mblock.sequence++;
   }
  } __finally { LeaveCriticalSection (&mblock.mguard); }
 }
 return 0;
}
DWORD WINAPI consume (void *arg) {
 DWORD ShutDown = 0;
 CHAR command, extra;
 /* Принять ОЧЕРЕДНОЕ сообщение по запросу пользователя. */
 while (!ShutDown) { /* Единственный поток, получающий доступ к стандартным устройствам ввода/вывода. */
  _tprintf(_T("n**Введите 'с' для приема; 's' для прекращения работы: "));
  _tscanf("%c%c", &command, &extra);
  if (command == 's') {
   EnterCriticalSection(&mblock.mguard);
   ShutDown = mblock.f_stop = 1;
   LeaveCriticalSection(&mblock.mguard);
  } else if (command == 'c') { /* Получить новый буфер для принимаемых сообщений. */ 
   EnterCriticalSection(&mblock.mguard);
   __try {
    if (mblock.f_ready == 0) _tprintf(_T("Новые сообщения отсутствуют. Повторите попытку.n"));
    else {
     MessageDisplay(&mblock);
     mblock.nCons++;
     mblock.nLost = mblock.sequence – mblock.nCons;
     mblock.f_ready = 0; /* Новые сообщения отсутствуют. */
    }
   } __finally { LeaveCriticalSection (&mblock.mguard); }
  } else {
   tprintf(_T("Такая команда отсутствует. Повторите попытку.n"));
  }
 }
 return 0;
}
void MessageFill(MSG_BLOCK *mblock) {
 /* Заполнить буфер сообщения содержимым, включая контрольную сумму и отметку времени. */
 DWORD i;
 mblock->checksum = 0;
 for (i = 0; i < DATA_SIZE; i++) {
  mblock->data[i] = rand();
  mblock->checksum ^= mblock->data[i];
 }
 mblock->timestamp = time(NULL);
 return;
}
void MessageDisplay(MSG_BLOCK *mblock) {
 /* Отобразить буфер сообщения, отметку времени и контрольную сумму. */
 DWORD i, tcheck = 0;
 for (i = 0; i < DATA_SIZE; i++) tcheck ^= mblock->data[i];
 _tprintf(_T("nВремя генерации сообщения № %d: %s"), mblock->sequence, _tctime(&(mblock->timestamp)));
 _tprintf(_T("Первая и последняя записи: %х %хn"), mblock->data[0], mblock->data[DATA_SIZE – 1]);
 if (tcheck == mblock->checksum) _tprintf(_T("УСПЕШНАЯ ОБРАБОТКА –>Контрольная сумма совпадает.n"));
 else tprintf(_T("СБОЙ –>Несовпадение контрольной суммы. Сообщение запорчено.n"));
 return;
}

Оглавление книги


Генерация: 6.665. Запросов К БД/Cache: 3 / 1
поделиться
Вверх Вниз