virtual_desktop_manager/
block_on.rs

1//! A very simple async executor based on the [`futures-executor`] crate.
2//!
3//! The implementation mostly comes from
4//! <https://docs.rs/futures-executor/0.3.30/src/futures_executor/local_pool.rs.html#45-103>.
5//!
6//! See also this reddit thread for more info about simple executors:
7//! <https://www.reddit.com/r/rust/comments/eilw8j/what_is_the_minimum_that_must_be_implemented_to/>
8//!
9//! [`futures-executor`]: https://crates.io/crates/futures-executor/0.3.30
10
11use std::{
12    cell::Cell,
13    future::Future,
14    pin::{pin, Pin},
15    sync::{
16        atomic::{AtomicBool, Ordering},
17        Arc,
18    },
19    task::{Context, Poll, Wake, Waker},
20    thread::{self, Thread},
21};
22
23thread_local!(static ENTERED: Cell<bool> = const { Cell::new(false) });
24
25struct Enter {}
26impl Enter {
27    fn new() -> Self {
28        ENTERED.with(|c| {
29            if c.get() {
30                panic!(
31                    "an execution scope has already been entered: \
32                    cannot execute `block_on` from within another `block_on`"
33                )
34            } else {
35                c.set(true);
36
37                Enter {}
38            }
39        })
40    }
41}
42impl Drop for Enter {
43    fn drop(&mut self) {
44        ENTERED.with(|c| {
45            assert!(c.get());
46            c.set(false);
47        });
48    }
49}
50
51struct ThreadNotify {
52    /// The (single) executor thread.
53    thread: Thread,
54    /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
55    /// before the next `park()`, which may otherwise happen if the code
56    /// being executed as part of the future(s) being polled makes use of
57    /// park / unpark calls of its own, i.e. we cannot assume that no other
58    /// code uses park / unpark on the executing `thread`.
59    unparked: AtomicBool,
60}
61
62thread_local! {
63    static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
64        thread: thread::current(),
65        unparked: AtomicBool::new(false),
66    });
67}
68
69impl Wake for ThreadNotify {
70    fn wake_by_ref(self: &Arc<Self>) {
71        // Make sure the wakeup is remembered until the next `park()`.
72        let unparked = self.unparked.swap(true, Ordering::Release);
73        if !unparked {
74            // If the thread has not been unparked yet, it must be done
75            // now. If it was actually parked, it will run again,
76            // otherwise the token made available by `unpark`
77            // may be consumed before reaching `park()`, but `unparked`
78            // ensures it is not forgotten.
79            self.thread.unpark();
80        }
81    }
82
83    fn wake(self: Arc<Self>) {
84        <ThreadNotify as Wake>::wake_by_ref(&self)
85    }
86}
87
88// Set up and run a basic single-threaded spawner loop, invoking `f` on each
89// turn.
90fn run_executor<T, F>(mut f: F) -> T
91where
92    F: FnMut(&mut Context<'_>) -> Poll<T>,
93{
94    let _enter = Enter::new();
95
96    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
97        let waker = Waker::from(Arc::clone(thread_notify));
98        let mut cx = Context::from_waker(&waker);
99        loop {
100            if let Poll::Ready(t) = f(&mut cx) {
101                return t;
102            }
103
104            // Wait for a wakeup.
105            while !thread_notify.unparked.swap(false, Ordering::Acquire) {
106                // No wakeup occurred. It may occur now, right before parking,
107                // but in that case the token made available by `unpark()`
108                // is guaranteed to still be available and `park()` is a no-op.
109                thread::park();
110            }
111        }
112    })
113}
114
115/// Run a future to completion on the current thread.
116///
117/// This function will block the caller until the given future has completed.
118pub fn block_on<F: Future>(f: F) -> F::Output {
119    let mut f = pin!(f);
120    run_executor(|cx| f.as_mut().poll(cx))
121}
122
123/// Create a new future that finishes when the list of futures complete.
124///
125/// Note: this code was not taken from any other crate.
126///
127/// # Panics
128///
129/// The returned future will delay any panic in a queued future until all
130/// futures have completed in order to prevent accidental cancellation.
131pub fn simple_join<'a, Fut>(futures: impl IntoIterator<Item = Fut>) -> impl Future<Output = ()> + 'a
132where
133    Fut: Future<Output = ()> + 'a,
134{
135    struct Join<'a> {
136        list: Vec<Pin<Box<dyn Future<Output = ()> + 'a>>>,
137        panic: Option<Box<dyn std::any::Any + Send>>,
138    }
139    impl Future for Join<'_> {
140        type Output = ();
141
142        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
143            let this = &mut self.get_mut();
144            let list = &mut this.list;
145            list.retain_mut(|item| {
146                let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
147                    Future::poll(item.as_mut(), cx).is_pending()
148                }));
149                match res {
150                    Err(payload) => {
151                        this.panic.get_or_insert(payload);
152                        false
153                    }
154                    Ok(is_pending) => is_pending,
155                }
156            });
157            if list.is_empty() {
158                if let Some(payload) = this.panic.take() {
159                    std::panic::resume_unwind(payload)
160                } else {
161                    Poll::Ready(())
162                }
163            } else {
164                Poll::Pending
165            }
166        }
167    }
168    Join {
169        list: futures
170            .into_iter()
171            .map(|fut| Box::pin(fut) as Pin<Box<dyn Future<Output = ()> + '_>>)
172            .collect(),
173        panic: None,
174    }
175}