1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Executor contains two parts:
15 //! - thread pool: how threads are started and how they run the tasks.
16 //! - schedule policy: how tasks are scheduled in the task queues.
17 pub(crate) mod block_on;
18 pub(crate) mod blocking_pool;
19 #[cfg(feature = "current_thread_runtime")]
20 pub(crate) mod current_thread;
21
22 use std::future::Future;
23 use std::mem::MaybeUninit;
24 use std::sync::Once;
25
26 use crate::builder::multi_thread_builder::GLOBAL_BUILDER;
27 use crate::builder::{initialize_blocking_spawner, RuntimeBuilder};
28 use crate::executor::blocking_pool::BlockPoolSpawner;
29 #[cfg(feature = "current_thread_runtime")]
30 use crate::executor::current_thread::CurrentThreadSpawner;
31 use crate::task::{JoinHandle, Task, TaskBuilder};
32 cfg_ffrt! {
33 use crate::ffrt::spawner::spawn;
34 }
35 cfg_not_ffrt! {
36 mod parker;
37 pub(crate) mod async_pool;
38 pub(crate) mod queue;
39 mod sleeper;
40 pub(crate) mod worker;
41 pub(crate) mod driver;
42 use crate::executor::driver::Handle;
43 use crate::builder::initialize_async_spawner;
44 use crate::executor::async_pool::AsyncPoolSpawner;
45 }
46
47 pub(crate) trait Schedule {
schedule(&self, task: Task, lifo: bool)48 fn schedule(&self, task: Task, lifo: bool);
49 }
50
51 pub(crate) struct PlaceholderScheduler;
52
53 impl Schedule for PlaceholderScheduler {
schedule(&self, _task: Task, _lifo: bool)54 fn schedule(&self, _task: Task, _lifo: bool) {
55 panic!("no scheduler can be called");
56 }
57 }
58
59 pub(crate) enum AsyncHandle {
60 #[cfg(feature = "current_thread_runtime")]
61 CurrentThread(CurrentThreadSpawner),
62 #[cfg(not(feature = "ffrt"))]
63 MultiThread(AsyncPoolSpawner),
64 #[cfg(feature = "ffrt")]
65 FfrtMultiThread,
66 }
67
68 /// Runtime struct.
69 ///
70 /// # If `multi_instance_runtime` feature is turned on
71 /// There will be multiple runtime executors, initializing from user settings in
72 /// `RuntimeBuilder`.
73 ///
74 /// # If `multi_instance_runtime` feature is turned off
75 /// There will be only *ONE* runtime executor singleton inside one process.
76 /// The async and blocking pools working when calling methods of this struct are
77 /// stored in the global static executor instance. Here, keep the empty struct
78 /// for compatibility and possibility for function extension in the future.
79 pub struct Runtime {
80 pub(crate) async_spawner: AsyncHandle,
81 }
82
83 #[cfg(not(feature = "ffrt"))]
84 impl Runtime {
get_handle(&self) -> std::sync::Arc<Handle>85 pub(crate) fn get_handle(&self) -> std::sync::Arc<Handle> {
86 match &self.async_spawner {
87 #[cfg(feature = "current_thread_runtime")]
88 AsyncHandle::CurrentThread(s) => s.handle.clone(),
89 AsyncHandle::MultiThread(s) => s.exe_mng_info.handle.clone(),
90 }
91 }
92 }
93
global_default_async() -> &'static Runtime94 pub(crate) fn global_default_async() -> &'static Runtime {
95 static mut GLOBAL_DEFAULT_ASYNC: MaybeUninit<Runtime> = MaybeUninit::uninit();
96 static ONCE: Once = Once::new();
97
98 unsafe {
99 ONCE.call_once(|| {
100 let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
101
102 if global_builder.is_none() {
103 *global_builder = Some(RuntimeBuilder::new_multi_thread());
104 }
105
106 #[cfg(not(feature = "ffrt"))]
107 let runtime = match initialize_async_spawner(global_builder.as_ref().unwrap()) {
108 Ok(s) => Runtime {
109 async_spawner: AsyncHandle::MultiThread(s),
110 },
111 Err(e) => panic!("initialize runtime failed: {:?}", e),
112 };
113 #[cfg(feature = "ffrt")]
114 let runtime = Runtime {
115 async_spawner: AsyncHandle::FfrtMultiThread,
116 };
117 GLOBAL_DEFAULT_ASYNC = MaybeUninit::new(runtime);
118 });
119 &*GLOBAL_DEFAULT_ASYNC.as_ptr()
120 }
121 }
122
global_default_blocking() -> &'static BlockPoolSpawner123 pub(crate) fn global_default_blocking() -> &'static BlockPoolSpawner {
124 static mut GLOBAL_DEFAULT_BLOCKING: MaybeUninit<BlockPoolSpawner> = MaybeUninit::uninit();
125 static ONCE: Once = Once::new();
126
127 unsafe {
128 ONCE.call_once(|| {
129 let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
130
131 if global_builder.is_none() {
132 *global_builder = Some(RuntimeBuilder::new_multi_thread());
133 }
134 match initialize_blocking_spawner(&global_builder.as_ref().unwrap().common) {
135 Ok(bps) => GLOBAL_DEFAULT_BLOCKING = MaybeUninit::new(bps),
136 Err(e) => panic!("initialize blocking pool failed: {:?}", e),
137 }
138 });
139 &*GLOBAL_DEFAULT_BLOCKING.as_ptr()
140 }
141 }
142
143 #[cfg(all(feature = "multi_instance_runtime", not(feature = "ffrt")))]
144 impl Runtime {
145 /// Creates a new multi-thread runtime with default setting
new() -> std::io::Result<Runtime>146 pub fn new() -> std::io::Result<Runtime> {
147 RuntimeBuilder::new_multi_thread().build()
148 }
149 }
150
151 impl Runtime {
152 /// Spawns a future onto the runtime, returning a [`JoinHandle`] for it.
153 ///
154 /// The future will be later polled by the executor, which is usually
155 /// implemented as a thread pool. The executor will run the future util
156 /// finished.
157 ///
158 /// Awaits on the JoinHandle will return the result of the future, but users
159 /// don't have to `.await` the `JoinHandle` in order to run the future,
160 /// since the future will be executed in the background once it's
161 /// spawned. Dropping the JoinHandle will throw away the returned value.
162 ///
163 /// # Examples
164 ///
165 /// ```no run
166 /// use ylong_runtime::task::*;
167 /// use ylong_runtime::builder::RuntimeBuilder;
168 /// use ylong_runtime::executor::Runtime;
169 ///
170 /// async fn test_future(num: usize) -> usize {
171 /// num
172 /// }
173 ///
174 /// let core_pool_size = 4;
175 ///
176 /// let runtime = RuntimeBuilder::new_multi_thread()
177 /// .worker_num(core_pool_size)
178 /// .build()
179 /// .unwrap();
180 ///
181 /// runtime.spawn(test_future(1));
182 /// ```
spawn<T, R>(&self, task: T) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,183 pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
184 where
185 T: Future<Output = R> + Send + 'static,
186 R: Send + 'static,
187 {
188 self.spawn_with_attr(task, &TaskBuilder::default())
189 }
190
191 #[inline]
spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,192 pub(crate) fn spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R>
193 where
194 T: Future<Output = R> + Send + 'static,
195 R: Send + 'static,
196 {
197 match &self.async_spawner {
198 #[cfg(feature = "current_thread_runtime")]
199 AsyncHandle::CurrentThread(current_thread) => current_thread.spawn(builder, task),
200 #[cfg(not(feature = "ffrt"))]
201 AsyncHandle::MultiThread(async_spawner) => async_spawner.spawn(builder, task),
202 #[cfg(feature = "ffrt")]
203 AsyncHandle::FfrtMultiThread => spawn(task, builder),
204 }
205 }
206
207 /// Spawns the provided function or closure onto the runtime.
208 ///
209 /// It's usually used for cpu-bounded computation that does not return
210 /// pending and takes a relatively long time.
211 ///
212 /// # Examples
213 ///
214 /// ```no run
215 /// use ylong_runtime::builder::RuntimeBuilder;
216 ///
217 /// use std::time;
218 /// use std::thread::sleep;
219 ///
220 /// let runtime = RuntimeBuilder::new_multi_thread()
221 /// .build()
222 /// .unwrap();
223 ///
224 /// runtime.spawn_blocking(move || {
225 /// sleep(time::Duration::from_millis(1));
226 /// 10
227 /// });
228 /// ```
spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R> where T: FnOnce() -> R + Send + 'static, R: Send + 'static,229 pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
230 where
231 T: FnOnce() -> R + Send + 'static,
232 R: Send + 'static,
233 {
234 crate::spawn::spawn_blocking(&TaskBuilder::new(), task)
235 }
236
237 /// Blocks the current thread and runs the given future (usually a
238 /// JoinHandle) to completion, and gets its return value.
239 ///
240 /// Any code after the `block_on` will be executed once the future is done.
241 ///
242 /// Don't use this method on an asynchronous environment, since it will
243 /// block the worker thread and may cause deadlock.
244 ///
245 /// # Examples
246 ///
247 /// ```no run
248 /// use ylong_runtime::builder::RuntimeBuilder;
249 ///
250 /// let core_pool_size = 4;
251 /// async fn test_future(num: usize) -> usize {
252 /// num
253 /// }
254 ///
255 /// let runtime = RuntimeBuilder::new_multi_thread()
256 /// .worker_num(core_pool_size)
257 /// .build()
258 /// .unwrap();
259 ///
260 /// let handle = runtime.spawn(test_future(4));
261 ///
262 /// let result = runtime.block_on(handle);
263 ///
264 /// assert_eq!(result.unwrap(), 4);
265 /// ```
block_on<T, R>(&self, task: T) -> R where T: Future<Output = R>,266 pub fn block_on<T, R>(&self, task: T) -> R
267 where
268 T: Future<Output = R>,
269 {
270 // Registers handle to the current thread when block_on().
271 // so that async_source can get the handle and register it.
272 #[cfg(not(feature = "ffrt"))]
273 let cur_context = worker::WorkerHandle {
274 _handle: self.get_handle(),
275 };
276 #[cfg(not(feature = "ffrt"))]
277 worker::CURRENT_HANDLE.with(|ctx| {
278 ctx.set(&cur_context as *const _ as *const ());
279 });
280
281 let ret = match &self.async_spawner {
282 #[cfg(feature = "current_thread_runtime")]
283 AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
284 #[cfg(not(feature = "ffrt"))]
285 AsyncHandle::MultiThread(_) => block_on::block_on(task),
286 #[cfg(feature = "ffrt")]
287 AsyncHandle::FfrtMultiThread => block_on::block_on(task),
288 };
289
290 // Sets the current thread variable to null,
291 // otherwise the worker's CURRENT_WORKER can not be set under MultiThread.
292 #[cfg(not(feature = "ffrt"))]
293 worker::CURRENT_HANDLE.with(|ctx| {
294 ctx.set(std::ptr::null());
295 });
296
297 ret
298 }
299 }
300
301 cfg_metrics!(
302 use crate::metrics::Metrics;
303 impl Runtime {
304 /// User can get some message from Runtime during running.
305 ///
306 /// # Example
307 /// ```
308 /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread().build().unwrap();
309 /// let _metrics = runtime.metrics();
310 /// ```
311 pub fn metrics(&self) -> Metrics {
312 Metrics::new(self)
313 }
314 }
315
316 /// Gets metrics of the global Runtime.
317 /// # Example
318 /// ```
319 /// use ylong_runtime::executor::get_global_runtime_metrics;
320 ///
321 /// let metrics = get_global_runtime_metrics();
322 /// ```
323 pub fn get_global_runtime_metrics() -> Metrics<'static> {
324 Metrics::new(global_default_async())
325 }
326 );
327