Выпуск 20. Октябрь 2014

От редактора | Содержание | Локальная установка и использование Perl-модулей

Событийно-ориентированное программирование. Введение

Первая статья из цикла по событийно-ориентированному программированию

Данная статья будет первой в цикле статей по СОП (событийно-ориентированному программированию). Конечной целью данного цикла является понимание того, как работают EV и AnyEvent изнутри и как их применять более эффективно в реальных приложениях.

Первая часть цикла будет использовать C, чтобы показать, как работают:

  • неблокирующий ввод-вывод (open/fcntl + O_NONBLOCK и ошибка EWOULDBLOCK (для сокетов));
  • мультиплексирование ввода-вывода (select, poll, epoll (Linux 2.6+), kqueue (BSD), eventfd (Linux 2.6.27+), Input/output completion port (IOCP) (Windows NT 3.5+));
  • как устроены библиотеки libev, libuv и libevent и как с ними работать;
  • как устроен биндинг EV к libev;

Вторая часть цикла будет использовать Perl для того, чтобы описать работу модулей EV и AnyEvent, и, возможно, новый модуль UV.

Согласно Википедии, серверное приложение при использовании СОП (в нашем случае это AnyEvent+EV) реализуется на системном вызове, получающем события одновременно от многих дескрипторов (мультиплексирование). При обработке событий используются исключительно неблокирующие операции ввода-вывода, чтобы ни один дескриптор не препятствовал обработке событий от других дескрипторов. Далее рассмотрим мультиплексирование ввода-вывода.

Мультиплексирование ввода-вывода

Как сказано в книге Роберта Лава «Linux: системное программирование» (2-е издание, стр. 82), мультиплексированный ввод-вывод позволяет приложениям параллельно блокировать несколько файловых дескрипторов и получать уведомления, как только любой из них будет готов к чтению или записи без блокировки. По умолчанию файловые дескприторы (например, сокеты, каналы) блокируют выполнение процесса. Это означает, что когда мы вызываем на файловом дескрипторе функцию, которая не может выполниться немедленно, наш процесс переходит в “спящее” состояние и ждет, когда будет выполнено определенное условие. Для того, чтобы использовать неблокируемый ввод-вывод, необходимо использовать флаг O_NONBLOCK для функций open или fcntl. Пример неблокирующего ввода-вывода мы рассмотрим в следующей статье, когда будем рассматривать сокеты и клиент-серверные приложения.

Для того, чтобы понять, как работают функции для мультиплексирования ввода-вывода, рассмотрим простой пример — программа блокируется, дожидаясь поступления ввода на стандартный ввод (stdin), блокировка может продолжаться вплоть до 5 секунд. Эта программа отслеживает всего один файловый дескриптор, поэтому здесь отсутствует мультиплексный ввод-вывод как таковой. Однако данный пример должен прояснить использование этих системных вызовов:

Функция select()

#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

#define TIMEOUT 5     // Установка таймаута в секундах
#define BUF_LEN 1024  // Длина буфера считывания в байтах

int main() {
  // Объявляем структуру для определения продолжительности времени ожидания
  struct timeval tv;
  // Ожидаем не дольше 5 секунд
  tv.tv_sec   = TIMEOUT;
  tv.tv_usec  = 0;

  // Объявляем набор дескрипторов для чтения
  fd_set readfds;
  // Дожидаемся ввода на stdin.
  FD_ZERO(&readfds); // Сбрасываем все биты в наборе readfds
  FD_SET(STDIN_FILENO, &readfds); // Устанавливаем бит для стандартного ввода (STDIN)

  /**
   * Блокируем процесс, пока не поступят данные на STDIN, либо пока не
   * истечет время TIMEOUT
   * В качестве первого параметра передаем максимальный номер дескриптора
   * + 1 */
  int ret = select(STDIN_FILENO + 1, &readfds, NULL, NULL, &tv);
  if (ret == -1) {
    perror("select");
    return 1;
  } else if (!ret) {
    printf("%d seconds elapsed.\n", TIMEOUT);
    return 0;
  }

  /**
   * После возврата из функции select с помощью функции FD_ISSET проверяем,
   * какие биты в наборе остались установленными
   */
  if (FD_ISSET(STDIN_FILENO, &readfds)) {
    char buf[BUF_LEN+1];

    // Блокировка гарантированно отсутствует
    int len = read(STDIN_FILENO, buf, BUF_LEN);
    if (len == -1) {
      perror("read");
      return 1;
    } else if (len) {
      buf[len] = '\0';
      printf("read: %s\n", buf);
    }

    return 0;
  }

  fprintf(stderr, "Этого быть не должно!\n");
  return 1;
}

Функция poll()

#include <stdio.h>
#include <unistd.h>
#include <poll.h>

#define TIMEOUT 5     // Установка таймаута в секундах
#define BUF_LEN 1024  // Длина буфера считывания в байтах

int main() {
  struct pollfd fds;

  // Отслеживаем ввод на stdin
  fds.fd = STDIN_FILENO;
  fds.events = POLLIN;

  // Выполняем блокирование
  int ret = poll(&fds, 1, TIMEOUT * 1000);
  if (ret == -1) {
    perror("poll");
    return 1;
  }

  if (!ret) {
    printf("%d seconds elapsed.\n", TIMEOUT);
    return 0;
  }

  if (fds.revents & POLLIN) {
    char buf[BUF_LEN+1];

    // Блокировка гарантированно отсутствует
    int len = read(STDIN_FILENO, buf, BUF_LEN);
    if (len == -1) {
      perror("read");
      return 1;
    } else if (len) {
      buf[len] = '\0';
      printf("read: %s\n", buf);
    }
  }

  return 0;
}

Функция epoll()

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>

#define TIMEOUT 5 // Установка таймаута в секундах
#define BUF_LEN 1024  // Длина буфера считывания в байтах
#define MAX_EVENTS 64

int main() {
  /**
   * Задаем контекст опроса событий
   * На выходе получаем файловый дескриптор, ассоциированный с новым
   * экземпляром epoll, который создается в функции epoll_create1().
   * Данный файловый дескриптор не имеет отношения к реальному файлу; это
   * просто указатель, который должен применяться с последующими вызовами,
   * задействующими возможность опроса событий.
   */
  int epfd = epoll_create1(0);
  if (epfd < 0) {
    perror("epoll_create1");
    return 1;
  }

  /**
   * Добавляем файловый дескриптор STDIN_FILENO в контекст опроса epfd
   */
  struct epoll_event event;
  event.data.fd = STDIN_FILENO;
  event.events  = EPOLLIN;

  int epctl = epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &event);
  if (epctl) {
    perror("epoll_ctl");
    return 1;
  }

  /**
   * Ожидаем событие чтения из STDIN_FILENO, ассоциипрованному с экземпляром
   * опроса событий epfd
   */
  struct epoll_event *events = malloc(sizeof(struct epoll_event) * MAX_EVENTS);
  if (!events) {
    perror("malloc");
    return 1;
  }

  int nr_events = epoll_wait(epfd, events, MAX_EVENTS, TIMEOUT * 1000);
  if (nr_events < 0) {
    perror("epoll_wait");
    free(events);
    return 1;
  }
  if (!nr_events) {
    printf("%d seconds elapsed.\n", TIMEOUT);
    return 0;
  }

  for (int i = 0; i < nr_events; ++i) {
    if (events[i].events & EPOLLIN) {
      char buf[BUF_LEN+1];

      // Блокировка гарантированно отсутствует
      int len = read(STDIN_FILENO, buf, BUF_LEN);
      if (len == -1) {
        perror("read");
        return 1;
      } else if (len) {
        buf[len] = '\0';
        printf("read: %s\n", buf);
      }
    }
  }

  free(events);

  return 0;
}

Функция kqueue()

#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdio.h>

#define TIMEOUT 5
#define BUF_LEN 1024

int main() {
  // registration event
  struct kevent change; // events we want to monitor
  struct kevent event;  // event that were triggered

  // create event queue
  int kq = kqueue();
  if (kq == -1) {
    perror("kqueue");
    return 1;
  }

  EV_SET(&change, STDIN_FILENO, EVFILT_READ, EV_ADD, 0, 0, NULL);

  struct timespec ts;
  ts.tv_sec = TIMEOUT;
  ts.tv_nsec = 0;

  while(1) {
    int nev = kevent(kq, &change, 1, &event, 1, &ts);
    if (new < 0) {
      perror("kevent");
      return 1;
    }
    if (nev == 0) {
      printf("%d seconds elapsed.\n", TIMEOUT);
      close(kq);
      return 0;
    }
    if (event.ident == STDIN_FILENO) {
      char buf[BUF_LEN + 1];

      int len = read(STDIN_FILENO, buf, BUF_LEN);
      if (len == -1) {
        perror("read");
        close(kq);
        return 1;
      } else if (len) {
        buf[len] = '\0';
        printf("read: %s\n", buf);
        close(kq);
        return 0;
      }
    }
  }
  close(kq);
  return 0;
}

Я не буду расписывать, как работают функции для каждого примера, думаю по комментариям в коде должно быть все ясно. Лишь вкратце опишу, как работает epoll и kqueue, т.к. они описаны в книгах и статьях меньше всего.

Итак, в примере с epoll мы видим аналогию с AnyEvent — регистрация наблюдателя откреплятеся от самого акта наблюдения. Один системный вызов инициализирует контекст опроса событий (для epoll — вызов функции epoll_create1(), для AnyEvent - создание переменной состояния (condvar)), другой добавляет в контекст наблюдаемые файловые дескрипторы или удаляет их оттуда (для epoll — системный вызов epoll_ctl, для AnyEvent — создание наблюдателей для определенных событий (io, timer и т.д.)). Вызов метода send() в нашем случае означает выполнение события, что влечет за собой выход из колбэка и вызов метода recv()), третий выполняет само ожидание события (для epoll — системный вызов epoll_wait(), для AnyEvent — вызов метода recv() для переменной состояния).

Аналогичный пример на AnyEvent можете посмотреть в статье «Всё, что вы хотели знать об AnyEvent, но боялись спросить» из выпуска 1 данного журнала.

Системный вызов kqueue() аналогичен вызову epoll — сначала регистрируем фильтр событий с помощью функции kqueue(), затем с помощью макроса EV_SET настраиваем структуру change для отслеживания ввода на stdin, затем с помощью kevent отслеживаем события.

Помимо приведенных примеров создания таймера для блокирующего ожидания с помощью мультиплексирования ввода-вывода имеется еще два способа, как это сделать:

  • использовать вызов функции alarm, которая генерирует сигнал SIGALRM, когда истекает заданное время;
  • использование новых параметров сокета — SO_RCVTIMEO и SO_SNDTIMEO (специфична для сокетов).

Функции eventfd и IOCP, а также приведенные выше способы создания таймера будут рассмотрены в следующих статьях. В следующей статье будет создано приложение с использованием мультиплексирования ввода-вывода и неблокирующего ввода-вывода на нескольких дескрипторах с использованием сокетов.

Ссылки, где можно почитать про мультиплексирование ввода-вывода:

select/poll:

  • книга “Роберт Лав — Linux. Системное программирование, 2-е издание, Питер, 2014” — глава 2. Файловый ввод-вывод, Раздел “Мультиплексный ввод-вывод”, стр. 81;
  • книга “Стивенс У. Р. — UNIX: разработка сетевых приложений, 3-е изд., Питер, 2007” — глава 6. “Мультиплексирование ввода-вывода: функции select и poll”, стр. 185;
  • книга “Стивенс Р. — UNIX. Профессиональное программирование, 2-е изд., Символ-Плюс, 2007” — глава 14. Расширенные операции ввода-вывода, раздел 14.5 Мультиплексирование ввода-вывода, стр. 558
  • книга “Michael Kerrisk — The Linux Programming Interface, 2010” — глава 63. Alternative I/O Models, раздел 63.2 I/O Multiplexing, стр. 1374

epoll:

  • книга “Роберт Лав — Linux. Системное программирование, 2-е издание, Питер, 2014” — глава 4. Расширенный файловый ввод-вывод, Раздел “Опрос событий”, стр. 131
  • книга “Michael Kerrisk — The Linux Programming Interface, 2010” — глава 63. Alternative I/O Models, раздел 63.4 The epoll API, стр. 1399
  • Epoll

kqueue:

Вячеслав Коваль


От редактора | Содержание | Локальная установка и использование Perl-модулей
Нас уже 1393. Больше подписчиков — лучше выпуски!

Комментарии к статье