1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::VecDeque; 15 use std::future::Future; 16 use std::option::Option::Some; 17 use std::pin::Pin; 18 use std::sync::{Arc, Condvar, Mutex, Weak}; 19 use std::task::{Context, Poll}; 20 use std::thread; 21 use std::time::Duration; 22 23 use crate::builder::{CallbackHook, CommonBuilder}; 24 use crate::error::{ErrorKind, ScheduleError}; 25 use crate::executor::PlaceholderScheduler; 26 use crate::task; 27 use crate::task::{JoinHandle, TaskBuilder, VirtualTableType}; 28 29 pub(crate) const BLOCKING_THREAD_QUIT_WAIT_TIME: Duration = Duration::from_secs(3); 30 31 #[derive(Clone)] 32 pub(crate) struct BlockPoolSpawner { 33 inner: Arc<Inner>, 34 } 35 36 impl Drop for BlockPoolSpawner { drop(&mut self)37 fn drop(&mut self) { 38 self.shutdown(BLOCKING_THREAD_QUIT_WAIT_TIME); 39 } 40 } 41 42 impl BlockPoolSpawner { new(builder: &CommonBuilder) -> BlockPoolSpawner43 pub fn new(builder: &CommonBuilder) -> BlockPoolSpawner { 44 let keep_alive_time = builder 45 .keep_alive_time 46 .unwrap_or(BLOCKING_THREAD_KEEP_ALIVE_TIME); 47 let max_thread_num = builder 48 .max_blocking_pool_size 49 .unwrap_or(BLOCKING_MAX_THEAD_NUM); 50 BlockPoolSpawner { 51 inner: Arc::new(Inner { 52 shared: Mutex::new(Shared { 53 queue: VecDeque::new(), 54 total_thread_num: 0, 55 idle_thread_num: 0, 56 notify_num: 0, 57 current_permanent_thread_num: 0, 58 shutdown: false, 59 worker_id: 0, 60 worker_threads: VecDeque::new(), 61 }), 62 condvar: Condvar::new(), 63 shutdown_shared: Mutex::new(false), 64 shutdown_condvar: Condvar::new(), 65 stack_size: builder.stack_size, 66 after_start: builder.after_start.clone(), 67 before_stop: builder.before_stop.clone(), 68 max_thread_num, 69 keep_alive_time, 70 max_permanent_thread_num: builder.blocking_permanent_thread_num, 71 }), 72 } 73 } 74 shutdown(&mut self, timeout: Duration) -> bool75 pub fn shutdown(&mut self, timeout: Duration) -> bool { 76 let mut shared = self.inner.shared.lock().unwrap(); 77 78 if shared.shutdown { 79 return false; 80 } 81 self.inner.condvar.notify_all(); 82 let workers = std::mem::take(&mut shared.worker_threads); 83 drop(shared); 84 85 let shutdown_shared = self.inner.shutdown_shared.lock().unwrap(); 86 87 if *self 88 .inner 89 .shutdown_condvar 90 .wait_timeout(shutdown_shared, timeout) 91 .unwrap() 92 .0 93 { 94 for handle in workers { 95 let _ = handle.1.join(); 96 } 97 return true; 98 } 99 false 100 } 101 } 102 103 const BLOCKING_THREAD_KEEP_ALIVE_TIME: Duration = Duration::from_secs(5); 104 pub const BLOCKING_MAX_THEAD_NUM: u8 = 50; 105 106 /// Inner struct for [`BlockPoolSpawner`]. 107 struct Inner { 108 /// Shared information of the threads in the blocking pool 109 shared: Mutex<Shared>, 110 111 /// Used for thread synchronization 112 condvar: Condvar, 113 114 /// Stores the notification for shutting down 115 shutdown_shared: Mutex<bool>, 116 117 /// Used for thread shutdown synchronization 118 shutdown_condvar: Condvar, 119 120 /// Stack size of each thread in the blocking pool 121 stack_size: Option<usize>, 122 123 /// A callback func to be called after thread starts 124 after_start: Option<CallbackHook>, 125 126 /// A callback func to be called before thread stops 127 before_stop: Option<CallbackHook>, 128 129 /// Maximum thread number for the blocking pool 130 max_thread_num: u8, 131 132 /// Maximum keep-alive time for idle threads 133 keep_alive_time: Duration, 134 135 /// Max number of permanent threads 136 max_permanent_thread_num: u8, 137 } 138 139 /// Shared info among the blocking pool 140 struct Shared { 141 /// Task queue 142 queue: VecDeque<Task>, 143 144 /// Number of current created threads 145 total_thread_num: u8, 146 147 /// Number of current idle threads 148 idle_thread_num: u8, 149 150 /// Number of calls to `notify_one`, prevents spurious wakeup of condvar. 151 notify_num: u8, 152 153 /// number of permanent threads in the pool 154 current_permanent_thread_num: u8, 155 156 /// Shutdown flag of the pool 157 shutdown: bool, 158 159 /// Corresponds with the JoinHandles of the worker threads 160 worker_id: usize, 161 162 /// Stores the JoinHandles of the worker threads 163 worker_threads: VecDeque<(usize, thread::JoinHandle<()>)>, 164 } 165 166 type Task = task::Task; 167 168 // ===== impl BlockPoolSpawner ===== 169 impl BlockPoolSpawner { create_permanent_threads(&self) -> Result<(), ScheduleError>170 pub fn create_permanent_threads(&self) -> Result<(), ScheduleError> { 171 for _ in 0..self.inner.max_permanent_thread_num { 172 let mut shared = self.inner.shared.lock().unwrap(); 173 shared.total_thread_num += 1; 174 let worker_id = shared.worker_id; 175 let mut builder = thread::Builder::new().name(format!("block-r-{worker_id}")); 176 if let Some(stack_size) = self.inner.stack_size { 177 builder = builder.stack_size(stack_size); 178 } 179 let inner = self.inner.clone(); 180 let join_handle = builder.spawn(move || inner.run(worker_id)); 181 match join_handle { 182 Ok(join_handle) => { 183 shared.worker_threads.push_back((worker_id, join_handle)); 184 shared.worker_id += 1; 185 } 186 Err(err) => { 187 return Err(ScheduleError::new(ErrorKind::BlockSpawnErr, err)); 188 } 189 } 190 } 191 Ok(()) 192 } 193 spawn_blocking<T, R>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<R> where T: FnOnce() -> R, T: Send + 'static, R: Send + 'static,194 pub(crate) fn spawn_blocking<T, R>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<R> 195 where 196 T: FnOnce() -> R, 197 T: Send + 'static, 198 R: Send + 'static, 199 { 200 let task = BlockingTask(Some(task)); 201 let scheduler: Weak<PlaceholderScheduler> = Weak::new(); 202 let (task, handle) = Task::create_task(builder, scheduler, task, VirtualTableType::Ylong); 203 let _ = self.spawn(task); 204 handle 205 } 206 spawn(&self, task: Task) -> Result<(), ScheduleError>207 fn spawn(&self, task: Task) -> Result<(), ScheduleError> { 208 let mut shared = self.inner.shared.lock().unwrap(); 209 210 // if the shutdown flag is on, cancel the task 211 if shared.shutdown { 212 return Err(ErrorKind::TaskShutdown.into()); 213 } 214 215 shared.queue.push_back(task); 216 217 if shared.idle_thread_num == 0 { 218 if shared.total_thread_num == self.inner.max_thread_num { 219 // thread number has reached maximum, do nothing 220 } else { 221 // there is no idle thread and the maximum thread number has not been reached, 222 // therefore create a new thread 223 shared.total_thread_num += 1; 224 // sets all required attributes for the thread 225 let worker_id = shared.worker_id; 226 let mut builder = thread::Builder::new().name(format!("block-{worker_id}")); 227 if let Some(stack_size) = self.inner.stack_size { 228 builder = builder.stack_size(stack_size); 229 } 230 231 let inner = self.inner.clone(); 232 let join_handle = builder.spawn(move || inner.run(worker_id)); 233 match join_handle { 234 Ok(join_handle) => { 235 shared.worker_threads.push_back((worker_id, join_handle)); 236 shared.worker_id += 1; 237 } 238 Err(e) => { 239 panic!("os can't spawn worker thread: {}", e); 240 } 241 } 242 } 243 } else { 244 shared.idle_thread_num -= 1; 245 shared.notify_num += 1; 246 self.inner.condvar.notify_one(); 247 } 248 Ok(()) 249 } 250 } 251 252 impl Inner { run(&self, worker_id: usize)253 fn run(&self, worker_id: usize) { 254 if let Some(f) = &self.after_start { 255 f() 256 } 257 258 let mut shared = self.shared.lock().unwrap(); 259 260 'main: loop { 261 // get a task from the global queue 262 while let Some(task) = shared.queue.pop_front() { 263 drop(shared); 264 task.run(); 265 shared = self.shared.lock().unwrap(); 266 } 267 268 shared.idle_thread_num += 1; 269 while !shared.shutdown { 270 // permanent waits, the thread keep alive until shutdown. 271 if shared.current_permanent_thread_num < self.max_permanent_thread_num { 272 shared.current_permanent_thread_num += 1; 273 shared = self.condvar.wait(shared).unwrap(); 274 shared.current_permanent_thread_num -= 1; 275 // Combining a loop to prevent spurious wakeup of condvar, if there is a 276 // spurious wakeup, the `notify_num` will be 0 and the loop will continue. 277 if shared.notify_num != 0 { 278 shared.notify_num -= 1; 279 break; 280 } 281 } else { 282 // if the thread is not permanent, set the keep-alive time for releasing 283 // the thread 284 let time_out_lock_res = self 285 .condvar 286 .wait_timeout(shared, self.keep_alive_time) 287 .unwrap(); 288 shared = time_out_lock_res.0; 289 let timeout_result = time_out_lock_res.1; 290 291 // Combining a loop to prevent spurious wakeup of condvar, if there is a 292 // spurious wakeup, the `notify_num` will be 0 and the loop will continue. 293 if shared.notify_num != 0 { 294 shared.notify_num -= 1; 295 break; 296 } 297 // expires, release the thread 298 if !shared.shutdown && timeout_result.timed_out() { 299 for (thread_id, thread) in shared.worker_threads.iter().enumerate() { 300 if thread.0 == worker_id { 301 shared.worker_threads.remove(thread_id); 302 break; 303 } 304 } 305 break 'main; 306 } 307 } 308 } 309 310 if shared.shutdown { 311 // empty the tasks in the global queue 312 while let Some(_task) = shared.queue.pop_front() { 313 drop(shared); 314 shared = self.shared.lock().unwrap(); 315 } 316 break; 317 } 318 } 319 320 // thread exit 321 shared.total_thread_num = shared 322 .total_thread_num 323 .checked_sub(1) 324 .expect("total thread num underflowed"); 325 shared.idle_thread_num = shared 326 .idle_thread_num 327 .checked_sub(1) 328 .expect("idle thread num underflowed"); 329 330 let shutdown = shared.shutdown; 331 drop(shared); 332 333 if shutdown { 334 *self.shutdown_shared.lock().unwrap() = true; 335 self.shutdown_condvar.notify_one(); 336 } 337 338 if let Some(f) = &self.before_stop { 339 f() 340 } 341 } 342 } 343 344 struct BlockingTask<T>(Option<T>); 345 346 impl<T> Unpin for BlockingTask<T> {} 347 348 impl<T, R> Future for BlockingTask<T> 349 where 350 T: FnOnce() -> R, 351 { 352 type Output = R; 353 poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output>354 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { 355 let func = self.0.take().expect("no run two times"); 356 Poll::Ready(func()) 357 } 358 } 359 360 #[cfg(test)] 361 mod test { 362 use std::sync::Weak; 363 use std::time::Duration; 364 365 use crate::builder::RuntimeBuilder; 366 use crate::executor::blocking_pool::BlockPoolSpawner; 367 use crate::executor::PlaceholderScheduler; 368 use crate::task::{Task, VirtualTableType}; 369 370 /// UT test cases for BlockPoolSpawner::new() 371 /// 372 /// # Brief 373 /// 1. Checking the parameters after initialization is completed. 374 #[test] ut_blocking_pool_new()375 fn ut_blocking_pool_new() { 376 let thread_pool_builder = 377 RuntimeBuilder::new_multi_thread().keep_alive_time(Duration::from_secs(1)); 378 let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 379 assert_eq!( 380 blocking_pool.inner.stack_size, 381 thread_pool_builder.common.stack_size 382 ); 383 assert_eq!( 384 blocking_pool.inner.max_thread_num, 385 thread_pool_builder.common.max_blocking_pool_size.unwrap() 386 ); 387 assert_eq!( 388 blocking_pool.inner.keep_alive_time, 389 thread_pool_builder.common.keep_alive_time.unwrap() 390 ); 391 assert_eq!( 392 blocking_pool.inner.max_permanent_thread_num, 393 thread_pool_builder.common.blocking_permanent_thread_num 394 ); 395 } 396 397 /// UT test cases for BlockPoolSpawner::shutdown() 398 /// 399 /// # Brief 400 /// 1. When shared.shutdown is false, the thread is safely exited without a 401 /// timeout 402 /// 2. When shared.shutdown is false, the thread is not safely exited in 403 /// case of timeout 404 /// 3. When shared.shutdown is true, BlockPoolSpawner::shutdown returns 405 /// directly, representing that the blocking thread pool has safely 406 /// exited 407 408 #[test] ut_blocking_pool_shutdown()409 fn ut_blocking_pool_shutdown() { 410 let thread_pool_builder = RuntimeBuilder::new_multi_thread(); 411 let mut blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 412 blocking_pool.inner.shared.lock().unwrap().shutdown = true; 413 assert!(!blocking_pool.shutdown(Duration::from_secs(3))); 414 415 let thread_pool_builder = RuntimeBuilder::new_multi_thread(); 416 let mut blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 417 let spawner_inner_clone = blocking_pool.inner.clone(); 418 let _thread = std::thread::spawn(move || { 419 *spawner_inner_clone.shutdown_shared.lock().unwrap() = true; 420 spawner_inner_clone.shutdown_condvar.notify_one(); 421 }); 422 assert!(blocking_pool.shutdown(Duration::from_secs(3))); 423 424 let thread_pool_builder = RuntimeBuilder::new_multi_thread(); 425 let mut blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 426 let spawner_inner_clone = blocking_pool.inner.clone(); 427 let _thread = std::thread::spawn(move || { 428 spawner_inner_clone.shutdown_condvar.notify_one(); 429 }); 430 431 blocking_pool.inner.shared.lock().unwrap().shutdown = true; 432 assert!(!blocking_pool.shutdown(Duration::from_secs(0))); 433 } 434 435 /// UT test cases for BlockPoolSpawner::create_permanent_threads() 436 /// 437 /// # Brief 438 /// 1. self.inner.is_permanent == true, self.inner.worker_name.clone() != 439 /// None, self.inner.stack_size != None 440 /// 2. self.inner.is_permanent == true, self.inner.worker_name.clone() == 441 /// None, self.inner.stack_size == None 442 /// 3. self.inner.is_permanent == false 443 #[test] ut_blocking_pool_spawner_create_permanent_threads()444 fn ut_blocking_pool_spawner_create_permanent_threads() { 445 let thread_pool_builder = 446 RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(4); 447 let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 448 assert!(blocking_pool.create_permanent_threads().is_ok()); 449 assert_eq!(blocking_pool.inner.shared.lock().unwrap().worker_id, 4); 450 451 let thread_pool_builder = 452 RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(4); 453 let common = RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(4); 454 let blocking_pool = BlockPoolSpawner::new(&common.common); 455 assert!(blocking_pool.create_permanent_threads().is_ok()); 456 assert_eq!( 457 blocking_pool.inner.shared.lock().unwrap().worker_id, 458 thread_pool_builder.common.blocking_permanent_thread_num as usize 459 ); 460 assert_eq!( 461 blocking_pool 462 .inner 463 .shared 464 .lock() 465 .unwrap() 466 .worker_threads 467 .pop_front() 468 .unwrap() 469 .1 470 .thread() 471 .name() 472 .unwrap(), 473 "block-r-0" 474 ); 475 476 let thread_pool_builder = RuntimeBuilder::new_multi_thread() 477 .blocking_permanent_thread_num(4) 478 .worker_name(String::from("test")); 479 let common = RuntimeBuilder::new_multi_thread() 480 .blocking_permanent_thread_num(4) 481 .worker_name(String::from("test")); 482 let blocking_pool = BlockPoolSpawner::new(&common.common); 483 assert!(blocking_pool.create_permanent_threads().is_ok()); 484 assert_eq!( 485 blocking_pool.inner.shared.lock().unwrap().worker_id, 486 thread_pool_builder.common.blocking_permanent_thread_num as usize 487 ); 488 assert_eq!( 489 blocking_pool 490 .inner 491 .shared 492 .lock() 493 .unwrap() 494 .worker_threads 495 .pop_front() 496 .unwrap() 497 .1 498 .thread() 499 .name() 500 .unwrap(), 501 "block-r-0" 502 ); 503 } 504 505 /// UT test cases for BlockPoolSpawner::spawn() 506 /// 507 /// # Brief 508 /// 1. shared.shutdown == true, return directly. 509 /// 2. shared.shutdown == false, shared.idle_thread_num != 0 510 /// 3. shared.shutdown == false, shared.idle_thread_num == 0, 511 /// shared.total_thread_num == self.inner.max_pool_size 512 /// 4. shared.shutdown == false, shared.idle_thread_num == 0, 513 /// shared.total_thread_num != self.inner.max_pool_size, 514 /// self.inner.worker_name.clone() != None 515 /// 5. shared.shutdown == false, shared.idle_thread_num == 0, 516 /// shared.total_thread_num != self.inner.max_pool_size, 517 /// self.inner.worker_name.clone() == None 518 519 #[test] ut_blocking_pool_spawner_spawn()520 fn ut_blocking_pool_spawner_spawn() { 521 use std::thread::sleep; 522 523 use crate::executor::blocking_pool::BlockingTask; 524 use crate::task::TaskBuilder; 525 526 let thread_pool_builder = RuntimeBuilder::new_multi_thread(); 527 let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 528 blocking_pool.inner.shared.lock().unwrap().shutdown = true; 529 let task = BlockingTask(Some(move || { 530 sleep(Duration::from_millis(10)); 531 String::from("task") 532 })); 533 let builder = TaskBuilder::new(); 534 let scheduler: Weak<PlaceholderScheduler> = Weak::new(); 535 let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong); 536 assert!(blocking_pool.spawn(task).is_err()); 537 538 let thread_pool_builder = RuntimeBuilder::new_multi_thread(); 539 let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 540 blocking_pool.inner.shared.lock().unwrap().shutdown = false; 541 blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 1; 542 let task = BlockingTask(Some(move || { 543 sleep(Duration::from_millis(10)); 544 String::from("task") 545 })); 546 let scheduler: Weak<PlaceholderScheduler> = Weak::new(); 547 let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong); 548 blocking_pool.spawn(task).expect("failed"); 549 assert_eq!(blocking_pool.inner.shared.lock().unwrap().notify_num, 1); 550 551 let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(4); 552 let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 553 blocking_pool.inner.shared.lock().unwrap().shutdown = false; 554 blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 0; 555 blocking_pool.inner.shared.lock().unwrap().total_thread_num = 4; 556 let task = BlockingTask(Some(move || { 557 sleep(Duration::from_millis(10)); 558 String::from("task") 559 })); 560 let scheduler: Weak<PlaceholderScheduler> = Weak::new(); 561 let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong); 562 blocking_pool.spawn(task).expect("failed"); 563 assert_eq!(blocking_pool.inner.shared.lock().unwrap().worker_id, 0); 564 565 let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(4); 566 let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 567 blocking_pool.inner.shared.lock().unwrap().shutdown = false; 568 blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 0; 569 blocking_pool.inner.shared.lock().unwrap().total_thread_num = 3; 570 571 let task = BlockingTask(Some(move || { 572 sleep(Duration::from_millis(10)); 573 String::from("task") 574 })); 575 let scheduler: Weak<PlaceholderScheduler> = Weak::new(); 576 let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong); 577 blocking_pool.spawn(task).expect("failed"); 578 assert_eq!( 579 blocking_pool 580 .inner 581 .shared 582 .lock() 583 .unwrap() 584 .worker_threads 585 .pop_front() 586 .unwrap() 587 .1 588 .thread() 589 .name() 590 .unwrap(), 591 "block-0" 592 ); 593 594 let thread_pool_builder = RuntimeBuilder::new_multi_thread() 595 .max_blocking_pool_size(4) 596 .worker_name(String::from("test")); 597 let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common); 598 blocking_pool.inner.shared.lock().unwrap().shutdown = false; 599 blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 0; 600 blocking_pool.inner.shared.lock().unwrap().total_thread_num = 3; 601 let task = BlockingTask(Some(move || { 602 sleep(Duration::from_millis(10)); 603 String::from("task") 604 })); 605 let scheduler: Weak<PlaceholderScheduler> = Weak::new(); 606 let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong); 607 blocking_pool.spawn(task).expect("failed"); 608 assert_eq!( 609 blocking_pool 610 .inner 611 .shared 612 .lock() 613 .unwrap() 614 .worker_threads 615 .pop_front() 616 .unwrap() 617 .1 618 .thread() 619 .name() 620 .unwrap(), 621 "block-0" 622 ); 623 } 624 } 625