Книга: UNIX: взаимодействие процессов

Функция mq_send

Функция mq_send

В листинге 5.25 приведен текст первой половины нашей функции mqsend.

Инициализация

14-29 Мы получаем указатели на используемые структуры и блокируем взаимное исключение для данной очереди. Проверяем, не превышает ли размер сообщения максимально допустимый для данной очереди.

Проверка очереди на пустоту и отправка уведомления

30-38 Если мы помещаем первое сообщение в пустую очередь, нужно проверить, не зарегистрирован ли какой-нибудь процесс на уведомление об этом событии и нет ли потоков, заблокированных в вызове mq_receive. Для проверки второго условия мы воспользуемся сохраняемым функцией mq_receive счетчиком mqh_nwait, содержащим количество потоков, заблокированных в вызове mq_receive. Если этот счетчик имеет ненулевое значение, мы не отправляем уведомление зарегистрированному процессу. Для отправки сигнала SIGEV_SIGNAL используется функция sigqueue. Затем процесс снимается с регистрации.

ПРИМЕЧАНИЕ

Вызов sigqueue для отправки сигнала приводит к передаче сигнала SI_QUEUE обработчику сигнала в структуре типа siginfo_t (раздел 5.7), что неправильно. Отправка правильного значения si_code (а именно SI_MESGQ) из пользовательского процесса осуществляется в зависимости от реализации. На с. 433 стандарта IEEE 1996 [8] отмечается, что для отправки этого сигнала из пользовательской библиотеки необходимо воспользоваться скрытым интерфейсом механизма отправки сигналов. 

Проверка заполненности очереди

39-48 Если очередь переполнена и установлен флаг O_NONBLOCK, мы возвращаем ошибку с кодом EAGAIN. В противном случае мы ожидаем сигнала по условной переменной mqh_wait, который, как мы увидим, отправляется функцией mq_receive при считывании сообщения из переполненной очереди.

ПРИМЕЧАНИЕ

Наша реализация упрощает ситуацию с возвращением ошибки EINTR при прерывании вызова mq_send сигналом, перехватываемым вызвавшим процессом. Проблема в том, что функция pthread_cond_wait не возвращает ошибки при возврате из обработчика сигнала: она может вернуть либо 0 (что рассматривается как ложное пробуждение), либо вообще не завершить работу. Все эти проблемы можно обойти, но это непросто. 

В листинге 5.26 приведена вторая половина функции mq_send. К моменту ее выполнения мы уже знаем о наличии в очереди свободного места для нашего сообщения.

Листинг 5.25. Функция mq_send: первая половина

//my_pxmsg_mmap/mq_send.с
1  #include "unpipc.h"
2  #include "mqueue.h"
3  int
4  mymq_send(mymqd_t mqd, const char *ptr, size_t len, unsigned int prio)
5  {
6   int n;
7   long index, freeindex;
8   int8_t *mptr;
9   struct sigevent *sigev;
10  struct mymq_hdr *mqhdr;
11  struct mymq_attr *attr;
12  struct mymsg_hdr *msghdr, *nmsghdr, *pmsghdr;
13  struct mymq_info *mqinfo;
14  mqinfo = mqd;
15  if (mqinfo->mqi_magic != MQI_MAGIC) {
16   errno = EBADF;
17   return(-1);
18  }
19  mqhdr = mqinfo->mqi_hdr; /* указатель типа struct */
20  mptr = (int8_t *) mqhdr; /* указатель на байт */
21  attr = &mqhdr->mqh_attr;
22  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
23   errno = n;
24   return(-1);
25  }
26  if (len > attr->mq_msgsize) {
27   errno = EMSGSIZE;
28   goto err;
29  }
30  if (attr->mq_curmsgs == 0) {
31   if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) {
32    sigev = &mqhdr->mqh_event;
33    if (sigev->sigev_notify == SIGEV_SIGNAL) {
34     sigqueue(mqhdr->mqh_pid, sigev->sigev_signo,
35      sigev->sigev_value);
36    }
37    mqhdr->mqh_pid = 0; /* снятие с регистрации */
38   }
39  } else if (attr->mq_curmsgs >= attr->mq_maxmsg) {
40   /* 4queue is full */
41   if (mqinfo->mqi_flags & O_NONBLOCK) {
32    errno = EAGAIN;
43    goto err;
44   }
45   /* ожидание освобождения места в очереди */
46   while (attr->mq_curmsgs >= attr->mq_maxmsg)
47    pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
48  }

Листинг 5.25. Функция mq_send: вторая половина

//my_pxmsg_mmap/mq_send.с
49  /* nmsghdr будет указывать на новое сообщение*/
50  if ((freeindex = mqhdr->mqh_free) == 0)
51   err_dump("mymq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
52  nmsghdr = (struct mymsg_hdr *) &mptr[freeindex];
53  nmsghdr->msg_prio = prio;
54  nmsghdr->msg_len = len;
55  memcpy(nmsghdr + 1, ptr, len); /* копирование сообщения в очередь */
56  mqhdr->mqh_free = nmsghdr->msg_next; /* новое начало списка пустых сообщений */
57  /* поиск места в списке для нового сообщения */
58  index = mqhdr->mqh_head;
59  pmsghdr = (struct mymsg_hdr *) &(mqhdr->mqh_head);
60  while (index != 0) {
61   msghdr = (struct mymsg_hdr *) &mptr[index];
62   if (prio > msghdr->msg_prio) {
63    nmsghdr->msg_next = index;
64    pmsghdr->msg_next = freeindex;
65    break;
66   }
67   index = msghdr->msg_next;
68   pmsghdr = msghdr;
69  }
70  if (index == 0) {
71   /* очередь была пуста или новое письмо добавлено к концу списка */
72   pmsghdr->msg_next = freeindex;
73   nmsghdr->msg_next = 0;
74  }
75  /* запускаем любой из процессов, заблокированных в mq_receive */
76  if (attr->mq_curmsgs == 0)
77   pthread_cond_signal(&mqhdr->mqh_wait);
78  attr->mq_curmsgs++;
79  pthread_mutex_unlock(&mqhdr->mqh_lock);
80  return(0);
81 err:
82  pthread_mutex_unlock(&mqhdr->mqh lock);
83  return(-1);
84 }

Получение индекса свободного блока

50-52 Поскольку количество свободных сообщений при создании очереди равно mq_maxmsg, ситуация, в которой mq_curmsgs будет меньше mq_maxmsg для пустого списка свободных сообщений, возникнуть не может.

Копирование сообщения

53-56 Указатель nmsghdr хранит адрес области памяти, в которую помещается сообщение. Приоритет и длина сообщения сохраняются в структуре msg_hdr, а затем в память копируется содержимое сообщения, переданного вызвавшим процессом.

Помещение нового сообщения в соответствующее место связного списка

57-74 Порядок сообщений в нашем списке зависит от их приоритета: они расположены в порядке его убывания. При добавлении нового сообщения мы проверяем, существуют ли сообщения с тем же приоритетом; в этом случае сообщение добавляется после последнего из них. Используя такой метод упорядочения, мы гарантируем, что mq_receive всегда будет возвращать старейшее сообщение с наивысшим приоритетом. По мере продвижения по списку мы сохраняем в pmsghdr адрес предыдущего сообщения, поскольку именно это сообщение будет хранить индекс нового сообщения в поле msg_next.

ПРИМЕЧАНИЕ

Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета. 

Пробуждение любого процесса, заблокированного в вызове mq_receive

75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.

78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.

Оглавление книги


Генерация: 0.644. Запросов К БД/Cache: 3 / 0
поделиться
Вверх Вниз