Skip to content

Rust 异步

Rust 的 Future 异步和 JS 的 Promise 还是有一点点像的, 比如 Future 完成后会返回 Poll::Ready(),未完成时返回 Poll::Pending

Promise 没完成的时候会返回 pending, 完成了返回 resolve

两者都需要不断去看任务队列里是否有未完成的任务,有就去执行,没有就循环等待

实现一个异步执行器

Rust 本身是没有异步运行时的,但是 Rust 把异步的标准给定义在了语言中,运行时交给社区来实现,而 JS 是在 V8 引擎中就内置了异步运行时。

现在我们来自己实现一个简易版的异步运行时(后面我们会实现一个简易版的 tokio 异步运行时),首先需要引入 futures 这个包

toml
[package]
edition = "2021"
name = "async_excutor"
version = "0.1.0"

[dependencies]
futures = "0.3"

原理

所有的异步任务都需要一个执行器去推动执行

  • 本质上就是通过 sync_channel 来进行任务发送和接收
  • Spawner 把 异步任务塞到任务通道里
  • Excutor 去不断去从看任务通道中是否有可以执行的异步任务,如果有就取出来执行,没有就把它塞回去等待下一轮唤醒

一个异步定时器

rust
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// 在Future和等待的线程间共享状态
struct SharedState {
    /// 定时(睡眠)是否结束
    completed: bool,
    /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
    waker: Option<Waker>,
}

impl Future for TimerFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
            // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
            // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
            // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // 创建新线程
        let thread_shared_state = shared_state.clone();

        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

有了这个异步的定时器之后,我们就需要一个执行器去执行它

执行器代码

rust
use {
    futures::{
        future::BoxFuture,
        task::{waker_ref, ArcWake},
        Future, FutureExt,
    },
    std::{
        sync::mpsc::{sync_channel, Receiver, SyncSender},
        sync::{Arc, Mutex},
        task::Context,
        time::Duration,
    },
    timer_future::TimerFuture,
};

/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
    /// 进行中的Future,在未来的某个时间点会被完成
    /// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
    /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
    /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
    /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    /// 可以将该任务自身放回到任务通道中,等待执行器的poll
    task_sender: SyncSender<Arc<Task>>,
}

/// 任务执行器,负责从通道中接收任务然后执行
struct Excutor {
    ready_quene: Receiver<Arc<Task>>,
}

/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });

        self.task_sender.send(task).expect("任务队列已满");
    }
}

// 当任务实现了 ArcWake 特征后,它就变成了 Waker ,在调用 wake() 对其唤醒后会将任务复制一份所有权( Arc ),
// 然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 poll 执行
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("任务队列已满");
    }
}

impl Excutor {
    fn run(&self) {
        while let Ok(task) = self.ready_quene.recv() {
            // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 基于任务自身创建一个 `LocalWaker`
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
                // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
                if future.as_mut().poll(context).is_pending() {
                    // Future还没执行完,因此将它放回任务中,等待下次被poll
                    *future_slot = Some(future);
                }
            }
        }
    }
}

fn new_executor_and_spawner() -> (Excutor, Spawner) {
    // 任务通道允许的最大缓冲数(任务队列的最大长度)
    // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
    const MAX_QUENED_TASKS: usize = 10_000;

    let (task_sender, ready_quene) = sync_channel(MAX_QUENED_TASKS);
    (Excutor { ready_quene }, Spawner { task_sender })
}

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // 生成一个任务
    spawner.spawn(async {
        println!("start!");
        // 创建定时器Future,并等待它完成
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done");
    });

    spawner.spawn(async {
        println!("test!");
    });

    spawner.spawn(async {
        println!("test2!");
    });

    drop(spawner);

    executor.run();
}

每天进步一丢丢