Книга: Введение в QNX/Neutrino 2. Руководство по программированию приложений реального времени в QNX Realtime Platform
Флаги канала
Разделы на этой странице:
Флаги канала
Когда мы вначале книги изучали сервер (в параграфе «Сервер»), мы упомянули, что функция ChannelCreate() принимает параметр flags (флаги); правда, тогда мы вместо этого параметра передавали нуль.
Теперь пришло время более подробно изучить назначение параметра flags. Рассмотрим только некоторые из возможных его значений:
_NTO_CHF_FIXED_PRIORITY
Принимающий поток не изменит приоритет в зависимости от приоритета отправителя. (Мы поговорим о проблемах приоритетов более подробно в разделе «Наследование приоритетов»). Обычно (то есть если этот флаг не установлен) приоритет принимающего сообщение потока изменяется на приоритет потока- отправителя.
_NTO_CHF_UNBLOCK
Ядро посылает импульс всякий раз, когда поток клиента пытается разблокироваться. Чтобы клиент мог разблокироваться, сервер должен ему ответить. Мы обсудим это ниже, потому что это имеет некоторые интересные последствия — как для клиента, так и для сервера.
_NTO_CHF_THREAD_DEATH
Ядро посылает импульс всякий раз, когда блокированный на этом канале поток «умирает». Это полезно для серверов, которые желают поддержать фиксированную популяцию потоков в пуле, т. е. количество потоков, доступных для обслуживания запросов.
_NTO_CHF_DISCONNECT
Ядро посылает импульс всякий раз после того, как уничтожается последнее из имевшихся соединений сервера с некоторым клиентом.
_NTO_CHF_SENDER_LEN
Ядро доставляет серверу, наряду с остальной информацией, размер клиентского сообщения.
Флаг _NTO_CHF_UNBLOCK
Присмотримся к флагу _NTO_CHF_UNBLOCK. Этот флаг имеет несколько особенностей при его применении, интересных и для клиента, и для сервера.
Обычно (то есть когда сервер не устанавливает флаг _NTO_CHF_UNBLOCK), когда клиент хочет разблокироваться от MsgSend() (или MsgSendv(), MsgSendvs() или другой функции этого семейства), клиент просто берет и разблокируется. Клиент может пожелать разблокироваться по приему сигнала или по тайм-ауту ядра (см. функцию TimerTimeout() в Справочном руководстве по Си-библиотеке, а также главу «Часы, таймеры и периодические уведомления»). Неприятный аспект этого заключается в том, что сервер понятия не имеет, что клиент уже разблокирован и больше не ожидает ответа.
Давайте предположим, что у вас многопоточный сервер, и все потоки заблокированы с помощью функции MsgReceive(). Клиент посылает сообщение серверу, и один из потоков сервера принимает его. Клиент блокируется, поток же сервера активно обрабатывает запрос. Но прежде, чем поток сервера сможет ответить клиенту, клиент разблокируется из своего MsgSend() (предположим, что по причине приема сигнала).
Не забывайте: поток сервера по-прежнему обрабатывает поступивший от клиента запрос. Но так как клиент теперь разблокирован (например, его вызов MsgSend() возвратил EINTR), он теперь может послать серверу другой запрос. Вследствие особенности архитектуры серверов в QNX/Neutrino, очередное сообщение от этого клиента принял бы другой поток сервера, но идентификатор отправителя-то остается тем же самым! Сервер не сумеет различить эти два запроса, и когда первый поток сервера завершает обработку первого запроса и отвечает клиенту, фактически он отвечает на второе сообщение, а не на первое. Итак, первый поток сервера отвечает на второе сообщение клиента.
Это плохо уже само по себе; но давайте заглянем еще на шаг вперед. Теперь второй поток завершает свою работу по обработке запроса и пробует ответить клиенту. Но поскольку первый поток сервера уже ответил этому клиенту, а значит, этот клиент уже разблокирован, то попытка второго потока сервера ответить клиенту возвратится с ошибкой.
Эта проблема встречается только в многопоточных серверах, потому что в однопоточном сервере его единственный поток был бы по-прежнему занят обработкой первого запроса клиента. Это означает, что даже если бы клиент разблокировался и снова послал сообщение серверу, он перешел бы в SEND- блокированное состояние (а не в REPLY-блокированное состояние), позволив тем самым серверу закончить обработку первого запроса, ответить клиенту (что привело бы к ошибке, потому что клиент более не находится в REPLY-блокированном состоянии) и лишь затем принять второе сообщение. Здесь реальная проблема состоит в том, что сервер выполняет лишнюю операцию — обработку первого запроса. Операция же эта является абсолютно бесполезной, поскольку клиент больше не ожидает ее результатов.
Решение данной проблемы (в случае многопоточного сервера) заключается в том, что сервер должен при создании канала указать вызову ChannelCreate() флаг _NTO_CHF_UNBLOCK. Этот флаг скажет ядру: «Сообщи мне импульсом, когда клиент попробует разблокироваться, но не позволяй ему это делать! Я разблокирую клиента сам».
Ключевым моментом здесь является то, что этот флаг сервера изменяет поведение клиентов, не позволяя им разблокироваться до тех пор, пока им это не разрешит сервер.
В однопоточном сервере происходит следующее:
Действие | Состояние клиента | Состояние сервера |
---|---|---|
Клиент посылает запрос серверу | Блокирован | Обработка |
Клиент получает сигнал | Блокирован | Обработка |
Ядро передает импульс серверу | Блокирован | Обработка (первого сообщения) |
Сервер завершает обработку первого запроса и отвечает клиенту | Разблокирован, получены корректные данные | Обработка (импульса) |
Это не помогло клиенту разблокироваться, когда он должен был это сделать, но зато обеспечило, чтобы сервер не запутался. В подобном примере сервер мог вообще проигнорировать импульс, отправленный ему ядром. Это нормально — поскольку сделано предположение, что позволить клиенту быть заблокированным до тех пор, пока сервер не подготовит данные для него, безопасно.
Если вы хотите, чтобы сервер среагировал каким-то действием на посланный ядром импульс, то существует два способа реализации этого:
• Создать еще один поток в сервере, который «слушал» бы канал на предмет импульсов от ядра. Этот второй поток будет отвечать за отмену операции, выполняемой первым потоком. Отвечать клиенту может любой из этих двух потоков.
• Не выполнять задание клиента в потоке непосредственно, а поставить его в очередь заданий. Это обычно делается в приложениях, где сервер целенаправленно направляет задания клиента в очередь, и сервер является управляемым по событиям. Обычно одно из получаемых сервером сообщений указывает на то, что работа клиента завершена, и что пора отвечать. Когда в этом случае приходит импульс от ядра, сервер отменяет выполняемую для данного клиента работу и отвечает.
Какой из методов вам выбирать — это будет зависеть от типа работы, которую выполняет сервер. В первом случае сервер активно выполняет работу для клиента, так что у вас просто не будет иного выбора, чем применить второй поток, который слушал бы импульсы от ядра, сообщающие о разблокировании (далее — «импульсы разблокирования» — прим. ред.). Конечно, вы могли бы также организовать программный опрос в пределах потока для проверки, не пришел ли импульс, но программный опрос обычно удручает.
Во втором случае работу делает не сам сервер, а кто-то другой — возможно, оборудование, которому приказано «сходи и набери данных». При таком варианте поток сервера будет в любом случае блокирован по функции MsgReceive(), ожидая от оборудования признака завершения операции.
В обоих случаях сервер обязан ответить клиенту, иначе клиент останется заблокированным.
Проблема синхронизации
Но даже если вы используете флаг _NTO_CHF_UNBLOCK, как это описано выше, остается еще одна проблема синхронизации. Предположим, что несколько потоков вашего сервера заблокированы по функции MsgReceive() в ожиданий сообщения или импульса, и клиент посылает серверу сообщение. Один из потоков разблокируется и начнет обрабатывать запрос клиента. В процессе этого клиент вдруг пожелает разблокироваться, и ядро сгенерирует предупреждающий об этом импульс (импульс разблокирования). Другой поток сервера примет этот импульс. Фактически здесь мы имеем гонки потоков — первый поток на момент получения вторым импульса мог быть уже почти готов ответить клиенту. Если ответит второй поток (тот, который получил импульс), то есть шанс, что клиент разблокируется и передаст серверу еще одно сообщение. При этом первый поток сервера получает шанс завершить работу по первому запросу и ответить полученными данными на второй запрос:
Путаница в многопоточном сервере.
Также возможен такой вариант: поток, получивший импульс, готовится ответить клиенту, а в это время отвечает первый поток. Получается то же самое, что и раньше — первый поток разблокирует клиента, клиент передает второй запрос, второй поток (тот, который получил импульс) разблокирует клиента по второму запросу.
Здесь мы имеем ситуацию с двумя параллельными потоками обработки (один вызван сообщением клиента и один — импульсом). Обычно в таких ситуациях применяются мутексы.
К сожалению, это привело бы к проблеме — мутекс нужно было бы захватить немедленно после вызова MsgReceive() и освободить перед вызовом MsgReply(). Это, конечно, будет работать, но это войдет в противоречие с самим предназначением импульса разблокирования! (Сервер мог бы либо получить сообщение и игнорировать импульс разблокирования, пока не ответит клиенту, либо получить импульс разблокирования и отменить второй запрос клиента.)
Многообещающим решением (но в конечном счете все равно обреченным на провал) выглядит применение «мелкозернистого» мутекса, то есть мутекса, который захватывается и освобождается только в небольших областях потока управления (как, собственно, и предполагается использовать мутекс — вместо блокирования целого раздела, как это было предложено выше). Можно было бы организовать в сервере флаг «Мы уже ответили?», сбрасывая его при приеме сообщения и снова устанавливая после ответа на него. Непосредственно перед ответом на сообщение проверяется состояние этого флага. Если флаг указывает на то, что ответ уже произведен, то отвечать не надо. Мутекс при этом следовало бы захватывать и освобождать только в областях проверки и установки/сброса флага.
К сожалению, это не будет работать, потому что мы далеко не всегда имеем дело с двумя потоками управления — не всегда же клиент будет получать сигнал в процессе обработки запроса, порождая тем самым импульс разблокирования. Ниже приведен сценарий, где предложенная схема не сработает:
• Клиент передает сообщение серверу; после этого клиент блокируется, а сервер переключается в режим обработки.
• Поскольку сервер принял запрос от клиента, флаг ответа сбрасывается в 0, указывая этим, что мы все еще должны ответить.
• Сервер отвечает клиенту в нормальном режиме (потому что флаг был установлен в 0) и устанавливает флаг в 1, указывая этим, что если прибудет импульс разблокирования, то его следует игнорировать.
• (Проблемы начинаются здесь.) Клиент посылает серверу второе сообщение и почти немедленно после этого получает сигнал; ядро передает серверу импульс разблокирования.
• Поток сервера, который принял сообщение, намеревался захватить мутекс для проверки состояния флага, но еще не успел это сделать, поскольку был вытеснен.
• Другой поток сервера получает импульс разблокирования, и поскольку флаг по-прежнему находится в состоянии 1 с момента последней установки, игнорирует этот импульс.
• Первый поток сервера захватывает мутекс и сбрасывает флаг.
• К этому моменту событие разблокирования клиента потеряно.
Даже если вы усовершенствуете флаг, задав для него большее количество состояний (таких как «импульс получен», «дан ответ на импульс», «сообщение получено», «дан ответ на сообщение»), у вас по-прежнему останется проблема гонок при синхронизации, потому что не существует атомарной операции, связывавшей бы флаг ответа и функции приема сообщения и ответа на него. (Именно это и определяет суть проблемы — небольшие окна во времени, одно после MsgReceive(), но до сброса флага, и второе — после того, как флаг установлен, но до вызова MsgReply().) Единственный способ обойти проблему состоит в том, чтобы переложить работу по отслеживанию состояния флага на ядро.
Применение флага _NTO_MI_UNBLOCK_REQ
К счастью, ядро отслеживает для нас состояние этого флага — под это отведен один бит в информационной структуре сообщения (это структура типа struct _msg_infо
, которая передается функции MsgReceive() в качестве последнего параметра, и которую можно также впоследствии получить по идентификатору отправителя, вызвав функцию MsgInfo()).
Этот флаг называется _NTO_MI_UNBLOCK_REQ и устанавливается в 1, если клиент желает разблокироваться (например, получив сигнал).
Это означает, что в многопоточном сервере у вас будет поток- обработчик, выполняющий работу для клиента, и еще один поток, который будет получать сообщения разблокирования (или другие; но сконцентрируемся пока только на сообщениях разблокирования). Когда от клиента приходит сообщение разблокирования, установите для себя флаг, чтобы дать вашей программе знать о желании потока разблокироваться.
Существуют две ситуации, которые необходимо рассмотреть:
• поток-обработчик заблокирован;
• поток-обработчик активен (выполняется).
Если поток-обработчик блокирован (например, в ожидании ресурса), то поток, получивший сообщение разблокирования, должен его разбудить. Когда поток-обработчик активизируется, он должен проверить состояние флага _NTO_MI_UNBLOCK_REQ и, если флаг установлен, дать ответ о ненормальном завершении. Если флаг сброшен, то поток может спокойно выполнять все, что ему необходимо для нормальной обработки запроса.
В противном случае, если поток-обработчик активен, он должен периодически проверять «флаг, выставляемый в его отношении» потоком, принимающим сообщение разблокирования, и если флаг установлен в 1, он должен ответить клиенту с кодом ошибки. Заметьте, что это всего-навсего оптимизация: в неоптимизированном случае поток-обработчик постоянно вызывал бы функцию MsgInfo() по идентификатору отправителя и проверял бит _NTO_MI_UNBLOCK_REQ самостоятельно.
- Сообщение канала
- 5.12.2 Открытие поименованного канала
- 9.5.8. Ограничение канала
- 10.1.4. Режим канала данных
- Листинг 5.8. (dup2.c) Перенаправление выходного потока канала с помощью функции dup2()
- Часть III. Как не сесть на мель в канале продаж: принципы организации цепочки торгового канала и управления ею
- Подключаемость канала
- Флаги форматирования строк .NET
- Регистры для управления каналами захвата
- Стимулирование сбыта в торговых каналах
- Флаги gfp_mask
- Флаги областей VMA