• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::VecDeque;
15 use std::future::Future;
16 use std::option::Option::Some;
17 use std::pin::Pin;
18 use std::sync::{Arc, Condvar, Mutex, Weak};
19 use std::task::{Context, Poll};
20 use std::thread;
21 use std::time::Duration;
22 
23 use crate::builder::{CallbackHook, CommonBuilder};
24 use crate::error::{ErrorKind, ScheduleError};
25 use crate::executor::PlaceholderScheduler;
26 use crate::task;
27 use crate::task::{JoinHandle, TaskBuilder, VirtualTableType};
28 
29 pub(crate) const BLOCKING_THREAD_QUIT_WAIT_TIME: Duration = Duration::from_secs(3);
30 
31 #[derive(Clone)]
32 pub(crate) struct BlockPoolSpawner {
33     inner: Arc<Inner>,
34 }
35 
36 impl Drop for BlockPoolSpawner {
drop(&mut self)37     fn drop(&mut self) {
38         self.shutdown(BLOCKING_THREAD_QUIT_WAIT_TIME);
39     }
40 }
41 
42 impl BlockPoolSpawner {
new(builder: &CommonBuilder) -> BlockPoolSpawner43     pub fn new(builder: &CommonBuilder) -> BlockPoolSpawner {
44         let keep_alive_time = builder
45             .keep_alive_time
46             .unwrap_or(BLOCKING_THREAD_KEEP_ALIVE_TIME);
47         let max_thread_num = builder
48             .max_blocking_pool_size
49             .unwrap_or(BLOCKING_MAX_THEAD_NUM);
50         BlockPoolSpawner {
51             inner: Arc::new(Inner {
52                 shared: Mutex::new(Shared {
53                     queue: VecDeque::new(),
54                     total_thread_num: 0,
55                     idle_thread_num: 0,
56                     notify_num: 0,
57                     current_permanent_thread_num: 0,
58                     shutdown: false,
59                     worker_id: 0,
60                     worker_threads: VecDeque::new(),
61                 }),
62                 condvar: Condvar::new(),
63                 shutdown_shared: Mutex::new(false),
64                 shutdown_condvar: Condvar::new(),
65                 stack_size: builder.stack_size,
66                 after_start: builder.after_start.clone(),
67                 before_stop: builder.before_stop.clone(),
68                 max_thread_num,
69                 keep_alive_time,
70                 max_permanent_thread_num: builder.blocking_permanent_thread_num,
71             }),
72         }
73     }
74 
shutdown(&mut self, timeout: Duration) -> bool75     pub fn shutdown(&mut self, timeout: Duration) -> bool {
76         let mut shared = self.inner.shared.lock().unwrap();
77 
78         if shared.shutdown {
79             return false;
80         }
81         self.inner.condvar.notify_all();
82         let workers = std::mem::take(&mut shared.worker_threads);
83         drop(shared);
84 
85         let shutdown_shared = self.inner.shutdown_shared.lock().unwrap();
86 
87         if *self
88             .inner
89             .shutdown_condvar
90             .wait_timeout(shutdown_shared, timeout)
91             .unwrap()
92             .0
93         {
94             for handle in workers {
95                 let _ = handle.1.join();
96             }
97             return true;
98         }
99         false
100     }
101 }
102 
103 const BLOCKING_THREAD_KEEP_ALIVE_TIME: Duration = Duration::from_secs(5);
104 pub const BLOCKING_MAX_THEAD_NUM: u8 = 50;
105 
106 /// Inner struct for [`BlockPoolSpawner`].
107 struct Inner {
108     /// Shared information of the threads in the blocking pool
109     shared: Mutex<Shared>,
110 
111     /// Used for thread synchronization
112     condvar: Condvar,
113 
114     /// Stores the notification for shutting down
115     shutdown_shared: Mutex<bool>,
116 
117     /// Used for thread shutdown synchronization
118     shutdown_condvar: Condvar,
119 
120     /// Stack size of each thread in the blocking pool
121     stack_size: Option<usize>,
122 
123     /// A callback func to be called after thread starts
124     after_start: Option<CallbackHook>,
125 
126     /// A callback func to be called before thread stops
127     before_stop: Option<CallbackHook>,
128 
129     /// Maximum thread number for the blocking pool
130     max_thread_num: u8,
131 
132     /// Maximum keep-alive time for idle threads
133     keep_alive_time: Duration,
134 
135     /// Max number of permanent threads
136     max_permanent_thread_num: u8,
137 }
138 
139 /// Shared info among the blocking pool
140 struct Shared {
141     /// Task queue
142     queue: VecDeque<Task>,
143 
144     /// Number of current created threads
145     total_thread_num: u8,
146 
147     /// Number of current idle threads
148     idle_thread_num: u8,
149 
150     /// Number of calls to `notify_one`, prevents spurious wakeup of condvar.
151     notify_num: u8,
152 
153     /// number of permanent threads in the pool
154     current_permanent_thread_num: u8,
155 
156     /// Shutdown flag of the pool
157     shutdown: bool,
158 
159     /// Corresponds with the JoinHandles of the worker threads
160     worker_id: usize,
161 
162     /// Stores the JoinHandles of the worker threads
163     worker_threads: VecDeque<(usize, thread::JoinHandle<()>)>,
164 }
165 
166 type Task = task::Task;
167 
168 // ===== impl BlockPoolSpawner =====
169 impl BlockPoolSpawner {
create_permanent_threads(&self) -> Result<(), ScheduleError>170     pub fn create_permanent_threads(&self) -> Result<(), ScheduleError> {
171         for _ in 0..self.inner.max_permanent_thread_num {
172             let mut shared = self.inner.shared.lock().unwrap();
173             shared.total_thread_num += 1;
174             let worker_id = shared.worker_id;
175             let mut builder = thread::Builder::new().name(format!("block-r-{worker_id}"));
176             if let Some(stack_size) = self.inner.stack_size {
177                 builder = builder.stack_size(stack_size);
178             }
179             let inner = self.inner.clone();
180             let join_handle = builder.spawn(move || inner.run(worker_id));
181             match join_handle {
182                 Ok(join_handle) => {
183                     shared.worker_threads.push_back((worker_id, join_handle));
184                     shared.worker_id += 1;
185                 }
186                 Err(err) => {
187                     return Err(ScheduleError::new(ErrorKind::BlockSpawnErr, err));
188                 }
189             }
190         }
191         Ok(())
192     }
193 
spawn_blocking<T, R>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<R> where T: FnOnce() -> R, T: Send + 'static, R: Send + 'static,194     pub(crate) fn spawn_blocking<T, R>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<R>
195     where
196         T: FnOnce() -> R,
197         T: Send + 'static,
198         R: Send + 'static,
199     {
200         let task = BlockingTask(Some(task));
201         let scheduler: Weak<PlaceholderScheduler> = Weak::new();
202         let (task, handle) = Task::create_task(builder, scheduler, task, VirtualTableType::Ylong);
203         let _ = self.spawn(task);
204         handle
205     }
206 
spawn(&self, task: Task) -> Result<(), ScheduleError>207     fn spawn(&self, task: Task) -> Result<(), ScheduleError> {
208         let mut shared = self.inner.shared.lock().unwrap();
209 
210         // if the shutdown flag is on, cancel the task
211         if shared.shutdown {
212             return Err(ErrorKind::TaskShutdown.into());
213         }
214 
215         shared.queue.push_back(task);
216 
217         if shared.idle_thread_num == 0 {
218             if shared.total_thread_num == self.inner.max_thread_num {
219                 // thread number has reached maximum, do nothing
220             } else {
221                 // there is no idle thread and the maximum thread number has not been reached,
222                 // therefore create a new thread
223                 shared.total_thread_num += 1;
224                 // sets all required attributes for the thread
225                 let worker_id = shared.worker_id;
226                 let mut builder = thread::Builder::new().name(format!("block-{worker_id}"));
227                 if let Some(stack_size) = self.inner.stack_size {
228                     builder = builder.stack_size(stack_size);
229                 }
230 
231                 let inner = self.inner.clone();
232                 let join_handle = builder.spawn(move || inner.run(worker_id));
233                 match join_handle {
234                     Ok(join_handle) => {
235                         shared.worker_threads.push_back((worker_id, join_handle));
236                         shared.worker_id += 1;
237                     }
238                     Err(e) => {
239                         panic!("os can't spawn worker thread: {}", e);
240                     }
241                 }
242             }
243         } else {
244             shared.idle_thread_num -= 1;
245             shared.notify_num += 1;
246             self.inner.condvar.notify_one();
247         }
248         Ok(())
249     }
250 }
251 
252 impl Inner {
run(&self, worker_id: usize)253     fn run(&self, worker_id: usize) {
254         if let Some(f) = &self.after_start {
255             f()
256         }
257 
258         let mut shared = self.shared.lock().unwrap();
259 
260         'main: loop {
261             // get a task from the global queue
262             while let Some(task) = shared.queue.pop_front() {
263                 drop(shared);
264                 task.run();
265                 shared = self.shared.lock().unwrap();
266             }
267 
268             shared.idle_thread_num += 1;
269             while !shared.shutdown {
270                 // permanent waits, the thread keep alive until shutdown.
271                 if shared.current_permanent_thread_num < self.max_permanent_thread_num {
272                     shared.current_permanent_thread_num += 1;
273                     shared = self.condvar.wait(shared).unwrap();
274                     shared.current_permanent_thread_num -= 1;
275                     // Combining a loop to prevent spurious wakeup of condvar, if there is a
276                     // spurious wakeup, the `notify_num` will be 0 and the loop will continue.
277                     if shared.notify_num != 0 {
278                         shared.notify_num -= 1;
279                         break;
280                     }
281                 } else {
282                     // if the thread is not permanent, set the keep-alive time for releasing
283                     // the thread
284                     let time_out_lock_res = self
285                         .condvar
286                         .wait_timeout(shared, self.keep_alive_time)
287                         .unwrap();
288                     shared = time_out_lock_res.0;
289                     let timeout_result = time_out_lock_res.1;
290 
291                     // Combining a loop to prevent spurious wakeup of condvar, if there is a
292                     // spurious wakeup, the `notify_num` will be 0 and the loop will continue.
293                     if shared.notify_num != 0 {
294                         shared.notify_num -= 1;
295                         break;
296                     }
297                     // expires, release the thread
298                     if !shared.shutdown && timeout_result.timed_out() {
299                         for (thread_id, thread) in shared.worker_threads.iter().enumerate() {
300                             if thread.0 == worker_id {
301                                 shared.worker_threads.remove(thread_id);
302                                 break;
303                             }
304                         }
305                         break 'main;
306                     }
307                 }
308             }
309 
310             if shared.shutdown {
311                 // empty the tasks in the global queue
312                 while let Some(_task) = shared.queue.pop_front() {
313                     drop(shared);
314                     shared = self.shared.lock().unwrap();
315                 }
316                 break;
317             }
318         }
319 
320         // thread exit
321         shared.total_thread_num = shared
322             .total_thread_num
323             .checked_sub(1)
324             .expect("total thread num underflowed");
325         shared.idle_thread_num = shared
326             .idle_thread_num
327             .checked_sub(1)
328             .expect("idle thread num underflowed");
329 
330         let shutdown = shared.shutdown;
331         drop(shared);
332 
333         if shutdown {
334             *self.shutdown_shared.lock().unwrap() = true;
335             self.shutdown_condvar.notify_one();
336         }
337 
338         if let Some(f) = &self.before_stop {
339             f()
340         }
341     }
342 }
343 
344 struct BlockingTask<T>(Option<T>);
345 
346 impl<T> Unpin for BlockingTask<T> {}
347 
348 impl<T, R> Future for BlockingTask<T>
349 where
350     T: FnOnce() -> R,
351 {
352     type Output = R;
353 
poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output>354     fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
355         let func = self.0.take().expect("no run two times");
356         Poll::Ready(func())
357     }
358 }
359 
360 #[cfg(test)]
361 mod test {
362     use std::sync::Weak;
363     use std::time::Duration;
364 
365     use crate::builder::RuntimeBuilder;
366     use crate::executor::blocking_pool::BlockPoolSpawner;
367     use crate::executor::PlaceholderScheduler;
368     use crate::task::{Task, VirtualTableType};
369 
370     /// UT test cases for BlockPoolSpawner::new()
371     ///
372     /// # Brief
373     /// 1. Checking the parameters after initialization is completed.
374     #[test]
ut_blocking_pool_new()375     fn ut_blocking_pool_new() {
376         let thread_pool_builder =
377             RuntimeBuilder::new_multi_thread().keep_alive_time(Duration::from_secs(1));
378         let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
379         assert_eq!(
380             blocking_pool.inner.stack_size,
381             thread_pool_builder.common.stack_size
382         );
383         assert_eq!(
384             blocking_pool.inner.max_thread_num,
385             thread_pool_builder.common.max_blocking_pool_size.unwrap()
386         );
387         assert_eq!(
388             blocking_pool.inner.keep_alive_time,
389             thread_pool_builder.common.keep_alive_time.unwrap()
390         );
391         assert_eq!(
392             blocking_pool.inner.max_permanent_thread_num,
393             thread_pool_builder.common.blocking_permanent_thread_num
394         );
395     }
396 
397     /// UT test cases for BlockPoolSpawner::shutdown()
398     ///
399     /// # Brief
400     /// 1. When shared.shutdown is false, the thread is safely exited without a
401     ///    timeout
402     /// 2. When shared.shutdown is false, the thread is not safely exited in
403     ///    case of timeout
404     /// 3. When shared.shutdown is true, BlockPoolSpawner::shutdown returns
405     ///    directly, representing that the blocking thread pool has safely
406     ///    exited
407 
408     #[test]
ut_blocking_pool_shutdown()409     fn ut_blocking_pool_shutdown() {
410         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
411         let mut blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
412         blocking_pool.inner.shared.lock().unwrap().shutdown = true;
413         assert!(!blocking_pool.shutdown(Duration::from_secs(3)));
414 
415         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
416         let mut blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
417         let spawner_inner_clone = blocking_pool.inner.clone();
418         let _thread = std::thread::spawn(move || {
419             *spawner_inner_clone.shutdown_shared.lock().unwrap() = true;
420             spawner_inner_clone.shutdown_condvar.notify_one();
421         });
422         assert!(blocking_pool.shutdown(Duration::from_secs(3)));
423 
424         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
425         let mut blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
426         let spawner_inner_clone = blocking_pool.inner.clone();
427         let _thread = std::thread::spawn(move || {
428             spawner_inner_clone.shutdown_condvar.notify_one();
429         });
430 
431         blocking_pool.inner.shared.lock().unwrap().shutdown = true;
432         assert!(!blocking_pool.shutdown(Duration::from_secs(0)));
433     }
434 
435     /// UT test cases for BlockPoolSpawner::create_permanent_threads()
436     ///
437     /// # Brief
438     /// 1. self.inner.is_permanent == true, self.inner.worker_name.clone() !=
439     ///    None, self.inner.stack_size != None
440     /// 2. self.inner.is_permanent == true, self.inner.worker_name.clone() ==
441     ///    None, self.inner.stack_size == None
442     /// 3. self.inner.is_permanent == false
443     #[test]
ut_blocking_pool_spawner_create_permanent_threads()444     fn ut_blocking_pool_spawner_create_permanent_threads() {
445         let thread_pool_builder =
446             RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(4);
447         let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
448         assert!(blocking_pool.create_permanent_threads().is_ok());
449         assert_eq!(blocking_pool.inner.shared.lock().unwrap().worker_id, 4);
450 
451         let thread_pool_builder =
452             RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(4);
453         let common = RuntimeBuilder::new_multi_thread().blocking_permanent_thread_num(4);
454         let blocking_pool = BlockPoolSpawner::new(&common.common);
455         assert!(blocking_pool.create_permanent_threads().is_ok());
456         assert_eq!(
457             blocking_pool.inner.shared.lock().unwrap().worker_id,
458             thread_pool_builder.common.blocking_permanent_thread_num as usize
459         );
460         assert_eq!(
461             blocking_pool
462                 .inner
463                 .shared
464                 .lock()
465                 .unwrap()
466                 .worker_threads
467                 .pop_front()
468                 .unwrap()
469                 .1
470                 .thread()
471                 .name()
472                 .unwrap(),
473             "block-r-0"
474         );
475 
476         let thread_pool_builder = RuntimeBuilder::new_multi_thread()
477             .blocking_permanent_thread_num(4)
478             .worker_name(String::from("test"));
479         let common = RuntimeBuilder::new_multi_thread()
480             .blocking_permanent_thread_num(4)
481             .worker_name(String::from("test"));
482         let blocking_pool = BlockPoolSpawner::new(&common.common);
483         assert!(blocking_pool.create_permanent_threads().is_ok());
484         assert_eq!(
485             blocking_pool.inner.shared.lock().unwrap().worker_id,
486             thread_pool_builder.common.blocking_permanent_thread_num as usize
487         );
488         assert_eq!(
489             blocking_pool
490                 .inner
491                 .shared
492                 .lock()
493                 .unwrap()
494                 .worker_threads
495                 .pop_front()
496                 .unwrap()
497                 .1
498                 .thread()
499                 .name()
500                 .unwrap(),
501             "block-r-0"
502         );
503     }
504 
505     /// UT test cases for BlockPoolSpawner::spawn()
506     ///
507     /// # Brief
508     /// 1. shared.shutdown == true, return directly.
509     /// 2. shared.shutdown == false, shared.idle_thread_num != 0
510     /// 3. shared.shutdown == false, shared.idle_thread_num == 0,
511     ///    shared.total_thread_num == self.inner.max_pool_size
512     /// 4. shared.shutdown == false, shared.idle_thread_num == 0,
513     ///    shared.total_thread_num != self.inner.max_pool_size,
514     ///    self.inner.worker_name.clone() != None
515     /// 5. shared.shutdown == false, shared.idle_thread_num == 0,
516     ///    shared.total_thread_num != self.inner.max_pool_size,
517     ///    self.inner.worker_name.clone() == None
518 
519     #[test]
ut_blocking_pool_spawner_spawn()520     fn ut_blocking_pool_spawner_spawn() {
521         use std::thread::sleep;
522 
523         use crate::executor::blocking_pool::BlockingTask;
524         use crate::task::TaskBuilder;
525 
526         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
527         let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
528         blocking_pool.inner.shared.lock().unwrap().shutdown = true;
529         let task = BlockingTask(Some(move || {
530             sleep(Duration::from_millis(10));
531             String::from("task")
532         }));
533         let builder = TaskBuilder::new();
534         let scheduler: Weak<PlaceholderScheduler> = Weak::new();
535         let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong);
536         assert!(blocking_pool.spawn(task).is_err());
537 
538         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
539         let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
540         blocking_pool.inner.shared.lock().unwrap().shutdown = false;
541         blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 1;
542         let task = BlockingTask(Some(move || {
543             sleep(Duration::from_millis(10));
544             String::from("task")
545         }));
546         let scheduler: Weak<PlaceholderScheduler> = Weak::new();
547         let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong);
548         blocking_pool.spawn(task).expect("failed");
549         assert_eq!(blocking_pool.inner.shared.lock().unwrap().notify_num, 1);
550 
551         let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(4);
552         let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
553         blocking_pool.inner.shared.lock().unwrap().shutdown = false;
554         blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 0;
555         blocking_pool.inner.shared.lock().unwrap().total_thread_num = 4;
556         let task = BlockingTask(Some(move || {
557             sleep(Duration::from_millis(10));
558             String::from("task")
559         }));
560         let scheduler: Weak<PlaceholderScheduler> = Weak::new();
561         let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong);
562         blocking_pool.spawn(task).expect("failed");
563         assert_eq!(blocking_pool.inner.shared.lock().unwrap().worker_id, 0);
564 
565         let thread_pool_builder = RuntimeBuilder::new_multi_thread().max_blocking_pool_size(4);
566         let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
567         blocking_pool.inner.shared.lock().unwrap().shutdown = false;
568         blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 0;
569         blocking_pool.inner.shared.lock().unwrap().total_thread_num = 3;
570 
571         let task = BlockingTask(Some(move || {
572             sleep(Duration::from_millis(10));
573             String::from("task")
574         }));
575         let scheduler: Weak<PlaceholderScheduler> = Weak::new();
576         let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong);
577         blocking_pool.spawn(task).expect("failed");
578         assert_eq!(
579             blocking_pool
580                 .inner
581                 .shared
582                 .lock()
583                 .unwrap()
584                 .worker_threads
585                 .pop_front()
586                 .unwrap()
587                 .1
588                 .thread()
589                 .name()
590                 .unwrap(),
591             "block-0"
592         );
593 
594         let thread_pool_builder = RuntimeBuilder::new_multi_thread()
595             .max_blocking_pool_size(4)
596             .worker_name(String::from("test"));
597         let blocking_pool = BlockPoolSpawner::new(&thread_pool_builder.common);
598         blocking_pool.inner.shared.lock().unwrap().shutdown = false;
599         blocking_pool.inner.shared.lock().unwrap().idle_thread_num = 0;
600         blocking_pool.inner.shared.lock().unwrap().total_thread_num = 3;
601         let task = BlockingTask(Some(move || {
602             sleep(Duration::from_millis(10));
603             String::from("task")
604         }));
605         let scheduler: Weak<PlaceholderScheduler> = Weak::new();
606         let (task, _) = Task::create_task(&builder, scheduler, task, VirtualTableType::Ylong);
607         blocking_pool.spawn(task).expect("failed");
608         assert_eq!(
609             blocking_pool
610                 .inner
611                 .shared
612                 .lock()
613                 .unwrap()
614                 .worker_threads
615                 .pop_front()
616                 .unwrap()
617                 .1
618                 .thread()
619                 .name()
620                 .unwrap(),
621             "block-0"
622         );
623     }
624 }
625