Rust 异步
Rust 的 Future 异步和 JS 的 Promise 还是有一点点像的, 比如 Future 完成后会返回 Poll::Ready(),未完成时返回 Poll::Pending
Promise 没完成的时候会返回 pending, 完成了返回 resolve
两者都需要不断去看任务队列里是否有未完成的任务,有就去执行,没有就循环等待
实现一个异步执行器
Rust 本身是没有异步运行时的,但是 Rust 把异步的标准给定义在了语言中,运行时交给社区来实现,而 JS 是在 V8 引擎中就内置了异步运行时。
现在我们来自己实现一个简易版的异步运行时(后面我们会实现一个简易版的 tokio 异步运行时),首先需要引入 futures 这个包
toml
[package]
edition = "2021"
name = "async_excutor"
version = "0.1.0"
[dependencies]
futures = "0.3"
原理
所有的异步任务都需要一个执行器去推动执行
- 本质上就是通过 sync_channel 来进行任务发送和接收
- Spawner 把 异步任务塞到任务通道里
- Excutor 去不断去从看任务通道中是否有可以执行的异步任务,如果有就取出来执行,没有就把它塞回去等待下一轮唤醒
一个异步定时器
rust
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// 在Future和等待的线程间共享状态
struct SharedState {
/// 定时(睡眠)是否结束
completed: bool,
/// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
// 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
// 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
// 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 创建新线程
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
有了这个异步的定时器之后,我们就需要一个执行器去执行它
执行器代码
rust
use {
futures::{
future::BoxFuture,
task::{waker_ref, ArcWake},
Future, FutureExt,
},
std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
},
timer_future::TimerFuture,
};
/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
/// 进行中的Future,在未来的某个时间点会被完成
/// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
/// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
/// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
/// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// 可以将该任务自身放回到任务通道中,等待执行器的poll
task_sender: SyncSender<Arc<Task>>,
}
/// 任务执行器,负责从通道中接收任务然后执行
struct Excutor {
ready_quene: Receiver<Arc<Task>>,
}
/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("任务队列已满");
}
}
// 当任务实现了 ArcWake 特征后,它就变成了 Waker ,在调用 wake() 对其唤醒后会将任务复制一份所有权( Arc ),
// 然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 poll 执行
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("任务队列已满");
}
}
impl Excutor {
fn run(&self) {
while let Ok(task) = self.ready_quene.recv() {
// 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 基于任务自身创建一个 `LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
if future.as_mut().poll(context).is_pending() {
// Future还没执行完,因此将它放回任务中,等待下次被poll
*future_slot = Some(future);
}
}
}
}
}
fn new_executor_and_spawner() -> (Excutor, Spawner) {
// 任务通道允许的最大缓冲数(任务队列的最大长度)
// 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
const MAX_QUENED_TASKS: usize = 10_000;
let (task_sender, ready_quene) = sync_channel(MAX_QUENED_TASKS);
(Excutor { ready_quene }, Spawner { task_sender })
}
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// 生成一个任务
spawner.spawn(async {
println!("start!");
// 创建定时器Future,并等待它完成
TimerFuture::new(Duration::new(2, 0)).await;
println!("done");
});
spawner.spawn(async {
println!("test!");
});
spawner.spawn(async {
println!("test2!");
});
drop(spawner);
executor.run();
}