1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io; 15 use std::sync::Mutex; 16 17 cfg_ffrt!( 18 use ylong_ffrt::{ffrt_set_worker_stack_size, Qos}; 19 use std::collections::HashMap; 20 use libc::{c_uint, c_ulong}; 21 use std::time::Duration; 22 use crate::builder::ScheduleAlgo; 23 ); 24 25 #[cfg(not(feature = "ffrt"))] 26 use crate::builder::common_builder::impl_common; 27 use crate::builder::CommonBuilder; 28 #[cfg(feature = "multi_instance_runtime")] 29 use crate::executor::{AsyncHandle, Runtime}; 30 31 pub(crate) static GLOBAL_BUILDER: Mutex<Option<MultiThreadBuilder>> = Mutex::new(None); 32 33 /// Runtime builder that configures a multi-threaded runtime, or the global 34 /// runtime. 35 pub struct MultiThreadBuilder { 36 pub(crate) common: CommonBuilder, 37 38 #[cfg(not(feature = "ffrt"))] 39 /// Maximum thread number for core thread pool 40 pub(crate) core_thread_size: Option<usize>, 41 42 #[cfg(feature = "ffrt")] 43 /// Thread number for each qos 44 pub(crate) thread_num_by_qos: HashMap<Qos, u32>, 45 } 46 47 impl MultiThreadBuilder { new() -> Self48 pub(crate) fn new() -> Self { 49 MultiThreadBuilder { 50 common: CommonBuilder::new(), 51 #[cfg(not(feature = "ffrt"))] 52 core_thread_size: None, 53 #[cfg(feature = "ffrt")] 54 thread_num_by_qos: HashMap::new(), 55 } 56 } 57 58 /// Configures the global runtime. 59 /// 60 /// # Error 61 /// If the global runtime is already running or this method has been called 62 /// before, then it will return an `AlreadyExists` error. build_global(self) -> io::Result<()>63 pub fn build_global(self) -> io::Result<()> { 64 let mut builder = GLOBAL_BUILDER.lock().unwrap(); 65 if builder.is_some() { 66 return Err(io::ErrorKind::AlreadyExists.into()); 67 } 68 69 #[cfg(feature = "ffrt")] 70 unsafe { 71 for (qos, stack_size) in self.common.stack_size_by_qos.iter() { 72 ffrt_set_worker_stack_size(*qos, *stack_size as c_ulong); 73 } 74 } 75 76 *builder = Some(self); 77 Ok(()) 78 } 79 } 80 81 #[cfg(feature = "ffrt")] 82 impl MultiThreadBuilder { 83 /// Sets the maximum worker number for a specific qos group. 84 /// 85 /// If a worker number has already been set for a qos, calling the method 86 /// with the same qos will overwrite the old value. 87 /// 88 /// # Error 89 /// The accepted worker number range for each qos is [1, 20]. If 0 is passed 90 /// in, then the maximum worker number will be set to 1. If a number 91 /// greater than 20 is passed in, then the maximum worker number will be 92 /// set to 20. max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self93 pub fn max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self { 94 let worker = match num { 95 0 => 1, 96 n if n > 20 => 20, 97 n => n, 98 }; 99 self.thread_num_by_qos.insert(qos, worker); 100 self 101 } 102 103 /// Sets the name prefix for all worker threads. worker_name(mut self, name: String) -> Self104 pub fn worker_name(mut self, name: String) -> Self { 105 self.common.worker_name = Some(name); 106 self 107 } 108 109 /// Sets the number of core worker threads. 110 /// 111 /// 112 /// The boundary of thread number is 1-64: 113 /// If sets a number smaller than 1, then thread number would be set to 1. 114 /// If sets a number larger than 64, then thread number would be set to 64. 115 /// The default value is the number of cores of the cpu. 116 /// 117 /// # Examples 118 /// ``` 119 /// use crate::ylong_runtime::builder::RuntimeBuilder; 120 /// 121 /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8); 122 /// ``` worker_num(self, core_pool_size: usize) -> Self123 pub fn worker_num(self, core_pool_size: usize) -> Self { 124 self.max_worker_num_by_qos(Qos::Default, core_pool_size as u32) 125 } 126 127 /// Sets the core affinity of the worker threads 128 /// 129 /// # Note 130 /// This method does nothing now under ffrt feature. is_affinity(self, _is_affinity: bool) -> Self131 pub fn is_affinity(self, _is_affinity: bool) -> Self { 132 self 133 } 134 135 /// Sets the schedule policy. 136 /// 137 /// # Note 138 /// This method does nothing now under ffrt feature. schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self139 pub fn schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self { 140 self 141 } 142 143 /// Sets the callback function to be called when a worker thread starts. 144 /// 145 /// # Note 146 /// This method does nothing now under ffrt feature. after_start<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,147 pub fn after_start<F>(self, _f: F) -> Self 148 where 149 F: Fn() + Send + Sync + 'static, 150 { 151 self 152 } 153 154 /// Sets the callback function to be called when a worker thread stops. 155 /// 156 /// # Note 157 /// This method does nothing now under ffrt feature. before_stop<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,158 pub fn before_stop<F>(self, _f: F) -> Self 159 where 160 F: Fn() + Send + Sync + 'static, 161 { 162 self 163 } 164 165 /// Sets the maximum number of permanent threads in blocking thread pool 166 /// 167 /// # Note 168 /// This method does nothing now under ffrt feature. blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self169 pub fn blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self { 170 self 171 } 172 173 /// Sets the number of threads that the runtime could spawn additionally 174 /// besides the core thread pool. 175 /// 176 /// The boundary is 1-64. 177 /// 178 /// # Note 179 /// This method does nothing now under ffrt feature. max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self180 pub fn max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self { 181 self 182 } 183 184 /// Sets how long will the thread be kept alive inside the blocking pool 185 /// after it becomes idle. 186 /// 187 /// # Note 188 /// This method does nothing now under ffrt feature. keep_alive_time(self, _keep_alive_time: Duration) -> Self189 pub fn keep_alive_time(self, _keep_alive_time: Duration) -> Self { 190 self 191 } 192 193 /// Sets the thread stack size for a specific qos group. 194 /// 195 /// If a stack size has already been set for a qos, calling the method 196 /// with the same qos will overwrite the old value 197 /// 198 /// # Error 199 /// The lowest accepted stack size is 16k. If a value under 16k is passed 200 /// in, then the stack size will be set to 16k instead. stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self201 pub fn stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self { 202 const PTHREAD_STACK_MIN: usize = 16 * 1000; 203 204 let stack_size = match stack_size { 205 n if n < PTHREAD_STACK_MIN => PTHREAD_STACK_MIN, 206 n => n, 207 }; 208 self.common.stack_size_by_qos.insert(qos, stack_size); 209 self 210 } 211 212 /// Sets the stack size for every worker thread that gets spawned by the 213 /// runtime. The minimum stack size is 1. worker_stack_size(self, stack_size: usize) -> Self214 pub fn worker_stack_size(self, stack_size: usize) -> Self { 215 self.stack_size_by_qos(Qos::Default, stack_size) 216 } 217 } 218 219 #[cfg(not(feature = "ffrt"))] 220 impl MultiThreadBuilder { 221 /// Initializes the runtime and returns its instance. 222 #[cfg(feature = "multi_instance_runtime")] build(&mut self) -> io::Result<Runtime>223 pub fn build(&mut self) -> io::Result<Runtime> { 224 use crate::builder::initialize_async_spawner; 225 let async_spawner = initialize_async_spawner(self)?; 226 227 Ok(Runtime { 228 async_spawner: AsyncHandle::MultiThread(async_spawner), 229 }) 230 } 231 232 /// Sets the number of core worker threads. 233 /// 234 /// 235 /// The boundary of thread number is 1-64: 236 /// If sets a number smaller than 1, then thread number would be set to 1. 237 /// If sets a number larger than 64, then thread number would be set to 64. 238 /// The default value is the number of cores of the cpu. 239 /// 240 /// # Examples 241 /// ``` 242 /// use crate::ylong_runtime::builder::RuntimeBuilder; 243 /// 244 /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8); 245 /// ``` worker_num(mut self, core_pool_size: usize) -> Self246 pub fn worker_num(mut self, core_pool_size: usize) -> Self { 247 if core_pool_size < 1 { 248 self.core_thread_size = Some(1); 249 } else if core_pool_size > 64 { 250 self.core_thread_size = Some(64); 251 } else { 252 self.core_thread_size = Some(core_pool_size); 253 } 254 self 255 } 256 } 257 258 #[cfg(not(feature = "ffrt"))] 259 impl_common!(MultiThreadBuilder); 260 261 #[cfg(feature = "full")] 262 #[cfg(test)] 263 mod test { 264 use crate::builder::RuntimeBuilder; 265 use crate::executor::{global_default_async, AsyncHandle}; 266 267 /// UT test cases for blocking on a time sleep without initializing the 268 /// runtime. 269 /// 270 /// # Brief 271 /// 1. Configure the global runtime to make it have six core threads 272 /// 2. Get the global runtime 273 /// 3. Check the core thread number of the runtime 274 /// 4. Call build_global once more 275 /// 5. Check the error 276 #[test] ut_build_global()277 fn ut_build_global() { 278 let ret = RuntimeBuilder::new_multi_thread() 279 .worker_num(6) 280 .max_blocking_pool_size(3) 281 .build_global(); 282 assert!(ret.is_ok()); 283 284 let async_pool = global_default_async(); 285 match &async_pool.async_spawner { 286 AsyncHandle::CurrentThread(_) => unreachable!(), 287 AsyncHandle::MultiThread(x) => { 288 assert_eq!(x.inner.total, 6); 289 } 290 } 291 292 let ret = RuntimeBuilder::new_multi_thread() 293 .worker_num(2) 294 .max_blocking_pool_size(3) 295 .build_global(); 296 assert!(ret.is_err()); 297 } 298 } 299 300 #[cfg(feature = "ffrt")] 301 #[cfg(test)] 302 mod ffrt_test { 303 use ylong_ffrt::Qos::{Default, UserInitiated, UserInteractive}; 304 305 use crate::builder::MultiThreadBuilder; 306 307 /// UT test cases for max_worker_num_by_qos 308 /// 309 /// # Brief 310 /// 1. Sets UserInteractive qos group to have 0 maximum worker number. 311 /// 2. Checks if the actual value is 1 312 /// 3. Sets UserInteractive qos group to have 21 maximum worker number. 313 /// 4. Checks if the actual value is 20 314 /// 5. Set Default qos group to have 8 maximum worker number. 315 /// 6. Checks if the actual value is 8. 316 /// 7. Calls build_global on the builder, check if the return value is Ok 317 #[test] ut_set_max_worker()318 fn ut_set_max_worker() { 319 let builder = MultiThreadBuilder::new(); 320 let builder = builder.max_worker_num_by_qos(UserInteractive, 0); 321 let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap(); 322 assert_eq!(*num, 1); 323 324 let builder = builder.max_worker_num_by_qos(UserInteractive, 21); 325 let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap(); 326 assert_eq!(*num, 20); 327 328 let builder = MultiThreadBuilder::new().max_worker_num_by_qos(Default, 8); 329 let num = builder.thread_num_by_qos.get(&Default).unwrap(); 330 assert_eq!(*num, 8); 331 } 332 333 /// UT cases for stack_size_by_qos 334 /// 335 /// # Brief 336 /// 1. Sets UserInitiated qos group's stack size to 16k - 1 337 /// 2. Checks if the actual stack size is 16k 338 /// 3. Sets UserInteractive qos group's stack size to 16k 339 /// 4. Checks if the actual stack size is 16k 340 /// 5. Sets Default qos group's stack size to 16M 341 /// 6. Checks if the actual stack size is 16M 342 /// 7. Sets UserInteractive qos group's stack size to 16k + 1 343 /// 8. Checks if the actual stack size is 16k + 1 344 #[test] ut_set_stack_size()345 fn ut_set_stack_size() { 346 let builder = MultiThreadBuilder::new(); 347 let builder = builder.stack_size_by_qos(UserInitiated, 16 * 1000 - 1); 348 let num = builder 349 .common 350 .stack_size_by_qos 351 .get(&UserInitiated) 352 .unwrap(); 353 assert_eq!(*num, 16 * 1000); 354 355 let builder = MultiThreadBuilder::new(); 356 let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000); 357 let num = builder 358 .common 359 .stack_size_by_qos 360 .get(&UserInteractive) 361 .unwrap(); 362 assert_eq!(*num, 16 * 1000); 363 364 let builder = MultiThreadBuilder::new(); 365 let builder = builder.stack_size_by_qos(Default, 16 * 1000 * 1000); 366 let num = builder.common.stack_size_by_qos.get(&Default).unwrap(); 367 assert_eq!(*num, 16 * 1000 * 1000); 368 369 let builder = MultiThreadBuilder::new(); 370 let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000 + 1); 371 let num = builder 372 .common 373 .stack_size_by_qos 374 .get(&UserInteractive) 375 .unwrap(); 376 assert_eq!(*num, 16 * 1000 + 1); 377 } 378 } 379