virtual_desktop_manager/block_on.rs
1//! A very simple async executor based on the [`futures-executor`] crate.
2//!
3//! The implementation mostly comes from
4//! <https://docs.rs/futures-executor/0.3.30/src/futures_executor/local_pool.rs.html#45-103>.
5//!
6//! See also this reddit thread for more info about simple executors:
7//! <https://www.reddit.com/r/rust/comments/eilw8j/what_is_the_minimum_that_must_be_implemented_to/>
8//!
9//! [`futures-executor`]: https://crates.io/crates/futures-executor/0.3.30
10
11use std::{
12 cell::Cell,
13 future::Future,
14 pin::{pin, Pin},
15 sync::{
16 atomic::{AtomicBool, Ordering},
17 Arc,
18 },
19 task::{Context, Poll, Wake, Waker},
20 thread::{self, Thread},
21};
22
23thread_local!(static ENTERED: Cell<bool> = const { Cell::new(false) });
24
25struct Enter {}
26impl Enter {
27 fn new() -> Self {
28 ENTERED.with(|c| {
29 if c.get() {
30 panic!(
31 "an execution scope has already been entered: \
32 cannot execute `block_on` from within another `block_on`"
33 )
34 } else {
35 c.set(true);
36
37 Enter {}
38 }
39 })
40 }
41}
42impl Drop for Enter {
43 fn drop(&mut self) {
44 ENTERED.with(|c| {
45 assert!(c.get());
46 c.set(false);
47 });
48 }
49}
50
51struct ThreadNotify {
52 /// The (single) executor thread.
53 thread: Thread,
54 /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
55 /// before the next `park()`, which may otherwise happen if the code
56 /// being executed as part of the future(s) being polled makes use of
57 /// park / unpark calls of its own, i.e. we cannot assume that no other
58 /// code uses park / unpark on the executing `thread`.
59 unparked: AtomicBool,
60}
61
62thread_local! {
63 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
64 thread: thread::current(),
65 unparked: AtomicBool::new(false),
66 });
67}
68
69impl Wake for ThreadNotify {
70 fn wake_by_ref(self: &Arc<Self>) {
71 // Make sure the wakeup is remembered until the next `park()`.
72 let unparked = self.unparked.swap(true, Ordering::Release);
73 if !unparked {
74 // If the thread has not been unparked yet, it must be done
75 // now. If it was actually parked, it will run again,
76 // otherwise the token made available by `unpark`
77 // may be consumed before reaching `park()`, but `unparked`
78 // ensures it is not forgotten.
79 self.thread.unpark();
80 }
81 }
82
83 fn wake(self: Arc<Self>) {
84 <ThreadNotify as Wake>::wake_by_ref(&self)
85 }
86}
87
88// Set up and run a basic single-threaded spawner loop, invoking `f` on each
89// turn.
90fn run_executor<T, F>(mut f: F) -> T
91where
92 F: FnMut(&mut Context<'_>) -> Poll<T>,
93{
94 let _enter = Enter::new();
95
96 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
97 let waker = Waker::from(Arc::clone(thread_notify));
98 let mut cx = Context::from_waker(&waker);
99 loop {
100 if let Poll::Ready(t) = f(&mut cx) {
101 return t;
102 }
103
104 // Wait for a wakeup.
105 while !thread_notify.unparked.swap(false, Ordering::Acquire) {
106 // No wakeup occurred. It may occur now, right before parking,
107 // but in that case the token made available by `unpark()`
108 // is guaranteed to still be available and `park()` is a no-op.
109 thread::park();
110 }
111 }
112 })
113}
114
115/// Run a future to completion on the current thread.
116///
117/// This function will block the caller until the given future has completed.
118pub fn block_on<F: Future>(f: F) -> F::Output {
119 let mut f = pin!(f);
120 run_executor(|cx| f.as_mut().poll(cx))
121}
122
123/// Create a new future that finishes when the list of futures complete.
124///
125/// Note: this code was not taken from any other crate.
126///
127/// # Panics
128///
129/// The returned future will delay any panic in a queued future until all
130/// futures have completed in order to prevent accidental cancellation.
131pub fn simple_join<'a, Fut>(futures: impl IntoIterator<Item = Fut>) -> impl Future<Output = ()> + 'a
132where
133 Fut: Future<Output = ()> + 'a,
134{
135 struct Join<'a> {
136 list: Vec<Pin<Box<dyn Future<Output = ()> + 'a>>>,
137 panic: Option<Box<dyn std::any::Any + Send>>,
138 }
139 impl Future for Join<'_> {
140 type Output = ();
141
142 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
143 let this = &mut self.get_mut();
144 let list = &mut this.list;
145 list.retain_mut(|item| {
146 let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
147 Future::poll(item.as_mut(), cx).is_pending()
148 }));
149 match res {
150 Err(payload) => {
151 this.panic.get_or_insert(payload);
152 false
153 }
154 Ok(is_pending) => is_pending,
155 }
156 });
157 if list.is_empty() {
158 if let Some(payload) = this.panic.take() {
159 std::panic::resume_unwind(payload)
160 } else {
161 Poll::Ready(())
162 }
163 } else {
164 Poll::Pending
165 }
166 }
167 }
168 Join {
169 list: futures
170 .into_iter()
171 .map(|fut| Box::pin(fut) as Pin<Box<dyn Future<Output = ()> + '_>>)
172 .collect(),
173 panic: None,
174 }
175}