Книга: UNIX: взаимодействие процессов

Пример: одна очередь для каждого клиента

Пример: одна очередь для каждого клиента

Изменим теперь предыдущий пример таким образом, чтобы все запросы клиентов передавались по одной очереди, но для отправки ответов использовалась бы отдельная очередь для каждого клиента. На рис. 6.3 изображена схема такого приложения. 


Рис. 6.3. Одна очередь для сервера и по одной для каждого клиента

Ключ очереди сервера должен быть известен клиентам, а сами клиенты создают свои очереди с ключом IPC_PRIVATE. Вместо передачи серверу идентификатора процесса клиенты сообщают ему идентификатор своей очереди, в которую сервер направляет свой ответ. Этот сервер является параллельным: для каждого нового клиента порождается отдельный процесс.

ПРИМЕЧАНИЕ

При такой схеме может возникнуть проблема в случае «гибели» клиента, потому что тогда сообщения останутся в его очереди навсегда (по крайней мере до перезагрузки ядра или явного удаления очереди другим процессом).

Нижеследующие заголовочные файлы и функции не претерпевают изменений по сравнению с предыдущими версиями:

? mesg.h (листинг 4.12);

? svmsg.h (листинг 6.7);

? функция main сервера (листинг 6.12);

? функция mesg_send (листинг 4.13).

Функция main клиента приведена в листинге 6.16; она слегка изменилась по сравнению с листингом 6.14. Мы открываем очередь сервера с известным ключом (MQ_KEY1) и создаем нашу собственную очередь с ключом IPC_PRIVATE. Два идентификатора этих очередей становятся аргументами функции client (листинг 6.17). После завершения работы клиента его персональная очередь удаляется.

Листинг 6.16. Функция main клиента

//svmsgmpxnq/client_main.с
1  #include "svmsg.h"
2  void client(int, int);
3  int
4  main(int argc, char **argv)
5  {
6   int readid, writeid;
7   /* сервер должен создать свою очередь */
8   writeid = Msgget(MQ_KEY1, 0);
9   /* мы создаем свою собственную очередь */
10  readid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
11  client(readid, writeid);
12  /* и удаляем нашу собственную очередь */
13  Msgctl(readid, IPC_RMID, NULL);
14  exit(0);
15 }

Листинг 6.17. Функция client

//svmsgmpxnq/client.с
1  #include "mesg.h"
2  void
3  client(int readid, int writeid)
4  {
5   size_t len;
6   ssize_t n;
7   char *ptr;
8   struct mymesg mesg;
9   /* инициализируем буфер идентификатором очереди и пробелом */
10  snprintf(mesg.mesg_data, MAXMESGDATA, "%d ", readid);
11  len = strlen(mesg.mesg_data);
12  ptr = mesg.mesg_data + len;
13  /* считываем имя файла */
14  Fgets(ptr, MAXMESGDATA – len, stdin);
15  len = strlen(mesg.mesg_data);
16  if (mesg.mesg_data[len-1] == 'n')
17   len--; /* удаляем перевод строки fgets() */
18  mesg.mesg_len = len;
19  mesg.mesg_type = 1;
20  /* отправляем идентификатор очереди и имя файла серверу */
21  Mesg_send(writeid, &mesg);
22  /* считываем ответ из нашей очереди и записываем его в stdout */
23  while ((n = Mesg_recv(readid, &mesg)) > 0)
24   Write(STDOUT_FILENO, mesg.mesg_data, n);
25 }

В листинге 6.17 приведен текст функции client. Эта функция практически идентична функции из листинга 6.15, но вместо передачи идентификатора процесса клиента на сервер направляется идентификатор очереди клиента. Тип сообщения в структуре mesg остается равным 1, поскольку это значение устанавливается для сообщений, передаваемых в обе стороны.

В листинге 6.19 приведена функция server. Главное отличие от листинга 6.13 в том, что эта функция представляет собой бесконечный цикл, в котором для каждого нового клиента вызывается fork.

Установка обработчика сигнала для SIGCHLD

10 Поскольку для каждого клиента порождается отдельный процесс, нужно позаботиться о процессах-зомби. В разделах 5.9 и 5.10 [24] об этом говорится подробно. Здесь мы просто установим обработчик для сигнала SIGCHLD, и наша функция sig_chld (листинг 6.18) будет вызываться при завершении работы дочернего процесса.

12-18 Породивший процесс сервера блокируется в вызове mesg_recv, ожидая появления сообщения от очередного клиента.

25-45 Вызовом fork порождается новый процесс, который производит попытку открыть запрошенный файл и отправляет клиенту либо сообщение об ошибке, либо содержимое файла. Мы преднамеренно поместили вызов fopen в дочерний процесс, а не в родительский, поскольку если файл находится в удаленной файловой системе, его открытие может занять довольно много времени в случае наличия проблем с сетью.

Функция-обработчик для SIGCHLD приведена в листинге 6.18. Она скопирована с листинга 5.11 [24].

Листинг 6.18. Обработчик сигнала SIGCHLD, вызывающий waitpid

//svmsgmpxnq/sigchldwaitpid.с
1 #include "unpipc.h"
2 void
3 sig_chld(int signo)
4 {
5  pid_t pid;
6  int stat;
7  while ((pid = waitpid(-1, &stat, WNOHANG)) > 0);
8  return;
9 }

Каждый раз при вызове обработчика происходит циклический вызов waitpid для получения статуса завершения работы всех дочерних процессов, которые могли завершить работу. Затем происходит возврат из обработчика сигнала. При этом может возникнуть проблема, поскольку родительский процесс проводит большую часть времени в заблокированном состоянии (при вызове mesg_recv, листинг 6.9). При возвращении из обработчика этот вызов msgrcv прерывается. Функция возвращает ошибку с кодом EINTR, как рассказывается в разделе 5.9 [24]. 

Нам нужно обработать такой возврат из вызванной функции, поэтому мы пишем новую функцию-обертку Mesg_recv, приведенную в листинге 6.20. Эта программа допускает возвращение ошибки с кодом EINTR функцией mesg_recv (которая просто вызывает msgrcv), и, если это происходит, мы просто еще раз вызываем mesg_recv.

Листинг 6.19. Функция server

//svmsgmpxnq/server.c
1  #include "mesg.h"
2  void
3  server(int readid, int writeid)
4  {
5   FILE *fp;
6   char *ptr;
7   ssize_t n;
8   struct mymesg mesg;
9   void sig_chld(int);
10  Signal(SIGCHLD, sig_chld);
11  for (;;) {
12   /* считывание имени файла из очереди */
13   mesg.mesg_type = 1;
14   if ((n = Mesg_recv(readid, &mesg)) == 0) {
15    err_msg("pathname missing");
16    continue;
17   }
18   mesg.mesg_data[n] = 40'; /* имя файла */
19   if ((ptr = strchr(mesg.mesg_data, ' ')) = NULL) {
20    err_msg("bogus request: %s", mesg.mesg_data);
21    continue;
22   }
23   *ptr++ = 0; /* ptr = имя файла */
24   writeid = atoi(mesg.mesg_data);
25   if (Fork() == 0) { /* дочерний процесс */
26    if ((fp = fopen(ptr, "r")) == NULL) {
27     /* ошибка: нужно сообщить клиенту */
28     snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) – n,
29      ": can't open, %sn", strerror(errno));
30     mesg.mesg_len = strlen(ptr);
31     memmove(mesg.mesg_data, ptr, mesg.mesg_len);
32     Mesg_send(writeid, &mesg);
33    } else {
34     /* файл открыт, копируем клиенту */
35     while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {
36      mesg.mesg_len = strlen(mesg.mesg_data);
37      Mesg_send(writeid, &mesg);
38     }
39     Fclose(fp);
40    }
41    /* отправка сообщения нулевой длины, указывающего конец файла */
42    mesg.mesg_len = 0;
43    Mesg_send(writeid, &mesg);
44    exit(0); /* завершение дочернего процесса */
45   }
46   /* родительский процесс просто зациклен */
47  }
48 }

Листинг 6.20. Функция-обертка Mesg_recv, обрабатывающая прерванный системный вызов

//svmsgmpxnq/mesg_recv.с
10 ssize_t
11 Mesg_recv(int id, struct mymesg *mptr)
12 {
13  ssize_t n;
14  do {
15   n = mesg_recv(id, mptr);
16  } while (n == –1 && errno == EINTR);
17  if (n == –1)
18   err_sys("mesg_recv error");
19  return(n);
20 }

Оглавление книги


Генерация: 1.335. Запросов К БД/Cache: 3 / 1
поделиться
Вверх Вниз