• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Executor contains two parts:
15 //! - thread pool: how threads are started and how they run the tasks.
16 //! - schedule policy: how tasks are scheduled in the task queues.
17 pub(crate) mod block_on;
18 pub(crate) mod blocking_pool;
19 #[cfg(feature = "current_thread_runtime")]
20 pub(crate) mod current_thread;
21 
22 use std::future::Future;
23 use std::mem::MaybeUninit;
24 use std::sync::Once;
25 
26 use crate::builder::multi_thread_builder::GLOBAL_BUILDER;
27 use crate::builder::{initialize_blocking_spawner, RuntimeBuilder};
28 use crate::executor::blocking_pool::BlockPoolSpawner;
29 #[cfg(feature = "current_thread_runtime")]
30 use crate::executor::current_thread::CurrentThreadSpawner;
31 use crate::task::{JoinHandle, Task, TaskBuilder};
32 cfg_ffrt! {
33     use crate::ffrt::spawner::spawn;
34 }
35 cfg_not_ffrt! {
36     mod parker;
37     pub(crate) mod async_pool;
38     pub(crate) mod queue;
39     mod sleeper;
40     pub(crate) mod worker;
41     pub(crate) mod driver;
42     use crate::executor::driver::Handle;
43     use crate::builder::initialize_async_spawner;
44     use crate::executor::async_pool::AsyncPoolSpawner;
45 }
46 
47 pub(crate) trait Schedule {
schedule(&self, task: Task, lifo: bool)48     fn schedule(&self, task: Task, lifo: bool);
49 }
50 
51 pub(crate) struct PlaceholderScheduler;
52 
53 impl Schedule for PlaceholderScheduler {
schedule(&self, _task: Task, _lifo: bool)54     fn schedule(&self, _task: Task, _lifo: bool) {
55         panic!("no scheduler can be called");
56     }
57 }
58 
59 pub(crate) enum AsyncHandle {
60     #[cfg(feature = "current_thread_runtime")]
61     CurrentThread(CurrentThreadSpawner),
62     #[cfg(not(feature = "ffrt"))]
63     MultiThread(AsyncPoolSpawner),
64     #[cfg(feature = "ffrt")]
65     FfrtMultiThread,
66 }
67 
68 /// Runtime struct.
69 ///
70 /// # If `multi_instance_runtime` feature is turned on
71 /// There will be multiple runtime executors, initializing from user settings in
72 /// `RuntimeBuilder`.
73 ///
74 /// # If `multi_instance_runtime` feature is turned off
75 /// There will be only *ONE* runtime executor singleton inside one process.
76 /// The async and blocking pools working when calling methods of this struct are
77 /// stored in the global static executor instance. Here, keep the empty struct
78 /// for compatibility and possibility for function extension in the future.
79 pub struct Runtime {
80     pub(crate) async_spawner: AsyncHandle,
81 }
82 
83 #[cfg(not(feature = "ffrt"))]
84 impl Runtime {
get_handle(&self) -> std::sync::Arc<Handle>85     pub(crate) fn get_handle(&self) -> std::sync::Arc<Handle> {
86         match &self.async_spawner {
87             #[cfg(feature = "current_thread_runtime")]
88             AsyncHandle::CurrentThread(s) => s.handle.clone(),
89             AsyncHandle::MultiThread(s) => s.exe_mng_info.handle.clone(),
90         }
91     }
92 }
93 
global_default_async() -> &'static Runtime94 pub(crate) fn global_default_async() -> &'static Runtime {
95     static mut GLOBAL_DEFAULT_ASYNC: MaybeUninit<Runtime> = MaybeUninit::uninit();
96     static ONCE: Once = Once::new();
97 
98     unsafe {
99         ONCE.call_once(|| {
100             let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
101 
102             if global_builder.is_none() {
103                 *global_builder = Some(RuntimeBuilder::new_multi_thread());
104             }
105 
106             #[cfg(not(feature = "ffrt"))]
107             let runtime = match initialize_async_spawner(global_builder.as_ref().unwrap()) {
108                 Ok(s) => Runtime {
109                     async_spawner: AsyncHandle::MultiThread(s),
110                 },
111                 Err(e) => panic!("initialize runtime failed: {:?}", e),
112             };
113             #[cfg(feature = "ffrt")]
114             let runtime = Runtime {
115                 async_spawner: AsyncHandle::FfrtMultiThread,
116             };
117             GLOBAL_DEFAULT_ASYNC = MaybeUninit::new(runtime);
118         });
119         &*GLOBAL_DEFAULT_ASYNC.as_ptr()
120     }
121 }
122 
global_default_blocking() -> &'static BlockPoolSpawner123 pub(crate) fn global_default_blocking() -> &'static BlockPoolSpawner {
124     static mut GLOBAL_DEFAULT_BLOCKING: MaybeUninit<BlockPoolSpawner> = MaybeUninit::uninit();
125     static ONCE: Once = Once::new();
126 
127     unsafe {
128         ONCE.call_once(|| {
129             let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
130 
131             if global_builder.is_none() {
132                 *global_builder = Some(RuntimeBuilder::new_multi_thread());
133             }
134             match initialize_blocking_spawner(&global_builder.as_ref().unwrap().common) {
135                 Ok(bps) => GLOBAL_DEFAULT_BLOCKING = MaybeUninit::new(bps),
136                 Err(e) => panic!("initialize blocking pool failed: {:?}", e),
137             }
138         });
139         &*GLOBAL_DEFAULT_BLOCKING.as_ptr()
140     }
141 }
142 
143 #[cfg(all(feature = "multi_instance_runtime", not(feature = "ffrt")))]
144 impl Runtime {
145     /// Creates a new multi-thread runtime with default setting
new() -> std::io::Result<Runtime>146     pub fn new() -> std::io::Result<Runtime> {
147         RuntimeBuilder::new_multi_thread().build()
148     }
149 }
150 
151 impl Runtime {
152     /// Spawns a future onto the runtime, returning a [`JoinHandle`] for it.
153     ///
154     /// The future will be later polled by the executor, which is usually
155     /// implemented as a thread pool. The executor will run the future util
156     /// finished.
157     ///
158     /// Awaits on the JoinHandle will return the result of the future, but users
159     /// don't have to `.await` the `JoinHandle` in order to run the future,
160     /// since the future will be executed in the background once it's
161     /// spawned. Dropping the JoinHandle will throw away the returned value.
162     ///
163     /// # Examples
164     ///
165     /// ```no run
166     /// use ylong_runtime::task::*;
167     /// use ylong_runtime::builder::RuntimeBuilder;
168     /// use ylong_runtime::executor::Runtime;
169     ///
170     /// async fn test_future(num: usize) -> usize {
171     ///     num
172     /// }
173     ///
174     /// let core_pool_size = 4;
175     ///
176     /// let runtime = RuntimeBuilder::new_multi_thread()
177     ///     .worker_num(core_pool_size)
178     ///     .build()
179     ///     .unwrap();
180     ///
181     /// runtime.spawn(test_future(1));
182     /// ```
spawn<T, R>(&self, task: T) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,183     pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
184     where
185         T: Future<Output = R> + Send + 'static,
186         R: Send + 'static,
187     {
188         self.spawn_with_attr(task, &TaskBuilder::default())
189     }
190 
191     #[inline]
spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,192     pub(crate) fn spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R>
193     where
194         T: Future<Output = R> + Send + 'static,
195         R: Send + 'static,
196     {
197         match &self.async_spawner {
198             #[cfg(feature = "current_thread_runtime")]
199             AsyncHandle::CurrentThread(current_thread) => current_thread.spawn(builder, task),
200             #[cfg(not(feature = "ffrt"))]
201             AsyncHandle::MultiThread(async_spawner) => async_spawner.spawn(builder, task),
202             #[cfg(feature = "ffrt")]
203             AsyncHandle::FfrtMultiThread => spawn(task, builder),
204         }
205     }
206 
207     /// Spawns the provided function or closure onto the runtime.
208     ///
209     /// It's usually used for cpu-bounded computation that does not return
210     /// pending and takes a relatively long time.
211     ///
212     /// # Examples
213     ///
214     /// ```no run
215     /// use ylong_runtime::builder::RuntimeBuilder;
216     ///
217     /// use std::time;
218     /// use std::thread::sleep;
219     ///
220     /// let runtime = RuntimeBuilder::new_multi_thread()
221     ///     .build()
222     ///     .unwrap();
223     ///
224     /// runtime.spawn_blocking(move || {
225     ///     sleep(time::Duration::from_millis(1));
226     ///     10
227     /// });
228     /// ```
spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R> where T: FnOnce() -> R + Send + 'static, R: Send + 'static,229     pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
230     where
231         T: FnOnce() -> R + Send + 'static,
232         R: Send + 'static,
233     {
234         crate::spawn::spawn_blocking(&TaskBuilder::new(), task)
235     }
236 
237     /// Blocks the current thread and runs the given future (usually a
238     /// JoinHandle) to completion, and gets its return value.
239     ///
240     /// Any code after the `block_on` will be executed once the future is done.
241     ///
242     /// Don't use this method on an asynchronous environment, since it will
243     /// block the worker thread and may cause deadlock.
244     ///
245     /// # Examples
246     ///
247     /// ```no run
248     /// use ylong_runtime::builder::RuntimeBuilder;
249     ///
250     /// let core_pool_size = 4;
251     /// async fn test_future(num: usize) -> usize {
252     ///     num
253     /// }
254     ///
255     /// let runtime = RuntimeBuilder::new_multi_thread()
256     ///     .worker_num(core_pool_size)
257     ///     .build()
258     ///     .unwrap();
259     ///
260     /// let handle = runtime.spawn(test_future(4));
261     ///
262     /// let result = runtime.block_on(handle);
263     ///
264     /// assert_eq!(result.unwrap(), 4);
265     /// ```
block_on<T, R>(&self, task: T) -> R where T: Future<Output = R>,266     pub fn block_on<T, R>(&self, task: T) -> R
267     where
268         T: Future<Output = R>,
269     {
270         // Registers handle to the current thread when block_on().
271         // so that async_source can get the handle and register it.
272         #[cfg(not(feature = "ffrt"))]
273         let cur_context = worker::WorkerHandle {
274             _handle: self.get_handle(),
275         };
276         #[cfg(not(feature = "ffrt"))]
277         worker::CURRENT_HANDLE.with(|ctx| {
278             ctx.set(&cur_context as *const _ as *const ());
279         });
280 
281         let ret = match &self.async_spawner {
282             #[cfg(feature = "current_thread_runtime")]
283             AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
284             #[cfg(not(feature = "ffrt"))]
285             AsyncHandle::MultiThread(_) => block_on::block_on(task),
286             #[cfg(feature = "ffrt")]
287             AsyncHandle::FfrtMultiThread => block_on::block_on(task),
288         };
289 
290         // Sets the current thread variable to null,
291         // otherwise the worker's CURRENT_WORKER can not be set under MultiThread.
292         #[cfg(not(feature = "ffrt"))]
293         worker::CURRENT_HANDLE.with(|ctx| {
294             ctx.set(std::ptr::null());
295         });
296 
297         ret
298     }
299 }
300 
301 cfg_metrics!(
302     use crate::metrics::Metrics;
303     impl Runtime {
304         /// User can get some message from Runtime during running.
305         ///
306         /// # Example
307         /// ```
308         /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread().build().unwrap();
309         /// let _metrics = runtime.metrics();
310         /// ```
311         pub fn metrics(&self) -> Metrics {
312             Metrics::new(self)
313         }
314     }
315 
316     /// Gets metrics of the global Runtime.
317     /// # Example
318     /// ```
319     /// use ylong_runtime::executor::get_global_runtime_metrics;
320     ///
321     /// let metrics = get_global_runtime_metrics();
322     /// ```
323     pub fn get_global_runtime_metrics() -> Metrics<'static> {
324         Metrics::new(global_default_async())
325     }
326 );
327