异步IO与Poller

既然标准库没有异步的IO接口,我们把目标转向OS自身提供的异步IO接口,比如linux下的epollepoll主要提供了三个接口:

/// 创建新的epoll实例 int epoll_create(int size); /// 向 epoll 实例中添加、修改或删除文件描述符 int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event); /// 等待 epoll 实例中的文件描述符准备就绪。 /// 这个函数会阻塞调用线程,到存在文件描述符就绪。 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

核心思路是:

  1. 把“关心”的IO的fd添加到epoll实例中(比如关心fd是否能写)
  2. 然后调用epoll_wait,阻塞调用线程到存在关心的fd已就绪时,线程继续运行。
  3. 这时候就能直接给已就绪的fd进行IO操作。

相对于标准库同步阻塞的IO操作来说,epoll等待fd就绪 这一步单独抽离了出来(也就是上面的第二步),允许同时监听多个fd,且允许超时。这就允许用额外一个线程就可以处理多个IO事件,或者是通过时间片的方式来处理IO事件。

不过要注意的是,epoll并不支持普通的文件,如果把文件fd添加到epoll里会返回EPERM错误。普通的文件读写需要用到aio或者使用一个线程池把同步转成异步。 epoll目前支持的fd是:

  • 网络socket,比如说TCP, UDP等
  • timerfd
  • signalfd
  • inotify
  • pipe
  • 子进程
  • 终端相关,比如stdin, stdout, stderr
  • epoll本身(可以加到其它epoll实例中)
  • 等等

不同的操作系统都有类似的接口,rust里已经有crate进行统一封装,比如说mio, polling。比如说polling提供提了以下接口,大体结构与epoll一致:

#![allow(unused)] fn main() { /// Poller实例, /// * linux下是epoll实例 /// * mac, iOS下是kqueue实例 /// * windows下是iocp实例 struct Poller { /*...*/ } impl Poller { /// 创建一个poller实例 pub fn new() -> Result<Poller>; /// 往poller实例中添加fd,及关心的事件 pub unsafe fn add( &self, /// unix下是fd, windows下是socket source: impl AsRawSource, interest: Event ) -> Result<()>; /// 更新fd关心的事件, /// 比如关心fd是否可读,改成是否可写 pub fn modify(&self, source: impl AsSource, interest: Event) -> Result<()>; /// 删除关心的fd pub fn delete(&self, source: impl AsSource) -> Result<()>; /// 阻塞调用线程直到 /// * 存在关心的fd就绪,或 /// * 超时 /// * 唤醒poller实例 pub fn wait( &self, events: &mut Events, /// * `None`为不设置超时 /// * `Some(0)`为不阻塞 timeout: Option<Duration> ) -> Result<usize>; /// 唤醒poller实例 pub fn notify(&self) -> Result<()>; } }

这是polling库的Example(截自README)

#![allow(unused)] fn main() { use polling::{Event, Poller}; use std::net::TcpListener; // Create a TCP listener. let socket = TcpListener::bind("127.0.0.1:8000")?; socket.set_nonblocking(true)?; let key = 7; // Arbitrary key identifying the socket. // Create a poller and register interest in readability on the socket. let poller = Poller::new()?; poller.add(&socket, Event::readable(key))?; // The event loop. let mut events = Vec::new(); loop { // Wait for at least one I/O event. events.clear(); poller.wait(&mut events, None)?; for ev in &events { if ev.key == key { // Perform a non-blocking accept operation. socket.accept()?; // Set interest in the next readability event. poller.modify(&socket, Event::readable(key))?; } } } }