Reactor的设计
参考
我们借助polling
库,来实现一个reactor,提供统一管理IO的注册、IO事件监听以及唤醒的功能。
基础的设计是:
- 有一个event loop不断监听注册在Reactor中的IO事件,当IO事件有响应时,调用对应的Waker
- 被
block_on
求值的IO future,向Reactor注册IO事件(包括waker)
这里Reactor
最简单提供两个接口,event_loop
和register_readable
:
#![allow(unused)]
fn main() {
// Reactor实例
pub struct Reactor {
// Poller实例
poller: Poller,
// 存储
repo: Mutex<Slab<Arc<IOEvent>>>,
}
// 代表一个IO
struct IOEvent {
fd: RawFd,
key: usize,
is_ready: AtomicBool,
waker: AtomicWaker,
}
impl Reactor {
// IO事件循环
// 当存在fd就绪时,调用注册的waker
pub fn event_loop(&self) -> io::Result<()>;
// 注册一个可读事件
// 当fd可读时返回
pub async fn register_readable(&self, fd: BorrowedFd<'_>) -> io::Result<()>;
}
}
先来看看event_loop
的实现,其做的事情就是:
- 等待注册的IO就绪,
- 调用对应的waker
#![allow(unused)]
fn main() {
pub fn event_loop(&self) -> io::Result<()> {
let mut events = Events::new();
loop {
events.clear();
// 等待注册到poller的IO就绪
match self.poller.wait(&mut events, None) {
Ok(0) => {},
Ok(_) => {
let repo = self.repo.lock();
for ev in events.iter() {
// 调用waker
if let Some(event) = repo.get(ev.key) {
event.waker.take().map(Waker::wake);
event.is_ready.swap(true, Ordering::Release);
}
}
Ok(())
}
Err(err) if err.kind() == ErrorKind::Interrupted => {},
Err(err) => return Err(err),
}
}
Ok(())
}
}
然后这里的注册的代码,写为一个异步函数,也方便通过RAII的方式去反注册:
#![allow(unused)]
fn main() {
// 注册可读fd,直到fd就绪
pub async fn register_readable(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
// IO RAII
struct IOGuard<'r> {
reactor: &'r Reactor,
event: Arc<IOEvent>,
}
impl<'r> IOGuard<'r> {
// 构造FdGuard,并将fd注册到reactor中
fn new(reactor: &'r Reactor, fd: BorrowedFd<'_>) -> io::Result<Self> {
let event = {
let mut repo = reactor.repo.lock();
let entry = repo.vacant_entry();
let event = Arc::new(IOEvent {
fd: fd.as_raw_fd(),
key: entry.key(),
is_ready: AtomicBool::new(false),
waker: AtomicWaker::new(),
});
entry.insert(event.clone());
event
};
// fd注册到poller里
if let Err(err) =
unsafe { reactor.poller.add(event.fd, Event::readable(event.key)) }
{
let mut repo = reactor.repo.lock();
repo.remove(event.key);
return Err(err);
}
Ok(Self { reactor, event })
}
}
// 当完成或者取消时自动反注册
impl Drop for IOGuard<'_> {
fn drop(&mut self) {
let mut repo = self.reactor.repo.lock();
repo.remove(self.event.key);
self.reactor
.poller
.delete(unsafe { BorrowedFd::borrow_raw(self.event.fd) })
.ok();
}
}
let io_guard = IOGuard::new(self, fd)?;
poll_fn(|cx| {
let event = &*io_guard.event;
// 等待reactor唤醒并改变状态
if event.is_ready.load(Ordering::Acquire) {
return Poll::Ready(Ok(()));
}
// 每次poll别忘记更新waker
event.waker.register(cx.waker());
Poll::Pending
})
.await
}
}
这个register_readable
是用于IO future的实现的,这里仍然以stdin为例子:
#![allow(unused)]
fn main() {
// 异步的stdin
pub struct Stdin<'r> {
reactor: &'r Reactor,
stdin: io::Stdin,
}
impl<'r> Stdin<'r> {
pub fn new(reactor: &'r Reactor) -> io::Result<Self> {
let this = Self {
reactor,
stdin: io::stdin(),
};
// 设置为异步的IO,
// 之后阻塞时通过Read::read返回的错误码为WouldBlock
rustix::io::ioctl_fionbio(&this.stdin, true)?;
Ok(this)
}
pub async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
loop {
// 尝试读stdin
match self.stdin.lock().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
// 如果被阻塞则等待stdin就绪
self.reactor.register_readable(self.stdin.as_fd()).await?;
}
}
}
}
有了Reactor
,我们就可以单独跑一个线程来管理多个IO的注册与唤醒了,到这里才能体现出异步在IO密集的应用上的优势。虽然和前面的stdin实现都创建了一个额外的线程处理IO事件,但这里可以同时处理多个不同类型的IO,实现了所谓的IO的“复用”。
#![allow(unused)]
fn main() {
let reactor = Reactor::new();
thread::scoped(|s| {
// reactor io线程,用于处理IO事件
s.spawn(|| reactor.event_loop().unwrap());
// 其它线程拿到Reactor可以用于创建IO对象
s.spawn(|| {
block_on(async {
let mut buf = [0; 1000];
let mut buf = &mut buf[..];
let stdin = Stdin::new(reactor)?;
while buf.len() > 0 {
let x = stdin.read(buf).await?;
println!("from stdin: {:?}", String::from_utf8_lossy(&buf[..x]));
buf = &mut buf[x..];
yield_now().await;
println!("yielding");
}
println!("end");
Ok(())
})
});
});
}
注:其实reactor的事件循环可以和block_on的轮询集成到一个循环里,这样甚至不需要多开一个线程。通过向reactor里注册一个特定的fd,在waker里进行IO操作,可以唤醒reactor。