1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! A builder to configure the runtime, and thread pool of the runtime.
15 //!
16 //! Ylong-runtime provides two kinds of runtime.
17 //! `CurrentThread`: Runtime which runs on the current thread.
18 //! `MultiThread`: Runtime which runs on multiple threads.
19 //!
20 //! After configuring the builder, a call to `build` will return the actual
21 //! runtime instance. [`MultiThreadBuilder`] could also be used for configuring
22 //! the global singleton runtime.
23 //!
24 //! For thread pool, the builder allows the user to set the thread number, stack
25 //! size and name prefix of each thread.
26
27 pub(crate) mod common_builder;
28 #[cfg(feature = "current_thread_runtime")]
29 pub(crate) mod current_thread_builder;
30 pub(crate) mod multi_thread_builder;
31
32 use std::fmt::Debug;
33 use std::io;
34 use std::sync::Arc;
35 #[cfg(all(any(feature = "time", feature = "net"), feature = "ffrt"))]
36 use std::sync::Once;
37
38 #[cfg(feature = "current_thread_runtime")]
39 pub use current_thread_builder::CurrentThreadBuilder;
40 pub use multi_thread_builder::MultiThreadBuilder;
41
42 pub(crate) use crate::builder::common_builder::CommonBuilder;
43 use crate::error::ScheduleError;
44 use crate::executor::blocking_pool::BlockPoolSpawner;
45 #[cfg(all(feature = "time", feature = "ffrt"))]
46 use crate::executor::netpoller::NetLooper;
47
48 cfg_not_ffrt!(
49 use crate::executor::async_pool::AsyncPoolSpawner;
50 );
51
52 /// A callback function to be executed in different stages of a thread's
53 /// life-cycle
54 pub type CallbackHook = Arc<dyn Fn() + Send + Sync + 'static>;
55
56 /// Schedule Policy.
57 #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq)]
58 pub enum ScheduleAlgo {
59 /// Bounded local queues which adopts FIFO order.
60 FifoBound,
61 }
62
63 /// Builder to build the runtime. Provides methods to customize the runtime,
64 /// such as setting thread pool size, worker thread stack size, work thread
65 /// prefix and etc.
66 ///
67 /// If `multi_instance_runtime` or `current_thread_runtime` feature is turned
68 /// on: After setting the RuntimeBuilder, a call to build will initialize the
69 /// actual runtime and returns its instance. If there is an invalid parameter
70 /// during the build, an error would be returned.
71 ///
72 /// Otherwise:
73 /// RuntimeBuilder will not have the `build()` method, instead, this builder
74 /// should be passed to set the global executor.
75 ///
76 /// # Examples
77 ///
78 /// ```no run
79 /// #![cfg(feature = "multi_instance_runtime")]
80 ///
81 /// use ylong_runtime::builder::RuntimeBuilder;
82 /// use ylong_runtime::executor::Runtime;
83 ///
84 /// let runtime = RuntimeBuilder::new_multi_thread()
85 /// .worker_num(4)
86 /// .worker_stack_size(1024 * 300)
87 /// .build()
88 /// .unwrap();
89 /// ```
90 pub struct RuntimeBuilder;
91
92 impl RuntimeBuilder {
93 /// Initializes a new RuntimeBuilder with current_thread settings.
94 ///
95 /// All tasks will run on the current thread, which means it does not create
96 /// any other worker threads.
97 ///
98 /// # Examples
99 ///
100 /// ```
101 /// use ylong_runtime::builder::RuntimeBuilder;
102 ///
103 /// let runtime = RuntimeBuilder::new_current_thread()
104 /// .worker_stack_size(1024 * 3)
105 /// .max_blocking_pool_size(4);
106 /// ```
107 #[cfg(feature = "current_thread_runtime")]
new_current_thread() -> CurrentThreadBuilder108 pub fn new_current_thread() -> CurrentThreadBuilder {
109 CurrentThreadBuilder::new()
110 }
111
112 /// Initializes a new RuntimeBuilder with multi_thread settings.
113 ///
114 /// When running, worker threads will be created according to the builder
115 /// configuration, and tasks will be allocated and run in the newly
116 /// created thread pool.
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// use ylong_runtime::builder::RuntimeBuilder;
122 ///
123 /// let runtime = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(4);
124 /// ```
new_multi_thread() -> MultiThreadBuilder125 pub fn new_multi_thread() -> MultiThreadBuilder {
126 MultiThreadBuilder::new()
127 }
128 }
129
130 #[cfg(not(feature = "ffrt"))]
initialize_async_spawner( builder: &MultiThreadBuilder, ) -> io::Result<AsyncPoolSpawner>131 pub(crate) fn initialize_async_spawner(
132 builder: &MultiThreadBuilder,
133 ) -> io::Result<AsyncPoolSpawner> {
134 AsyncPoolSpawner::new(builder)
135 }
136
137 #[cfg(feature = "ffrt")]
initialize_ffrt_spawner(_builder: &MultiThreadBuilder) -> io::Result<()>138 pub(crate) fn initialize_ffrt_spawner(_builder: &MultiThreadBuilder) -> io::Result<()> {
139 // initialize reactor
140 #[cfg(any(feature = "time", feature = "net"))]
141 initialize_reactor()?;
142 Ok(())
143 }
144
initialize_blocking_spawner( builder: &CommonBuilder, ) -> Result<BlockPoolSpawner, ScheduleError>145 pub(crate) fn initialize_blocking_spawner(
146 builder: &CommonBuilder,
147 ) -> Result<BlockPoolSpawner, ScheduleError> {
148 let blocking_spawner = BlockPoolSpawner::new(builder);
149 blocking_spawner.create_permanent_threads()?;
150 Ok(blocking_spawner)
151 }
152
153 #[cfg(all(feature = "time", feature = "ffrt"))]
initialize_reactor() -> io::Result<()>154 pub(crate) fn initialize_reactor() -> io::Result<()> {
155 static ONCE: Once = Once::new();
156 ONCE.call_once(|| {
157 let net_poller = NetLooper::new();
158 net_poller.create_net_poller_thread();
159 });
160 Ok(())
161 }
162
163 #[cfg(test)]
164 mod test {
165 use crate::builder::RuntimeBuilder;
166 #[cfg(not(feature = "ffrt"))]
167 use crate::builder::ScheduleAlgo;
168
169 /// UT test cases for RuntimeBuilder::new_multi_thread()
170 ///
171 /// # Brief
172 /// 1. Checks if the object name property is None
173 /// 2. Checks if the object core_pool_size property is None
174 /// 3. Checks if the object is_steal property is true
175 /// 4. Checks if the object is_affinity property is true
176 /// 5. Checks if the object permanent_blocking_thread_num property is 4
177 /// 6. Checks if the object max_pool_size property is Some(50)
178 /// 7. Checks if the object keep_alive_time property is None
179 /// 8. Checks if the object schedule_algo property is
180 /// ScheduleAlgo::FifoBound
181 /// 9. Checks if the object stack_size property is None
182 /// 10. Checks if the object after_start property is None
183 /// 11. Checks if the object before_stop property is None
184 #[test]
ut_thread_pool_builder_new()185 fn ut_thread_pool_builder_new() {
186 let thread_pool_builder = RuntimeBuilder::new_multi_thread();
187 assert_eq!(thread_pool_builder.common.worker_name, None);
188 assert_eq!(thread_pool_builder.common.blocking_permanent_thread_num, 0);
189 assert_eq!(thread_pool_builder.common.max_blocking_pool_size, Some(50));
190 assert_eq!(thread_pool_builder.common.keep_alive_time, None);
191 #[cfg(not(feature = "ffrt"))]
192 {
193 assert_eq!(thread_pool_builder.core_thread_size, None);
194 assert_eq!(
195 thread_pool_builder.common.schedule_algo,
196 ScheduleAlgo::FifoBound
197 );
198 }
199 assert_eq!(thread_pool_builder.common.stack_size, None);
200 }
201
202 /// UT test cases for RuntimeBuilder::name()
203 ///
204 /// # Brief
205 /// 1. Checks if the object name property is modified value
206 #[test]
ut_thread_pool_builder_name()207 fn ut_thread_pool_builder_name() {
208 let name = String::from("worker_name");
209 let thread_pool_builder = RuntimeBuilder::new_multi_thread().worker_name(name.clone());
210 assert_eq!(thread_pool_builder.common.worker_name, Some(name));
211 }
212
213 /// UT test cases for RuntimeBuilder::core_pool_size()
214 ///
215 /// # Brief
216 /// 1. core_pool_size set to 1, Check if the return value is Some(1)
217 /// 2. core_pool_size set to 64, Check if the return value is Some(64)
218 /// 3. core_pool_size set to 0, Check if the return value is Some(1)
219 /// 4. core_pool_size set to 65, Check if the return value is Some(64)
220 #[test]
221 #[cfg(not(feature = "ffrt"))]
ut_thread_pool_builder_core_pool_size()222 fn ut_thread_pool_builder_core_pool_size() {
223 let thread_pool_builder = RuntimeBuilder::new_multi_thread().worker_num(1);
224 assert_eq!(thread_pool_builder.core_thread_size, Some(1));
225
226 let thread_pool_builder = RuntimeBuilder::new_multi_thread().worker_num(64);
227 assert_eq!(thread_pool_builder.core_thread_size, Some(64));
228
229 let thread_pool_builder = RuntimeBuilder::new_multi_thread().worker_num(0);
230 assert_eq!(thread_pool_builder.core_thread_size, Some(1));
231
232 let thread_pool_builder = RuntimeBuilder::new_multi_thread().worker_num(65);
233 assert_eq!(thread_pool_builder.core_thread_size, Some(64));
234 }
235
236 /// UT test cases for RuntimeBuilder::stack_size()
237 ///
238 /// # Brief
239 /// 1. stack_size set to 0, Check if the return value is Some(1)
240 /// 2. stack_size set to 1, Check if the return value is Some(1)
241 #[test]
ut_thread_pool_builder_stack_size()242 fn ut_thread_pool_builder_stack_size() {
243 let thread_pool_builder = RuntimeBuilder::new_multi_thread().worker_stack_size(0);
244 assert_eq!(thread_pool_builder.common.stack_size.unwrap(), 1);
245
246 let thread_pool_builder = RuntimeBuilder::new_multi_thread().worker_stack_size(1);
247 assert_eq!(thread_pool_builder.common.stack_size.unwrap(), 1);
248 }
249 }
250
251 #[cfg(test)]
252 #[cfg(feature = "current_thread_runtime")]
253 mod current_thread_test {
254 use crate::builder::RuntimeBuilder;
255
256 /// UT test cases for new_current_thread.
257 ///
258 /// # Brief
259 /// 1. Verify the result when multiple tasks are inserted to the current
260 /// thread at a time.
261 /// 2. Insert the task for multiple times, wait until the task is complete,
262 /// verify the result, and then perform the operation again.
263 /// 3. Spawn nest thread.
264 #[test]
ut_thread_pool_builder_current_thread()265 fn ut_thread_pool_builder_current_thread() {
266 let runtime = RuntimeBuilder::new_current_thread().build().unwrap();
267 let mut handles = vec![];
268 for index in 0..1000 {
269 let handle = runtime.spawn(async move { index });
270 handles.push(handle);
271 }
272 for (index, handle) in handles.into_iter().enumerate() {
273 let result = runtime.block_on(handle).unwrap();
274 assert_eq!(result, index);
275 }
276
277 let runtime = RuntimeBuilder::new_current_thread().build().unwrap();
278 for index in 0..1000 {
279 let handle = runtime.spawn(async move { index });
280 let result = runtime.block_on(handle).unwrap();
281 assert_eq!(result, index);
282 }
283
284 let runtime = RuntimeBuilder::new_current_thread().build().unwrap();
285 let handle = runtime.spawn(async move {
286 let runtime = RuntimeBuilder::new_current_thread().build().unwrap();
287 let handle = runtime.spawn(async move { 1_usize });
288 let result = runtime.block_on(handle).unwrap();
289 assert_eq!(result, 1);
290 result
291 });
292 let result = runtime.block_on(handle).unwrap();
293 assert_eq!(result, 1);
294 }
295 }
296
297 #[cfg(not(feature = "ffrt"))]
298 #[cfg(test)]
299 mod ylong_executor_test {
300 use crate::builder::{RuntimeBuilder, ScheduleAlgo};
301
302 /// UT test cases for ThreadPoolBuilder::is_affinity()
303 ///
304 /// # Brief
305 /// 1. is_affinity set to true, check if it is a modified value
306 /// 2. is_affinity set to false, check if it is a modified value
307 #[test]
ut_thread_pool_builder_is_affinity()308 fn ut_thread_pool_builder_is_affinity() {
309 let thread_pool_builder = RuntimeBuilder::new_multi_thread().is_affinity(true);
310 assert!(thread_pool_builder.common.is_affinity);
311
312 let thread_pool_builder = RuntimeBuilder::new_multi_thread().is_affinity(false);
313 assert!(!thread_pool_builder.common.is_affinity);
314 }
315
316 /// UT test cases for RuntimeBuilder::blocking_permanent_thread_num()
317 ///
318 /// # Brief
319 /// 1. permanent_blocking_thread_num set to 1, check if the return value is
320 /// 1.
321 /// 2. permanent_blocking_thread_num set to max_thread_num, check if the
322 /// return value is max_blocking_pool_size.
323 /// 3. permanent_blocking_thread_num set to 0, check if the return value is
324 /// 1.
325 /// 4. permanent_blocking_thread_num set to max_thread_num + 1, Check if the
326 /// return value O is max_blocking_pool_size.
327 #[test]
ut_thread_pool_builder_permanent_blocking_thread_num()328 fn ut_thread_pool_builder_permanent_blocking_thread_num() {
329 let thread_pool_builder =
330 RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(1);
331 assert_eq!(thread_pool_builder.common.blocking_permanent_thread_num, 1);
332
333 let blocking_permanent_thread_num =
334 thread_pool_builder.common.max_blocking_pool_size.unwrap();
335 let thread_pool_builder = RuntimeBuilder::new_multi_thread()
336 .blocking_permanent_thread_num(blocking_permanent_thread_num);
337 assert_eq!(
338 thread_pool_builder.common.blocking_permanent_thread_num,
339 blocking_permanent_thread_num
340 );
341
342 let thread_pool_builder =
343 RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(0);
344 assert_eq!(thread_pool_builder.common.blocking_permanent_thread_num, 0);
345
346 let permanent_blocking_thread_num =
347 thread_pool_builder.common.max_blocking_pool_size.unwrap() + 1;
348 let thread_pool_builder = RuntimeBuilder::new_multi_thread()
349 .blocking_permanent_thread_num(permanent_blocking_thread_num);
350 assert_eq!(
351 thread_pool_builder.common.blocking_permanent_thread_num,
352 thread_pool_builder.common.max_blocking_pool_size.unwrap()
353 );
354 }
355
356 /// UT test cases for RuntimeBuilder::max_pool_size()
357 ///
358 /// # Brief
359 /// 1. max_pool_size set to 1, check if the return value is Some(1)
360 /// 2. max_pool_size set to 64, check if the return value is Some(64)
361 /// 3. max_pool_size set to 0, check if the return value is Some(1)
362 /// 4. max_pool_size set to 65, check if the return value is Some(64)
363 #[test]
ut_thread_pool_builder_max_pool_size()364 fn ut_thread_pool_builder_max_pool_size() {
365 let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(1);
366 assert_eq!(
367 thread_pool_builder.common.max_blocking_pool_size.unwrap(),
368 1
369 );
370
371 let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(64);
372 assert_eq!(
373 thread_pool_builder.common.max_blocking_pool_size.unwrap(),
374 64
375 );
376
377 let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(0);
378 assert_eq!(
379 thread_pool_builder.common.max_blocking_pool_size.unwrap(),
380 1
381 );
382
383 let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(65);
384 assert_eq!(
385 thread_pool_builder.common.max_blocking_pool_size.unwrap(),
386 64
387 );
388 }
389
390 /// UT test cases for RuntimeBuilder::keep_alive_time()
391 ///
392 /// # Brief
393 /// 1. keep_alive_time set to 0, check if the return value is
394 /// Some(Duration::from_secs(0))
395 /// 2. keep_alive_time set to 1, check if the return value is
396 /// Some(Duration::from_secs(1))
397 #[test]
ut_thread_pool_builder_keep_alive_time()398 fn ut_thread_pool_builder_keep_alive_time() {
399 use std::time::Duration;
400
401 let keep_alive_time = Duration::from_secs(0);
402 let thread_pool_builder =
403 RuntimeBuilder::new_multi_thread().keep_alive_time(keep_alive_time);
404 assert_eq!(
405 thread_pool_builder.common.keep_alive_time.unwrap(),
406 keep_alive_time
407 );
408
409 let keep_alive_time = Duration::from_secs(1);
410 let thread_pool_builder =
411 RuntimeBuilder::new_multi_thread().keep_alive_time(keep_alive_time);
412 assert_eq!(
413 thread_pool_builder.common.keep_alive_time.unwrap(),
414 keep_alive_time
415 );
416 }
417
418 /// UT test cases for RuntimeBuilder::schedule_algo()
419 ///
420 /// # Brief
421 /// 1. schedule_algo set to FifoBound, check if it is the modified value
422 #[cfg(not(feature = "ffrt"))]
423 #[test]
ut_thread_pool_builder_schedule_algo_test()424 fn ut_thread_pool_builder_schedule_algo_test() {
425 let schedule_algo = ScheduleAlgo::FifoBound;
426 let thread_pool_builder = RuntimeBuilder::new_multi_thread().schedule_algo(schedule_algo);
427 assert_eq!(thread_pool_builder.common.schedule_algo, schedule_algo);
428 }
429 }
430