Выпуск 11. Январь 2014

Про опыт разработки на Perl под Raspberry PI | Содержание | Обход дерева директорий на Perl и Haskell (часть 1)

Асинхронное программирование с IO::Async

Наряду с популярным AnyEvent для Perl существуют другие модули для событийно-ориентированного программирования (СОП), и, в том числе, модуль IO::Async, автором которого является Paul Evans. Рассмотрим чем же интересен данный швейцарский нож и какими уникальными лезвиями он обладает.

IO::Async

IO::Async содержит обширный набор модулей, которые позволяют решать разные небольшие задачи, такие как отслеживание состояния дескрипторов ввода/вывода (возможность записать или прочесть данные), запуск таймеров, перехват сигналов, управление процессами-потомками, наблюдение за изменениями в файловой системе и даже управление потоками выполнения (сопрограммы).

Важная особенность IO::Async — это активное использование так называемых обещаний ( Promises ), которые в терминологии IO::Async именуются объектам Future из одноимённого модуля. Данный объект представляет собой некоторую выполняемую задачу, которая ещё не завершилась. Future появились в IO::Async не так давно и ещё активно прорабатывается, но тенденция к повсеместному их использованию чётко прослеживается.

IO::Async может оказаться особенно удобным при использовании ООП подхода при разработке приложений, поскольку позволяет наследовать свои базовые классы и задавать обработку событий в методах ваших модулей.

IO::Async::Loop

IO::Async::Loop — это базовый модуль, который задаёт цикл обработки событий в программе. Типичная создаваемая программа создаёт один цикл и добавляет в него обработчики различных событий, после чего запускает цикл в работу.

use IO::Async::Loop;

# Создание цикла
my $loop = IO::Async::Loop->new;

# Добавляем какой-либо обработчик
$loop->add(...);

# Запускаем главный цикл обработки событий
$loop->run

Класс IO::Async::Loop является абстрактным и под его капотом может работать какая-либо из существующих реализаций мультиплексора событий, например, идущие в поставке IO::Async::Loop::Select и IO::Async::Loop::Poll, соответственно использующие системные вызовы select и poll. На CPAN также могут быть найдены реализации с использованием специфический системных вызовов, например, kqueue и epoll, или библиотек EV и UV.

Приоритетом выбора того или иного бэкенда можно управлять, например, через переменную окружения IO_ASYNC_LOOP или переменную $IO::Async::Loop::LOOP, в которых задаётся разделённая запятой последовательность субклассов:

$IO::Async::Loop::LOOP = 'EV,Epoll,Poll'

Или непосредственно создав цикл из нужного субкласса:

use IO::Async::Loop::Poll;

my $loop = IO::Async::Loop::Poll->new;

Уведомители IO::Async::Notifier

Для каждого наблюдаемого события создаётся свой так называемый уведомитель — объект базового класса IO::Async::Notifier, который и выполняет все низкоуровневые операции по наблюдению.

  • IO::Async::Handle — наблюдение за событиями ввода/вывода файлового дескриптора
  • IO::Async::Stream — подкласс для работы с потоковыми протоколами обмена
  • IO::Async::Socket — подкласс для работы с датаграммами или сокетом напрямую
  • IO::Async::Timer — базовый класс для таймеров
  • IO::Async::Timer::Absolute — вызов процедуры в заданное время
  • IO::Async::Timer::Countdown — вызов процедуры через заданный промежуток времени
  • IO::Async::Timer::Countdown — периодический вызов процедуры
  • IO::Async::Signal — наблюдение и обработка получаемых процессом сигналов
  • IO::Async::Process — запуск и наблюдение за дочерними процессами
  • IO::Async::PID — отслеживание завершения дочернего процесса
  • IO::Async::File — отслеживание изменений файла (поля в выводе stat())
  • IO::Async::Function и IO::Async::Routine — асинхронный вызов функции (вызов кода в дочернем процессе/треде)

Операции с вводом/выводом

На примере создания сетевого сервера, рассмотрим использование IO::Async::Handle и IO::Async::Stream. Сервер будет принимать соединения на порт 1234, читать ввод построчно и возвращать перевёрнутую строку в ответ:

use IO::Socket::INET;
use IO::Async::Handle;
use IO::Async::Stream;
use IO::Async::Loop;

my $loop = IO::Async::Loop->new;

# Создание сокета, ожидающего соединения на порт 1234
my $socket = IO::Socket::INET->new( LocalPort => 1234, Listen => 1, ReuseAddr=>1 );

# Создание хендла для наблюдения за сокетом
my $handle = IO::Async::Handle->new(
    handle => $socket,
    on_read_ready  => sub {

        # Новое подключение. Вызов accept() на сокете
        my $client = $socket->accept or die $!;

        # Создание потокового сервера
        my $stream; $stream = IO::Async::Stream->new(
            handle => $client,
            close_on_read_eof => 0,

            # Читаем полученные данные
            on_read => sub {
              my ( $self, $buffref, $eof ) = @_;

              # Читаем данные из буфера построчно
              # Пишем данные ответа в выходной буфер потока
              while( $$buffref =~ s/^(.*)\n// ) {
                 $stream->write(scalar reverse "\n".$1);
              }

              # Если получен EOF, то закрываем соединение,
              # как только все данные будут отправлены
              $stream->close_when_empty() if $eof;

              # не вызывать повторно эту функцию после EOF
              return 0;
           },
        );

        # Добавляем поток в цикл
        $loop->add( $stream );
    },

);

# Добавление хендла в цикл
$loop->add( $handle );

# Запуск цикла
$loop->run

Каждое новое соединение обрабатывается с помощью класса IO::Async::Stream, который оптимизирован для работы с потоковыми данными и автоматически формирует буфер для чтения и записи, которые передаются в виде ссылки в соответствующую функцию-колбек.

Операции ввода/вывода и объекты Future

Создание сетевого сервера и клиента достаточно распространённая задача, поэтому данный функционал был добавлен в IO::Async::Loop. Предыдущий пример можно переписать следующим образом:

use IO::Async::Loop;

my $loop = IO::Async::Loop->new;

# Создание сокета, прослушивающего порт 1234
$loop->listen(
    family   => "inet",
    socktype => "stream",
    service  => 1234,
    on_resolve_error => sub { die "Cannot resolve - $_[0]\n"; },
    on_listen_error  => sub { die "Cannot listen\n"; },

    # Обработка подключений
    on_stream => sub {
        my $stream = shift;

        # Конфигурируем поток
        $stream->configure( on_read => sub { 0 }, close_on_read_eof => 0 );

        # Добавляем поток в цикл
        $loop->add( $stream );

        # Возвращается Future-объект для операции чтения потока до EOF
        my $f = $stream->read_until_eof;

        # Функция-колбек при завершении операции
        $f->on_done(sub {
                my ($buf, $eof) = @_;

                while( $buf =~ s/^(.*)\n// ) {
                    $stream->write(scalar reverse "\n".$1);
                }

                $stream->close_when_empty() if $eof;
                return 0;
            });
    }
);

$loop->run

В данном примере при обработке клиентских подключений автоматически выполняется accept() и в функцию-колбэк on_stream передаётся готовый объект IO::Async::Stream. С помощью метода configure существует возможность сконфигурировать поток точно также, как это было сделано в первом примере. Но в данном случае продемонстрирован новый подход по работе с асинхронными функциями: использование Future-объектов.

Метод read_until_eof() возврашает объект Futureобещание, т.е. некоторое задание, которое находится в процессе выполнения. Само задание — вычитать поток до EOF. С помощью метода on_done мы устанавливаем функцию, которая будет вызвана, когда задание будет успешно выполнено.

С первого взгляда не видно никаких преимуществ использования Future-объектов. Попробуем усложнить задачу сервера. Теперь на каждую строку, начинающуюся с http:// выполнять загрузку указанного url и возвращать клиенту его содержимое.

Без использования Future-объектов это можно реализовать так:


sub on_read {
    my ( $self, $buffref, $eof ) = @_;

    # Учёт количества запросов
    my $req = 0;

    while( $$buffref =~ s/^(.*)\n// ) {
        my $url = $1;
        if ($url =~ m{^http\://} ) {
            $req++;

            # http-запрос
            my $http = Net::Async::HTTP->new();
            $loop->add( $http );
            $http->do_request(
                uri => $url,
                on_response => sub {
                    my ($response) = @_;
                    $stream->write($response->content);
                    $req--;
                },
                on_error => sub {
                    $req--;
                }
             )
         }
    }

    if ($eof and $req == 0) {
        $stream->close_when_empty();
    }
    return 0;
}

Как видно мы получаем вложенную функцию-колбек on_response, которая будет собирать результаты http-запросов и отправлять клиенту их результат. Если бы нам потребовалось выполнить ещё несколько асинхронных действий после успешного http-запроса, это привело бы к дополнительным вложенным колбекам, что усложняло бы восприятие кода.

Рассмотрим вариант с Future-объектами:

sub on_stream {
    my $stream = shift;
    $stream->configure( on_read => sub { 0 }, close_on_read_eof => 0 );
    $loop->add( $stream );

    my $f = $stream
        ->read_until_eof
        ->then(sub {
            my ($buf, $eof) = @_;

            my @req = ();
            while( $buf =~ s/^(.*)\n// ) {
                my $url = $1;
                next if $url !~ m{http://};
                my $http = Net::Async::HTTP->new();
                $loop->add( $http );

                # do_request() возвращает Future-объект
                push @req, $http->do_request( uri => $url );
            }

            return Future->wait_all( @req );
        })
        ->on_done( sub {
            $stream->write($_->get->content) for @_;
            $stream->close_when_empty();
        })
}

В данном примере выполнение операций выстраивается в последовательную цепочку:

  • Вычитывается поток.
  • Выполняется создание http-запросов по заданным url и возвращается Future-объект, который выполнится при условии завершения всех http-запросов.
  • Финальный результат отправляется клиенту, и соединение завершается.

Если бы потребовалось добавить какие-то дополнительные операции после выполнения http-запроса, то они бы последовательно добавлялись в данную цепочку, ясно и чётко демонстрируя логику программы.

Датаграммы и прямая работа с сокетом

В отличии от IO::Async::Stream, IO::Async::Socket предназначен для работы с датаграммами, т.е. дискретными сообщениями.

use IO::Async::Socket;

use IO::Async::Loop;
my $loop = IO::Async::Loop->new;

# Подключение по протоколу UDP на порт 1234
$loop->connect(
    host     => "127.0.0.1",
    service  => "1234",
    socktype => 'dgram',

    # Успешное подключение
    on_connected => sub {
        my ( $sock ) = @_;

        # Непосредственная работа с сокетом
        my $socket = IO::Async::Socket->new(
            handle => $sock,

            # получен пакет
            on_recv => sub {
                my ( $self, $dgram, $addr ) = @_;

                print "Получена датаграмма: $dgram\n",
                $loop->stop;
            },

            # ошибка при отправке в сокет
            on_recv_error => sub {
                my ( $self, $errno ) = @_;
                die "Ошибка - $errno\n";
            },
        );

        $loop->add( $socket );

        # Отправка сообщения в сокет
        $socket->send( "Привет, мир!" );
    },

    on_resolve_error => sub { die "Cannot resolve - $_[0]\n"; },
    on_connect_error => sub { die "Cannot connect\n"; },
);

$loop->run;

В указанном примере создаётся объект класса IO::Async::Socket, который становится наблюдателем за сокетом соединения. Функция-колбек on_recv вызывается каждый раз при получение порции данных (как правило с приходом каждого пакета). IO::Async::Socket можно использовать и для работы с сокетом типа SOCK_STREAM, в этом случае поток данных обрабатывается дискретно, т.е. по мере поступления порций данных в сокет.

Таймеры

Таймеры позволяют запустить некоторую операцию в заданный момент времени. В примере сетевого сервера таймер мог быть полезен для отслеживания висящих без дела клиентов.

sub on_stream {
    my $stream = shift;

    # Запускаем сторожевой таймер на 5 секунд
    my $watchdog = IO::Async::Timer::Countdown->new(
        delay => 5,

        # Закрываем поток по истечении времени
        on_expire => sub {
            $stream->close;
        }
    );
    $watchdog->start();
    $loop->add( $watchdog );

    $stream->configure(
        on_read => sub {
            my ( $self, $buffref, $eof ) = @_;

            # Сброс таймера, если данные появились
            $watchdog->reset;
            ...
        },
        on_closed => sub {

            # Остановить таймер, если поток закрылся
            $watchdog->stop;
        }
     );
     $loop->add( $stream );
     ...
}

Помимо IO::Async::Timer::Countdown существуют таймеры IO::Async::Timer::Absolute (для запуска задачи в указанное время) и IO::Async::Timer::Periodic (для периодического запуска задачи).

Сигналы

С помощью IO::Async::Signal существует возможность асинхронно обрабатывать поступающие сигналы, выполняя заданный код обработчика сигнала. При обычном способе задания обработки сигнала, через установку $SIG{NAME}, может произойти ситуация, когда сигнал придёт в середине выполнения какой-либо важной транзакции, и код обработчика потенциально способен нарушить её. Реализация IO::Async::Signal гарантирует выполнение кода обработчика после завершения текущего такта цикла, т.о. происходит синхронизация относительно основного цикла событий.

Допускается установка множества обработчиков для одного сигнала. В этом случае каждый из них будет выполнен, но без какого-либо фиксированного порядка.

Пример обработки сигнала SIGHUP:

use IO::Async::Signal;
use IO::Async::Loop;

my $loop = IO::Async::Loop->new;

my $signal = IO::Async::Signal->new(
   name => "HUP",

   # Завершить цикл при получении сигнала SIGHUP
   on_receipt => sub {
       print "Stop loop after SIGHUP\n";
       $loop->stop;
   },
);

$loop->add( $signal );

$loop->run;

Возможен также альтернативный вариант создания обработчиков сигналов, через метод attach_signal в IO::Async::Loop:

use IO::Async::Loop;

my $loop = IO::Async::Loop->new;
my $sig_id = $loop->attach_signal( "SIGHUP", sub {
    print "Stop loop after SIGHUP\n";
    $loop->stop;
});

$loop->run;

Метод detach_signal позволяет удалить обработчик:

$loop->detach_signal($sig_id);

Управление процессами

Одно из важных условий корректной работы цикла обработки событий — неблокируемый код и отсутствие процессороёмких вычислений. В тех случаях, когда избежать этого невозможно, можно прибегнуть к выполнению таких операций в отдельном процессе или треде.

В IO::Async существуют несколько способов для запуска и управления дочерними процессами.

IO::Async::Process

IO::Async::Process позволяет запустить внешнюю программу или код. Модуль выполняет последовательные вызовы fork() и exec(), если это внешняя команда, или fork() и eval() в скалярном контексте, если вызывается код.

В случае выполнения внешней программы, существует множество способов для получения stdout, stderr запущенного процесса или передачи данных на stdin дочернего процесса.

Например, простой пример для захвата stdout дочернего процесса:

use IO::Async::Loop;
use IO::Async::Process;

my $loop = IO::Async::Loop->new;

my $stdout;
my $process = IO::Async::Process->new(
   command => [ "writing-program", "arguments" ],
   stdout => { into => \$stdout },
   on_finish => sub {
      print "The process has finished, and wrote:\n";
      print $stdout;
   }
);

$loop->add( $process );
$loop->run

Пример посложнее, для вычисления с помощью консольного калькулятора bc. Выражение передаётся на stdin процесса, а результаты забираются из stdout:


use IO::Async::Loop;
use IO::Async::Process;

my $loop = IO::Async::Loop->new;

my $process = IO::Async::Process->new(
   command => [ "bc", "-q" ],

   # Формируется pipe, дочерний процесс читает stdin и пишет на stdout
   stdio => {
       via => "pipe_rdwr"
   },

   on_finish => sub {
      $loop->stop;
   },
);

# Чтение данных в родительском процессе
$process->stdio->configure(
    on_read => sub {
         my ( $stream, $buffref ) = @_;
         while( $$buffref =~ s/^(.*\n)// ) {
            print "Answer is $1";
         }
         return 0;
   },
);

$loop->add( $process );

# Отправка выражений для вычислений в bc
$process->stdio->write("$_\n") for ("2+2", "2+2*2", "(2+2)*2" );

$loop->run

На выходе программы получим такой результат:

Answer is 4
Answer is 6
Answer is 8

Выполнение кода позволяет получить только код завершения процесса:

my $process = IO::Async::Process->new(
    code => sub {
        return 42
    },
    on_finish => sub {
        my ($self, $code) = @_;
        printf "Exit code: %d\n", $code>>8;
    },
);
...

Методы IO::Async::Loop

Как и во многих других случаях, создание дочерних процессов продублировано в реализации IO::Async::Loop. Например, метод open_child() является обёрткой вокруг IO::Async::Process:

$pid = $loop->open_child(
    command => [...],
    on_finish => sub { ... }
);

Кроме того, присутствует метод run_child, который является упрощённым способом вызова внешних процессов, если требуется получить только их вывод:

$loop->run_child(
   command => [ "ls", "-1" ]
   on_finish => sub {
      my ( $pid, $exitcode, $stdout, $stderr ) = @_;
      my $status = ( $exitcode >> 8 );
      ...
   },
);

IO::Async::PID

IO::Async::PID позволяет отслеживать завершение работы дочернего процесса. Может быть полезен, если по каким-то причинам вас не устроили все предыдущие методы запуска внешних процессов.

use IO::Async::PID;
use IO::Async::Loop;
my $loop = IO::Async::Loop->new;

# Запуск дочернего процесса
my $kid = $loop->fork(
   code => sub {
      print "Сон..\n";
      sleep 10;
      print "Выход\n";
      return 20;
   },
);

print "Запущен дочерний процесс $kid\n";

my $pid = IO::Async::PID->new(
   pid => $kid,

   on_exit => sub {
      my ( $self, $exitcode ) = @_;
      printf "Дочерний процесс %d завершился с кодом %d\n",
         $self->pid, $exitcode>>8;
   },
);

$loop->add( $pid );

$loop->run;

Асинхронный запуск функций с помощью IO::Async::Function и IO::Async::Routine

В разделе, посвящённом IO::Async::Process упоминалось, что дочерний процесс может выполнять и фрагмент кода, но функционал этот достаточно бедный, т.к. позволяет вернуть только код завершения процесса из процесса-потомка.

Для асинхронного выполнения функций с возможностью получения результатов выполнения в IO::Async созданы специальные классы: IO::Async::Function и IO::Async::Routine.

IO::Async::Function

Объект IO::Async::Function представляет собой функцию, которая будет выполняться параллельно основному процессу программы. Существует возможность выбора модели запуска — это либо fork нового процесса, либо создание нового треда. При первом старте или вызове такой функции происходит запуск одного или нескольких (в зависимости от настроек) процессов/нитей — рабочих. Эти рабочие содержат код выполняемой функции и могут многократно вызывать её и возвращать результат обратно в основной процесс, поэтому важно, чтобы функция обладала свойством реентерабельности, т.е. не изменяла своих внутренних структур, что могло бы изменить её поведение при повторных вызовах. Помимо количества рабочих можно задавать и таймаут, после которого неиспользуемый рабочий будет остановлен для экономии ресурсов.

Внутренне IO::Async::Function построен на базе IO::Async::Routine, о котором будет рассказано ниже. Основной процесс запускает процесс/нить рабочего и формирует каналы для ввода/вывода. Через входной канал рабочего передаются аргументы функции, а через выходной — возвращаемые данные. Для сериализации/десериализации данных используются функции модуля Storable.

Пример функции:

use IO::Async::Function;
use IO::Async::Loop;
use Crypt::CBC;

my $loop = IO::Async::Loop->new;

my $cipher = Crypt::CBC->new(-key => 'secret', -cipher => 'Blowfish');

my $function = IO::Async::Function->new(

    # Код шифровщика
    code => sub {
        $cipher->encrypt(shift);
    },
    model => 'fork',    # форк
    max_workers => 10,  # максимальное число рабочих
    min_workers => 2,   # минимальное число рабочих
    idle_timeout => 10, # выключение неиспользуемых рабочих
                        # по таймауту
);

$loop->add( $function );

# Вызов функции
$function->call(

   # Аргументы
   args => [ "Plain text" ],
   on_return => sub {
      my $ciphertext = shift;
      print "'Plain text' encrypted to '". $ciphertext ."'\n";
   },
   on_error => sub {
      warn "Все CPU перегреты\n";
   },
);

$loop->run;

Основное назначение таких функций: выполнение каких-либо интенсивных вычислений или блокирующихся операций, которым не требуется хранить информацию о состоянии между вызовами. Т.е. в идеале функции, результат которых зависит только от входных аргументов.

IO::Async::Routine

IO::Async::Routine также как и IO::Async::Function позволяет выполнять код во внешнем процессе/нити. Отличие состоит в том, что создаваемый рабочий запускается один раз и производит обмен данными с основным процессом через специальные каналы IO::Async::Channel. Такой рабочий может иметь сложную логику, хранить информацию о состоянии в процессе обмена с основным процессом и хорошо подходит для организации потоковой обработки данных.

Рассмотрим пример программы:

use IO::Async::Routine;
use IO::Async::Channel;
use IO::Async::Loop;

my $loop = IO::Async::Loop->new;

# Каналы для обмена данными
my $input  = IO::Async::Channel->new;
my $output = IO::Async::Channel->new;

# Функция рабочего процесса
my $routine = IO::Async::Routine->new(
    channels_in  => [ $input  ],
    channels_out => [ $output ],

    code => sub {
        my @nums = @{ $input->recv };
        my $ret = 0; $ret += $_ for @nums;
        $output->send( \$ret );
    },

    on_finish => sub {
        print "Рабочий завершил работу - $_[-1]\n";
        $loop->stop;
   },
);

$loop->add( $routine );

# Отправка данных рабочему
$input->send( [ 10, 20, 30 ] );

# Функция-колбек для приёма данных
$output->recv(
    on_recv => sub {
        my ( $ch, $totalref ) = @_;
        print "10 + 20 + 30 = $$totalref\n";
        $loop->stop;
    }
);

$loop->run;

В примере создаются два канала IO::Async::Channel для обмена данными с рабочим. Данные объекты ориентированы на использование исключительно совместно с IO::Async::Routine, поэтому их не требуется вручную добавлять в цикл обработки событий, их управлением займётся сам IO::Async::Routine. Для этого требуется указать их в списках каналов channels_in и channels_out, т.е. определить направление записи в канал: входной и выходной каналы для рабочего соответственно.

Канал позволяет передавать только ссылки. Все такие ссылки в момент передачи сериализируются с помощью Storable, поэтому после того как объект сериализован, любые изменения в нём уже не будут видны тому процессу, в который он был передан.

Канал имеет два метода: send() и recv() для передачи и получения данных в/из канала соответственно. Выполнение send() происходит без блокировки. recv() блокируется в том случае, если вызван без указания колбека on_recv и возвращает полученные данные.

Изменения в файловой системе

Модуль IO::Async::File позволяет создавать уведомители об изменениях в файлах и каталогах. Наблюдать можно за изменениями любой характеристики файла, доступной в вызове stat(): время создания/доступа/модификации, размер, права доступа, владелец и другие.

use IO::Async::File;
use IO::Async::Loop;

my $loop = IO::Async::Loop->new;

# Наблюдаем за изменением времени модификации
# файла исходного кода программы
my $file = IO::Async::File->new(
   filename => __FILE__,
   on_mtime_changed => sub {
      my ( $self, $new_mtime, $old_mtime ) = @_;
      print "Время модификации файла изменилось",
            "с mtime $old_mtime на $new_mtime\n";
   }
);

$loop->add( $file );

$loop->run;

Так же есть возможность наблюдать за всеми характеристиками одновременно, указав функцию-колбек on_stat_changed, которая вызывается при изменении любой из характеристик. В качестве параметров передаются объекты класса File::stat с текущим и предыдущим состояниями файлового дескриптора.

Внутри модуль реализован на простом периодическом вызове функции stat() на файле каждые 2 секунды. Регулировать интервал опроса можно с помощью параметра interval при создании объекта.

ООП подход при использовании IO::Async

Пример сетевого сервера, рассмотренный в начале статьи, можно переделать с применением ООП подхода. Метод listen IO::Async::Loop возвращает объект IO::Async::Listener. Мы можем создать свой собственный класс MyListener, который будет наследовать методы IO::Async::Listener и будет иметь свой обработчик события on_stream:

use IO::Async::Loop;

my $loop = IO::Async::Loop->new;

# Создание объекта уведомителя MyListener,
# производного от IO::Async::Listener
my $myl = MyListener->new;

$loop->add($myl);

$myl->listen(
    family   => "inet",
    socktype => "stream",
    service  => 1234,
    on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; },
    on_listen_error  => sub { print STDERR "Cannot listen\n"; },
);

$loop->run;

# Собственный модуль обработки потока
package MyListener;

# Наследуем от класса IO::Async::Listener
use base qw( IO::Async::Listener );

# Реализуем метод on_stream, вызываемый при подключении клиента
sub on_stream {
    my ($self, $stream) = @_;

    $stream->configure(
        on_read => sub { 0 },
        close_on_read_eof => 0
    );

    # Добавляем поток в цикл
    $self->loop->add( $stream );

    # Возвращается Future-объект для операции чтения потока до EOF
    my $f = $stream->read_until_eof;

    # Функция-колбек при завершении операции
    $f->on_done(sub {
        my ($buf, $eof) = @_;

        while( $buf =~ s/^(.*)\n// ) {
            $stream->write(scalar reverse "\n".$1);
        }

        $stream->close_when_empty() if $eof;
        return 0;
    });
}

1;

Такой подход позволяет получить логичный набор классов для каждой решаемой задачи и избежать появления огромных простыней кода с вложенными функциями-колбеками.

Заключение

Хочется отметить, что несмотря на то, что это ещё один фреймворк для СОП, он, тем не менее, заслуживает серьёзного внимания уже благодаря полноте реализуемых задач, а также уникальной интеграции событийно-ориентированного программирования с обещаниями и объектно-ориентированным подходом к компоновке программы.

Как уже было отмечено, цикл IO::Async может использовать множество реализаций мультиплексоров под капотом, что делает его похожим на AnyEvent. В экосистеме модулей IO::Async есть также модули для асинхронного разрешения имён с использованием системного резолвера, модуль http-клиента, прозрачная поддержка SSL и т.д. IO::Async можно использовать в веб-приложениях Mojolicious. Всё это позволяет строить широкий класс приложений на его основе.

Владимир Леттиев


Про опыт разработки на Perl под Raspberry PI | Содержание | Обход дерева директорий на Perl и Haskell (часть 1)
Нас уже 1393. Больше подписчиков — лучше выпуски!

Комментарии к статье