Книга: UNIX: взаимодействие процессов

13.6. Отправка сообщений на сервер

13.6. Отправка сообщений на сервер

Изменим наше решение задачи производителей и потребителей следующим образом. Сначала запускается сервер, создающий объект разделяемой памяти, в который клиенты записывают свои сообщения. Сервер просто выводит содержимое этих сообщений, хотя задачу можно и обобщить таким образом, чтобы он выполнял действия, аналогичные демону syslog, который описан в главе 13 [24]. Мы называем группу отправляющих сообщения процессов клиентами, потому что по отношению к нашему серверу они ими и являются, однако эти клиенты могут являться серверами по отношению к другим приложениям. Например, сервер Telnet является клиентом демона syslog, когда отправляет ему сообщения для занесения их в системный журнал.

Вместо передачи сообщений одним из описанных ранее методов (часть 2) будем хранить сообщения в разделяемой памяти. Это, разумеется, потребует какой-либо формы синхронизации действий клиентов, помещающих сообщения, и сервера, читающего их. На рис. 13.2 приведена схема приложения в целом.


Рис. 13.2. Несколько клиентов отправляют сообщения серверу через разделяемую память

Перед нами взаимодействие нескольких производителей (клиентов) и одного потребителя (сервер). Разделяемая память отображается в адресное пространство сервера и каждого из клиентов.

В листинге 13.8 приведен текст заголовочного файла cliserv2.h, в котором определена структура объекта, хранимого в разделяемой памяти.

Листинг 13.8. Заголовочный файл, определяющий содержимое разделяемой памяти

//pxshm/cliserv2.h
1  #include "unpipc.h"
2  #define MESGSIZE 256 /* максимальный размер сообщения в байтах, включая завершающий ноль */
3  #define NMESG 16 /* максимальное количество сообщений */
4  struct shmstruct { /* структура, хранящаяся в разделяемой памяти */
5   sem_t mutex; /* три семафора Posix, размещаемые в памяти */
6   sem_t nempty;
7   sem_t nstored;
8   int nput; /* индекс для следующего сообщения */
9   long noverflow; /* количество переполнений */
10  sem_t noverflowmutex; /* взаимное исключение для счетчика переполнений */
11  long msgoff[NMESG]; /* сдвиг для каждого из сообщений */
12  char msgdata[NMESG * MESGSIZE]; /* сами сообщения */
13 };

Основные семафоры и переменные

5-8 Три семафора Posix, размещаемых в памяти, используются для того же, для чего семафоры использовались в задаче производителей и потребителей в разделе 10.6. Их имена mutex, nempty, nstored. Переменная nput хранит индекс следующего помещаемого сообщения. Поскольку одновременно работают несколько производителей, эта переменная защищена взаимным исключением и хранится в разделяемой памяти вместе со всеми остальными.

Счетчик переполнений

9-10 Существует вероятность того, что клиент не сможет отправить сообщение из-за отсутствия свободного места для него. Если программа-клиент представляет собой сервер для других приложений (например, сервер FTP или HTTP), она не должна блокироваться в ожидании освобождения места для сообщения. Поэтому программа-клиент будет написана таким образом, чтобы она не блокировалась, но увеличивала счетчик переполнений (noverflow). Поскольку этот счетчик также является общим для всех процессов, он также должен быть защищен взаимным исключением, чтобы его значение не было повреждено.

Сдвиги сообщений и их содержимое

11-12 Массив msgoff содержит сдвиги сообщений в массиве msgdata, в котором сообщения хранятся подряд. Таким образом, сдвиг первого сообщения msgoff[0] = 0, msgoff [1] = 256 (значение MESGSIZE), msgoff [2] = 512 и т. д.

Нужно понимать, что при работе с разделяемой памятью использовать сдвиг в таких случаях необходимо, поскольку объект разделяемой памяти может быть отображен в разные области адресного пространства процесса (может начинаться с разных физических адресов). Возвращаемое mmap значение для каждого процесса может быть индивидуальным. Поэтому при работе с объектами разделяемой памяти нельзя использовать указатели, содержащие реальные адреса переменных в этом объекте.

В листинге 13.9 приведен текст программы-сервера, которая ожидает помещения сообщений в разделяемую память, а затем выводит их.

Листинг 13.9. Сервер, считывающий сообщения из разделяемой памяти

//pxshm/server2.c
1  #include "cliserv2.h"
2  int
3  main(int argc, char **argv)
4  {
5   int fd, index, lastnoverflow, temp;
6   long offset;
7   struct shmstruct *ptr;
8   if (argc != 2)
9    err_quit("usage: server2 <name>");
10  /* создание объекта разделяемой памяти, установка размера, отображение в память, закрытие дескриптора */
11  shm_unlink(Px_ipc_name(argv[1])); /* ошибка игнорируется */
12  fd = Shm_open(Px_ipc_name(argv[1]), O_RDWR | O_CREAT | O_EXCL, FILE_MODE);
13  ptr = Mmap(NULL, sizeof(struct shmstruct), PROT_READ | PROT_WRITE,
14   MAP_SHARED, fd, 0);
15  Ftruncate(fd, sizeof(struct shmstruct));
16  Close(fd);
17  /* инициализация массива сдвигов */
18  for (index = 0; index < NMESG; index++)
19   ptr->msgoff[index] = index * MESGSIZE;
20  /* инициализация семафоров в разделяемой памяти */
21  Sem_init(&ptr->mutex, 1, 1);
22  Sem_init(&ptr->nempty, 1, NMESG);
23  Sem_init(&ptr->nstored, 1, 0);
24  Sem_init(&ptr->noverflowmutex, 1, 1);
25  /* программа-потребитель */
26  index = 0;
27  lastnoverflow = 0;
28  for (;;) {
29   Sem_wait(&ptr->nstored);
30   Sem_wait(&ptr->mutex);
31   offset = ptr->msgoff[index];
32   printf("index = %d: %sn", index, &ptr->msgdata[offset]);
33   if (++index >= NMESG)
34    index =0; /* циклический буфер */
35   Sem_post(&ptr->mutex);
36   Sem_post(&ptr->nempty);
37   Sem_wait(&ptr->noverflowmutex);
38   temp = ptr->noverflow; /* не выводим, пока не снимем блокировку */
39   Sem_post(&ptr->noverflowmutex);
40   if (temp != lastnoverflow) {
41    printf("noverflow = %dn", temp);
42    lastnoverflow = temp;
43   }
44  }
45  exit(0);
46 }

Создание объекта разделяемой памяти

10-16 Сначала делается вызов shm_unlink, чтобы удалить объект с тем же именем, который мог остаться после другого приложения. Затем объект разделяемой памяти создается вызовом shm_open и отображается в адресное пространство процесса вызовом mmap, после чего дескриптор объекта закрывается.

Инициализация массива сдвигов

17-19 Массив сдвигов инициализируется сдвигами сообщений.

Инициализация семафоров

20-24 Инициализируются четыре семафора, размещаемые в объекте разделяемой памяти. Второй аргумент sem_init всегда делается ненулевым, поскольку семафоры будут использоваться совместно несколькими процессами.

Ожидание сообщения, вывод его содержимого

25-36 Первая половина цикла for написана по стандартному алгоритму потребителя: ожидание изменения семафора nstored, установка блокировки для семафора mutex, обработка данных, увеличение значения семафора nempty.

Обработка переполнений

37-43 При каждом проходе цикла мы проверяем наличие возникших переполнений. Сравнивается текущее значение noverflows с предыдущим. Если значение изменилось, оно выводится на экран и сохраняется. Обратите внимание, что значение считывается с заблокированным взаимным исключением noverflowmutex, но блокировка снимается перед сравнением и выводом значения. Идея в том, что нужно всегда следовать общему правилу минимизации количества операций, выполняемых с заблокированным взаимным исключением. В листинге 13.10 приведен текст программы-клиента.

Листинг 13.10. Клиент, помещающий сообщения в разделяемую память

//pxshm/client2.c
1  #include "cliserv2.h"
2  int
3  main(int argc, char **argv)
4  {
5   int fd, i, nloop, nusec;
6   pid_t pid;
7   char mesg[MESGSIZE];
8   long offset;
9   struct shmstruct *ptr;
10  if (argc != 4)
11   err_quit("usage: client2 <name> <#loops> <#usec>");
12  nloop = atoi(argv[2]);
13  nusec = atoi(argv[3]);
14  /* открытие и отображение объекта разделяемой памяти, созданного сервером заранее */
15  fd = Shm_open(Px_ipc_name(argv[1]), O_RDWR, FILE_MODE);
16  ptr = Mmap(NULL, sizeof(struct shmstruct), PROT_READ | PROT_WRITE,
17   MAP_SHARED, fd, 0);
18  Close(fd);
19  pid = getpid();
20  for (i = 0; i < nloop; i++) {
21   Sleep_us(nusec);
22   snprintf(mesg, MESGSIZE, "pid %ld; message %d", (long) pid, i);
23   if (sem_trywait(&ptr->nempty) == –1) {
24    if (errno == EAGAIN) {
25     Sem_wait(&ptr->noverflowmutex);
26     ptr->noverflow++;
27     Sem_post(&ptr->noverflowmutex);
28     continue;
29    } else
30     err_sys("sem_trywait error");
31   }
32   Sem_wait(&ptr->mutex);
33   offset = ptr->msgoff[ptr->nput];
34   if (++(ptr->nput) >= NMESG)
35    ptr->nput = 0; /* циклический буфер */
36   Sem_post(&ptr->mutex);
37   strcpy(&ptr->msgdata[offset], mesg);
38   Sem_post(&ptr->nstored);
39  }
40  exit(0);
41 }

Аргументы командной строки

10-13 Первый аргумент командной строки задает имя объекта разделяемой памяти; второй — количество сообщений, которые должны быть отправлены серверу данным клиентом. Последний аргумент задает паузу перед отправкой очередного сообщения (в микросекундах). Мы сможем получить ситуацию переполнения, запустив одновременно несколько экземпляров клиентов и указав небольшое значение для этой паузы. Таким образом мы сможем убедиться, что сервер корректно обрабатывает ситуацию переполнения.

Открытие и отображение разделяемой памяти

14-18 Мы открываем объект разделяемой памяти, предполагая, что он уже создан и проинициализирован сервером, а затем отображаем его в адресное пространство процесса. После этого дескриптор может быть закрыт.

Отправка сообщений

19-31 Клиент работает по простому алгоритму программы-производителя, но вместо вызова sem_wait(nempty), который приводил бы к блокированию клиента в случае отсутствия места в буфере для следующего сообщения, мы вызываем sem_trywait — эта функция не блокируется. Если значение семафора нулевое, возвращается ошибка EAGAIN. Мы обрабатываем эту ошибку, увеличивая значение счетчика переполнений.

ПРИМЕЧАНИЕ

sleep_us — функция из листингов С.9 и С.10 [21]. Она приостанавливает выполнение программы на заданное количество микросекунд. Реализуется вызовом select или poll. 

32-37 Пока заблокирован семафор mutex, мы можем получить значение сдвига (offset) и увеличить счетчик nput, но мы снимаем блокировку с этого семафора перед операцией копирования сообщения в разделяемую память. Когда семафор заблокирован, должны выполняться только самые необходимые операции.

Сначала запустим сервер в фоновом режиме, а затем запустим один экземпляр программы-клиента, указав 50 сообщений и нулевую паузу между ними: 

solaris % server2 serv2 &
[2] 27223
solaris % client2 serv250 0
index = 0: pid 27224: message 0
index = 1: pid 27224: message 1
index = 2: pid 27224: message 2
…                                продолжает в том же духе
index = 15: pid 27224: message 47
index = 0: pid 27224: message 48
index = 1: pid 27224: message 49 нет утерянных сообщений

Но если мы запустим программу-клиент еще раз, то мы увидим возникновение переполнений.

solaris % client2 serv250 0
index = 2: pid 27228: message 0
index = 3: pid 27228: message 1
…              пока все в порядке
index = 10: pid 27228: message 8
index = 11: pid 27228: message 9
noverflow = 25 утеряно 25 сообщений
index = 12: pid 27228: message 10
index = 13: pid 27228: message 11
…              нормально обрабатываются сообщения 12-22
index = 9: pid 27228: message 23
index = 10: pid 27228: message 24

На этот раз клиент успешно отправил сообщения 0-9, которые были получены и выведены сервером. Затем клиент снова получил управление и поместил сообщения 10-49, но места хватило только для первых 15, а последующие 25 (с 25 по 49) не были сохранены из-за переполнения:

Очевидно, что в этом примере переполнение возникло из-за того, что мы потребовали от клиента отправлять сообщения так часто, как только возможно, не делая между ними пауз. В реальном мире такое случается редко. Целью этого примера было продемонстрировать обработку ситуаций, в которых места для очередного сообщения нет, но клиент не должен блокироваться. Такая ситуация может возникнуть, разумеется, не только при использовании разделяемой памяти, но и при использовании очередей сообщений, именованных и неименованных каналов.

ПРИМЕЧАНИЕ

Переполнение приемного буфера данными встречается не только в этом примере. В разделе 8.13 [24] обсуждалась такая ситуация в связи с дейтаграммами UDP и приемным буфером сокета UDP. В разделе 18.2 [23] подробно рассказывается о том, как доменные сокеты Unix возвращают отправителю ошибку ENOBUFS при переполнении приемного буфера получателя. Это отличает доменные сокеты от протокола UDP. Программа-клиент из листинга 13.10 узнает о переполнении буфера, поэтому если этот код поместить в функцию общего назначения, которую затем будут использовать другие программы, такая функция сможет возвращать ошибку вызывающему процессу при переполнении буфера сервера.

Оглавление книги


Генерация: 1.856. Запросов К БД/Cache: 3 / 0
поделиться
Вверх Вниз