• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::sync::Mutex;
16 
17 cfg_ffrt!(
18     use ylong_ffrt::{ffrt_set_worker_stack_size, Qos};
19     use std::collections::HashMap;
20     use libc::{c_uint, c_ulong};
21     use std::time::Duration;
22     use crate::builder::ScheduleAlgo;
23 );
24 
25 #[cfg(not(feature = "ffrt"))]
26 use crate::builder::common_builder::impl_common;
27 use crate::builder::CommonBuilder;
28 #[cfg(feature = "multi_instance_runtime")]
29 use crate::executor::{AsyncHandle, Runtime};
30 
31 pub(crate) static GLOBAL_BUILDER: Mutex<Option<MultiThreadBuilder>> = Mutex::new(None);
32 
33 /// Runtime builder that configures a multi-threaded runtime, or the global
34 /// runtime.
35 pub struct MultiThreadBuilder {
36     pub(crate) common: CommonBuilder,
37 
38     #[cfg(not(feature = "ffrt"))]
39     /// Maximum thread number for core thread pool
40     pub(crate) core_thread_size: Option<usize>,
41 
42     #[cfg(feature = "ffrt")]
43     /// Thread number for each qos
44     pub(crate) thread_num_by_qos: HashMap<Qos, u32>,
45 }
46 
47 impl MultiThreadBuilder {
new() -> Self48     pub(crate) fn new() -> Self {
49         MultiThreadBuilder {
50             common: CommonBuilder::new(),
51             #[cfg(not(feature = "ffrt"))]
52             core_thread_size: None,
53             #[cfg(feature = "ffrt")]
54             thread_num_by_qos: HashMap::new(),
55         }
56     }
57 
58     /// Configures the global runtime.
59     ///
60     /// # Error
61     /// If the global runtime is already running or this method has been called
62     /// before, then it will return an `AlreadyExists` error.
build_global(self) -> io::Result<()>63     pub fn build_global(self) -> io::Result<()> {
64         let mut builder = GLOBAL_BUILDER.lock().unwrap();
65         if builder.is_some() {
66             return Err(io::ErrorKind::AlreadyExists.into());
67         }
68 
69         #[cfg(feature = "ffrt")]
70         unsafe {
71             for (qos, stack_size) in self.common.stack_size_by_qos.iter() {
72                 ffrt_set_worker_stack_size(*qos, *stack_size as c_ulong);
73             }
74         }
75 
76         *builder = Some(self);
77         Ok(())
78     }
79 }
80 
81 #[cfg(feature = "ffrt")]
82 impl MultiThreadBuilder {
83     /// Sets the maximum worker number for a specific qos group.
84     ///
85     /// If a worker number has already been set for a qos, calling the method
86     /// with the same qos will overwrite the old value.
87     ///
88     /// # Error
89     /// The accepted worker number range for each qos is [1, 20]. If 0 is passed
90     /// in, then the maximum worker number will be set to 1. If a number
91     /// greater than 20 is passed in, then the maximum worker number will be
92     /// set to 20.
max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self93     pub fn max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self {
94         let worker = match num {
95             0 => 1,
96             n if n > 20 => 20,
97             n => n,
98         };
99         self.thread_num_by_qos.insert(qos, worker);
100         self
101     }
102 
103     /// Sets the name prefix for all worker threads.
worker_name(mut self, name: String) -> Self104     pub fn worker_name(mut self, name: String) -> Self {
105         self.common.worker_name = Some(name);
106         self
107     }
108 
109     /// Sets the number of core worker threads.
110     ///
111     ///
112     /// The boundary of thread number is 1-64:
113     /// If sets a number smaller than 1, then thread number would be set to 1.
114     /// If sets a number larger than 64, then thread number would be set to 64.
115     /// The default value is the number of cores of the cpu.
116     ///
117     /// # Examples
118     /// ```
119     /// use crate::ylong_runtime::builder::RuntimeBuilder;
120     ///
121     /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8);
122     /// ```
worker_num(self, core_pool_size: usize) -> Self123     pub fn worker_num(self, core_pool_size: usize) -> Self {
124         self.max_worker_num_by_qos(Qos::Default, core_pool_size as u32)
125     }
126 
127     /// Sets the core affinity of the worker threads
128     ///
129     /// # Note
130     /// This method does nothing now under ffrt feature.
is_affinity(self, _is_affinity: bool) -> Self131     pub fn is_affinity(self, _is_affinity: bool) -> Self {
132         self
133     }
134 
135     /// Sets the schedule policy.
136     ///
137     /// # Note
138     /// This method does nothing now under ffrt feature.
schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self139     pub fn schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self {
140         self
141     }
142 
143     /// Sets the callback function to be called when a worker thread starts.
144     ///
145     /// # Note
146     /// This method does nothing now under ffrt feature.
after_start<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,147     pub fn after_start<F>(self, _f: F) -> Self
148     where
149         F: Fn() + Send + Sync + 'static,
150     {
151         self
152     }
153 
154     /// Sets the callback function to be called when a worker thread stops.
155     ///
156     /// # Note
157     /// This method does nothing now under ffrt feature.
before_stop<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,158     pub fn before_stop<F>(self, _f: F) -> Self
159     where
160         F: Fn() + Send + Sync + 'static,
161     {
162         self
163     }
164 
165     /// Sets the maximum number of permanent threads in blocking thread pool
166     ///
167     /// # Note
168     /// This method does nothing now under ffrt feature.
blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self169     pub fn blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self {
170         self
171     }
172 
173     /// Sets the number of threads that the runtime could spawn additionally
174     /// besides the core thread pool.
175     ///
176     /// The boundary is 1-64.
177     ///
178     /// # Note
179     /// This method does nothing now under ffrt feature.
max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self180     pub fn max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self {
181         self
182     }
183 
184     /// Sets how long will the thread be kept alive inside the blocking pool
185     /// after it becomes idle.
186     ///
187     /// # Note
188     /// This method does nothing now under ffrt feature.
keep_alive_time(self, _keep_alive_time: Duration) -> Self189     pub fn keep_alive_time(self, _keep_alive_time: Duration) -> Self {
190         self
191     }
192 
193     /// Sets the thread stack size for a specific qos group.
194     ///
195     /// If a stack size has already been set for a qos, calling the method
196     /// with the same qos will overwrite the old value
197     ///
198     /// # Error
199     /// The lowest accepted stack size is 16k. If a value under 16k is passed
200     /// in, then the stack size will be set to 16k instead.
stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self201     pub fn stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self {
202         const PTHREAD_STACK_MIN: usize = 16 * 1000;
203 
204         let stack_size = match stack_size {
205             n if n < PTHREAD_STACK_MIN => PTHREAD_STACK_MIN,
206             n => n,
207         };
208         self.common.stack_size_by_qos.insert(qos, stack_size);
209         self
210     }
211 
212     /// Sets the stack size for every worker thread that gets spawned by the
213     /// runtime. The minimum stack size is 1.
worker_stack_size(self, stack_size: usize) -> Self214     pub fn worker_stack_size(self, stack_size: usize) -> Self {
215         self.stack_size_by_qos(Qos::Default, stack_size)
216     }
217 }
218 
219 #[cfg(not(feature = "ffrt"))]
220 impl MultiThreadBuilder {
221     /// Initializes the runtime and returns its instance.
222     #[cfg(feature = "multi_instance_runtime")]
build(&mut self) -> io::Result<Runtime>223     pub fn build(&mut self) -> io::Result<Runtime> {
224         use crate::builder::initialize_async_spawner;
225         let async_spawner = initialize_async_spawner(self)?;
226 
227         Ok(Runtime {
228             async_spawner: AsyncHandle::MultiThread(async_spawner),
229         })
230     }
231 
232     /// Sets the number of core worker threads.
233     ///
234     ///
235     /// The boundary of thread number is 1-64:
236     /// If sets a number smaller than 1, then thread number would be set to 1.
237     /// If sets a number larger than 64, then thread number would be set to 64.
238     /// The default value is the number of cores of the cpu.
239     ///
240     /// # Examples
241     /// ```
242     /// use crate::ylong_runtime::builder::RuntimeBuilder;
243     ///
244     /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8);
245     /// ```
worker_num(mut self, core_pool_size: usize) -> Self246     pub fn worker_num(mut self, core_pool_size: usize) -> Self {
247         if core_pool_size < 1 {
248             self.core_thread_size = Some(1);
249         } else if core_pool_size > 64 {
250             self.core_thread_size = Some(64);
251         } else {
252             self.core_thread_size = Some(core_pool_size);
253         }
254         self
255     }
256 }
257 
258 #[cfg(not(feature = "ffrt"))]
259 impl_common!(MultiThreadBuilder);
260 
261 #[cfg(feature = "full")]
262 #[cfg(test)]
263 mod test {
264     use crate::builder::RuntimeBuilder;
265     use crate::executor::{global_default_async, AsyncHandle};
266 
267     /// UT test cases for blocking on a time sleep without initializing the
268     /// runtime.
269     ///
270     /// # Brief
271     /// 1. Configure the global runtime to make it have six core threads
272     /// 2. Get the global runtime
273     /// 3. Check the core thread number of the runtime
274     /// 4. Call build_global once more
275     /// 5. Check the error
276     #[test]
ut_build_global()277     fn ut_build_global() {
278         let ret = RuntimeBuilder::new_multi_thread()
279             .worker_num(6)
280             .max_blocking_pool_size(3)
281             .build_global();
282         assert!(ret.is_ok());
283 
284         let async_pool = global_default_async();
285         match &async_pool.async_spawner {
286             AsyncHandle::CurrentThread(_) => unreachable!(),
287             AsyncHandle::MultiThread(x) => {
288                 assert_eq!(x.inner.total, 6);
289             }
290         }
291 
292         let ret = RuntimeBuilder::new_multi_thread()
293             .worker_num(2)
294             .max_blocking_pool_size(3)
295             .build_global();
296         assert!(ret.is_err());
297     }
298 }
299 
300 #[cfg(feature = "ffrt")]
301 #[cfg(test)]
302 mod ffrt_test {
303     use ylong_ffrt::Qos::{Default, UserInitiated, UserInteractive};
304 
305     use crate::builder::MultiThreadBuilder;
306 
307     /// UT test cases for max_worker_num_by_qos
308     ///
309     /// # Brief
310     /// 1. Sets UserInteractive qos group to have 0 maximum worker number.
311     /// 2. Checks if the actual value is 1
312     /// 3. Sets UserInteractive qos group to have 21 maximum worker number.
313     /// 4. Checks if the actual value is 20
314     /// 5. Set Default qos group to have 8 maximum worker number.
315     /// 6. Checks if the actual value is 8.
316     /// 7. Calls build_global on the builder, check if the return value is Ok
317     #[test]
ut_set_max_worker()318     fn ut_set_max_worker() {
319         let builder = MultiThreadBuilder::new();
320         let builder = builder.max_worker_num_by_qos(UserInteractive, 0);
321         let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap();
322         assert_eq!(*num, 1);
323 
324         let builder = builder.max_worker_num_by_qos(UserInteractive, 21);
325         let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap();
326         assert_eq!(*num, 20);
327 
328         let builder = MultiThreadBuilder::new().max_worker_num_by_qos(Default, 8);
329         let num = builder.thread_num_by_qos.get(&Default).unwrap();
330         assert_eq!(*num, 8);
331     }
332 
333     /// UT cases for stack_size_by_qos
334     ///
335     /// # Brief
336     /// 1. Sets UserInitiated qos group's stack size to 16k - 1
337     /// 2. Checks if the actual stack size is 16k
338     /// 3. Sets UserInteractive qos group's stack size to 16k
339     /// 4. Checks if the actual stack size is 16k
340     /// 5. Sets Default qos group's stack size to 16M
341     /// 6. Checks if the actual stack size is 16M
342     /// 7. Sets UserInteractive qos group's stack size to 16k + 1
343     /// 8. Checks if the actual stack size is 16k + 1
344     #[test]
ut_set_stack_size()345     fn ut_set_stack_size() {
346         let builder = MultiThreadBuilder::new();
347         let builder = builder.stack_size_by_qos(UserInitiated, 16 * 1000 - 1);
348         let num = builder
349             .common
350             .stack_size_by_qos
351             .get(&UserInitiated)
352             .unwrap();
353         assert_eq!(*num, 16 * 1000);
354 
355         let builder = MultiThreadBuilder::new();
356         let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000);
357         let num = builder
358             .common
359             .stack_size_by_qos
360             .get(&UserInteractive)
361             .unwrap();
362         assert_eq!(*num, 16 * 1000);
363 
364         let builder = MultiThreadBuilder::new();
365         let builder = builder.stack_size_by_qos(Default, 16 * 1000 * 1000);
366         let num = builder.common.stack_size_by_qos.get(&Default).unwrap();
367         assert_eq!(*num, 16 * 1000 * 1000);
368 
369         let builder = MultiThreadBuilder::new();
370         let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000 + 1);
371         let num = builder
372             .common
373             .stack_size_by_qos
374             .get(&UserInteractive)
375             .unwrap();
376         assert_eq!(*num, 16 * 1000 + 1);
377     }
378 }
379