Выпуск 5. Июль 2013

Введение в разработку web-приложений на PSGI/Plack. Часть 4. Асинхронность | Содержание | Обзор CPAN за июнь 2013 г.

Многопроцессовый сервер на AnyEvent

Рассмотрена универсальная параллельная обработка данных путем запуска нескольких процессов под управлением AnyEvent.

Очень часто приходится обрабатывать большое количество каких-то задач или данных. И зачастую данные эти удобно обрабатывать в несколько потоков, дабы полностью утилизировать всю мощь современных многопроцессорных компьютеров. Самый простой путь для осуществления этой цели — многопоточное приложение. Но у него есть ограничения, например — оно тяжело параллелится на несколько процессорных ядер (хотя в целом это решаемый вопрос). В данной статье рассмотрим решение такой задачи с помощью событийно-ориентированного приложения.

Итак, как будет выглядеть приложение? Напишем сервер, который будет работать с пулом процессов. Он будет создавать процессы (не обязательно перловые, это могут быть и программы на C/Java/Python/Ruby), перехватывать их STDOUT/STDERR и заниматься прочими мелкими задачами, которыми и положено заниматься такому серверу.

Создаем каркас приложения:

#!/usr/bin/env perl
use v5.12;
use AnyEvent;
use AnyEvent::Handle;
use IPC::Open3;

my $cv = AE::cv;

my $alive_timer = AnyEvent->timer(
    after    => 60,
    interval => 60,
    cb       => sub {
        say "Server alive";
    }
);

$cv->recv;

Получилось приложение, которое раз в минуту пишет в консоль о том, что оно все еще работает. Пример довольно простой, перейдем к более сложным вещам.

Итак, первым делом понадобится пул, где хранится вся информация о запущенных процессах. Это вынесем в отдельный пакет, дабы не загромождать код самого событийного механизма и улучшить читаемость.

package My::Pool;

my %commands_pool = ();
my %process_pool  = ();

sub new {
    my $self = {};
    bless $self;
}

sub update_pool {
    my $self = shift;

    return {
        command_id => int(rand(10000)),
        command    => 'test.pl',
        options    => int(rand(100))
    };
}

sub store_process {
    my $self = shift;
    my $proc = shift;

    $process_pool{$proc->{'pid'}} = $proc;
}

sub process_done {
    my $self      = shift;
    my $pid       = shift;
    my $exit_code = shift;

    my $comm_id = $process_pool{$pid}->{'command_id'};

    $command_pool{$comm_id}->{'exit_code'} = $exit_code;
    delete($process_pool{$pid});
}

sub save_command {
    my $self    = shift;
    my $comm_id = shift;

    ...

    delete($command_pool{$comm_id});

    return 1;
}

1;

Получилась простейшая обвязка для обработки пула, теперь настало время запускать комманды. Модифицируем каркас следующим образом:

my $Pool = My::Pool->new();

my $cv = AE::cv;

my $alive_timer = AnyEvent->timer(
    after    => 60,
    interval => 60,
    cb       => sub {
        say "Server alive";
    }
);

Создаем новый таймер для переодического опроса новых данных:

my $process_time = AnyEvent->timer(
    after    => 1,
    interval => 5,
    cb       => sub {

Запрашиваем новую комманду для выполнения:

        my $command = $Poll->update_pool();

С помощью IPC::Open3 создаем фоновый процесс. 0 на третьем месте в списке аргументов говорит, что STDERR надо перенаправить в STDOUT. Все дальнейшие опции аналогичны system:

        my ($chld_out, $chld_in);
        my $pid = open3($chld_in, $chld_out, 0, $command->{'command'},
            'options=' . $command->{'options'});

Создаем объект AnyEvent::Handler для обработки информации, поступающей от процесса:

        my $hdl;
        $hdl = AnyEvent::Handle->new(
            fh      => \*$chld_out,
            pid     => $pid,
            oper_id => $oper_id,
            on_eof  => sub {
                my ($hdl) = @_;

Тут можно что-то сделать при закрытии канала. Например — уничтожить хэндлер $hdl->destroy.

            },

В данном случае каждая строка от дочернего процесса будет выводиться на экран:

            on_read => sub {
                my ($hdl) = @_;
                $hdl->push_read(
                    line => sub {
                        my ($hdl, $line) = @_;
                        say "process [$pid] got line <$line>"
                    );
                }
            );
        }
    );

Создаем объект, следящий за процессом и получающий его код завершения:

    my $w = AnyEvent->child(
        pid => $pid,
        cb  => sub {
            my ($pid, $status) = @_;
            $status = $status >> 8;

Выводим на экран информацию о завершении процесса и вызываем его обработчик в пуле:

            say "process [$pid] done with code [$status]";
            $Pool->process_done($pid, $status);
        }
    );

теперь сохраним все в пуле процессов:

    $Pool->store_process(
        {pid => $pid, handler => $hdl, watcher => $w}
    );
});

$cv->recv;

На данный момент этот сервер успешно запускает комманды, которые получает из пула и выводит логи их работы на экран. Теперь можно наращивать функционал, например, добавить уничтожение зависших процессов и прочую логику. При тестировании на релаьных примерах эта конструкция (обернутая примерно в 10 КБ кода с логикой) прекрасно держит 50 одновременно запущенных процессов + еще десяток удаленных клиентов, обменивающихся с ними информацией через сокеты. Больше процессов не держит — у сервера ресурсов не хватает, процессы довольно тяжелые.

Несколько недостатков представленного решения:

  • Во первых, нужно внимательно следить за блокировками в коде.
  • Во вторых — из четырех Windows-машин заработало только на одной. Может это ошибка IPC::OpenX. Т.е. запускаться и рождать процессы модуль будет, но данных из STDOUT не будет возвращать. Но под Windows можно использовать следующий прием — заменяем my $pid = open3(...); на my $pid = system(1, $command, '> $log_file 2>$1'); и дальше навешиваем AE::Handler на этот $log_file.
  • Буферизация STDOUT. Т.е. лог работы процесса возвращается практически одновременно с его завершением. Но лог STDERR — появится мгновенно т.к. на нем нет буферизации. Стоит обратить на это внимание.

Денис Федосеев


Введение в разработку web-приложений на PSGI/Plack. Часть 4. Асинхронность | Содержание | Обзор CPAN за июнь 2013 г.
Нас уже 1393. Больше подписчиков — лучше выпуски!

Комментарии к статье