actix_rt/
arbiter.rs

1use std::{
2    cell::RefCell,
3    fmt,
4    future::Future,
5    pin::Pin,
6    sync::atomic::{AtomicUsize, Ordering},
7    task::{Context, Poll},
8    thread,
9};
10
11use futures_core::ready;
12use tokio::sync::mpsc;
13
14use crate::system::{System, SystemCommand};
15
16pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
17
18thread_local!(
19    static HANDLE: RefCell<Option<ArbiterHandle>> = const { RefCell::new(None) };
20);
21
22pub(crate) enum ArbiterCommand {
23    Stop,
24    Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
25}
26
27impl fmt::Debug for ArbiterCommand {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        match self {
30            ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
31            ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
32        }
33    }
34}
35
36/// A handle for sending spawn and stop messages to an [Arbiter].
37#[derive(Debug, Clone)]
38pub struct ArbiterHandle {
39    tx: mpsc::UnboundedSender<ArbiterCommand>,
40}
41
42impl ArbiterHandle {
43    pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
44        Self { tx }
45    }
46
47    /// Send a future to the [Arbiter]'s thread and spawn it.
48    ///
49    /// If you require a result, include a response channel in the future.
50    ///
51    /// Returns true if future was sent successfully and false if the [Arbiter] has died.
52    pub fn spawn<Fut>(&self, future: Fut) -> bool
53    where
54        Fut: Future<Output = ()> + Send + 'static,
55    {
56        self.tx
57            .send(ArbiterCommand::Execute(Box::pin(future)))
58            .is_ok()
59    }
60
61    /// Send a function to the [Arbiter]'s thread and execute it.
62    ///
63    /// Any result from the function is discarded. If you require a result, include a response
64    /// channel in the function.
65    ///
66    /// Returns true if function was sent successfully and false if the [Arbiter] has died.
67    pub fn spawn_fn<F>(&self, f: F) -> bool
68    where
69        F: FnOnce() + Send + 'static,
70    {
71        self.spawn(async { f() })
72    }
73
74    /// Instruct [Arbiter] to stop processing it's event loop.
75    ///
76    /// Returns true if stop message was sent successfully and false if the [Arbiter] has
77    /// been dropped.
78    pub fn stop(&self) -> bool {
79        self.tx.send(ArbiterCommand::Stop).is_ok()
80    }
81}
82
83/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
84/// and functions.
85///
86/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
87#[derive(Debug)]
88pub struct Arbiter {
89    tx: mpsc::UnboundedSender<ArbiterCommand>,
90    thread_handle: thread::JoinHandle<()>,
91}
92
93impl Arbiter {
94    /// Spawn a new Arbiter thread and start its event loop.
95    ///
96    /// # Panics
97    /// Panics if a [System] is not registered on the current thread.
98    #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
99    #[allow(clippy::new_without_default)]
100    pub fn new() -> Arbiter {
101        Self::with_tokio_rt(|| {
102            crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
103        })
104    }
105
106    /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
107    ///
108    /// [tokio-runtime]: tokio::runtime::Runtime
109    #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
110    pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
111    where
112        F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
113    {
114        let sys = System::current();
115        let system_id = sys.id();
116        let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
117
118        let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
119        let (tx, rx) = mpsc::unbounded_channel();
120
121        let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
122
123        let thread_handle = thread::Builder::new()
124            .name(name.clone())
125            .spawn({
126                let tx = tx.clone();
127                move || {
128                    let rt = crate::runtime::Runtime::from(runtime_factory());
129                    let hnd = ArbiterHandle::new(tx);
130
131                    System::set_current(sys);
132
133                    HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
134
135                    // register arbiter
136                    let _ = System::current()
137                        .tx()
138                        .send(SystemCommand::RegisterArbiter(arb_id, hnd));
139
140                    ready_tx.send(()).unwrap();
141
142                    // run arbiter event processing loop
143                    rt.block_on(ArbiterRunner { rx });
144
145                    // deregister arbiter
146                    let _ = System::current()
147                        .tx()
148                        .send(SystemCommand::DeregisterArbiter(arb_id));
149                }
150            })
151            .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));
152
153        ready_rx.recv().unwrap();
154
155        Arbiter { tx, thread_handle }
156    }
157
158    /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
159    ///
160    /// # Panics
161    /// Panics if a [System] is not registered on the current thread.
162    #[cfg(all(target_os = "linux", feature = "io-uring"))]
163    #[allow(clippy::new_without_default)]
164    pub fn new() -> Arbiter {
165        let sys = System::current();
166        let system_id = sys.id();
167        let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
168
169        let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
170        let (tx, rx) = mpsc::unbounded_channel();
171
172        let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
173
174        let thread_handle = thread::Builder::new()
175            .name(name.clone())
176            .spawn({
177                let tx = tx.clone();
178                move || {
179                    let hnd = ArbiterHandle::new(tx);
180
181                    System::set_current(sys);
182
183                    HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
184
185                    // register arbiter
186                    let _ = System::current()
187                        .tx()
188                        .send(SystemCommand::RegisterArbiter(arb_id, hnd));
189
190                    ready_tx.send(()).unwrap();
191
192                    // run arbiter event processing loop
193                    tokio_uring::start(ArbiterRunner { rx });
194
195                    // deregister arbiter
196                    let _ = System::current()
197                        .tx()
198                        .send(SystemCommand::DeregisterArbiter(arb_id));
199                }
200            })
201            .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));
202
203        ready_rx.recv().unwrap();
204
205        Arbiter { tx, thread_handle }
206    }
207
208    /// Sets up an Arbiter runner in a new System using the environment's local set.
209    pub(crate) fn in_new_system() -> ArbiterHandle {
210        let (tx, rx) = mpsc::unbounded_channel();
211
212        let hnd = ArbiterHandle::new(tx);
213
214        HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
215
216        crate::spawn(ArbiterRunner { rx });
217
218        hnd
219    }
220
221    /// Return a handle to the this Arbiter's message sender.
222    pub fn handle(&self) -> ArbiterHandle {
223        ArbiterHandle::new(self.tx.clone())
224    }
225
226    /// Return a handle to the current thread's Arbiter's message sender.
227    ///
228    /// # Panics
229    /// Panics if no Arbiter is running on the current thread.
230    pub fn current() -> ArbiterHandle {
231        HANDLE.with(|cell| match *cell.borrow() {
232            Some(ref hnd) => hnd.clone(),
233            None => panic!("Arbiter is not running."),
234        })
235    }
236
237    /// Try to get current running arbiter handle.
238    ///
239    /// Returns `None` if no Arbiter has been started.
240    ///
241    /// Unlike [`current`](Self::current), this never panics.
242    pub fn try_current() -> Option<ArbiterHandle> {
243        HANDLE.with(|cell| cell.borrow().clone())
244    }
245
246    /// Stop Arbiter from continuing it's event loop.
247    ///
248    /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
249    pub fn stop(&self) -> bool {
250        self.tx.send(ArbiterCommand::Stop).is_ok()
251    }
252
253    /// Send a future to the Arbiter's thread and spawn it.
254    ///
255    /// If you require a result, include a response channel in the future.
256    ///
257    /// Returns true if future was sent successfully and false if the Arbiter has died.
258    #[track_caller]
259    pub fn spawn<Fut>(&self, future: Fut) -> bool
260    where
261        Fut: Future<Output = ()> + Send + 'static,
262    {
263        self.tx
264            .send(ArbiterCommand::Execute(Box::pin(future)))
265            .is_ok()
266    }
267
268    /// Send a function to the Arbiter's thread and execute it.
269    ///
270    /// Any result from the function is discarded. If you require a result, include a response
271    /// channel in the function.
272    ///
273    /// Returns true if function was sent successfully and false if the Arbiter has died.
274    #[track_caller]
275    pub fn spawn_fn<F>(&self, f: F) -> bool
276    where
277        F: FnOnce() + Send + 'static,
278    {
279        self.spawn(async { f() })
280    }
281
282    /// Wait for Arbiter's event loop to complete.
283    ///
284    /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
285    pub fn join(self) -> thread::Result<()> {
286        self.thread_handle.join()
287    }
288}
289
290/// A persistent future that processes [Arbiter] commands.
291struct ArbiterRunner {
292    rx: mpsc::UnboundedReceiver<ArbiterCommand>,
293}
294
295impl Future for ArbiterRunner {
296    type Output = ();
297
298    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
299        // process all items currently buffered in channel
300        loop {
301            match ready!(self.rx.poll_recv(cx)) {
302                // channel closed; no more messages can be received
303                None => return Poll::Ready(()),
304
305                // process arbiter command
306                Some(item) => match item {
307                    ArbiterCommand::Stop => {
308                        return Poll::Ready(());
309                    }
310                    ArbiterCommand::Execute(task_fut) => {
311                        tokio::task::spawn_local(task_fut);
312                    }
313                },
314            }
315        }
316    }
317}