1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Executor contains two parts:
15 //! - thread pool: how threads are started and how they run the tasks.
16 //! - schedule policy: how tasks are scheduled in the task queues.
17 pub(crate) mod block_on;
18 pub(crate) mod blocking_pool;
19 #[cfg(feature = "current_thread_runtime")]
20 pub(crate) mod current_thread;
21
22 #[cfg(all(any(feature = "time", feature = "net"), feature = "ffrt"))]
23 pub(crate) mod netpoller;
24 use std::future::Future;
25 use std::mem::MaybeUninit;
26 use std::sync::Once;
27
28 use crate::builder::multi_thread_builder::GLOBAL_BUILDER;
29 use crate::builder::{initialize_blocking_spawner, RuntimeBuilder};
30 use crate::executor::blocking_pool::BlockPoolSpawner;
31 #[cfg(feature = "current_thread_runtime")]
32 use crate::executor::current_thread::CurrentThreadSpawner;
33 use crate::task::TaskBuilder;
34 use crate::{JoinHandle, Task};
35 cfg_ffrt! {
36 use crate::builder::initialize_ffrt_spawner;
37 use crate::ffrt::spawner::spawn;
38 }
39 cfg_not_ffrt! {
40 mod parker;
41 pub(crate) mod async_pool;
42 pub(crate) mod queue;
43 mod sleeper;
44 pub(crate) mod worker;
45 pub(crate) mod driver;
46 use crate::executor::driver::Handle;
47 use crate::builder::initialize_async_spawner;
48 use crate::executor::async_pool::AsyncPoolSpawner;
49 }
50
51 pub(crate) trait Schedule {
schedule(&self, task: Task, lifo: bool)52 fn schedule(&self, task: Task, lifo: bool);
53 }
54
55 pub(crate) struct PlaceholderScheduler;
56
57 impl Schedule for PlaceholderScheduler {
schedule(&self, _task: Task, _lifo: bool)58 fn schedule(&self, _task: Task, _lifo: bool) {
59 panic!("no scheduler can be called");
60 }
61 }
62
63 pub(crate) enum AsyncHandle {
64 #[cfg(feature = "current_thread_runtime")]
65 CurrentThread(CurrentThreadSpawner),
66 #[cfg(not(feature = "ffrt"))]
67 MultiThread(AsyncPoolSpawner),
68 #[cfg(feature = "ffrt")]
69 FfrtMultiThread,
70 }
71
72 /// Runtime struct.
73 ///
74 /// # If `multi_instance_runtime` feature is turned on
75 /// There will be multiple runtime executors, initializing from user settings in
76 /// `RuntimeBuilder`.
77 ///
78 /// # If `multi_instance_runtime` feature is turned off
79 /// There will be only *ONE* runtime executor singleton inside one process.
80 /// The async and blocking pools working when calling methods of this struct are
81 /// stored in the global static executor instance. Here, keep the empty struct
82 /// for compatibility and possibility for function extension in the future.
83 pub struct Runtime {
84 pub(crate) async_spawner: AsyncHandle,
85 }
86
87 #[cfg(not(feature = "ffrt"))]
88 impl Runtime {
get_handle(&self) -> std::sync::Arc<Handle>89 pub(crate) fn get_handle(&self) -> std::sync::Arc<Handle> {
90 match &self.async_spawner {
91 #[cfg(feature = "current_thread_runtime")]
92 AsyncHandle::CurrentThread(s) => s.handle.clone(),
93 AsyncHandle::MultiThread(s) => s.exe_mng_info.handle.clone(),
94 }
95 }
96 }
97
global_default_async() -> &'static Runtime98 pub(crate) fn global_default_async() -> &'static Runtime {
99 static mut GLOBAL_DEFAULT_ASYNC: MaybeUninit<Runtime> = MaybeUninit::uninit();
100 static ONCE: Once = Once::new();
101
102 unsafe {
103 ONCE.call_once(|| {
104 let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
105
106 if global_builder.is_none() {
107 *global_builder = Some(RuntimeBuilder::new_multi_thread());
108 }
109
110 #[cfg(not(feature = "ffrt"))]
111 let runtime = match initialize_async_spawner(global_builder.as_ref().unwrap()) {
112 Ok(s) => Runtime {
113 async_spawner: AsyncHandle::MultiThread(s),
114 },
115 Err(e) => panic!("initialize runtime failed: {:?}", e),
116 };
117 #[cfg(feature = "ffrt")]
118 let runtime = match initialize_ffrt_spawner(global_builder.as_ref().unwrap()) {
119 Ok(()) => Runtime {
120 async_spawner: AsyncHandle::FfrtMultiThread,
121 },
122 Err(e) => panic!("initialize runtime failed: {:?}", e),
123 };
124 GLOBAL_DEFAULT_ASYNC = MaybeUninit::new(runtime);
125 });
126 &*GLOBAL_DEFAULT_ASYNC.as_ptr()
127 }
128 }
129
global_default_blocking() -> &'static BlockPoolSpawner130 pub(crate) fn global_default_blocking() -> &'static BlockPoolSpawner {
131 static mut GLOBAL_DEFAULT_BLOCKING: MaybeUninit<BlockPoolSpawner> = MaybeUninit::uninit();
132 static ONCE: Once = Once::new();
133
134 unsafe {
135 ONCE.call_once(|| {
136 let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
137
138 if global_builder.is_none() {
139 *global_builder = Some(RuntimeBuilder::new_multi_thread());
140 }
141 match initialize_blocking_spawner(&global_builder.as_ref().unwrap().common) {
142 Ok(bps) => GLOBAL_DEFAULT_BLOCKING = MaybeUninit::new(bps),
143 Err(e) => panic!("initialize blocking pool failed: {:?}", e),
144 }
145 });
146 &*GLOBAL_DEFAULT_BLOCKING.as_ptr()
147 }
148 }
149
150 #[cfg(all(feature = "multi_instance_runtime", not(feature = "ffrt")))]
151 impl Runtime {
152 /// Creates a new multi-thread runtime with default setting
new() -> std::io::Result<Runtime>153 pub fn new() -> std::io::Result<Runtime> {
154 RuntimeBuilder::new_multi_thread().build()
155 }
156 }
157
158 impl Runtime {
159 /// Spawns a future onto the runtime, returning a [`JoinHandle`] for it.
160 ///
161 /// The future will be later polled by the executor, which is usually
162 /// implemented as a thread pool. The executor will run the future util
163 /// finished.
164 ///
165 /// Awaits on the JoinHandle will return the result of the future, but users
166 /// don't have to `.await` the `JoinHandle` in order to run the future,
167 /// since the future will be executed in the background once it's
168 /// spawned. Dropping the JoinHandle will throw away the returned value.
169 ///
170 /// # Examples
171 ///
172 /// ```no run
173 /// use ylong_runtime::task::*;
174 /// use ylong_runtime::builder::RuntimeBuilder;
175 /// use ylong_runtime::executor::Runtime;
176 ///
177 /// async fn test_future(num: usize) -> usize {
178 /// num
179 /// }
180 ///
181 /// let core_pool_size = 4;
182 ///
183 /// let runtime = RuntimeBuilder::new_multi_thread()
184 /// .worker_num(core_pool_size)
185 /// .build()
186 /// .unwrap();
187 ///
188 /// runtime.spawn(test_future(1));
189 /// ```
spawn<T, R>(&self, task: T) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,190 pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
191 where
192 T: Future<Output = R> + Send + 'static,
193 R: Send + 'static,
194 {
195 self.spawn_with_attr(task, &TaskBuilder::default())
196 }
197
198 #[inline]
spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,199 pub(crate) fn spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R>
200 where
201 T: Future<Output = R> + Send + 'static,
202 R: Send + 'static,
203 {
204 match &self.async_spawner {
205 #[cfg(feature = "current_thread_runtime")]
206 AsyncHandle::CurrentThread(current_thread) => current_thread.spawn(builder, task),
207 #[cfg(not(feature = "ffrt"))]
208 AsyncHandle::MultiThread(async_spawner) => async_spawner.spawn(builder, task),
209 #[cfg(feature = "ffrt")]
210 AsyncHandle::FfrtMultiThread => spawn(task, builder),
211 }
212 }
213
214 /// Spawns the provided function or closure onto the runtime.
215 ///
216 /// It's usually used for cpu-bounded computation that does not return
217 /// pending and takes a relatively long time.
218 ///
219 /// # Examples
220 ///
221 /// ```no run
222 /// use ylong_runtime::builder::RuntimeBuilder;
223 ///
224 /// use std::time;
225 /// use std::thread::sleep;
226 ///
227 /// let runtime = RuntimeBuilder::new_multi_thread()
228 /// .build()
229 /// .unwrap();
230 ///
231 /// runtime.spawn_blocking(move || {
232 /// sleep(time::Duration::from_millis(1));
233 /// 10
234 /// });
235 /// ```
spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R> where T: FnOnce() -> R + Send + 'static, R: Send + 'static,236 pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
237 where
238 T: FnOnce() -> R + Send + 'static,
239 R: Send + 'static,
240 {
241 crate::spawn::spawn_blocking(&TaskBuilder::new(), task)
242 }
243
244 /// Blocks the current thread and runs the given future (usually a
245 /// JoinHandle) to completion, and gets its return value.
246 ///
247 /// Any code after the `block_on` will be executed once the future is done.
248 ///
249 /// Don't use this method on an asynchronous environment, since it will
250 /// block the worker thread and may cause deadlock.
251 ///
252 /// # Examples
253 ///
254 /// ```no run
255 /// use ylong_runtime::builder::RuntimeBuilder;
256 ///
257 /// let core_pool_size = 4;
258 /// async fn test_future(num: usize) -> usize {
259 /// num
260 /// }
261 ///
262 /// let runtime = RuntimeBuilder::new_multi_thread()
263 /// .worker_num(core_pool_size)
264 /// .build()
265 /// .unwrap();
266 ///
267 /// let handle = runtime.spawn(test_future(4));
268 ///
269 /// let result = runtime.block_on(handle);
270 ///
271 /// assert_eq!(result.unwrap(), 4);
272 /// ```
block_on<T, R>(&self, task: T) -> R where T: Future<Output = R>,273 pub fn block_on<T, R>(&self, task: T) -> R
274 where
275 T: Future<Output = R>,
276 {
277 // Registers handle to the current thread when block_on().
278 // so that async_source can get the handle and register it.
279 #[cfg(not(feature = "ffrt"))]
280 let cur_context = worker::WorkerHandle {
281 _handle: self.get_handle(),
282 };
283 #[cfg(not(feature = "ffrt"))]
284 worker::CURRENT_HANDLE.with(|ctx| {
285 ctx.set(&cur_context as *const _ as *const ());
286 });
287
288 let ret = match &self.async_spawner {
289 #[cfg(feature = "current_thread_runtime")]
290 AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
291 #[cfg(not(feature = "ffrt"))]
292 AsyncHandle::MultiThread(_) => block_on::block_on(task),
293 #[cfg(feature = "ffrt")]
294 AsyncHandle::FfrtMultiThread => block_on::block_on(task),
295 };
296
297 // Sets the current thread variable to null,
298 // otherwise the worker's CURRENT_WORKER can not be set under MultiThread.
299 #[cfg(not(feature = "ffrt"))]
300 worker::CURRENT_HANDLE.with(|ctx| {
301 ctx.set(std::ptr::null());
302 });
303
304 ret
305 }
306 }
307
308 cfg_metrics!(
309 use crate::metrics::Metrics;
310 impl Runtime {
311 /// User can get some message from Runtime during running.
312 ///
313 /// # Example
314 /// ```
315 /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread().build().unwrap();
316 /// let _metrics = runtime.metrics();
317 /// ```
318 pub fn metrics(&self) -> Metrics {
319 Metrics::new(self)
320 }
321 }
322
323 /// Gets metrics of the global Runtime.
324 /// # Example
325 /// ```
326 /// use ylong_runtime::executor::get_global_runtime_metrics;
327 ///
328 /// let metrics = get_global_runtime_metrics();
329 /// ```
330 pub fn get_global_runtime_metrics() -> Metrics<'static> {
331 Metrics::new(global_default_async())
332 }
333 );
334