Выпуск 4. Июнь 2013

Введение в разработку web-приложений на PSGI/Plack. Часть 3. Starman. | Содержание | Что нового в Perl 5.18.0

AnyEvent и fork

Довольно часто возникает задача выполнения некоторых действий в отдельном процессе, например, для исполнения блокирующихся операций или запуска внешних программ. fork отлично выполняет свою задачу, но если ваше приложение построено на базе AnyEvent, необходимо знать о некоторых нюансах, прежде чем начинать создавать новые процессы.

Fork

Классическим способом создания новых процессов в UNIX-подобных системах является выполнение системного вызова fork, который создаёт копию выполняемого процесса и начинает выполнять оба процесса, начиная с последующей инструкции. Интерпретатор Perl просто выполняет системный вызов, если он доступен, а в случае его отсутствия — выполняет эмуляцию, которая в общем случае абсолютно прозрачна для программиста (с небольшими оговорками, смотрите perlfork). В современных операционных системах fork оптимизирован и не выполняет копирования всего адресного пространства процесса, выполняя копирование лишь при изменении данных в процессе-потомке (механизм COW — копирование-при-записи). Так или иначе, оба процесса имеют в своём распоряжении все данные, располагавшиеся в родительском процессе, что в том числе касается и открытых файловых дескрипторов и директорий. Если выполняемый родительский процесс являлся многопоточной программой и в ней работали несколько нитей, то копируется только нить, которая запустила системный вызов, остальные нити в дочернем процессе бесследно исчезают.

Для запуска нового приложения применяется системный вызов из семейства exec, который замещает образ текущего процесса новым. По этой причине для параллельного запуска новой программы совместно с работающей используют последовательность вызовов fork и затем exec в процессе-потомке. При выполнении exec все используемые данные старого процесса освобождаются, но новый процесс наследует те же самые pid и ppid старого процесса, и, кроме того, в новый процесс копируются все открытые файловые дескрипторы (если они не были открыты с флагом O_CLOEXEC).

Оба варианта запуска новых процессов могут быть использованы в Perl для выполнения параллельных задач.

Проблемы использования fork

Описанное поведение fork может оказывать нежелательные побочные эффекты на работу приложения. Рассмотрим проблемы, которые могут возникнуть в общем случае, и проблемы, которые специфичны для AnyEvent-приложений.

  1. Медленное создание копии крупного процесса

Если объём памяти, занимаемой приложением, исчисляется несколькими гигабайтами, то создание копии процесса может занять ощутимое время. Несмотря на наличие механизма COW, создание процесса по-прежнему требует инициации структуры нового процесса в ядре, копирования таблиц страниц памяти родительского процесса, которые могут занимать существенный объём для огромного процесса. Это также может усугубляться тем, что единственной требуемой операцией в дочернем процессе будет создание нового процесса через вызов exec. Это приводит к напрасной трате ресурсов и большим накладным расходам для создания новых процессов.

  1. Копирование бесполезных для дочернего процесса данных

Данные, копируемые в дочерний процесс, могут быть абсолютно бесполезны для дочернего процесса. Благодаря механизму COW они не занимают дополнительного объёма памяти, но это так, пока родительский процесс тоже их не изменяет. В тот самый момент, когда родительский процесс освободит память под уже ненужные данные или обновит их, старая версия этих данных перетечёт в дочерний процесс. Получим парадоксальную ситуацию, когда освобождённая память не освобождается, а просто утекает в другой процесс.

  1. Fork может сделать невозможным работу дочернего процесса

Программа может использовать нити через модуль threads или загружать модули, которые используют POSIX threads неявно, например модуль IO::AIO. Специфика клонирования многопоточного процесса может привести к тому, что полученный дочерний процесс окажется в неконсистентном состоянии, препятствующем его дальнейшей корректной работе. Такая ситуация возможна, даже если вы не используете нити, а вызываете fork внутри обработчика сигнала, который может выполниться в произвольной точке программы, разорвав, например, критичную транзакцию.

  1. Обработка событий в дочернем процессе

При создании дочернего процесса в него копируются все действующие объекты, наблюдающие за событиями. Не все событийные библиотеки, работающие в бэкенде AnyEvent, способны корректно начать работать в дочернем процессе. Поэтому обработка событий в дочернем процессе может оказаться невозможной. Даже если библиотека обработки событий гарантирует работоспособность после fork, в дочернем процессе могут начать запускаться таймеры или обработчики сигналов, которые имели смысл только в родительском процессе.

  1. Fork+exec нового процесса интерпретатора Perl может быть не так прост

Не всегда легко обнаружить правильный путь к интерпретатору Perl, в переменной ^X может содержаться совсем не интерпретатор.

  1. Fork+exec нового процесса может оказаться неудачным

Часто процесс может выполняться очень длительное время, за которое в окружении могут произойти перемены, например, обновиться модули или даже сам интерпретатор. Попытка fork+exec нового процесса может завершиться ошибкой, например, из-за проблемы в обновлённом стороннем модуле.

AnyEvent::Fork

Недавно вышедший модуль AnyEvent::Fork Марка Леменна (Marc Lehmann) пытается решить все указанные выше проблемы использования fork в AnyEvent-приложениях, предоставляя безопасный и гибкий инструмент создания и управления процессами.

Особенность модуля в том, что он создаёт новые процессы путём запуска нового интерпретатора perl или посредством клонирования существующего “шаблонного” процесса. Такой подход позволяет решить половину указанных проблем, связанных с разделением общих данных в процессах (2, 3 и 4).

Для решения проблемы со скоростью работы fork+exec в нём используется модуль Proc::FastSpawn, который при наличии поддержки на платформе применяет системные вызовы vfork+exec. Вызов vfork аналогичен fork с той лишь разницей, что не происходит копирования структур данных родительского процесса в процесс-потомок. При этом родительский процесс останавливается до тех пор, пока дочерний процесс не завершится или не выполнит вызов exec. Это значительно ускоряет запуск нового процесса. На платформе win32 и других, где нет вызова vfork, по возможности используется spawn или происходит откат на обычный fork+exec.

Модуль также пытается обнаружить нужный интерпретатор perl, анализируя глобальную переменную ^X, и в случае проблем откатывается на использование $Config::Config{perlpath}. Кроме того, существует возможность переопределить путь к интерпретатору через глобальную переменную $AnyEvent::Fork::PERL.

Чтобы избежать проблем с обновлённым окружением для долгоработающих процессов, AnyEvent::Fork предлагает использование шаблонного процесса, который запускается в самом начале работы программы и позже при необходимости используется для создания новых процессов через fork. Кроме того, это решает и проблему, появившуюся с использованием exec, когда требуется каждый раз загружать модули, используемые программой, — использование шаблонного процесса позволяет загрузить необходимые модули один раз.

Использование AnyEvent::Fork

Рассмотрим типичный пример использования:

use AnyEvent::Fork;

AnyEvent::Fork
    ->new
    ->require ("MyModule")
    ->run ("MyModule::server", my $cv = AE::cv);

my $fh = $cv->recv;

Объект AnyEvent::Fork создаётся вызовом метода new(). Метод new() помимо создания объекта производит запуск нового процесса интерпретатора perl, который становится “шаблонным” процессом. Из этого шаблона в дальнейшем и формируются все новые процессы посредством вызова fork.

Вызов метода require() позволяет выполнить загрузку модуля в процессе потомка. Метод run() выполняет подпрограмму в процессе-потомке. Первым параметром указывается имя подпрограммы, вторым параметром передаётся колбэк-функция, которая выполняется после того, как будет запущен процесс потомок.

Первым параметром, который получают запускаемая в новом процессе подпрограмма и колбэк-функция в родительском процессе, — это дескриптор сокета, открытого между процессами. Если использование сокета не требуется, его можно закрыть в этих подпрограммах.

Передача параметров и файловых дескрипторов в дочерний процесс

Для того, чтобы передать дополнительные параметры в подпрограмму, вызываемую в дочернем процессе, используется метод send_arg():

use AnyEvent::Fork;

AnyEvent::Fork
    ->new
    ->eval('
        sub runner {
            my ($fh, @params) = @_;
            ...
        }
    ')
    ->send_arg(@params)
    ->run("runner", my $cv = AE::cv);

my $fh = $cv->recv;

Метод передаёт только строковые параметры, а для передачи сложных структур данных требуется использовать сериализацию.

Также существует возможность передавать файловые дескрипторы с помощью метода send_fh():

open my $log, '>', '/var/log/file' or die $!;

AnyEvent::Fork
    ->new
    ->eval('
        sub runner {
            my ($fh, $log, @params) = @_;
            ...
        }
    ')
    ->send_fh($log)
    ->send_arg(@params)
    ->run("runner", my $cv = AE::cv);

my $fh = $cv->recv;

Это открывает широкие возможности для коммуникации между процессами родителя и потомка.

Создание и управление пулом процессов

Метод fork() используется для клонирования нужного шаблонного процесса:

my $pool =
  AnyEvent::Fork
    ->new
    ->require ("MyModule");

my @pool;

for my $id (1..10) {
    push @pool,
      $pool
        ->fork
        ->send_arg($id)
        ->run("MyModule::foo", sub {

            my ($fh) = @_;
            print "Process id $id\n";
            ...
          });
}

Программа создаст пул из десяти процессов из заданного шаблонного процесса.

Работа модуля при отсутствии возможности запуска нового интерпретатора

В некоторых ситуациях бывает невозможно запустить новый интерпретатор perl. Например, он может быть встроенным в другое приложение, которое сейчас выполняет этот код, или выполнение fork может испортить работу какого-либо уже загруженного модуля.

Для этих случаев в состав дистрибутива AnyEvent::Fork включены два модуля: AnyEvent::Fork::Early и AnyEvent::Fork::Template.

AnyEvent::Fork::Early должен быть загружен в самом начале вашего приложения до загрузки каких-либо других библиотек. В этом случае происходит форк текущего процесса, который затем и используется для создания новых процессов:

#!/usr/bin/perl
use AnyEvent::Fork::Early;

# some other code

Действие, которое выполняет AnyEvent::Fork::Template, — практически то же самое, только перед созданием шаблонного процесса есть возможность загрузить некоторые дополнительные модули, которые станут доступны во всех вновь создаваемых процессах:

use Some::Harmless::Module;
use My::Worker::Module;

use AnyEvent::Fork::Template;

use Gtk2 -init;

$AnyEvent::Fork::Template->fork->run ("My::Worker::Module::run_worker", sub { ... });

В данном примере после создания шаблона загружается модуль Gtk2, в котором могут возникнуть проблемы при использовании fork. В последней строке происходит создание нового процесса из шаблонного процесса, в котором не загружен “опасный” модуль Gtk2.

AnyEvent::Fork::RPC

В состав модуля AnyEvent::Fork не встроено никаких средств для создания очереди заданий или протокола двустороннего обмена между процессом-потомком и родительским процессом в виде запросов/ответов. Поэтому был создан модуль AnyEvent::Fork::RPC, который предназначен именно для подобных целей. На текущий момент его API имеет статус экспериментального, поэтому вполне вероятны изменения.

Пример использования:

use AnyEvent;
use AnyEvent::Fork::RPC;

my $done = AE::cv;

my $rpc = AnyEvent::Fork
    ->new
    ->eval(do { local $/; <DATA> })
    ->AnyEvent::Fork::RPC::run ("worker",
        on_error   => sub { warn "FATAL: $_[0]"; exit 1 },
        on_event   => sub { warn "$_[0] requests handled\n" },
        on_destroy => $done,
      );

for my $id (1..100) {
    $rpc->( "foo" => "bar", sub {
        $_[0] or warn "$id: $_[1]\n";
    });
}

undef $rpc;

$done->recv;

__DATA__

my $count;

sub worker {
    my ($command, $data) = @_;

    AnyEvent::Fork::RPC::event ($count) unless ++$count % 10;

    my $status = ...

    $status or (0, "$!")
}

После обычного создания объекта нового процесса выполняется метод AnyEvent::Fork::RPC::run, возвращающий объект, который можно будет использовать для многократного выполнения вызовов подпрограммы в дочернем процессе. Существует возможность регистрации функций-колбэков на различные события, которые будут происходить при запуске подпрограммы: ошибки, уничтожение объекта и т.п.

В данном примере происходит последовательное заполнение очереди команд на запуск подпрограммы и регистрации функций-колбеков для получения результатов выполнения команды в родительском процессе. После формирования очереди объект $rpc освобождается присвоением переменной значения undef. Это приведёт к тому, что после выполнения всей очереди заданий процесс потомок будет завершён. Это в свою очередь приведёт к вызову колбэка уничтожения объекта (on_destroy).

AnyEvent::Fork::Pool

Если в вашем приложении требуется более тонкое управление пулом процессов, то для этих целей можно использовать модуль AnyEvent::Fork::Pool. Так же как и предыдущий модуль, API данного модуля сейчас находится в альфа-стадии и может активно изменяться.

Пример использования:

use AnyEvent;
use AnyEvent::Fork::Pool;

my $pool = AnyEvent::Fork
    ->new
    ->require ("MyWorker")
    ->AnyEvent::Fork::Pool::run (
        "MyWorker::run", # the worker function

        # pool management
        max        => 4,   # absolute maximum # of processes
        idle       => 0,   # minimum # of idle processes
        load       => 2,   # queue at most this number of jobs per process
        start      => 0.1, # wait this many seconds before starting a new process
        stop       => 10,  # wait this many seconds before stopping an idle process
        on_destroy => (my $finish = AE::cv), # called when object is destroyed

        # parameters passed to AnyEvent::Fork::RPC
        async      => 0,
        on_error   => sub { die "FATAL: $_[0]\n" },
        on_event   => sub { my @ev = @_ },
        init       => "MyWorker::init",
        serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
      );

for (1..10) {
    $pool->(doit => $_, sub {
        print "MyWorker::run returned @_\n";
    });
}

undef $pool;

$finish->recv;

В данном примере создаётся пул процессов, которые выполняют функцию MyWorker::run. В соответствии с заданными настройками AnyEvent::Fork::Pool автоматически регулирует количество необходимых для выполнения заданий процессов, время их работы и необходимость уничтожения.

Параметры для управления пулом:

  • idle — указывает минимальное число простаивающих процессов; если количество свободных процессов снижается меньше этого уровня, происходит запуск новых процессов;
  • max — максимальное число процессов в пуле;
  • load — максимальное количество задач, отправляемых на один рабочий процесс;
  • start — задержка запуска нового процесса, в случае, если в соответствии с настройками требуется запуск нового процесса;
  • stop — время, через которое рабочий процесс будет остановлен, если он простаивает.

Заключение

Модуль AnyEvent::Fork и вспомогательные модули на его основе позволяют безопасно и гибко управлять созданием новых процессов для решения задач по параллельной обработке данных. Успешно решены проблемы с побочными эффектами fork для асинхронных приложений и реализовано универсальное и кроссплатформенное решение.

Возможно, ещё преждевременно рекомендовать модули для использования в промышленной эксплуатации, поскольку API модулей ещё не стабилизированы, но определённо есть смысл начать изучать и экспериментировать. Удачных опытов!

Владимир Леттиев


Введение в разработку web-приложений на PSGI/Plack. Часть 3. Starman. | Содержание | Что нового в Perl 5.18.0
Нас уже 1393. Больше подписчиков — лучше выпуски!

Комментарии к статье