Книга: Фундаментальные алгоритмы и структуры данных в Delphi

Алгоритм считывания-записи

Алгоритм считывания-записи

В многопоточных приложениях 32-разрядной операционной системы Windows приходится решать целый ряд проблем, которые в однопоточных программах просто не возникают. Действительно, первая проблема, с которой приходится сталкиваться - определение способа запуска и останова потоков. Но в основном она решается на уровне операционной системы: достаточно внимательно прочесть программную документацию операционной системы и правильно применить почерпнутые сведения.

Этот раздел адресован только тем программистам, которые работают в среде 32-разрядной Windows. Delphi I вообще не поддерживает многопоточную обработку, в то время как Kylix и Linux не предоставляют необходимых примитивных объектов синхронизации, с помощью которых можно было бы решить проблему считывания-записи.

Более серьезная проблема - совместное использование данных несколькими потоками, независимо от того, являются ли данные отдельным целочисленным значением или более сложной структурой данных. По существу, приходится решать вопросы параллельного доступа. Если конкретный поток обновляет часть данных, считывание этих данных в это же время другим потоком лишено смысла. В этом случае считывающий поток (обычно называемый программой считывания {reader} ) может получить частично обновленное значение, поскольку обновляющий поток (программа записи {writer} ) еще не закончил обновление, но операционная система отключилась от него.

При наличии двух или более программ записи достаточно скоро могут возникнуть значительные проблемы, если эти программы обновляют одни и те же данные. Однако никакие проблемы параллельного доступа не должны возникать в случае считывания одних и тех же данных несколькими программами считывания.

На момент написания этой книги большинство пользователей использовало однопроцессорные персональные компьютеры (ПК). В таких компьютерах операционная система осуществляет очень быстрое циклическое переключение между потоками, останавливая один поток и запуская другой. Конкретный метод выполнения этого переключения не имеет значения (нецелесообразно создавать программу для конкретной схемы, поскольку она может зависеть от операционной системы), но следует сразу уяснить, что невозможно точно определить все характеристики переключения (такие, как момент переключения, являются ли определенные операции элементарными и т.п.). Один из лучших, когда-либо слышанных мною советов состоял в том, что многопоточные приложения всегда должны быть протестированы на многопроцессорном компьютере. На таком компьютере операционная система будет действительно одновременно выполнять два или более потока. Все неприятные аспекты проблем параллельной обработки неизбежно проявятся при запуске программы на ПК с двумя или более процессорами. Даже если тестовая программа успешно выполняется на однопроцессорном ПК (возможно, потому, что переключение потоков всегда выполнятся в удачные моменты времени), на многопроцессорном компьютере код может разрушаться из-за каких-нибудь причудливых ошибок.

Поэтому требуется механизм блокировки. Программа записи должна иметь возможность "блокировать" определенные данные, чтобы во время их обновления никакая другая программа записи или считывания не могла получить к ним доступ. Аналогично, во время считывания данных программой считывания никакая программа записи не должна быть в состоянии их обновить, но другие программы считывания должны по-прежнему иметь к ним доступ.

Похоже, что в среде 32-разрядной Windows существует множество объектов, обеспечивающих синхронизацию: критический раздел, флаг синхронизации, семафор, событие, но ни один из них не подходит для решения поставленной задачи полностью. Критический раздел и флаг синхронизации подходят больше других, но они не позволят нескольким программам считывания одновременно получать доступ к одним и тем же данным.

Если для работы с многопоточными данными совместного использования применяется класс TList, Delphi 3 и последующие версии языка предоставляет класс TThreadedList. В основном, применяемая в этом классе стратегия синхронизации реализуется следующим образом: каждое обращение к TList защищается критическим разделом или флагом синхронизации. Delphi-версия класса TThreadedList предоставляет метод LockList, который выполняет вход в критический раздел и возвращает внутренний класс TList. Затем поток может свободно использовать этот объект TList до момента своего завершения, после чего подпрограмма потока должна вызвать метод UnLockList для выхода из критического раздела.

Хотя это решение работает, и притом весьма успешно, ему присущ очевидный недостаток: в любой отдельный момент времени только один поток может иметь доступ к объекту TList. Класс не делает никакого различия между доступом для считывания (в процессе которого список не изменяется) и доступом для записи (при котором он изменяется). Как уже отмечалось, в любой отдельный момент времени может существовать много программ, осуществляющих одновременное считывание объекта TList. Но может существовать только одна программа, осуществляющая запись в него. Это решение, хотя его и просто реализовать, характеризуется избыточностью. Оно не позволяет с максимальной эффективностью использовать TList для многопоточной обработки.

Определим действия, которые должен был бы выполнять объект синхронизации. Нам требуется единый объект, который мог бы использоваться для синхронизации доступа к данным как программой считывания, так и программой записи. Он должен допускать одновременное существование нескольких активных потоков считывания. В любой данный момент времени он должен допускать существование только одного активного потока записи, и, если такой поток существует, не должно существовать ни одного активного потока считывания (они могут обращаться к каким-либо данным в структуре данных, которые в данный момент обновляются).

В идеале необходимо определить также следующее поведение. Если потоку требуется выполнить запись в структуру данных, он должен иметь возможность сообщить об этом объекту. В этом случае объект заблокирует запуск любых новых потоков считывания до момента завершения всех текущих потоков считывания. Затем он позволит продолжить выполнение потока записи. Если никакого ожидающего своей очереди потока записи не существует, поток считывания должен получить беспрепятственный доступ к структуре данных. Необходимо каким-то способом обеспечить возможность постановки нескольких потоков записи в очередь. По существу, это требование означает, что объект синхронизации должен принудительно организовывать цикл использования объекта TList многими потоками считывания, затем единственным потоком записи, затем многими потоками считывания и т.д.

Из приведенного определения понятно, что должен существовать какой-то примитивный объект синхронизации, которому поток записи мог бы сигнализировать о завершении обновления, чтобы можно было запустить потоки записи. (под примитивным понимается какой-либо объект, предоставляемый самой операционной системой). И наоборот, должен существовать объект синхронизации, которому последний поток в наборе потоков считывания мог бы сигнализировать о завершении своей работы, чтобы можно было предоставить свободу действий потоку записи.

Разрабатываемый нами комплексный объект нуждается, по меньшей мере, в четырех методах. Поток считывания вызывает первый метод, чтобы начать считывание (обратите внимание, что внутри этой подпрограммы может происходить блокировка, обеспечивающая ожидание окончания работы потока записи). Иногда этот метод называют подпрограммой регистрации считывания (reader registration routine). Как только поток считывания завершает свою работу, он должен вызвать другую подпрограмму для прекращения использования объекта синхронизации и, возможно, предоставления свободы действий потоку записи (подпрограмма отмены регистрации считывания). Аналогично такие же две подпрограммы должны существовать и для потока записи. Назовем эти четыре подпрограммы, соответственно, StartReading, StopReadlng, StartWriting и StopWriting.

Описать возможную работу этого объекта достаточно легко. Сложнее его действительно реализовать. Подпрограмма StartReading выполняет несколько задач. Вначале она должна проверить существование ожидающего своей очереди потока записи. При наличии хотя бы одного такого потока, подпрограмма должна перейти в режим ожидания поступления какого-либо объекта синхронизации. Наиболее подходящие кандидаты на роль такого объекта - семафор или событие (эти объекты допускают одновременный запуск нескольких потоков при поступлении сигнала, в то время как флаг синхронизации или критический раздел не допускают этого). Если в данный момент времени поток записи действительно выполняется, подпрограмма StartReading должна выполнять блокировку таким же образом. В отсутствии выполняющегося или ожидающего потока записи подпрограмма StartReading регистрирует поток как считывающий, осуществляет выход, после чего поток получает возможность немедленно продолжить свою работу.

Метод StopReading должен выяснить, выполняется ли в данный момент последний поток считывания. Если это так, метод должен предоставить свободу действий потоку записи, передавая ему ожидаемый им сигнализирующий объект. Если ожидающий поток записи отсутствует, могут существовать ожидающие потоки считывания. Поэтому метод должен оставить объект в таком состоянии, чтобы поток считывания или записи мог быть немедленно запущен при вызове соответствующей запускающей подпрограммы.

Метод StartWriting также выполняет несколько задач. Если поток записи активен, он ожидает поступление объекта синхронизации, который будет использоваться для предоставления свободы действий следующему потоку записи. При наличии одного или более активных потоков считывания, метод действует так же. В противном случае он регистрируется как записывающий и выполняет выход, позволяя потоку записи продолжить работу.

Метод StopWriting отменяет регистрацию потока, выполняющегося в качестве записывающего, а затем проверяет существование одного или более готовых к запуску потоков считывания. Если такие потоки существуют, метод передает им ожидаемый ими объект синхронизации и завершает свою работу. Если какие-то потоки считывания отсутствуют, метод проверяет наличие ожидающего потока записи. Если такие потоки существуют, метод предоставляет одному из них свободу действий, передавая ему ожидаемый всеми этими потоками объект, а затем прекращает свою работу. Если ни одна из перечисленных ситуаций не имеет места, метод оставляет составной объект в состоянии, позволяющем немедленный запуск потока чтения или записи.

Исходя из приведенного описания, можно сделать ряд выводов. Во-первых, нам требуется переменная для хранения числа ожидающих потоков считывания. Во-вторых, требуется переменная для хранения числа ожидающих потоков записи. В-третьих, нам нужна переменная для хранения числа выполняющихся в текущий момент времени потоков считывания. В-четвертых, нам нужен булев флаг, свидетельствующий о выполнении потока записи. И, наконец, нам требуются определенные примитивные объекты синхронизации, содержащие все перечисленные компоненты.

Поскольку имеется четыре тесно связанных между собой переменных, вызовы для выполнения их считывания и обновления следует поместить внутрь критического раздела или флага синхронизации. Мы будем использовать критический раздел, поскольку эти компоненты эффективнее. Итак, это будет первым объектом синхронизации. Первым шагом выполнения каждого из четырех описанных методов будет запрос критического раздела, последним - его освобождение. Однако вспомните, что методы, которые позволяют запустить поток считывания, могут блокироваться внутри подпрограммы. Если бы этот программный блок оказался между процедурами вызова и освобождения управляющего критического раздела, возникла бы тупиковая ситуация. Поэтому необходимо обеспечить, чтобы блокировка выполнялась снаружи, после того, как критический раздел освобожден.

Поскольку одновременно только один поток записи может быть активным, может показаться целесообразным поместить объект синхронизации, который ставит потоки записи в очередь, также в критический раздел, поскольку этот раздел может принадлежать только одному потоку. Однако на практике проще воспользоваться семафором. Причина этого проста: в действительности не требуется вызов объекта синхронизации, поскольку не существует подходящего места для его освобождения. Действительно, вы убедитесь, что придется дожидаться семафора в одном потоке и освобождать его в другом. Такой подход невозможен при использовании критического раздела: поток, обращающийся к критическому разделу, владеет им.

А каким должен быть объект синхронизации для потоков считывания? Больше всего подошли бы семафор или событие сброса вручную. Как и в предыдущем случае, лучше использовать семафор, поскольку применение объекта события привело бы возникновению проблем (при получении сигнала будут освобождаться только ожидающие его прихода потоки;

в данной реализации поток может находиться в состоянии, в котором он еще не вызвал подпрограмму WaitFor).

Код интерфейса создаваемого нами класса синхронизации TtdReadWriteSync приведен в листинге 12.1. Он содержит ряд приватных полей, которые будут использоваться в четырех основных методах.

Листинг 12.1. Интерфейс класса TtdReadWriteSync

type

TtdReadWriteSync = class private

FActiveReaders : integer;

FActiveWriter : boolean;

FBlockedReaders : THandle;

{семафор}

FBlockedWriters : THandle;

{семафор}

FController : TRTLCriticalSection;

FWaitingReaders : integer;

FWaitingWriters : integer;

protected

public

constructor Create;

destructor Destroy; override;

procedure StartReading;

procedure StartWriting;

procedure StopReading;

procedure StopWriting;

end;

Приватное поле FBlockedReaders семафора предназначено для ожидающих потоков считывания, а поле FBlockedWriters - для ожидающих потоков записи. Поле FController - основной компонент, обеспечивающий последовательный доступ к объектам (к сожалению, применение подобного механизма последовательной обработки необходимо для обеспечения того, чтобы каждый поток получал целостное и неискаженное изображение всего класса).

Код метода StartReading приведен в листинге 12.2.

Листинг 12.2. Метод StartReading

procedure TtdReadWriteSync.StartReading;

var

HaveToWait : boolean;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{если существует выполняющийся поток записи или хотя бы один ожидающий своей очереди поток записи, метод добавляет себя в качестве ожидающего метода записи, обеспечивая переход в состояние ожидания}

if FActiveWriter or (FWaitingWriters <> 0) then begin

inc(FWaitingReaders);

HaveToWait :=true;

end

{в противном случае он добавляет себя в качестве еще одного выполняющегося потока считывания и обеспечивает отсутствие состояния ожидания}

else begin

inc(FActiveReaders);

HaveToWait := false;

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

{при необходимости ожидания нужно выполнить следующее}

if HaveToWait then

WaitForSingleObject(FBlockedReaders, INFINITE);

end;

Прежде всего, мы перехватываем управление критическим разделом. После этого можно осуществлять управление значениями внутренних полей. При наличии выполняющегося в текущий момент или хотя бы одного ожидающего потока записи метод увеличивает число ожидающих потоков считывания, освобождает управление критическим разделом, а затем переходит в состояние ожидания семафора "заблокированные потоки считывания". При отсутствии ожидающих или выполняющихся потоков записи метод увеличивает число выполняющихся потоков считывания и освобождает критический раздел. По выходу из этого метода программа либо освобождается от необходимости ожидать прихода семафора, либо сразу пропускает состояние ожидания. Обратите внимание, что во втором случае метод увеличил число выполняющихся потоков считывания, а в первом нет. Это может показаться программной ошибкой, но вскоре мы покажем, как можно решить возникающую при этом проблему.

Рассмотрим метод StopReading, код которого приведен в листинге 12.3.

Листинг 12.3. Метод StopReading

procedure TtdReadWriteSync.StopReading;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{считывание завершено}

dec (FActiveReaders);

{если выполняется последний поток считывания и при наличии по меньшей мере одного ожидающего потока записи ему необходимо предоставить свободу действий}

if (FActiveReaders = 0) and (FWaitingWriters <> 0) then begin

dec(FWaitingWriters);

FActiveWriter :=true;

ReleaseSemaphore(FBlockedWriters, 1, nil);

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

end;

Как обычно, прежде всего, мы перехватываем управление критическим разделом. Этот поток стремится прекратить свои действия по считыванию, поэтому он уменьшает значение счетчика выполняющихся потоков считывания. Если результирующее значение не равно нулю, это свидетельствует о наличии других активных потоков считывания. Поэтому метод просто освобождает управление критическим разделом и осуществляет выход. Однако если этот поток был последним активным потоком считывания, теперь значение счетчика равно нулю и нужно предоставить свободу действий ожидающему потоку записи (если таковой существует). Для этого метод освобождает семафор заблокированных потоков записи. Иначе говоря, метод увеличивает значение счетчика на единицу, в результате чего система предоставит свободу действий одному, и только одному, заблокированному потоку записи, после чего немедленно снова уменьшит значение счетчика до нуля, обеспечивая блокировку всех остальных потоков записи. Однако непосредственно перед тем метод StopReading уменьшает значение счетчика ожидающих потоков записи и увеличивает значение счетчика выполняющихся потоков записи. Общий результат выполнения этого кода состоит в том, что поток записи освобождается, а значения двух счетчиков потоков записи обновляются.

Перейдем к рассмотрению метода StartWriting, код которого приведен в листинге 12.4.

Вначале снова необходимо перехватить управление критическим разделом. При наличии любых выполняющихся потоков считывания или записи метод увеличивает значение счетчика ожидающих потоков записи, освобождает управление критическим разделом, а затем ожидает освобождения семафора заблокированных потоков записи.

Листинг 12.4. Метод StartWriting

procedure TtdReadWriteSync.StartWriting;

var

HaveToWait : boolean;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{при наличии еще одного запущенного потока записи или активных потоков считывания, метод добавляет себя в качестве ожидающего потока считывания и обеспечивает переход в состояние ожидания}

if FActiveWriter or (FActiveReaders <> 0) then begin

inc(FWaitingWriters);

HaveToWait := true;

end

{в противном случае метод должен добавить себя в качестве еще одного выполняющегося потока записи и обеспечить отсутствие состояния ожидания}

else begin

FActiveWriter :=true;

HaveToWait := false;

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

{при необходимости ожидания нужно выполнить следующее}

if HaveToWait then

WaitForSingleObject(FBlockedWriters, INFINITE);

end;

При отсутствии каких-либо других выполняющихся потоков можно сразу начать запись. Метод увеличивает значение счетчика выполняющихся потоков записи, освобождает управление критическим разделом и осуществляет выход из подпрограммы. В любом случае, сразу по выходу из подпрограммы значение счетчика активных потоков записи оказывается установленным равным единице (либо самим этим методом, либо методом StopReading - если помните, это происходит Непосредственно перед передачей семафора заблокированных потоков записи).

И, наконец, можно приступить к рассмотрению метода StopWriting, код которого приведен в листинге 12.5.

Как и ранее, первоначальная задача состоит в перехвате управления критическим разделом. Затем, поскольку запись завершена, метод уменьшает значение счетчика активных потоков записи. Теперь выполняется проверка количества ожидающих потоков считывания. Мы входим в цикл, который уменьшает значение счетчика активных потоков считывания и освобождает семафор. Семафор, в свою очередь, освобождает от ожидания один поток считывания. Со временем, по завершении цикла, все потоки считывания будут освобождены и смогут считаться активными (обратите внимание, что они все будут использовать соответствующее обращение к методу StartReading). Если, с другой стороны, не существует никаких ожидающих потоков считывания, метод выполняет проверку на наличие каких-либо ожидающих потоков записи. Если такие потоки существуют, метод освобождает только один поток записи таким же образом, как уже было описано при рассмотрении метода StopReading. И, наконец, независимо ни от чего, метод освобождает управление критическим разделом.

Листинг 12.5. Метод StopWriting

procedure TtdReadWriteSync.StopWriting;

var

i : integer;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{запись завершена}

FActiveWriter := false;

{если имеется хотя бы один ожидающий поток записи, освободить их всех}

if (FWaitingReaders <> 0) then begin

FActiveReaders := FWaitingReaders;

FWaitingReaders := 0;

ReleaseSemaphore(FBlockedReaders, FActiveReadersr nil);

end

{в противном случае, при наличии по меньшей мере одного ожидающего потока записи, ему необходимо предоставить свободу действий}

else

if (FWaitingWriters <> 0) then begin

dec(FWaitingWriters);

FActiveWriter :=true;

ReleaseSemaphore(FBlockedWriters, 1, nil);

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

end;

Нам осталось рассмотреть только два метода: конструктор Create и деструктор Destroy. Код реализации этих методов показан в листинге 12.6.

Листинг 12.6. Создание и уничтожение объекта синхронизации

constructor TtdReadWriteSync.Create;

var

NameZ : array [0..MAXJPATH] of AnsiChar;

begin

inherited Create;

{создать примитивные объекты синхронизации}

GetRandomObjName (NameZ, ' tdRW.BlockedReaders' );

FBlockedReaders := CreateSemaphore(nil, 0, MaxReaders, NameZ);

GetRandomObjName(NameZ, 'tdRW.BlockedWriters');

FBlockedWriters := CreateSemaphore(nil, 0, 1, NameZ);

InitializeCriticalSection(FController);

end;

destructor TtdReadWriteSyhc.Destroy;

begin

CloseHandle(FBlockedReaders);

CloseHandle(FBlockedWriters);

DeleteCriticalSection(FController);

inherited Destroy;

end;

Как видите, конструктор Create будет создавать три примитивных объекта синхронизации, а деструктор Destroy будет, соответственно, их уничтожать.

Полный исходный код класса TtdReadWriteSync можно найти на Web-сайте издательства, в разделе материалов. После выгрузки материалов отыщите среди них файл TDRWSync.pas.

Оглавление книги


Генерация: 1.788. Запросов К БД/Cache: 3 / 1
поделиться
Вверх Вниз