一个完整的Executor的例子

这里的Executor提供两个核心的接口:

#![allow(unused)]
fn main() {
pub struct Executor<'a> {
    /// 这里用的是异步mpmc channel
    tx: Sender<Runnable>,
    rx: Receiver<Runnerble>,
    
    /// Makes the `'a` lifetime invariant.
    _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
}

impl<'a> Executor<'a> {
    // 创建一个Task,并丢到Executor中
    pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T>;
    
    // 当executor里有新的task时,就会拿出来执行
    pub async fn execute(&self);
}
}

有了async_task我们就很容易实现这两个接口:

#![allow(unused)]
fn main() {
 pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
     let tx = self.tx.clone();
     let schedule = move |runnable| tx.send(runnable).unwrap();
     
     // 创建一个task,并直接丢到队列里
     let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
     runnable.schedule();
     
     task
}

pub async fn execute(&self) {
    // 不断从队列里取task,并轮询
        let mut rx = self.rx.stream();
        while let Some(runnable) = rx.next().await {
            runnable.run();
        }
}
}

注:同Reactor::event_loopExecutor::execute也可以集成到block_on中。

有了Executor我们就可以利用多个线程来并发执行异步代码了:

#![allow(unused)]
fn main() {
let reactor = Reactor::new();
let executor = Executor::new();

thread::scope(|s| {
    // reactor io线程,用于处理IO事件
    s.spawn(|| reactor.event_loop().unwrap());
    
    // 起8个线程作为线程池,来并发执行task
    for _ in 0..8 {
        s.spawn(|| {
            block_on(executor.execute());
        });
    }
    
    // 创建异步任务,丢到Executor中执行
    executor.spawn(async {
        let mut buf = [0; 1000];
        let mut buf = &mut buf[..];
        let stdin = Stdin::new(reactor).unwrap();

        while buf.len() > 0 {
            let x = stdin.read(buf).await.unwrap();
            println!("from stdin: {:?}", String::from_utf8_lossy(&buf[..x]));

            buf = &mut buf[x..];
        }
    }).detach();
    
    // 这个也会丢到Executor,然后被并发执行
    executor.spawn(async {
        yield_now().await;
        println!("yield 1");
        yield_now().await;
        println!("yield 2");
    }).detach();
    
});
}