Future的接口

Future的接口采用的是基于轮询的形式,而非更加常见的CPS形式:

为了方便叙述,这里先去掉一些噪音,化简了一下现有接口

#![allow(unused)]
fn main() {
/// 异步计算的抽象
trait Future {
    type Output;
    
    /// 提供一个轮询的接口
    fn poll(&mut self, waker: Waker) -> Poll<Self::Output>;
}

/// 轮询的结果
enum Poll<T> {
    Pending,
    Ready(T)
}

#[derive(Clone)]
struct Waker { /*...*/ }
impl Waker {
    /// 当异步计算有进展时调用,以通知轮询方进行下一轮的轮询
    fn wake(self);
}
}

我们需要拿到一个Future的值,我们需要不断地调用poll轮询它:

  • poll返回Pending的时候,表示Future还没完成,且暂时不需要占用当前控制流。从Future的角度来说,则是让出了当前的控制流,让我们可以做一些其它的事情。

    相比于同步阻塞的IO,异步IO当资源未就绪时返回Pending,可以避免陷入内核态,同时能减少上下文切换的开销

  • poll返回Ready的时候,则表示Future的计算已完成。

当然,除了poll以外,还可以取消一个Future,只需要不再轮询它,这时可以选择析构掉Future,释放掉里面的资源(这时对于Future来说,相当于在.await处panic了)。

其中poll还有一个参数Waker,当Future有进展时,就可以调用.wake(),来通知轮询方继续轮询Future。其中Waker,满足SendSync,意味着.wake()方法可以在任何地方调用,比如说把Waker注册给OS,由OS来调用.wake()

注意:这个Waker参数并不一定是自上而下传递下来的,也有可能是poll中间构造的,甚至可以来自于别的运行时的。

于是对一个Future求值最基础的程序就长这样:

#![allow(unused)]
fn main() {
// 轮询future
loop {
    match fut.poll(waker) {
        Pending => {
            // 当异步计算不需要占用当前线程控制流的时候,会让出控制流,于是可以做一些其它事情
        }
        Ready(r) => {
            // 计算完成
            break r
        }
    }
    
    // 当`fut`有进一步进展时,可以进一步轮询。
    if todo!("fut 有进展") {
        continue;
    }
}

}

不过这里补充一点,poll一个Future的策略完全由轮询方来决定,不同的业务场景可以以不同的方式去轮询。Waker不调用的时候也轮询方也可以去poll一个Future;反过来Waker被调用了,也可以不立刻去poll。比如我们可以“马不停蹄”地轮询Future

#![allow(unused)]
fn main() {
loop {
    // 返回`Pending`时,立刻继续`poll`,直到返回`Ready`,
    // 对于不希望线程休眠的程序的运行时,就可以这么设计
    if let Ready(r) = fut.poll(waker) {
        return r;
    }
}
}

作为对比,这里简单地把基于CPS变换的异步计算的抽象列在这里:

#![allow(unused)]
fn main() {
trait Future {
 type Output;

 /// 1.`schedule`和`callback`不应该阻塞,`callback`可能会被注册到一些地方
 /// 2. 当异步计算**完成**时,`callback`就会被调用。
 fn schedule<Callback>(self, callback: Callback)
 where
 	Callback: FnOnce(Self::Output) + Send + Sync
}
}

这时候对Future的求值就和我们在其他语言(比如js)中见到的类似了:

#![allow(unused)]
fn main() {
fut.schedule(|output| {
 // 异步计算完成
 do_something(output);
});
}

其它更复杂的异步接口,在rust里也都可以,(也倾向于)设计成poll_xxx的形式(注:后续所有的waker参数都替换成现在的Context):

  • 异步流:fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
  • 异步读:fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, Error>>
  • 异步写:fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>>

等等。

Future的接口其实是脱胎于Iterator的接口,都是通过**“外部”**轮询的方式来获取下一步结果,同样也可以造出很多不同功能的组合子(也可以叫Adapter)。相对于传统回调的方式,这种方式更符合Rust的哲学——零开销抽象,同时在borrow checker下这样的接口也更易使用一些。

这里更深入的讨论就不展开了,大家有兴趣可以看一下这些资料: