1use std::{
2 cell::RefCell,
3 fmt,
4 future::Future,
5 pin::Pin,
6 sync::atomic::{AtomicUsize, Ordering},
7 task::{Context, Poll},
8 thread,
9};
10
11use futures_core::ready;
12use tokio::sync::mpsc;
13
14use crate::system::{System, SystemCommand};
15
16pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
17
18thread_local!(
19 static HANDLE: RefCell<Option<ArbiterHandle>> = const { RefCell::new(None) };
20);
21
22pub(crate) enum ArbiterCommand {
23 Stop,
24 Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
25}
26
27impl fmt::Debug for ArbiterCommand {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 match self {
30 ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
31 ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
38pub struct ArbiterHandle {
39 tx: mpsc::UnboundedSender<ArbiterCommand>,
40}
41
42impl ArbiterHandle {
43 pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
44 Self { tx }
45 }
46
47 pub fn spawn<Fut>(&self, future: Fut) -> bool
53 where
54 Fut: Future<Output = ()> + Send + 'static,
55 {
56 self.tx
57 .send(ArbiterCommand::Execute(Box::pin(future)))
58 .is_ok()
59 }
60
61 pub fn spawn_fn<F>(&self, f: F) -> bool
68 where
69 F: FnOnce() + Send + 'static,
70 {
71 self.spawn(async { f() })
72 }
73
74 pub fn stop(&self) -> bool {
79 self.tx.send(ArbiterCommand::Stop).is_ok()
80 }
81}
82
83#[derive(Debug)]
88pub struct Arbiter {
89 tx: mpsc::UnboundedSender<ArbiterCommand>,
90 thread_handle: thread::JoinHandle<()>,
91}
92
93impl Arbiter {
94 #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
99 #[allow(clippy::new_without_default)]
100 pub fn new() -> Arbiter {
101 Self::with_tokio_rt(|| {
102 crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
103 })
104 }
105
106 #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
110 pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
111 where
112 F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
113 {
114 let sys = System::current();
115 let system_id = sys.id();
116 let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
117
118 let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
119 let (tx, rx) = mpsc::unbounded_channel();
120
121 let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
122
123 let thread_handle = thread::Builder::new()
124 .name(name.clone())
125 .spawn({
126 let tx = tx.clone();
127 move || {
128 let rt = crate::runtime::Runtime::from(runtime_factory());
129 let hnd = ArbiterHandle::new(tx);
130
131 System::set_current(sys);
132
133 HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
134
135 let _ = System::current()
137 .tx()
138 .send(SystemCommand::RegisterArbiter(arb_id, hnd));
139
140 ready_tx.send(()).unwrap();
141
142 rt.block_on(ArbiterRunner { rx });
144
145 let _ = System::current()
147 .tx()
148 .send(SystemCommand::DeregisterArbiter(arb_id));
149 }
150 })
151 .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));
152
153 ready_rx.recv().unwrap();
154
155 Arbiter { tx, thread_handle }
156 }
157
158 #[cfg(all(target_os = "linux", feature = "io-uring"))]
163 #[allow(clippy::new_without_default)]
164 pub fn new() -> Arbiter {
165 let sys = System::current();
166 let system_id = sys.id();
167 let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
168
169 let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
170 let (tx, rx) = mpsc::unbounded_channel();
171
172 let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
173
174 let thread_handle = thread::Builder::new()
175 .name(name.clone())
176 .spawn({
177 let tx = tx.clone();
178 move || {
179 let hnd = ArbiterHandle::new(tx);
180
181 System::set_current(sys);
182
183 HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
184
185 let _ = System::current()
187 .tx()
188 .send(SystemCommand::RegisterArbiter(arb_id, hnd));
189
190 ready_tx.send(()).unwrap();
191
192 tokio_uring::start(ArbiterRunner { rx });
194
195 let _ = System::current()
197 .tx()
198 .send(SystemCommand::DeregisterArbiter(arb_id));
199 }
200 })
201 .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));
202
203 ready_rx.recv().unwrap();
204
205 Arbiter { tx, thread_handle }
206 }
207
208 pub(crate) fn in_new_system() -> ArbiterHandle {
210 let (tx, rx) = mpsc::unbounded_channel();
211
212 let hnd = ArbiterHandle::new(tx);
213
214 HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
215
216 crate::spawn(ArbiterRunner { rx });
217
218 hnd
219 }
220
221 pub fn handle(&self) -> ArbiterHandle {
223 ArbiterHandle::new(self.tx.clone())
224 }
225
226 pub fn current() -> ArbiterHandle {
231 HANDLE.with(|cell| match *cell.borrow() {
232 Some(ref hnd) => hnd.clone(),
233 None => panic!("Arbiter is not running."),
234 })
235 }
236
237 pub fn try_current() -> Option<ArbiterHandle> {
243 HANDLE.with(|cell| cell.borrow().clone())
244 }
245
246 pub fn stop(&self) -> bool {
250 self.tx.send(ArbiterCommand::Stop).is_ok()
251 }
252
253 #[track_caller]
259 pub fn spawn<Fut>(&self, future: Fut) -> bool
260 where
261 Fut: Future<Output = ()> + Send + 'static,
262 {
263 self.tx
264 .send(ArbiterCommand::Execute(Box::pin(future)))
265 .is_ok()
266 }
267
268 #[track_caller]
275 pub fn spawn_fn<F>(&self, f: F) -> bool
276 where
277 F: FnOnce() + Send + 'static,
278 {
279 self.spawn(async { f() })
280 }
281
282 pub fn join(self) -> thread::Result<()> {
286 self.thread_handle.join()
287 }
288}
289
290struct ArbiterRunner {
292 rx: mpsc::UnboundedReceiver<ArbiterCommand>,
293}
294
295impl Future for ArbiterRunner {
296 type Output = ();
297
298 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
299 loop {
301 match ready!(self.rx.poll_recv(cx)) {
302 None => return Poll::Ready(()),
304
305 Some(item) => match item {
307 ArbiterCommand::Stop => {
308 return Poll::Ready(());
309 }
310 ArbiterCommand::Execute(task_fut) => {
311 tokio::task::spawn_local(task_fut);
312 }
313 },
314 }
315 }
316 }
317}