Выпуск 5. Июль 2013
← Введение в разработку web-приложений на PSGI/Plack. Часть 4. Асинхронность | Содержание | Обзор CPAN за июнь 2013 г. →Многопроцессовый сервер на AnyEvent
Рассмотрена универсальная параллельная обработка данных путем запуска нескольких процессов под управлением AnyEvent.
Очень часто приходится обрабатывать большое количество каких-то задач или данных. И зачастую данные эти удобно обрабатывать в несколько потоков, дабы полностью утилизировать всю мощь современных многопроцессорных компьютеров. Самый простой путь для осуществления этой цели — многопоточное приложение. Но у него есть ограничения, например — оно тяжело параллелится на несколько процессорных ядер (хотя в целом это решаемый вопрос). В данной статье рассмотрим решение такой задачи с помощью событийно-ориентированного приложения.
Итак, как будет выглядеть приложение? Напишем сервер, который будет работать с пулом процессов. Он будет создавать процессы (не обязательно перловые, это могут быть и программы на C/Java/Python/Ruby), перехватывать их STDOUT/STDERR и заниматься прочими мелкими задачами, которыми и положено заниматься такому серверу.
Создаем каркас приложения:
#!/usr/bin/env perl
use v5.12;
use AnyEvent;
use AnyEvent::Handle;
use IPC::Open3;
my $cv = AE::cv;
my $alive_timer = AnyEvent->timer(
after => 60,
interval => 60,
cb => sub {
say "Server alive";
}
);
$cv->recv;
Получилось приложение, которое раз в минуту пишет в консоль о том, что оно все еще работает. Пример довольно простой, перейдем к более сложным вещам.
Итак, первым делом понадобится пул, где хранится вся информация о запущенных процессах. Это вынесем в отдельный пакет, дабы не загромождать код самого событийного механизма и улучшить читаемость.
package My::Pool;
my %commands_pool = ();
my %process_pool = ();
sub new {
my $self = {};
bless $self;
}
sub update_pool {
my $self = shift;
return {
command_id => int(rand(10000)),
command => 'test.pl',
options => int(rand(100))
};
}
sub store_process {
my $self = shift;
my $proc = shift;
$process_pool{$proc->{'pid'}} = $proc;
}
sub process_done {
my $self = shift;
my $pid = shift;
my $exit_code = shift;
my $comm_id = $process_pool{$pid}->{'command_id'};
$command_pool{$comm_id}->{'exit_code'} = $exit_code;
delete($process_pool{$pid});
}
sub save_command {
my $self = shift;
my $comm_id = shift;
...
delete($command_pool{$comm_id});
return 1;
}
1;
Получилась простейшая обвязка для обработки пула, теперь настало время запускать комманды. Модифицируем каркас следующим образом:
my $Pool = My::Pool->new();
my $cv = AE::cv;
my $alive_timer = AnyEvent->timer(
after => 60,
interval => 60,
cb => sub {
say "Server alive";
}
);
Создаем новый таймер для переодического опроса новых данных:
my $process_time = AnyEvent->timer(
after => 1,
interval => 5,
cb => sub {
Запрашиваем новую комманду для выполнения:
my $command = $Poll->update_pool();
С помощью IPC::Open3
создаем фоновый процесс. 0 на третьем месте в списке аргументов говорит, что STDERR
надо перенаправить в STDOUT
. Все дальнейшие опции аналогичны system
:
my ($chld_out, $chld_in);
my $pid = open3($chld_in, $chld_out, 0, $command->{'command'},
'options=' . $command->{'options'});
Создаем объект AnyEvent::Handler
для обработки информации, поступающей от процесса:
my $hdl;
$hdl = AnyEvent::Handle->new(
fh => \*$chld_out,
pid => $pid,
oper_id => $oper_id,
on_eof => sub {
my ($hdl) = @_;
Тут можно что-то сделать при закрытии канала. Например — уничтожить хэндлер $hdl->destroy
.
},
В данном случае каждая строка от дочернего процесса будет выводиться на экран:
on_read => sub {
my ($hdl) = @_;
$hdl->push_read(
line => sub {
my ($hdl, $line) = @_;
say "process [$pid] got line <$line>"
);
}
);
}
);
Создаем объект, следящий за процессом и получающий его код завершения:
my $w = AnyEvent->child(
pid => $pid,
cb => sub {
my ($pid, $status) = @_;
$status = $status >> 8;
Выводим на экран информацию о завершении процесса и вызываем его обработчик в пуле:
say "process [$pid] done with code [$status]";
$Pool->process_done($pid, $status);
}
);
теперь сохраним все в пуле процессов:
$Pool->store_process(
{pid => $pid, handler => $hdl, watcher => $w}
);
});
$cv->recv;
На данный момент этот сервер успешно запускает комманды, которые получает из пула и выводит логи их работы на экран. Теперь можно наращивать функционал, например, добавить уничтожение зависших процессов и прочую логику. При тестировании на релаьных примерах эта конструкция (обернутая примерно в 10 КБ кода с логикой) прекрасно держит 50 одновременно запущенных процессов + еще десяток удаленных клиентов, обменивающихся с ними информацией через сокеты. Больше процессов не держит — у сервера ресурсов не хватает, процессы довольно тяжелые.
Несколько недостатков представленного решения:
- Во первых, нужно внимательно следить за блокировками в коде.
- Во вторых — из четырех Windows-машин заработало только на одной. Может это ошибка
IPC::OpenX
. Т.е. запускаться и рождать процессы модуль будет, но данных изSTDOUT
не будет возвращать. Но под Windows можно использовать следующий прием — заменяемmy $pid = open3(...);
наmy $pid = system(1, $command, '> $log_file 2>$1');
и дальше навешиваемAE::Handler
на этот$log_file
. - Буферизация
STDOUT
. Т.е. лог работы процесса возвращается практически одновременно с его завершением. Но логSTDERR
— появится мгновенно т.к. на нем нет буферизации. Стоит обратить на это внимание.
← Введение в разработку web-приложений на PSGI/Plack. Часть 4. Асинхронность | Содержание | Обзор CPAN за июнь 2013 г. →