• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Executor contains two parts:
15 //! - thread pool: how threads are started and how they run the tasks.
16 //! - schedule policy: how tasks are scheduled in the task queues.
17 pub(crate) mod block_on;
18 pub(crate) mod blocking_pool;
19 #[cfg(feature = "current_thread_runtime")]
20 pub(crate) mod current_thread;
21 
22 #[cfg(all(any(feature = "time", feature = "net"), feature = "ffrt"))]
23 pub(crate) mod netpoller;
24 use std::future::Future;
25 use std::mem::MaybeUninit;
26 use std::sync::Once;
27 
28 use crate::builder::multi_thread_builder::GLOBAL_BUILDER;
29 use crate::builder::{initialize_blocking_spawner, RuntimeBuilder};
30 use crate::executor::blocking_pool::BlockPoolSpawner;
31 #[cfg(feature = "current_thread_runtime")]
32 use crate::executor::current_thread::CurrentThreadSpawner;
33 use crate::task::TaskBuilder;
34 use crate::{JoinHandle, Task};
35 cfg_ffrt! {
36     use crate::builder::initialize_ffrt_spawner;
37     use crate::ffrt::spawner::spawn;
38 }
39 cfg_not_ffrt! {
40     mod parker;
41     pub(crate) mod async_pool;
42     pub(crate) mod queue;
43     mod sleeper;
44     pub(crate) mod worker;
45     pub(crate) mod driver;
46     use crate::executor::driver::Handle;
47     use crate::builder::initialize_async_spawner;
48     use crate::executor::async_pool::AsyncPoolSpawner;
49 }
50 
51 pub(crate) trait Schedule {
schedule(&self, task: Task, lifo: bool)52     fn schedule(&self, task: Task, lifo: bool);
53 }
54 
55 pub(crate) struct PlaceholderScheduler;
56 
57 impl Schedule for PlaceholderScheduler {
schedule(&self, _task: Task, _lifo: bool)58     fn schedule(&self, _task: Task, _lifo: bool) {
59         panic!("no scheduler can be called");
60     }
61 }
62 
63 pub(crate) enum AsyncHandle {
64     #[cfg(feature = "current_thread_runtime")]
65     CurrentThread(CurrentThreadSpawner),
66     #[cfg(not(feature = "ffrt"))]
67     MultiThread(AsyncPoolSpawner),
68     #[cfg(feature = "ffrt")]
69     FfrtMultiThread,
70 }
71 
72 /// Runtime struct.
73 ///
74 /// # If `multi_instance_runtime` feature is turned on
75 /// There will be multiple runtime executors, initializing from user settings in
76 /// `RuntimeBuilder`.
77 ///
78 /// # If `multi_instance_runtime` feature is turned off
79 /// There will be only *ONE* runtime executor singleton inside one process.
80 /// The async and blocking pools working when calling methods of this struct are
81 /// stored in the global static executor instance. Here, keep the empty struct
82 /// for compatibility and possibility for function extension in the future.
83 pub struct Runtime {
84     pub(crate) async_spawner: AsyncHandle,
85 }
86 
87 #[cfg(not(feature = "ffrt"))]
88 impl Runtime {
get_handle(&self) -> std::sync::Arc<Handle>89     pub(crate) fn get_handle(&self) -> std::sync::Arc<Handle> {
90         match &self.async_spawner {
91             #[cfg(feature = "current_thread_runtime")]
92             AsyncHandle::CurrentThread(s) => s.handle.clone(),
93             AsyncHandle::MultiThread(s) => s.exe_mng_info.handle.clone(),
94         }
95     }
96 }
97 
global_default_async() -> &'static Runtime98 pub(crate) fn global_default_async() -> &'static Runtime {
99     static mut GLOBAL_DEFAULT_ASYNC: MaybeUninit<Runtime> = MaybeUninit::uninit();
100     static ONCE: Once = Once::new();
101 
102     unsafe {
103         ONCE.call_once(|| {
104             let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
105 
106             if global_builder.is_none() {
107                 *global_builder = Some(RuntimeBuilder::new_multi_thread());
108             }
109 
110             #[cfg(not(feature = "ffrt"))]
111             let runtime = match initialize_async_spawner(global_builder.as_ref().unwrap()) {
112                 Ok(s) => Runtime {
113                     async_spawner: AsyncHandle::MultiThread(s),
114                 },
115                 Err(e) => panic!("initialize runtime failed: {:?}", e),
116             };
117             #[cfg(feature = "ffrt")]
118             let runtime = match initialize_ffrt_spawner(global_builder.as_ref().unwrap()) {
119                 Ok(()) => Runtime {
120                     async_spawner: AsyncHandle::FfrtMultiThread,
121                 },
122                 Err(e) => panic!("initialize runtime failed: {:?}", e),
123             };
124             GLOBAL_DEFAULT_ASYNC = MaybeUninit::new(runtime);
125         });
126         &*GLOBAL_DEFAULT_ASYNC.as_ptr()
127     }
128 }
129 
global_default_blocking() -> &'static BlockPoolSpawner130 pub(crate) fn global_default_blocking() -> &'static BlockPoolSpawner {
131     static mut GLOBAL_DEFAULT_BLOCKING: MaybeUninit<BlockPoolSpawner> = MaybeUninit::uninit();
132     static ONCE: Once = Once::new();
133 
134     unsafe {
135         ONCE.call_once(|| {
136             let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
137 
138             if global_builder.is_none() {
139                 *global_builder = Some(RuntimeBuilder::new_multi_thread());
140             }
141             match initialize_blocking_spawner(&global_builder.as_ref().unwrap().common) {
142                 Ok(bps) => GLOBAL_DEFAULT_BLOCKING = MaybeUninit::new(bps),
143                 Err(e) => panic!("initialize blocking pool failed: {:?}", e),
144             }
145         });
146         &*GLOBAL_DEFAULT_BLOCKING.as_ptr()
147     }
148 }
149 
150 #[cfg(all(feature = "multi_instance_runtime", not(feature = "ffrt")))]
151 impl Runtime {
152     /// Creates a new multi-thread runtime with default setting
new() -> std::io::Result<Runtime>153     pub fn new() -> std::io::Result<Runtime> {
154         RuntimeBuilder::new_multi_thread().build()
155     }
156 }
157 
158 impl Runtime {
159     /// Spawns a future onto the runtime, returning a [`JoinHandle`] for it.
160     ///
161     /// The future will be later polled by the executor, which is usually
162     /// implemented as a thread pool. The executor will run the future util
163     /// finished.
164     ///
165     /// Awaits on the JoinHandle will return the result of the future, but users
166     /// don't have to `.await` the `JoinHandle` in order to run the future,
167     /// since the future will be executed in the background once it's
168     /// spawned. Dropping the JoinHandle will throw away the returned value.
169     ///
170     /// # Examples
171     ///
172     /// ```no run
173     /// use ylong_runtime::task::*;
174     /// use ylong_runtime::builder::RuntimeBuilder;
175     /// use ylong_runtime::executor::Runtime;
176     ///
177     /// async fn test_future(num: usize) -> usize {
178     ///     num
179     /// }
180     ///
181     /// let core_pool_size = 4;
182     ///
183     /// let runtime = RuntimeBuilder::new_multi_thread()
184     ///     .worker_num(core_pool_size)
185     ///     .build()
186     ///     .unwrap();
187     ///
188     /// runtime.spawn(test_future(1));
189     /// ```
spawn<T, R>(&self, task: T) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,190     pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
191     where
192         T: Future<Output = R> + Send + 'static,
193         R: Send + 'static,
194     {
195         self.spawn_with_attr(task, &TaskBuilder::default())
196     }
197 
198     #[inline]
spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,199     pub(crate) fn spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R>
200     where
201         T: Future<Output = R> + Send + 'static,
202         R: Send + 'static,
203     {
204         match &self.async_spawner {
205             #[cfg(feature = "current_thread_runtime")]
206             AsyncHandle::CurrentThread(current_thread) => current_thread.spawn(builder, task),
207             #[cfg(not(feature = "ffrt"))]
208             AsyncHandle::MultiThread(async_spawner) => async_spawner.spawn(builder, task),
209             #[cfg(feature = "ffrt")]
210             AsyncHandle::FfrtMultiThread => spawn(task, builder),
211         }
212     }
213 
214     /// Spawns the provided function or closure onto the runtime.
215     ///
216     /// It's usually used for cpu-bounded computation that does not return
217     /// pending and takes a relatively long time.
218     ///
219     /// # Examples
220     ///
221     /// ```no run
222     /// use ylong_runtime::builder::RuntimeBuilder;
223     ///
224     /// use std::time;
225     /// use std::thread::sleep;
226     ///
227     /// let runtime = RuntimeBuilder::new_multi_thread()
228     ///     .build()
229     ///     .unwrap();
230     ///
231     /// runtime.spawn_blocking(move || {
232     ///     sleep(time::Duration::from_millis(1));
233     ///     10
234     /// });
235     /// ```
spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R> where T: FnOnce() -> R + Send + 'static, R: Send + 'static,236     pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
237     where
238         T: FnOnce() -> R + Send + 'static,
239         R: Send + 'static,
240     {
241         crate::spawn::spawn_blocking(&TaskBuilder::new(), task)
242     }
243 
244     /// Blocks the current thread and runs the given future (usually a
245     /// JoinHandle) to completion, and gets its return value.
246     ///
247     /// Any code after the `block_on` will be executed once the future is done.
248     ///
249     /// Don't use this method on an asynchronous environment, since it will
250     /// block the worker thread and may cause deadlock.
251     ///
252     /// # Examples
253     ///
254     /// ```no run
255     /// use ylong_runtime::builder::RuntimeBuilder;
256     ///
257     /// let core_pool_size = 4;
258     /// async fn test_future(num: usize) -> usize {
259     ///     num
260     /// }
261     ///
262     /// let runtime = RuntimeBuilder::new_multi_thread()
263     ///     .worker_num(core_pool_size)
264     ///     .build()
265     ///     .unwrap();
266     ///
267     /// let handle = runtime.spawn(test_future(4));
268     ///
269     /// let result = runtime.block_on(handle);
270     ///
271     /// assert_eq!(result.unwrap(), 4);
272     /// ```
block_on<T, R>(&self, task: T) -> R where T: Future<Output = R>,273     pub fn block_on<T, R>(&self, task: T) -> R
274     where
275         T: Future<Output = R>,
276     {
277         // Registers handle to the current thread when block_on().
278         // so that async_source can get the handle and register it.
279         #[cfg(not(feature = "ffrt"))]
280         let cur_context = worker::WorkerHandle {
281             _handle: self.get_handle(),
282         };
283         #[cfg(not(feature = "ffrt"))]
284         worker::CURRENT_HANDLE.with(|ctx| {
285             ctx.set(&cur_context as *const _ as *const ());
286         });
287 
288         let ret = match &self.async_spawner {
289             #[cfg(feature = "current_thread_runtime")]
290             AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
291             #[cfg(not(feature = "ffrt"))]
292             AsyncHandle::MultiThread(_) => block_on::block_on(task),
293             #[cfg(feature = "ffrt")]
294             AsyncHandle::FfrtMultiThread => block_on::block_on(task),
295         };
296 
297         // Sets the current thread variable to null,
298         // otherwise the worker's CURRENT_WORKER can not be set under MultiThread.
299         #[cfg(not(feature = "ffrt"))]
300         worker::CURRENT_HANDLE.with(|ctx| {
301             ctx.set(std::ptr::null());
302         });
303 
304         ret
305     }
306 }
307 
308 cfg_metrics!(
309     use crate::metrics::Metrics;
310     impl Runtime {
311         /// User can get some message from Runtime during running.
312         ///
313         /// # Example
314         /// ```
315         /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread().build().unwrap();
316         /// let _metrics = runtime.metrics();
317         /// ```
318         pub fn metrics(&self) -> Metrics {
319             Metrics::new(self)
320         }
321     }
322 
323     /// Gets metrics of the global Runtime.
324     /// # Example
325     /// ```
326     /// use ylong_runtime::executor::get_global_runtime_metrics;
327     ///
328     /// let metrics = get_global_runtime_metrics();
329     /// ```
330     pub fn get_global_runtime_metrics() -> Metrics<'static> {
331         Metrics::new(global_default_async())
332     }
333 );
334