Книга: UNIX: взаимодействие процессов

Функция mq_open

Функция mq_open

В листинге 5.17 приведен текст первой части функции mq_open, создающей новую очередь сообщений или открывающей существующую.

Листинг 5.17. Функция mq_open: первая часть

//my_pxmsg._mmap/mq_open. с
1  #include "unpipc.h"
2  #include "mqueue.h"
3  #include <stdarg.h>
4  #define MAX_TRIES 10
5  struct mymq_attr defattr =
6   { 0, 128, 1024, 0 };
7  mymqd_t
8  mymq_open(const char *pathname, int oflag, …)
9  {
10  int i, fd, nonblock, created, save_errno;
11  long msgsize, filesize, index;
12  va_list ap;
13  mode_t mode;
14  int8_t *mptr;
15  struct stat statbuff;
16  struct mymq_hdr *mqhdr;
17  struct mymsg_hdr *msghdr;
18  struct mymq_attr *attr;
19  struct mymq_info *mqinfo;
20  pthread_mutexattr_t mattr;
21  pthread_condattr_t cattr;
22  created = 0;
23  nonblock = oflag & O_NONBLOCK;
24  oflag &= ~O_NONBLOCK;
25  mptr = (int8_t *) MAP_FAILED;
26  mqinfo = NULL;
27 again:
28  if (oflag & O_CREAT) {
29   va_start(ap, oflag); /* ар инициализируется последним аргументом */
30   mode = va_arg(ap, va_mode_t) & ~S_IXUSR;
31   attr = va_arg(ap, struct mymq_attr *);
32   va_end(ap);
33   /* открытие с установкой бита user-execute */
34   fd = open (pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
35   if (fd < 0) {
36    if (errno == EEXIST && (oflag & O_EXCL) == 0)
37     goto exists; /* уже существует, OK */
38    else
39     return((mymqd_t) –1);
40   }
41   created = 1;
42   /* при создании файла он инициализируется */
43   if (attr == NULL)
44    attr = &defattr;
45   else {
46    if (attr->mq_maxmsg <– 0 || attr->mq_msgsize <= 0) {
47     errno = EINVAL;
48     goto err;
49    }
50   }

Обработка списка аргументов переменного размера

29-32 Функция может быть вызвана либо с двумя, либо с четырьмя аргументами в зависимости от того, указан ли флаг O_CREAT. Если флаг указан, третий аргумент имеет тип mode_t, а это простой системный тип, являющийся одним из целых типов. При этом мы столкнемся с проблемой в BSD/OS, где этот тип данных определен как unsigned short (16 бит). Поскольку целое в этой реализации занимает 32 бита, компилятор С увеличивает аргумент этого типа с 16 до 32 бит, потому что все короткие целые в списке аргументов увеличиваются до обычных целых. Но если мы укажем mode_t при вызове va_arg, он пропустит 16 бит аргумента в стеке, если этот аргумент был увеличен до 32 бит. Следовательно, мы должны определить свой собственный тип данных, va_mode_t, который будет целым в BSD/OS и типом mode_t в других системах. Эту проблему с переносимостью решают приведенные ниже строки нашего заголовка unpipc.h (листинг В.1):

#ifdef __bsdi__
#define va_mode_t int
#else
#define va_mode_t mode_t
#endif

30 Мы сбрасываем бит user-execute (S_IXUSR) в переменной mode по причинам, которые будут вскоре раскрыты.

Создание новой очереди сообщений

33-34 Создается обычный файл с именем, указанным при вызове функции, и устанавливается бит user-execute. 

Обработка потенциальной ситуации гонок

35-40 Если бы при указании флага O_CREAT мы просто открыли файл, отобразили его содержимое в память и проинициализировали отображенный файл (как будет описано ниже), у нас возникла бы ситуация гонок. Очередь сообщений инициализируется mq_open только в том случае, если вызывающий процесс указывает флаг O_CREAT и очередь сообщений еще не существует. Это означает, что нам нужно каким-то образом определять, существует она или нет. Для этого при открытии файла для последующего отображения в память мы всегда указываем флаг O_EXCL. Возвращение ошибки EEXIST функцией open является ошибкой для mq_open только в том случае, если при вызове был указан флаг O_EXCL. В противном случае при возвращении функцией open ошибки EEXIST мы делаем вывод, что файл уже существует, и переходим к листингу 5.19, как если бы флаг O_CREAT вовсе не был указан.

Ситуация гонок может возникнуть потому, что использование отображаемого в память файла для реализации очереди сообщений требует двух шагов при инициализации очереди: сначала файл должен быть создан функцией open, а затем его содержимое должно быть проинициализировано. Проблема возникает, если два потока (одного или различных процессов) вызывают mq_open приблизительно одновременно. Один из потоков может создать файл, после чего управление будет передано системой второму потоку, прежде чем первый завершит инициализацию файла. Второй поток обнаружит, что файл уже существует (вызвав open с флагом O_EXCL), и приступит к использованию очереди сообщений.

Мы используем бит user-execute для указания того, был ли проинициализирован файл с очередью сообщений. Этот бит устанавливается только тем потоком, который создает этот файл (флаг O_EXCL позволяет определить этот поток); этот поток инициализирует файл с очередью сообщений, а затем сбрасывает бит user-execute.

Аналогичная ситуация может возникнуть в листингах 10.28 и 10.37.

Проверка атрибутов

42-50 Если при вызове в качестве последнего аргумента передан нулевой указатель, очередь сообщений инициализируется со значениями атрибутов по умолчанию: 128 сообщений в очереди и 1024 байта на сообщение. Если атрибуты указаны явно, мы проверяем, что mq_maxmsg и mq_msgsize имеют положительные значения.

Вторая часть функции mq_open приведена в листинге 5.18. Она завершает инициализацию новой очереди сообщений.

Листинг 5.18. Вторая часть функции mq_open: инициализация новой очереди

//my_pxmsg_mmap/mq_open.с
51    /* вычисление и установка размера файла */
52    msgsize = MSGSIZE(attr->mq_msgsize);
53    filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *
54     (sizeof(struct mymsg_hdr) + msgsize));
55    if (lseek(fd, filesize – 1, SEEK_SET) == –1)
56     goto err;
57    if (write(fd, "", 1) == –1)
58     goto err;
59    /* отображение файла в память */
60    mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
61     MAP_SHARED, fd, 0);
62    if (mptr == MAP_FAILED)
63     goto err;
64    /* выделение структуры mymq_info{} для очереди */
65    if ((mqinfo = mallос (sizeof (struct mymq_info))) == NULL)
66     goto err;
67    mqinfo->mqi_hdr = mqhdr = (struct mymq_hdr *) mptr;
68    mqinfo->mqi_magic = MQI_MAGIC;
69    mqinfo->mqi_flags = nonblock;
70    /* инициализация заголовка в начале файла */
71    /* создание списка пустых сообщений */
72    mqhdr->mqh_attr.mq_flags = 0;
73    mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
74    mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
75    mqhdr->mqh_attr.mq_curmsgs = 0;
76    mqhdr->mqh_nwait = 0;
77    mqhdr->mqh_pid = 0;
78    mqhdr->mqh_head = 0;
79    index = sizeof(struct mymq_hdr);
80    mqhdr->mqh_free = index;
81    for (i = 0; i < attr->mq_maxmsg – 1; i++) {
82     msghdr = (struct mymsg_hdr *) &mptr[index];
83     index += sizeof(struct mymsg_hdr) + msgsize;
84     msghdr->msg_next = index;
85    }
86    msghdr = (struct mymsg_hdr *) &mptr[index];
87    msghdr->msg_next = 0; /* конец списка пустых сообщений */
88    /* инициализация взаимного исключения и условной переменной */
89    if ((i = pthread_mutexattr_init(&mattr)) != 0)
90     goto pthreaderr;
91    pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
92    i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
93    pthread_mutexattr_destroy(&mattr); /* обязательно нужно удалить */
94    if (i != 0)
95     goto pthreaderr:
96    if ((i = pthread_condattr_init(&cattr)) != 0)
97     goto pthreaderr;
98    pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
99    i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
100   pthread_condattr_destroy(&cattr); /* обязательно нужно удалить */
101   if (i != 0)
102    goto pthreaderr;
103   /* инициализация завершена, снимаем бит user-execute */
104   if (fchmod(fd, mode) == –1)
105    goto err;
106   close(fd);
107   return((mymqd_t) mqinfo);
108  }

Установка размера файла

51-58 Вычисляется размер сообщения, который затем округляется до кратного размеру длинного целого. Также в файле отводится место для структуры mq_hdr в начале файла и msghdr в начале каждого сообщения (рис. 5.2). Размер вновь созданного файла устанавливается функцией lseek и записью одного байта со значением 0. Проще было бы вызвать ftruncate (раздел 13.3), но у нас нет гарантий, что это сработало бы для увеличения размера файла.

Отображение файла в память

59-63 Файл отображается в память функцией mmap.

Выделение памяти под структуру mq_info

64-66 При каждом вызове mq_open создается отдельный экземпляр mq_infо. Эта структура после создания инициализируется.

Инициализация структуры mq_hdr

67-87 Инициализируется структура mq_hdr. Заголовок связного списка сообщений (mqh_head) инициализируется нулем, а все сообщения в очереди добавляются к списку свободных (mqh_frее).

Инициализация взаимного исключения и условной переменной

88-102 Поскольку очереди сообщений Posix могут использоваться совместно произвольным количеством процессов, которые знают имя очереди и имеют соответствующие разрешения, нам нужно инициализировать взаимное исключение и условную переменную с атрибутом PTHREAD_PROCESS_SHARED. Для этого мы сначала инициализируем атрибуты вызовом pthread_mutexattr_init, а затем устанавливаем значение атрибута совместного использования процессами, вызвав pthread_mutexattr_setpshared. После этого взаимное исключение инициализируется вызовом pthread_mutex_init. Аналогичные действия выполняются для условной переменной. Мы должны аккуратно удалить взаимное исключение и условную переменную даже при возникновении ошибки, поскольку вызовы pthread_ mutexattr_init и pthread_condattr_init выделяют под них память (упражнение 7.3).

Сброс бита user-execute

103-107 После инициализации очереди сообщений мы сбрасываем бит user-execute. Это говорит другим процессам о том, что очередь была проинициализирована. Мы также закрываем файл вызовом close, поскольку он был успешно отображен в память и держать его открытым больше нет необходимости.

В листинге 5.19 приведен конец функции mq_open, в котором осуществляется открытие существующей очереди сообщений.

Листинг 5.19. Третья часть функции mq_open: открытие существующей очереди сообщений

//my_pxmsg_mmap/mq_open.с
109 exists:
110  /* открытие файла и отображение его в память */
111  if ((fd = open(pathname, O_RDWR)) < 0) {
112   if (errno == ENOENT && (oflag & O_CREAT))
113    goto again;
114  goto err;
115  }
116  /* проверяем, что инициализация завершена */
117  for (i = 0; i < MAX TRIES; i++) {
118   if (stat(pathname, &statbuff) == –1) {
119    if (errno == ENOENT && (oflag & O_CREAT)) {
120     close(fd);
121     goto again;
122    }
123    goto err;
124   }
125   if ((statbuff.st_mode & S_IXUSR) == 0)
126   break;
127   sleep(1);
128  }
129  if (i == MAX_TRIES) {
130   errno = ETIMEDOUT;
131   goto err;
132  }
133  filesize = statbuff.st_size;
134  mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
135  if (mptr == MAP_FAILED)
136   goto err;
137  close(fd);
138  /* выделяем одну mymq_info{} для каждого вызова open */
139  if ((mqinfo = malloc(sizeof(struct mymq_info))) == NULL)
140   goto err;
141  rnqinfo->mqi_hdr = (struct mymq_hdr *) mptr;
142  mqinfo->mqi_magic = MQI_MAGIC;
143  mqinfo->mqi_flags = nonblock;
144  return((mymqd_t) mqinfo);
145 pthreaderr:
146  errno = i;
147 err:
148  /* не даем следующим вызовам изменить errno */
149  save_errno = errno;
150  if (created)
151   unlink(pathname);
152  if (mptr != MAP_FAILED)
153   munmap(mptr, filesize);
154  if (mqinfo != NULL)
155   free(mqinfo);
156  close(fd);
157  errno = save_errno;
158  return((mymqd_t) –1);
159 }

Открытие существующей очереди сообщений

109-115 Здесь мы завершаем работу, если флаг O_CREAT не был указан или если он был указан, но очередь уже существовала. В любом случае, мы открываем существующую очередь сообщений. Для этого мы открываем для чтения и записи файл, в котором она содержится, функцией open и отображаем его содержимое в адресное пpocтрaнcтвo процесса (mmap).

ПРИМЕЧАНИЕ

Наша реализация сильно упрощена в том, что касается режима открытия файла. Даже если вызвавший процесс указывает флаг O_RDONLY, мы должны дать возможность доступа для чтения и записи при открытии файла командой open и при отображении его в память командой mmap, поскольку невозможно считать сообщение из очереди, не изменив содержимое файла. Аналогично невозможно записать сообщение в очередь, не имея доступа на чтение. Обойти эту проблему можно, сохранив режим открытия (O_RDONLY, O_WRONLY, O_RDWR) в структуре mq_info и проверяя этот режим в каждой из функций. Например, mq_receive должна возвращать ошибку, если в mq_info хранится значение O_WRONLY.

Проверка готовности очереди

116-132 Нам необходимо дождаться, когда очередь будет проинициализирована (в случае, если несколько потоков сделают попытку открыть ее приблизительно одновременно). Для этого мы вызываем stat и проверяем разрешения доступа к файлу (поле st_mode структуры stat). Если бит user-execute сброшен, очередь уже проинициализирована.

Этот участок кода обрабатывает другую возможную ситуацию гонок. Предположим, что два потока разных процессов попытаются открыть очередь приблизительно одновременно. Первый поток создает файл и блокируется при вызове lseek в листинге 5.18. Второй поток обнаруживает, что файл уже существует, и переходит к метке exists, где он вновь открывает файл функцией open и при этом блокируется. Затем продолжается выполнение первого потока, но его вызов mmap в листинге 5.18 не срабатывает (возможно, он превысил лимит использования памяти), поэтому он переходит на метку err и удаляет созданный файл вызовом unlink. Продолжается выполнение второго потока, но если бы мы вызывали fstat вместо stat, он бы вышел по тайм-ауту в цикле for, ожидая инициализации файла. Вместо этого мы вызываем stat, которая возвращает ошибку, если файл не существует, и, если флаг O_CREAT был указан при вызове mq_open, мы переходим на метку again (листинг 5.17) для повторного создания файла. Эта ситуация гонок заставляет нас также проверять, не возвращается ли при вызове open ошибка ENOENT.

Отображение файла в память; создание и инициализация структуры mq_info

133-144 Файл отображается в память, после чего его дескриптор может быть закрыт. Затем мы выделяем место под структуру mq_infо и инициализируем ее. Возвращаемое значение представляет собой указатель на эту структуру.

Обработка ошибок

145-148 При возникновении ошибок происходит переход к метке err, а переменной errno присваивается значение, которое должно быть возвращено функцией mq_open. Мы аккуратно вызываем функции для очистки памяти от выделенных объектов, чтобы переменная errno не изменила свое значение в случае возникновения ошибки в этих функциях.

Оглавление книги


Генерация: 0.553. Запросов К БД/Cache: 2 / 0
поделиться
Вверх Вниз