Выпуск 17. Июль 2014

Яндекс.Директ: как мы деплоим наши Perl-web-приложения | Содержание | Использование портов GPIO в Raspberry Pi. Часть 1

Асинхронный ввод/вывод с IO::AIO

В UNIX-системах операции чтения и записи файлов, как правило, являются синхронными, т.е. вызовы read() и write() не возвращаются, пока актуальные данные не поступят в/из буфера ядра. Все дисковые операции на несколько порядков медленнее операций с памятью, поэтому однопоточное приложение, проводящее операции с файлами, может значительное время проводить в ожидании, что может негативно сказаться на обработке асинхронных событий (сигналы, новые данные в сокетах/каналах). Модуль IO::AIO предоставляет набор функций, которые позволяют асинхронно выполнять операции ввода/вывода, не блокируя основную нить выполнения процесса.

Реализации асинхронного ввода/вывода

Существует несколько различных реализаций асинхронного I/O. Стандарт POSIX.1b определяет набор асинхронных функций и их интерфейс, но не регламентирует способ реализации. В библиотеке GNU libc эти функции реализованы на пользовательском уровне, когда запрос на ввод/вывод обрабатывается в выделенном треде, а информация о результатах может доставляться отправкой сигнала или инициацией нового треда.

В ядре Linux, начиная с версии 2.6, появился набор системных вызовов, которые можно использовать в программах непосредственно или через обёртку в виде библиотеки libaio. Реализация aio-функций в ядре Linux имеет ограничения: требуется открытие файлов с флагом O_DIRECT, что должно поддерживаться файловой системой, а также полностью отключается дисковый кэш.

Модуль IO::AIO был создан Марком Леманном на основе C-библиотеки libeio, которая также, как и glibc, использует пул тредов для выполнения обычных синхронных операций ввода/вывода, не блокируя основной поток исполнения программы. Такой подход обеспечивает высокую переносимость библиотеки, вплоть до возможности использования на платформе Windows.

API IO::AIO

API модуля IO::AIO предоставляет множество функций для работы с файлами и директориями, функции для интеграции с модулем обработки событий, а также контролем исполнения. Некоторые функции могут успешно работать и с сокетами, и с каналами, но по эффективности они будут уступать аналогичным функциям из модуля обработки событий (например, EV).

Прежде чем начать описание функций и разбор примеров, необходимо оговориться о способе интеграции IO::AIO с модулем обработки событий. Для служебных целей IO::AIO создаёт канал, который используется для получения данных из тредов. Как только данный канал становится доступным для чтения, т.е. пришли данные какого-либо запроса, необходимо выполнить функцию обработки этих данных. Дескриптор канала возвращает функция IO::AIO::poll_fileno(), а функция обработки это IO::AIO::poll_cb(). Таким образом, например, при работе совместно EV требуется установить IO-страж:

my $aio = EV::io IO::AIO::poll_fileno, EV::READ, \&IO::AIO::poll_cb;

В общем случае рекомендуется использовать модуль AnyEvent, в таком случае интеграция выполняется тривиально, просто загрузив модуль AnyEvent::AIO:

use AnyEvent;
use IO::AIO;

# Интеграция IO::AIO с AnyEvent
use AnyEvent::AIO;

Все дальнейшие примеры будут использовать такую шапку и для простоты она будет опускаться.

Открытие, чтение и запись файла

Для открытия файла предназначена функция aio_open, которая принимает в качестве аргументов путь к файлу, битовую маску из флагов (список возможных флагов можно подсмотреть в Fcntl), права для создаваемого файла (меняется в соответствии с umask текущего процесса).

Чтение файла производится функцией aio_read, которая читает по заданному файловому дескриптору и смещению указанное количество данных в буфер по заданному смещению.

my $cv = AE::cv;

# открытие только на чтение
aio_open "/etc/passwd", IO::AIO::O_RDONLY, 0, sub {

    # полученный файловый дескриптор
    my $fh = shift or die $!;

    # размер файла
    my $size = -s $fh;

    my $contents;

    # чтение
    aio_read $fh, 0, $size, $contents, 0, sub {

        # прочитанный размер данных
        my $readed = shift;

        # -1 - признак ошибки
        die "ошибка чтения: $!" if $readed == -1;

        die "Не удалось прочитать весь файл"
          if $readed != $size;
        close $fh;
       
        # печать содержимого
        print $contents;
        $cv->send;
    }
};

$cv->recv;

Запись файла выполняется функцией aio_write, с аналогичным набором аргументов, как и в aio_read.

my $cv = AE::cv;

# открытие на запись и при необходимости
# создание нового файла с правами 0666
aio_open "hello", IO::AIO::O_CREAT | IO::AIO::O_WRONLY, 0666, sub {

    # полученный файловый дескриптор
    my $fh = shift or die $!;

    my $contents = "hello, world!" ;

    # запись из буфера $contents
    aio_write $fh, 0, length($contents), $contents, 0, sub {

        # записанный размер данных
        my $len = shift;

        # -1 - признак ошибки
        die "ошибка записи: $!" if $len == -1;

        die "Не удалось записать весь файл"
          if $len != length($contents);
        close $fh;

        $cv->send;
    }
};

$cv->recv;

Чтение каталогов

Довольно часто возникает задача получения списка файлов в каталоге, для этих целей существует несколько асинхронных функций.

aio_readdir выполняет сразу три операции: открывает каталог, читает содержимое и закрывает каталог. Ссылка на список полученных элементов передаётся в функцию обратного вызова. Этот список не содержит специальных каталогов . и ...

my $cv = AE::cv;

# чтение каталога
aio_readdir "/etc/init.d",  sub {

    # полученный список файлов
    my $list = shift or die $!;

    print join "\n", @$list;
    $cv->send;
};

$cv->recv;

Функция aio_scandir по аналогии с aio_readdir выполняет чтение содержимого каталога, но при этом также пытается определить тип файла. Вызываемая функция обратного вызова получает в случае успеха два списка: список директорий и список остальных файлов.

my $cv = AE::cv;

# чтение каталога
aio_scandir "/etc", 0,  sub {
    my ($dirs, $nondirs) = @_ or die $!;
    print "каталоги: @$dirs\n";
    print "всё остальное: @$nondirs\n";
    $cv->send;
};

$cv->recv;

В данном примере aio_scandir также имеет второй параметр, который определяет, как много выделенных тредов будет занято сканированием (выполнять stat на файлы). Значение 0 или меньше означает, что будет использоваться количество тредов по умолчанию: 4.

aio_scandir позволяет, например, реализовать поиск файла.

use File::Spec::Functions;

sub scan {
    my $cb = pop;
    my $cv = pop;
    my @dirs = @_;

    my $scan;

    # Функция-замыкание, которая вызывает саму себя до тех пор,
    # пока не останется непросканированных каталогов
    ( $scan = sub {
        my $dir = shift @dirs;
        aio_scandir $dir, 0,  sub {
            my ($dirs, $nondirs) = @_;

            # добавим новые каталоги для сканирования
            push @dirs, map { catfile $dir, $_ } @$dirs if $dirs;

            # вызываем пользовательскую функцию обратного вызова
            $cb->($dir, $nondirs);

            # продолжаем сканирование, если есть что
            if (@dirs) {
                $scan->()
            } else {
                $cv->()
            }
        };
    } )->();
};

my $cv = AE::cv;

# Регулярное выражение для поиска файла
my $search = qr/foo|bar/;

# Поиск в каталоге /etc
scan "/etc", $cv, sub {
    my ($dir, $files) = @_;
    for my $file (@$files) {
        printf "Found match: %s\n", catfile $dir, $file
            if $file =~ $search
    }
};

$cv->recv;

В данном примере функция scan выполняет рекурсивный поиск по списку каталогов. Предпоследний параметр — функция, которая вызывается при завершении сканирования (в данном случае — это условная переменная). Последний параметр — это функция обратного вызова, которая вызывается после сканирования каждого каталога. Ей передаются два параметра: $dir — текущий каталог, $files — ссылка на массив с файлами в текущем каталоге. В данном примере функция ищет имена файлов по регулярному выражению /foo|bar/.

Операции с двумя дескрипторами

Иногда возникает потребность в пересылке данных из одного дескриптора в другой. Например, веб-сервер должен отправить локальный файл через сетевой сокет клиенту. В этом случае удобно использовать функцию aio_sendfile, которая при возможности использует системный вызов sendfile, позволяя избежать копирования данных из ядра в пользовательские буфер и обратно.

use AnyEvent::Socket;

my $cv = AE::cv;

# Создаём tcp-сервер, который слушает порт 8080
# и отправляет всем подключившимся клиентам
# файл /etc/passwd
tcp_server undef, 8080, sub {
    my $socket = shift or die $!;

    # откроем /etc/passwd на чтение
    aio_open "/etc/passwd", IO::AIO::O_RDONLY, 0, sub {
        my $fh = shift or die $!;
        my $size = -s $fh;

        my $sendfile;
        my $offset = 0;

        # Функция-замыкание для отправки данных
        ( $sendfile = sub {

            # Копирование из файла в сокет
            aio_sendfile $socket, $fh, $offset, $size, sub {

                # Произошла ошибка
                if ( $_[0] == -1 ) {

                    # Не смертельно, можно повторить
                    if ( $!{EAGAIN} || $!{EINTR} ) {
                        $sendfile->();
                    }

                    # всё-таки что-то не так
                    else {
                        die $!;
                    }
                }

                # Отправлено меньше чем надо, повторяем
                elsif ( $_[0] < $size ) {
                    $offset += $_[0];
                    $size   -= $_[0];
                    $sendfile->();
                }

                # Файл успешно отправлен, конец сеанса
                else {
                    close $socket;
                    close $fh;
                }   
            }; 
        } )->()
    }; 
}; 

$cv->recv;

В данном примере дополнительно рассмотрено, как обрабатывать ошибки: если вызов aio_sendfile вернул -1, то необходимо проверить код ошибки, если, например, это EAGAIN, то значит сокет ещё не готов принять данные и надо повторить вызов позже.

Операции копирования, перемещения и удаления

Представлен набор асинхронных функций, который позволяет производить типичные операции по манипуляциям с файлами и каталогами:

  • aio_copy — производит копирование файла (директории не поддерживаются) из пути источника в путь назначения. Функция обратного вызова получает 0 в случае успеха и -1 в случае ошибки.

  • aio_move — перемещение файла. Это композитная функция, которая пытается выполнить вызов rename и в случае, если это невозможно (путь назначения на другом разделе) выполняет копирование aio_copy.

  • aio_unlink — удаление файла по заданному пути.

  • aio_rmtree — удаление дерева каталогов по заданному пути.

    aio_copy "/mnt/backup/etc/passwd", "/etc/passwd", sub {
        $_[0] and die $!;
        aio_copy "/mnt/backup/etc/shadow", "/etc/shadow", sub {
            $_[0] and die $!;
            aio_rmtree "/mnt/backup/etc";
        };
    };

Файловые пути и текущий каталог

Многие aio-функции оперируют с файловыми путями. Модуль IO::AIO требует, чтобы все используемые пути передавались как байтовые строки, т.е. если путь передан в программу как строка символов, необходимо принять меры для её приведения к байтовой строке, иначе вызов aio-функции вызовет исключение.

# строки кода - набор символов в кодировке UTF-8
use utf8;
use Encode;

my $path = "/путь/юникод/в/кодировке/UTF-8";

# файловый путь кодируется в байтовую строку
aio_stat encode_utf8($path), sub {
   ...
};

IO::AIO рекомендует использовать абсолютные пути во всех функциях. Это связано с тем, что при смене текущего каталога вполне возможна ситуация, что какой-то запущенный тред работает со старым рабочим каталогом, что может привести впоследствии к ошибке. Разумеется, если вы уверены, что в течении работы программы текущий каталог не меняется, то использование относительных путей вполне легально.

Если использование относительных путей востребованно и требуется контроль текущего каталога при выполнении aio-функций, то в IO::AIO для подобных целей существует функция aio_wd, формирующая объект класса IO::AIO::WD, который можно использовать в aio-функциях для обозначения текущего каталога.

aio_wd "/etc", sub {
    my $etcdir = shift;

    aio_stat [$etcdir, "passwd"], sub {
        ...
    }
};

Переменная $etcdir сохраняет информацию о каталоге “/etc”, на поддерживающих системах происходит открытие файлового дескриптора каталога, что ускоряет разрешение имён относительного данного каталога. aio_stat в качестве параметра-пути получает ссылку на массив, где первый элемент это объект текущего каталога, а второй элемент — относительный путь. Все функции IO::AIO, которые работают с путями, поддерживают подобный формат вызова. Таким образом, можно всегда быть уверенным, что вызов функции с относительным путём будет проходить в контексте выбранного текущего каталога, даже если текущий каталог самого приложения изменился.

Запросы

Каждая aio-функция возвращает объект запроса IO::AIO::REQ, если вызывается не в пустом контексте. Данный объект ассоциируется с выполняемой операцией. Каждая операция проходит пять состояний:

  1. Готовность — сразу после создания запроса, ожидание выполнения тредом.

  2. Выполнение — запрос принят тредом, и в данный момент тред выполняет его.

  3. Ожидание — запрос выполнен, и тред ожидает, когда будет обработан результат.

  4. Результат — результат запроса обрабатывается в poll_cb, который вызывает пользовательскую функцию обратного вызова.

  5. Выполнено — запрос выполнен, все связанные структуры данных освобождены.

До того как запрос перейдёт в стадию выполнению, существует возможность его отменить с помощью вызова метода cancel объекта запроса:

# создать запрос на удаление
# какого-нибудь ненужного каталога
my $req = aio_rmtree "/", sub {
    print "all your base are belong to us\n";
};

# отменить запрос, если не поздно
$req->cancel;

Также можно задать или переопределить пользовательскую функцию обратного вызова запроса, до того как начнётся обработка результата запроса:

# вызов stat на файле
my $req = aio_stat "/etc/passwd";

# установка колбека
$req->cb( sub {
    $_[0] and die "ошибка stat: $!";
    printf "размер %i\n", -s _;
});

Группы запросов

Несколько запросов можно объединить в группы, чтобы иметь возможность отследить завершение всех запросов. Для этих целей создана функция aio_group, которая принимает один параметр — функцию обратного вызова, которая вызывается, когда завершены все запросы группы. Возвращается объект класса IO::AIO::GRP, в который можно добавлять запросы с помощью метода add:

my $cv = AE::cv;

# Создаём группу
my $grp = aio_group sub {
    print "выполнены все запросы\n";
    $cv->send
};

# Добавляем первый запрос
add $grp aio_stat "/etc/passwd", sub {
    printf "размер passwd %i\n", -s _ unless $_[0];
};

# Добавляем второй запрос
add $grp aio_stat "/etc/group", sub {
    printf "размер group %i\n", -s _ unless $_[0];
};

$cv->recv;

Группы, как и сами запросы, можно отменять с помощью вызова cancel. В этом случае отменяются все запросы, входящие в группу. Группы также можно добавлять в другие группы.

В IO::AIO реализована возможность динамического добавления запросов в группу. Каждой группе можно назначить так называемый генератор или фидер (feeder) — функцию, которая вызывается, как только количество запросов в группе снизится ниже определённого уровня. Функция может добавить новый запрос в группу, таким образом “питая” группу.

Например, описанную выше задачу поиска файла по заданному регулярному выражению можно реализовать с помощью группы и генератора:

use File::Spec::Functions;

sub scan {
    my $cb = pop;
    my $cv = pop;
    my @dirs = @_;

    # Группа запросов, в качестве функции обратного вызова
    # передаётся условная переменная $cv
    my $grp = aio_group $cv;

    # Установим лимит на минимальное число запросов в группе,
    # после которого вызывается фидер
    limit $grp 1;

    # Генератор/фидер запросов
    feed $grp sub {
        my $dir = shift @dirs // return;

        # Добавляем запрос в группу
        add $grp aio_scandir $dir, 0,  sub {
            my ($dirs, $nondirs) = @_;
            push @dirs, map { catfile $dir, $_ } @$dirs if $dirs;

            # Вызываем функцию пользователя
            $cb->($dir, $nondirs);
        }; 
    }; 
}; 

my $cv = AE::cv;

# Регулярное выражение для поиска файла
my $search = qr/foo|bar/;

# Поиск в каталоге /etc
scan "/etc", $cv, sub {
    my ($dir, $files) = @_;
    for my $file (@$files) {
        printf "Found match: %s\n", catfile $dir, $file
            if $file =~ $search
    }   
}; 

$cv->recv;

В данном примере мы модифицировали функцию scan, создав группу, которую снабжает новыми запросами функция-генератор. Также обратите внимание, что используется специальная функция limit, которая устанавливает минимальный порог запросов, после которого должен вызываться генератор/фидер. В данном примере, это 1, таким образом, одномоментно работает только один aio-запрос.

Служебные функции и функции тонкой подстройки IO::AIO

IO::AIO имеет несколько функций, которые относятся непосредственно к работе самого модуля IO::AIO, позволяя подстроить различные аспекты его функционирования.

  • IO::AIO::min_parallel — минимальное число одновременно запущенных тредов для обработки запросов, по умолчанию 8.

  • IO::AIO::max_parallel — максимальное число одновременно запущенных тредов для обработки запросов, по умолчанию 32.

  • IO::AIO::max_idle — максимальное число простаивающих тредов

  • IO::AIO::nreqs — возвращает количество готовых, выполняющихся и ожидающих обработку тредов.

Помимо служебных функций IO::AIO::poll_fileno и IO::AIO::poll_cb, описанных в начале статьи, есть ещё несколько функций, связанных с обработкой запросов.

  • IO::AIO::poll_wait — блокирующее ожидание, пока один из запросов не перейдёт в состояние готовности для чтения.

  • IO::AIO::flush — блокирующее ожидание, пока все запросы не будут обработаны, аналогичен выполнению кода:

    IO::AIO::poll_wait, IO::AIO::poll_cb
      while IO::AIO::nreqs;

Данные функции могут быть полезны при использовании IO::AIO без модуля обработки событий.

Заключение

IO::AIO однозначно является самой мощной системой для работы с дисковой подсистемой в Perl. Интеграция с различными модулями обработки событий, огромное число функций для всевозможных нужд, высокая переносимость модуля, тонкая настройка, адаптация к высокой нагрузке являются наиболее заметными преимуществами IO::AIO. Если вы разрабатываете систему, которая генерирует высокую нагрузку на дисковую подсистему и при этом должна оставаться крайне отзывчивой к внешним воздействиям, то лучше IO::AIO вы вряд ли сможете что-то найти.

В статье описаны лишь немногие востребованные функции, исчерпывающий их список с подробным описанием доступен в POD-документации модуля.

Владимир Леттиев


Яндекс.Директ: как мы деплоим наши Perl-web-приложения | Содержание | Использование портов GPIO в Raspberry Pi. Часть 1
Нас уже 1393. Больше подписчиков — лучше выпуски!

Комментарии к статье