Выпуск 17. Июль 2014
← Яндекс.Директ: как мы деплоим наши Perl-web-приложения | Содержание | Использование портов GPIO в Raspberry Pi. Часть 1 →Асинхронный ввод/вывод с IO::AIO
В UNIX-системах операции чтения и записи файлов, как правило, являются синхронными, т.е. вызовы read()
и write()
не возвращаются, пока актуальные данные не поступят в/из буфера ядра. Все дисковые операции на несколько порядков медленнее операций с памятью, поэтому однопоточное приложение, проводящее операции с файлами, может значительное время проводить в ожидании, что может негативно сказаться на обработке асинхронных событий (сигналы, новые данные в сокетах/каналах). Модуль IO::AIO
предоставляет набор функций, которые позволяют асинхронно выполнять операции ввода/вывода, не блокируя основную нить выполнения процесса.
Реализации асинхронного ввода/вывода
Существует несколько различных реализаций асинхронного I/O. Стандарт POSIX.1b определяет набор асинхронных функций и их интерфейс, но не регламентирует способ реализации. В библиотеке GNU libc эти функции реализованы на пользовательском уровне, когда запрос на ввод/вывод обрабатывается в выделенном треде, а информация о результатах может доставляться отправкой сигнала или инициацией нового треда.
В ядре Linux, начиная с версии 2.6, появился набор системных вызовов, которые можно использовать в программах непосредственно или через обёртку в виде библиотеки libaio. Реализация aio-функций в ядре Linux имеет ограничения: требуется открытие файлов с флагом O_DIRECT
, что должно поддерживаться файловой системой, а также полностью отключается дисковый кэш.
Модуль IO::AIO
был создан Марком Леманном на основе C-библиотеки libeio, которая также, как и glibc, использует пул тредов для выполнения обычных синхронных операций ввода/вывода, не блокируя основной поток исполнения программы. Такой подход обеспечивает высокую переносимость библиотеки, вплоть до возможности использования на платформе Windows.
API IO::AIO
API модуля IO::AIO
предоставляет множество функций для работы с файлами и директориями, функции для интеграции с модулем обработки событий, а также контролем исполнения. Некоторые функции могут успешно работать и с сокетами, и с каналами, но по эффективности они будут уступать аналогичным функциям из модуля обработки событий (например, EV).
Прежде чем начать описание функций и разбор примеров, необходимо оговориться о способе интеграции IO::AIO
с модулем обработки событий. Для служебных целей IO::AIO
создаёт канал, который используется для получения данных из тредов. Как только данный канал становится доступным для чтения, т.е. пришли данные какого-либо запроса, необходимо выполнить функцию обработки этих данных. Дескриптор канала возвращает функция IO::AIO::poll_fileno()
, а функция обработки это IO::AIO::poll_cb()
. Таким образом, например, при работе совместно EV
требуется установить IO-страж:
my $aio = EV::io IO::AIO::poll_fileno, EV::READ, \&IO::AIO::poll_cb;
В общем случае рекомендуется использовать модуль AnyEvent
, в таком случае интеграция выполняется тривиально, просто загрузив модуль AnyEvent::AIO
:
use AnyEvent;
use IO::AIO;
# Интеграция IO::AIO с AnyEvent
use AnyEvent::AIO;
Все дальнейшие примеры будут использовать такую шапку и для простоты она будет опускаться.
Открытие, чтение и запись файла
Для открытия файла предназначена функция aio_open
, которая принимает в качестве аргументов путь к файлу, битовую маску из флагов (список возможных флагов можно подсмотреть в Fcntl
), права для создаваемого файла (меняется в соответствии с umask текущего процесса).
Чтение файла производится функцией aio_read
, которая читает по заданному файловому дескриптору и смещению указанное количество данных в буфер по заданному смещению.
my $cv = AE::cv;
# открытие только на чтение
aio_open "/etc/passwd", IO::AIO::O_RDONLY, 0, sub {
# полученный файловый дескриптор
my $fh = shift or die $!;
# размер файла
my $size = -s $fh;
my $contents;
# чтение
aio_read $fh, 0, $size, $contents, 0, sub {
# прочитанный размер данных
my $readed = shift;
# -1 - признак ошибки
die "ошибка чтения: $!" if $readed == -1;
die "Не удалось прочитать весь файл"
if $readed != $size;
close $fh;
# печать содержимого
print $contents;
$cv->send;
}
};
$cv->recv;
Запись файла выполняется функцией aio_write
, с аналогичным набором аргументов, как и в aio_read
.
my $cv = AE::cv;
# открытие на запись и при необходимости
# создание нового файла с правами 0666
aio_open "hello", IO::AIO::O_CREAT | IO::AIO::O_WRONLY, 0666, sub {
# полученный файловый дескриптор
my $fh = shift or die $!;
my $contents = "hello, world!" ;
# запись из буфера $contents
aio_write $fh, 0, length($contents), $contents, 0, sub {
# записанный размер данных
my $len = shift;
# -1 - признак ошибки
die "ошибка записи: $!" if $len == -1;
die "Не удалось записать весь файл"
if $len != length($contents);
close $fh;
$cv->send;
}
};
$cv->recv;
Чтение каталогов
Довольно часто возникает задача получения списка файлов в каталоге, для этих целей существует несколько асинхронных функций.
aio_readdir
выполняет сразу три операции: открывает каталог, читает содержимое и закрывает каталог. Ссылка на список полученных элементов передаётся в функцию обратного вызова. Этот список не содержит специальных каталогов .
и ..
.
my $cv = AE::cv;
# чтение каталога
aio_readdir "/etc/init.d", sub {
# полученный список файлов
my $list = shift or die $!;
print join "\n", @$list;
$cv->send;
};
$cv->recv;
Функция aio_scandir
по аналогии с aio_readdir
выполняет чтение содержимого каталога, но при этом также пытается определить тип файла. Вызываемая функция обратного вызова получает в случае успеха два списка: список директорий и список остальных файлов.
my $cv = AE::cv;
# чтение каталога
aio_scandir "/etc", 0, sub {
my ($dirs, $nondirs) = @_ or die $!;
print "каталоги: @$dirs\n";
print "всё остальное: @$nondirs\n";
$cv->send;
};
$cv->recv;
В данном примере aio_scandir
также имеет второй параметр, который определяет, как много выделенных тредов будет занято сканированием (выполнять stat
на файлы). Значение 0
или меньше означает, что будет использоваться количество тредов по умолчанию: 4.
aio_scandir
позволяет, например, реализовать поиск файла.
use File::Spec::Functions;
sub scan {
my $cb = pop;
my $cv = pop;
my @dirs = @_;
my $scan;
# Функция-замыкание, которая вызывает саму себя до тех пор,
# пока не останется непросканированных каталогов
( $scan = sub {
my $dir = shift @dirs;
aio_scandir $dir, 0, sub {
my ($dirs, $nondirs) = @_;
# добавим новые каталоги для сканирования
push @dirs, map { catfile $dir, $_ } @$dirs if $dirs;
# вызываем пользовательскую функцию обратного вызова
$cb->($dir, $nondirs);
# продолжаем сканирование, если есть что
if (@dirs) {
$scan->()
} else {
$cv->()
}
};
} )->();
};
my $cv = AE::cv;
# Регулярное выражение для поиска файла
my $search = qr/foo|bar/;
# Поиск в каталоге /etc
scan "/etc", $cv, sub {
my ($dir, $files) = @_;
for my $file (@$files) {
printf "Found match: %s\n", catfile $dir, $file
if $file =~ $search
}
};
$cv->recv;
В данном примере функция scan
выполняет рекурсивный поиск по списку каталогов. Предпоследний параметр — функция, которая вызывается при завершении сканирования (в данном случае — это условная переменная). Последний параметр — это функция обратного вызова, которая вызывается после сканирования каждого каталога. Ей передаются два параметра: $dir
— текущий каталог, $files
— ссылка на массив с файлами в текущем каталоге. В данном примере функция ищет имена файлов по регулярному выражению /foo|bar/
.
Операции с двумя дескрипторами
Иногда возникает потребность в пересылке данных из одного дескриптора в другой. Например, веб-сервер должен отправить локальный файл через сетевой сокет клиенту. В этом случае удобно использовать функцию aio_sendfile
, которая при возможности использует системный вызов sendfile
, позволяя избежать копирования данных из ядра в пользовательские буфер и обратно.
use AnyEvent::Socket;
my $cv = AE::cv;
# Создаём tcp-сервер, который слушает порт 8080
# и отправляет всем подключившимся клиентам
# файл /etc/passwd
tcp_server undef, 8080, sub {
my $socket = shift or die $!;
# откроем /etc/passwd на чтение
aio_open "/etc/passwd", IO::AIO::O_RDONLY, 0, sub {
my $fh = shift or die $!;
my $size = -s $fh;
my $sendfile;
my $offset = 0;
# Функция-замыкание для отправки данных
( $sendfile = sub {
# Копирование из файла в сокет
aio_sendfile $socket, $fh, $offset, $size, sub {
# Произошла ошибка
if ( $_[0] == -1 ) {
# Не смертельно, можно повторить
if ( $!{EAGAIN} || $!{EINTR} ) {
$sendfile->();
}
# всё-таки что-то не так
else {
die $!;
}
}
# Отправлено меньше чем надо, повторяем
elsif ( $_[0] < $size ) {
$offset += $_[0];
$size -= $_[0];
$sendfile->();
}
# Файл успешно отправлен, конец сеанса
else {
close $socket;
close $fh;
}
};
} )->()
};
};
$cv->recv;
В данном примере дополнительно рассмотрено, как обрабатывать ошибки: если вызов aio_sendfile
вернул -1
, то необходимо проверить код ошибки, если, например, это EAGAIN
, то значит сокет ещё не готов принять данные и надо повторить вызов позже.
Операции копирования, перемещения и удаления
Представлен набор асинхронных функций, который позволяет производить типичные операции по манипуляциям с файлами и каталогами:
aio_copy
— производит копирование файла (директории не поддерживаются) из пути источника в путь назначения. Функция обратного вызова получает 0 в случае успеха и -1 в случае ошибки.aio_move
— перемещение файла. Это композитная функция, которая пытается выполнить вызовrename
и в случае, если это невозможно (путь назначения на другом разделе) выполняет копированиеaio_copy
.aio_unlink
— удаление файла по заданному пути.aio_rmtree
— удаление дерева каталогов по заданному пути.aio_copy "/mnt/backup/etc/passwd", "/etc/passwd", sub { $_[0] and die $!; aio_copy "/mnt/backup/etc/shadow", "/etc/shadow", sub { $_[0] and die $!; aio_rmtree "/mnt/backup/etc"; }; };
Файловые пути и текущий каталог
Многие aio-функции оперируют с файловыми путями. Модуль IO::AIO
требует, чтобы все используемые пути передавались как байтовые строки, т.е. если путь передан в программу как строка символов, необходимо принять меры для её приведения к байтовой строке, иначе вызов aio-функции вызовет исключение.
# строки кода - набор символов в кодировке UTF-8
use utf8;
use Encode;
my $path = "/путь/юникод/в/кодировке/UTF-8";
# файловый путь кодируется в байтовую строку
aio_stat encode_utf8($path), sub {
...
};
IO::AIO
рекомендует использовать абсолютные пути во всех функциях. Это связано с тем, что при смене текущего каталога вполне возможна ситуация, что какой-то запущенный тред работает со старым рабочим каталогом, что может привести впоследствии к ошибке. Разумеется, если вы уверены, что в течении работы программы текущий каталог не меняется, то использование относительных путей вполне легально.
Если использование относительных путей востребованно и требуется контроль текущего каталога при выполнении aio-функций, то в IO::AIO
для подобных целей существует функция aio_wd
, формирующая объект класса IO::AIO::WD
, который можно использовать в aio-функциях для обозначения текущего каталога.
aio_wd "/etc", sub {
my $etcdir = shift;
aio_stat [$etcdir, "passwd"], sub {
...
}
};
Переменная $etcdir
сохраняет информацию о каталоге “/etc”, на поддерживающих системах происходит открытие файлового дескриптора каталога, что ускоряет разрешение имён относительного данного каталога. aio_stat
в качестве параметра-пути получает ссылку на массив, где первый элемент это объект текущего каталога, а второй элемент — относительный путь. Все функции IO::AIO
, которые работают с путями, поддерживают подобный формат вызова. Таким образом, можно всегда быть уверенным, что вызов функции с относительным путём будет проходить в контексте выбранного текущего каталога, даже если текущий каталог самого приложения изменился.
Запросы
Каждая aio-функция возвращает объект запроса IO::AIO::REQ
, если вызывается не в пустом контексте. Данный объект ассоциируется с выполняемой операцией. Каждая операция проходит пять состояний:
Готовность — сразу после создания запроса, ожидание выполнения тредом.
Выполнение — запрос принят тредом, и в данный момент тред выполняет его.
Ожидание — запрос выполнен, и тред ожидает, когда будет обработан результат.
Результат — результат запроса обрабатывается в
poll_cb
, который вызывает пользовательскую функцию обратного вызова.Выполнено — запрос выполнен, все связанные структуры данных освобождены.
До того как запрос перейдёт в стадию выполнению, существует возможность его отменить с помощью вызова метода cancel
объекта запроса:
# создать запрос на удаление
# какого-нибудь ненужного каталога
my $req = aio_rmtree "/", sub {
print "all your base are belong to us\n";
};
# отменить запрос, если не поздно
$req->cancel;
Также можно задать или переопределить пользовательскую функцию обратного вызова запроса, до того как начнётся обработка результата запроса:
# вызов stat на файле
my $req = aio_stat "/etc/passwd";
# установка колбека
$req->cb( sub {
$_[0] and die "ошибка stat: $!";
printf "размер %i\n", -s _;
});
Группы запросов
Несколько запросов можно объединить в группы, чтобы иметь возможность отследить завершение всех запросов. Для этих целей создана функция aio_group
, которая принимает один параметр — функцию обратного вызова, которая вызывается, когда завершены все запросы группы. Возвращается объект класса IO::AIO::GRP
, в который можно добавлять запросы с помощью метода add
:
my $cv = AE::cv;
# Создаём группу
my $grp = aio_group sub {
print "выполнены все запросы\n";
$cv->send
};
# Добавляем первый запрос
add $grp aio_stat "/etc/passwd", sub {
printf "размер passwd %i\n", -s _ unless $_[0];
};
# Добавляем второй запрос
add $grp aio_stat "/etc/group", sub {
printf "размер group %i\n", -s _ unless $_[0];
};
$cv->recv;
Группы, как и сами запросы, можно отменять с помощью вызова cancel
. В этом случае отменяются все запросы, входящие в группу. Группы также можно добавлять в другие группы.
В IO::AIO
реализована возможность динамического добавления запросов в группу. Каждой группе можно назначить так называемый генератор или фидер (feeder) — функцию, которая вызывается, как только количество запросов в группе снизится ниже определённого уровня. Функция может добавить новый запрос в группу, таким образом “питая” группу.
Например, описанную выше задачу поиска файла по заданному регулярному выражению можно реализовать с помощью группы и генератора:
use File::Spec::Functions;
sub scan {
my $cb = pop;
my $cv = pop;
my @dirs = @_;
# Группа запросов, в качестве функции обратного вызова
# передаётся условная переменная $cv
my $grp = aio_group $cv;
# Установим лимит на минимальное число запросов в группе,
# после которого вызывается фидер
limit $grp 1;
# Генератор/фидер запросов
feed $grp sub {
my $dir = shift @dirs // return;
# Добавляем запрос в группу
add $grp aio_scandir $dir, 0, sub {
my ($dirs, $nondirs) = @_;
push @dirs, map { catfile $dir, $_ } @$dirs if $dirs;
# Вызываем функцию пользователя
$cb->($dir, $nondirs);
};
};
};
my $cv = AE::cv;
# Регулярное выражение для поиска файла
my $search = qr/foo|bar/;
# Поиск в каталоге /etc
scan "/etc", $cv, sub {
my ($dir, $files) = @_;
for my $file (@$files) {
printf "Found match: %s\n", catfile $dir, $file
if $file =~ $search
}
};
$cv->recv;
В данном примере мы модифицировали функцию scan
, создав группу, которую снабжает новыми запросами функция-генератор. Также обратите внимание, что используется специальная функция limit
, которая устанавливает минимальный порог запросов, после которого должен вызываться генератор/фидер. В данном примере, это 1, таким образом, одномоментно работает только один aio-запрос.
Служебные функции и функции тонкой подстройки IO::AIO
IO::AIO
имеет несколько функций, которые относятся непосредственно к работе самого модуля IO::AIO
, позволяя подстроить различные аспекты его функционирования.
IO::AIO::min_parallel
— минимальное число одновременно запущенных тредов для обработки запросов, по умолчанию 8.IO::AIO::max_parallel
— максимальное число одновременно запущенных тредов для обработки запросов, по умолчанию 32.IO::AIO::max_idle
— максимальное число простаивающих тредовIO::AIO::nreqs
— возвращает количество готовых, выполняющихся и ожидающих обработку тредов.
Помимо служебных функций IO::AIO::poll_fileno
и IO::AIO::poll_cb
, описанных в начале статьи, есть ещё несколько функций, связанных с обработкой запросов.
IO::AIO::poll_wait
— блокирующее ожидание, пока один из запросов не перейдёт в состояние готовности для чтения.IO::AIO::flush
— блокирующее ожидание, пока все запросы не будут обработаны, аналогичен выполнению кода:IO::AIO::poll_wait, IO::AIO::poll_cb while IO::AIO::nreqs;
Данные функции могут быть полезны при использовании IO::AIO
без модуля обработки событий.
Заключение
IO::AIO
однозначно является самой мощной системой для работы с дисковой подсистемой в Perl. Интеграция с различными модулями обработки событий, огромное число функций для всевозможных нужд, высокая переносимость модуля, тонкая настройка, адаптация к высокой нагрузке являются наиболее заметными преимуществами IO::AIO
. Если вы разрабатываете систему, которая генерирует высокую нагрузку на дисковую подсистему и при этом должна оставаться крайне отзывчивой к внешним воздействиям, то лучше IO::AIO
вы вряд ли сможете что-то найти.
В статье описаны лишь немногие востребованные функции, исчерпывающий их список с подробным описанием доступен в POD-документации модуля.
← Яндекс.Директ: как мы деплоим наши Perl-web-приложения | Содержание | Использование портов GPIO в Raspberry Pi. Часть 1 →