• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::RefCell;
15 use std::future::Future;
16 use std::sync::atomic::AtomicBool;
17 use std::sync::atomic::Ordering::{Acquire, SeqCst};
18 use std::sync::{Arc, Condvar, Mutex, RwLock};
19 use std::time::Duration;
20 use std::{cmp, io, thread};
21 
22 use super::driver::{Driver, Handle};
23 use super::parker::Parker;
24 use super::queue::{GlobalQueue, LocalQueue, LOCAL_QUEUE_CAP};
25 use super::sleeper::Sleeper;
26 use super::worker::{get_current_ctx, run_worker, Worker};
27 use super::{worker, Schedule};
28 use crate::builder::multi_thread_builder::MultiThreadBuilder;
29 use crate::builder::CallbackHook;
30 use crate::task::{Task, TaskBuilder, VirtualTableType};
31 use crate::util::core_affinity::set_current_affinity;
32 use crate::util::fastrand::fast_random;
33 use crate::util::num_cpus::get_cpu_num;
34 use crate::JoinHandle;
35 
36 const ASYNC_THREAD_QUIT_WAIT_TIME: Duration = Duration::from_secs(3);
37 pub(crate) const GLOBAL_POLL_INTERVAL: u8 = 61;
38 
39 pub(crate) struct MultiThreadScheduler {
40     /// Async pool shutdown state
41     is_cancel: AtomicBool,
42     /// Number of total workers
43     pub(crate) num_workers: usize,
44     /// Join Handles for all threads in the executor
45     handles: RwLock<Vec<Parker>>,
46     /// Used for idle and wakeup logic.
47     sleeper: Sleeper,
48     /// The global queue of the executor
49     global: GlobalQueue,
50     /// A set of all the local queues in the executor
51     locals: Vec<LocalQueue>,
52     pub(crate) handle: Arc<Handle>,
53     #[cfg(feature = "metrics")]
54     steal_count: std::sync::atomic::AtomicUsize,
55 }
56 
57 impl Schedule for MultiThreadScheduler {
58     #[inline]
schedule(&self, task: Task, lifo: bool)59     fn schedule(&self, task: Task, lifo: bool) {
60         if self.enqueue(task, lifo) {
61             self.wake_up_rand_one();
62         }
63     }
64 }
65 
66 impl MultiThreadScheduler {
new(thread_num: usize, handle: Arc<Handle>) -> Self67     pub(crate) fn new(thread_num: usize, handle: Arc<Handle>) -> Self {
68         let mut locals = Vec::new();
69         for _ in 0..thread_num {
70             locals.push(LocalQueue::new());
71         }
72 
73         Self {
74             is_cancel: AtomicBool::new(false),
75             num_workers: thread_num,
76             handles: RwLock::new(Vec::new()),
77             sleeper: Sleeper::new(thread_num),
78             global: GlobalQueue::new(),
79             locals,
80             handle,
81             #[cfg(feature = "metrics")]
82             steal_count: std::sync::atomic::AtomicUsize::new(0),
83         }
84     }
85 
is_cancel(&self) -> bool86     pub(crate) fn is_cancel(&self) -> bool {
87         self.is_cancel.load(Acquire)
88     }
89 
set_cancel(&self)90     pub(crate) fn set_cancel(&self) {
91         self.is_cancel.store(true, SeqCst);
92     }
93 
cancel(&self)94     pub(crate) fn cancel(&self) {
95         self.set_cancel();
96         self.wake_up_all();
97     }
98 
wake_up_all(&self)99     fn wake_up_all(&self) {
100         let join_handle = self.handles.read().unwrap();
101         for item in join_handle.iter() {
102             item.unpark(self.handle.clone());
103         }
104     }
105 
106     #[inline]
is_parked(&self, worker_index: &usize) -> bool107     pub(crate) fn is_parked(&self, worker_index: &usize) -> bool {
108         self.sleeper.is_parked(worker_index)
109     }
110 
wake_up_rand_one(&self)111     pub(crate) fn wake_up_rand_one(&self) {
112         if let Some(index) = self.sleeper.pop_worker() {
113             self.handles
114                 .read()
115                 .unwrap()
116                 .get(index)
117                 .unwrap()
118                 .unpark(self.handle.clone());
119         }
120     }
121 
turn_to_sleep(&self, worker_index: usize)122     pub(crate) fn turn_to_sleep(&self, worker_index: usize) {
123         // If it's the last thread going to sleep, check if there are any tasks
124         // left. If yes, wakes up a thread.
125         if self.sleeper.push_worker(worker_index) && !self.has_no_work() {
126             self.wake_up_rand_one();
127         }
128     }
129 
create_local_queue(&self, index: usize) -> LocalQueue130     pub(crate) fn create_local_queue(&self, index: usize) -> LocalQueue {
131         let local_run_queue = self.locals.get(index).unwrap();
132         LocalQueue {
133             inner: local_run_queue.inner.clone(),
134         }
135     }
136 
has_no_work(&self) -> bool137     pub(crate) fn has_no_work(&self) -> bool {
138         // check if local queues are empty
139         for index in 0..self.num_workers {
140             let item = self.locals.get(index).unwrap();
141             if !item.is_empty() {
142                 return false;
143             }
144         }
145         // then check is global queue empty
146         self.global.is_empty()
147     }
148 
149     // The returned value indicates whether or not to wake up another worker
150     // We need to wake another worker under these circumstances:
151     // 1. The task has been inserted into the global queue
152     // 2. The lifo slot is taken, we push the old task into the local queue
enqueue(&self, mut task: Task, lifo: bool) -> bool153     pub(crate) fn enqueue(&self, mut task: Task, lifo: bool) -> bool {
154         let cur_worker = get_current_ctx();
155 
156         // WorkerContext::Curr will never enter here.
157         if let Some(worker_ctx) = cur_worker {
158             if !std::ptr::eq(&self.global, &worker_ctx.worker.scheduler.global) {
159                 self.global.push_back(task);
160                 return true;
161             }
162 
163             if lifo {
164                 let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
165                 let prev_task = lifo_slot.take();
166                 if let Some(prev) = prev_task {
167                     // there is some task in lifo slot, therefore we put the prev task
168                     // into run queue, and put the current task into the lifo slot
169                     *lifo_slot = Some(task);
170                     task = prev;
171                 } else {
172                     // there is no task in lifo slot, return immediately
173                     *lifo_slot = Some(task);
174                     return false;
175                 }
176             }
177 
178             let local_run_queue = self.locals.get(worker_ctx.worker.index).unwrap();
179             local_run_queue.push_back(task, &self.global);
180             return true;
181         }
182 
183         // If the local queue of the current worker is full, push the task into the
184         // global queue
185         self.global.push_back(task);
186         true
187     }
188 
dequeue(&self, index: usize, worker_inner: &mut worker::Inner) -> Option<Task>189     pub(crate) fn dequeue(&self, index: usize, worker_inner: &mut worker::Inner) -> Option<Task> {
190         let local_run_queue = &worker_inner.run_queue;
191         let count = worker_inner.count;
192 
193         let task = {
194             // For every 61 times of execution, dequeue a task from the global queue first.
195             // Otherwise, dequeue a task from the local queue. However, if the local queue
196             // has no task, dequeue a task from the global queue instead.
197             if count % GLOBAL_POLL_INTERVAL as u32 == 0 {
198                 let limit = local_run_queue.remaining() as usize;
199                 // If the local queue is empty, multiple tasks are stolen from the global queue
200                 // to the local queue. If the local queue has tasks, only dequeue one task from
201                 // the global queue and run it.
202                 let task = if limit == LOCAL_QUEUE_CAP {
203                     self.global
204                         .pop_batch(self.num_workers, local_run_queue, limit)
205                 } else {
206                     self.global.pop_front()
207                 };
208                 match task {
209                     Some(task) => Some(task),
210                     None => local_run_queue.pop_front(),
211                 }
212             } else {
213                 let local_task = local_run_queue.pop_front();
214                 match local_task {
215                     Some(task) => Some(task),
216                     None => {
217                         let limit = local_run_queue.remaining() as usize;
218                         if limit > 1 {
219                             self.global
220                                 .pop_batch(self.num_workers, local_run_queue, limit)
221                         } else {
222                             self.global.pop_front()
223                         }
224                     }
225                 }
226             }
227         };
228 
229         if task.is_some() {
230             return task;
231         }
232 
233         // There is no task in the local queue or the global queue, so we try to steal
234         // tasks from another worker's local queue.
235         // The number of stealing worker should be less than half of the total worker
236         // number.
237 
238         if !self.sleeper.try_inc_searching_num() {
239             return None;
240         }
241 
242         // start to searching.
243         let num = self.locals.len();
244         let start = (fast_random() >> 56) as usize;
245 
246         for i in 0..num {
247             let i = (start + i) % num;
248             // skip the current worker's local queue
249             if i == index {
250                 continue;
251             }
252             let target = self.locals.get(i).unwrap();
253             if let Some(task) = target.steal_into(local_run_queue) {
254                 #[cfg(feature = "metrics")]
255                 self.steal_count
256                     .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
257 
258                 if self.sleeper.dec_searching_num() {
259                     self.wake_up_rand_one()
260                 };
261                 return Some(task);
262             }
263         }
264         // if there is no task to steal, we check global queue for one last time
265         let task_from_global = self.global.pop_front();
266 
267         // end searching
268         // regardless of whether a task can be stolen from the global queue,
269         // wake_up_rand_one is not called.
270         self.sleeper.dec_searching_num();
271 
272         task_from_global
273     }
274 
get_global(&self) -> &GlobalQueue275     pub(crate) fn get_global(&self) -> &GlobalQueue {
276         &self.global
277     }
278 
279     cfg_metrics!(
280         pub(crate) fn get_handles(&self) -> &RwLock<Vec<Parker>> {
281             &self.handles
282         }
283 
284         pub(crate) fn get_steal_count(&self) -> usize {
285             self.steal_count.load(Acquire)
286         }
287 
288         pub(crate) fn load_state(&self) -> (usize, usize) {
289             self.sleeper.load_state()
290         }
291     );
292 }
293 
294 #[derive(Clone)]
295 pub(crate) struct AsyncPoolSpawner {
296     pub(crate) inner: Arc<Inner>,
297 
298     pub(crate) exe_mng_info: Arc<MultiThreadScheduler>,
299 }
300 
301 impl Drop for AsyncPoolSpawner {
drop(&mut self)302     fn drop(&mut self) {
303         self.release()
304     }
305 }
306 
307 pub(crate) struct Inner {
308     /// Number of total threads
309     pub(crate) total: usize,
310     /// Core-affinity setting of the threads
311     is_affinity: bool,
312     /// Handle for shutting down the pool
313     shutdown_handle: Arc<(Mutex<usize>, Condvar)>,
314     /// A callback func to be called after thread starts
315     after_start: Option<CallbackHook>,
316     /// A callback func to be called before thread stops
317     before_stop: Option<CallbackHook>,
318     /// Name of the worker threads
319     worker_name: Option<String>,
320     /// Stack size of each thread
321     stack_size: Option<usize>,
322     /// Workers
323     #[cfg(feature = "metrics")]
324     workers: Mutex<Vec<Arc<Worker>>>,
325 }
326 
get_cpu_core() -> usize327 fn get_cpu_core() -> usize {
328     cmp::max(1, get_cpu_num() as usize)
329 }
330 
async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>)331 fn async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>) {
332     if let Some(f) = inner.after_start.clone() {
333         f();
334     }
335 
336     run_worker(worker, handle);
337     let (lock, cvar) = &*(inner.shutdown_handle.clone());
338     let mut finished = lock.lock().unwrap();
339     *finished += 1;
340 
341     // the last thread wakes up the main thread
342     if *finished >= inner.total {
343         cvar.notify_one();
344     }
345 
346     if let Some(f) = inner.before_stop.clone() {
347         f();
348     }
349 }
350 
351 impl AsyncPoolSpawner {
new(builder: &MultiThreadBuilder) -> io::Result<Self>352     pub(crate) fn new(builder: &MultiThreadBuilder) -> io::Result<Self> {
353         let (handle, driver) = Driver::initialize();
354 
355         let thread_num = builder.core_thread_size.unwrap_or_else(get_cpu_core);
356         let spawner = AsyncPoolSpawner {
357             inner: Arc::new(Inner {
358                 total: thread_num,
359                 is_affinity: builder.common.is_affinity,
360                 shutdown_handle: Arc::new((Mutex::new(0), Condvar::new())),
361                 after_start: builder.common.after_start.clone(),
362                 before_stop: builder.common.before_stop.clone(),
363                 worker_name: builder.common.worker_name.clone(),
364                 stack_size: builder.common.stack_size,
365                 #[cfg(feature = "metrics")]
366                 workers: Mutex::new(Vec::with_capacity(thread_num)),
367             }),
368             exe_mng_info: Arc::new(MultiThreadScheduler::new(thread_num, handle)),
369         };
370         spawner.create_async_thread_pool(driver)?;
371         Ok(spawner)
372     }
373 
create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()>374     fn create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()> {
375         let mut workers = vec![];
376         for index in 0..self.inner.total {
377             let local_queue = self.exe_mng_info.create_local_queue(index);
378             let local_run_queue =
379                 Box::new(worker::Inner::new(local_queue, Parker::new(driver.clone())));
380             workers.push(Arc::new(Worker {
381                 index,
382                 scheduler: self.exe_mng_info.clone(),
383                 inner: RefCell::new(local_run_queue),
384                 lifo: RefCell::new(None),
385                 yielded: RefCell::new(Vec::new()),
386             }))
387         }
388 
389         for (worker_id, worker) in workers.drain(..).enumerate() {
390             let work_arc_handle = self.exe_mng_info.handle.clone();
391             #[cfg(feature = "metrics")]
392             self.inner.workers.lock().unwrap().push(worker.clone());
393             // set up thread attributes
394             let mut builder = thread::Builder::new();
395 
396             if let Some(worker_name) = self.inner.worker_name.clone() {
397                 builder = builder.name(format!("async-{worker_id}-{worker_name}"));
398             } else {
399                 builder = builder.name(format!("async-{worker_id}"));
400             }
401 
402             if let Some(stack_size) = self.inner.stack_size {
403                 builder = builder.stack_size(stack_size);
404             }
405 
406             let parker = worker.inner.borrow().parker.clone();
407             self.exe_mng_info.handles.write().unwrap().push(parker);
408 
409             let inner = self.inner.clone();
410 
411             if self.inner.is_affinity {
412                 builder.spawn(move || {
413                     let cpu_core_num = get_cpu_core();
414                     let cpu_id = worker_id % cpu_core_num;
415                     set_current_affinity(cpu_id).expect("set_current_affinity() fail!");
416                     async_thread_proc(inner, worker, work_arc_handle);
417                 })?;
418             } else {
419                 builder.spawn(move || {
420                     async_thread_proc(inner, worker, work_arc_handle);
421                 })?;
422             }
423         }
424         Ok(())
425     }
426 
spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static,427     pub(crate) fn spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output>
428     where
429         T: Future + Send + 'static,
430         T::Output: Send + 'static,
431     {
432         let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
433         let (task, join_handle) =
434             Task::create_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
435 
436         self.exe_mng_info.schedule(task, false);
437         join_handle
438     }
439 
440     /// # Safety
441     /// Users need to guarantee that the future will remember lifetime and thus
442     /// compiler will capture lifetime issues, or the future will complete
443     /// when its context remains valid. If not, currently
444     /// runtime initialization will cause memory error.
445     ///
446     /// ## Memory issue example
447     /// No matter using which type (current / multi thread) of runtime, the
448     /// following code can compile. When the variable `slice` gets released
449     /// when the function ends, any handles returned from this function rely
450     /// on a dangled pointer.
451     ///
452     /// ```no run
453     ///  fn err_example(runtime: &Runtime) -> JoinHandle<()> {
454     ///     let builder = TaskBuilder::default();
455     ///     let mut slice = [1, 2, 3, 4, 5];
456     ///     let borrow = &mut slice;
457     ///     match &runtime.async_spawner {
458     ///         AsyncHandle::CurrentThread(pool) => {
459     ///             pool.spawn_with_ref(
460     ///                 &builder,
461     ///                 async { borrow.iter_mut().for_each(|x| *x *= 2) }
462     ///             )
463     ///        }
464     ///        AsyncHandle::MultiThread(pool) => {
465     ///             pool.spawn_with_ref(
466     ///                 &builder,
467     ///                 async { borrow.iter_mut().for_each(|x| *x *= 2) }
468     ///             )
469     ///        }
470     ///     }
471     /// }
472     ///
473     /// let runtime = Runtime::new().unwrap();
474     /// let handle = spawn_blocking(
475     ///     move || block_on(err_example(&runtime)).unwrap()
476     /// );
477     /// ```
spawn_with_ref<T>( &self, builder: &TaskBuilder, task: T, ) -> JoinHandle<T::Output> where T: Future + Send, T::Output: Send,478     pub(crate) unsafe fn spawn_with_ref<T>(
479         &self,
480         builder: &TaskBuilder,
481         task: T,
482     ) -> JoinHandle<T::Output>
483     where
484         T: Future + Send,
485         T::Output: Send,
486     {
487         let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
488         let raw_task = Task::create_raw_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
489         let handle = JoinHandle::new(raw_task);
490         let task = Task(raw_task);
491         self.exe_mng_info.schedule(task, false);
492         handle
493     }
494 
495     /// Waits 3 seconds for threads to finish before releasing the async pool.
496     /// If threads could not finish before releasing, there could be possible
497     /// memory leak.
release_wait(&self) -> Result<(), ()>498     fn release_wait(&self) -> Result<(), ()> {
499         self.exe_mng_info.cancel();
500         let pair = self.inner.shutdown_handle.clone();
501         let total = self.inner.total;
502         let (lock, cvar) = &*pair;
503         let finished = lock.lock().unwrap();
504         let res = cvar
505             .wait_timeout_while(finished, ASYNC_THREAD_QUIT_WAIT_TIME, |&mut finished| {
506                 finished < total
507             })
508             .unwrap();
509         // if time limit has been reached, the unfinished threads would not get released
510         if res.1.timed_out() {
511             Err(())
512         } else {
513             Ok(())
514         }
515     }
516 
release(&self)517     pub(crate) fn release(&self) {
518         if let Ok(()) = self.release_wait() {
519             let mut join_handle = self.exe_mng_info.handles.write().unwrap();
520             #[allow(clippy::mem_replace_with_default)]
521             let mut worker_handles = std::mem::replace(join_handle.as_mut(), vec![]);
522             drop(join_handle);
523             for parker in worker_handles.drain(..) {
524                 parker.release();
525             }
526         }
527     }
528 
529     #[cfg(feature = "metrics")]
get_worker(&self, index: u8) -> Result<Arc<Worker>, ()>530     pub(crate) fn get_worker(&self, index: u8) -> Result<Arc<Worker>, ()> {
531         let vec = self.inner.workers.lock().unwrap();
532         for i in 0..vec.len() {
533             let worker = vec.get(i).expect("worker index out of range");
534             if worker.index == index.into() {
535                 return Ok(worker.clone());
536             }
537         }
538         Err(())
539     }
540 }
541 
542 #[cfg(test)]
543 mod test {
544     use std::future::Future;
545     use std::pin::Pin;
546     use std::sync::atomic::Ordering::{Acquire, Release};
547     use std::sync::mpsc::channel;
548     use std::sync::{Arc, Mutex};
549     use std::task::{Context, Poll};
550     use std::thread::spawn;
551 
552     use crate::builder::RuntimeBuilder;
553     use crate::executor::async_pool::{get_cpu_core, AsyncPoolSpawner, MultiThreadScheduler};
554     use crate::executor::driver::Driver;
555     use crate::executor::parker::Parker;
556     use crate::task::{Task, TaskBuilder, VirtualTableType};
557 
558     pub struct TestFuture {
559         value: usize,
560         total: usize,
561     }
562 
create_new() -> TestFuture563     pub fn create_new() -> TestFuture {
564         TestFuture {
565             value: 0,
566             total: 1000,
567         }
568     }
569 
570     impl Future for TestFuture {
571         type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>572         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
573             if self.total > self.value {
574                 self.get_mut().value += 1;
575                 cx.waker().wake_by_ref();
576                 Poll::Pending
577             } else {
578                 Poll::Ready(self.total)
579             }
580         }
581     }
582 
test_future() -> usize583     async fn test_future() -> usize {
584         create_new().await
585     }
586 
587     /// UT test cases for ExecutorMngInfo::new()
588     ///
589     /// # Brief
590     /// 1. Creates a ExecutorMsgInfo with thread number 1
591     /// 2. Creates a ExecutorMsgInfo with thread number 2
592     #[test]
ut_executor_mng_info_new_001()593     fn ut_executor_mng_info_new_001() {
594         let (arc_handle, _) = Driver::initialize();
595         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
596         assert!(!executor_mng_info.is_cancel.load(Acquire));
597         assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
598 
599         let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
600         assert!(!executor_mng_info.is_cancel.load(Acquire));
601         assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
602     }
603 
604     /// UT test cases for ExecutorMngInfo::create_local_queues()
605     ///
606     /// # Brief
607     /// 1. index set to 0, check the return value
608     /// 2. index set to ExecutorMngInfo.inner.total, check the return value
609     #[test]
ut_executor_mng_info_create_local_queues()610     fn ut_executor_mng_info_create_local_queues() {
611         let (arc_handle, _) = Driver::initialize();
612         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
613         let local_run_queue_info = executor_mng_info.create_local_queue(0);
614         assert!(local_run_queue_info.is_empty());
615 
616         let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
617         let local_run_queue_info = executor_mng_info.create_local_queue(63);
618         assert!(local_run_queue_info.is_empty());
619     }
620 
621     /// UT test cases for ExecutorMngInfo::enqueue()
622     ///
623     /// # Brief
624     /// 1. index set to 0, check the return value
625     /// 2. index set to ExecutorMngInfo.inner.total, check the return value
626     #[test]
ut_executor_mng_info_enqueue()627     fn ut_executor_mng_info_enqueue() {
628         let (arc_handle, _) = Driver::initialize();
629         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
630 
631         let builder = TaskBuilder::new();
632         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
633         let (task, _) = Task::create_task(
634             &builder,
635             exe_scheduler,
636             test_future(),
637             VirtualTableType::Ylong,
638         );
639 
640         executor_mng_info.enqueue(task, true);
641         assert!(!executor_mng_info.has_no_work());
642     }
643 
644     /// UT test cases for ExecutorMngInfo::is_cancel()
645     ///
646     /// # Brief
647     /// 1. The is_cancel value is set to true to check the return value
648     /// 2. The is_cancel value is set to false to check the return value
649     #[test]
ut_executor_mng_info_is_cancel()650     fn ut_executor_mng_info_is_cancel() {
651         let (arc_handle, _) = Driver::initialize();
652         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
653         executor_mng_info.is_cancel.store(false, Release);
654         assert!(!executor_mng_info.is_cancel());
655         executor_mng_info.is_cancel.store(true, Release);
656         assert!(executor_mng_info.is_cancel());
657     }
658 
659     /// UT test cases for ExecutorMngInfo::set_cancel()
660     ///
661     /// # Brief
662     /// 1. Check if the is_cancel parameter becomes true after set_cancel
663     #[test]
ut_executor_mng_info_set_cancel()664     fn ut_executor_mng_info_set_cancel() {
665         let (arc_handle, _) = Driver::initialize();
666         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
667         assert!(!executor_mng_info.is_cancel.load(Acquire));
668         executor_mng_info.set_cancel();
669         assert!(executor_mng_info.is_cancel.load(Acquire));
670     }
671 
672     /// UT test cases for ExecutorMngInfo::cancel()
673     ///
674     /// # Brief
675     /// 1. Check if the is_cancel parameter becomes true after set_cancel
676     #[test]
ut_executor_mng_info_cancel()677     fn ut_executor_mng_info_cancel() {
678         let (arc_handle, arc_driver) = Driver::initialize();
679         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
680 
681         let flag = Arc::new(Mutex::new(0));
682         let (tx, rx) = channel();
683 
684         let (flag_clone, tx) = (flag.clone(), tx);
685 
686         let mut parker = Parker::new(arc_driver);
687         let parker_cpy = parker.clone();
688         let _ = spawn(move || {
689             parker.park();
690             *flag_clone.lock().unwrap() = 1;
691             tx.send(()).unwrap()
692         });
693         executor_mng_info.handles.write().unwrap().push(parker_cpy);
694 
695         executor_mng_info.cancel();
696         rx.recv().unwrap();
697         assert_eq!(*flag.lock().unwrap(), 1);
698     }
699 
700     /// UT test cases for ExecutorMngInfo::wake_up_all()
701     ///
702     /// # Brief
703     /// 1. Constructs an environment to check if all threads are woken up and
704     ///    executed via thread hooks.
705     #[test]
ut_executor_mng_info_wake_up_all()706     fn ut_executor_mng_info_wake_up_all() {
707         let (arc_handle, arc_driver) = Driver::initialize();
708         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
709 
710         let flag = Arc::new(Mutex::new(0));
711         let (tx, rx) = channel();
712 
713         let (flag_clone, tx) = (flag.clone(), tx);
714 
715         let mut parker = Parker::new(arc_driver);
716         let parker_cpy = parker.clone();
717 
718         let _ = spawn(move || {
719             parker.park();
720             *flag_clone.lock().unwrap() = 1;
721             tx.send(()).unwrap()
722         });
723 
724         executor_mng_info.handles.write().unwrap().push(parker_cpy);
725 
726         executor_mng_info.wake_up_all();
727         rx.recv().unwrap();
728         assert_eq!(*flag.lock().unwrap(), 1);
729     }
730 
731     /// UT test cases for ExecutorMngInfo::wake_up_rand_one()
732     ///
733     /// # Brief
734     /// 1. Constructs an environment to check if a thread is woken up and
735     ///    executed by a thread hook.
736     #[test]
ut_executor_mng_info_wake_up_rand_one()737     fn ut_executor_mng_info_wake_up_rand_one() {
738         let (arc_handle, arc_driver) = Driver::initialize();
739         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
740         executor_mng_info.turn_to_sleep(0);
741 
742         let flag = Arc::new(Mutex::new(0));
743         let (tx, rx) = channel();
744 
745         let (flag_clone, tx) = (flag.clone(), tx);
746 
747         let mut parker = Parker::new(arc_driver);
748         let parker_cpy = parker.clone();
749 
750         let _ = spawn(move || {
751             parker.park();
752             *flag_clone.lock().unwrap() = 1;
753             tx.send(()).unwrap()
754         });
755 
756         executor_mng_info.handles.write().unwrap().push(parker_cpy);
757 
758         executor_mng_info.wake_up_rand_one();
759         rx.recv().unwrap();
760         assert_eq!(*flag.lock().unwrap(), 1);
761     }
762 
763     /// UT test cases for ExecutorMngInfo::wake_up_if_one_task_left()
764     ///
765     /// # Brief
766     /// 1. Constructs the environment, checks if there are still tasks, and if
767     ///    so, wakes up a thread to continue working.
768     #[test]
ut_executor_mng_info_wake_up_if_one_task_left()769     fn ut_executor_mng_info_wake_up_if_one_task_left() {
770         let (arc_handle, arc_driver) = Driver::initialize();
771         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
772 
773         executor_mng_info.turn_to_sleep(0);
774 
775         let flag = Arc::new(Mutex::new(0));
776         let (tx, rx) = channel();
777 
778         let (flag_clone, tx) = (flag.clone(), tx);
779 
780         let mut parker = Parker::new(arc_driver);
781         let parker_cpy = parker.clone();
782 
783         let _ = spawn(move || {
784             parker.park();
785             *flag_clone.lock().unwrap() = 1;
786             tx.send(()).unwrap()
787         });
788 
789         executor_mng_info.handles.write().unwrap().push(parker_cpy);
790 
791         let builder = TaskBuilder::new();
792         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
793         let (task, _) = Task::create_task(
794             &builder,
795             exe_scheduler,
796             test_future(),
797             VirtualTableType::Ylong,
798         );
799 
800         executor_mng_info.enqueue(task, true);
801 
802         if !executor_mng_info.has_no_work() {
803             executor_mng_info.wake_up_rand_one();
804         }
805 
806         rx.recv().unwrap();
807         assert_eq!(*flag.lock().unwrap(), 1);
808     }
809 
810     /// UT test cases for ExecutorMngInfo::from_woken_to_sleep()
811     ///
812     /// # Brief
813     ///  1. Construct the environment and set the state of the specified thread
814     ///     to park state. If the last thread is in park state, check whether
815     ///     there is a task, and if so, wake up this thread.
816     #[test]
ut_from_woken_to_sleep()817     fn ut_from_woken_to_sleep() {
818         let (arc_handle, arc_driver) = Driver::initialize();
819         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
820 
821         let flag = Arc::new(Mutex::new(0));
822         let (tx, rx) = channel();
823 
824         let (flag_clone, tx) = (flag.clone(), tx);
825 
826         let mut parker = Parker::new(arc_driver);
827         let parker_cpy = parker.clone();
828 
829         let _ = spawn(move || {
830             parker.park();
831             *flag_clone.lock().unwrap() = 1;
832             tx.send(()).unwrap()
833         });
834 
835         executor_mng_info.handles.write().unwrap().push(parker_cpy);
836 
837         let builder = TaskBuilder::new();
838         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
839         let (task, _) = Task::create_task(
840             &builder,
841             exe_scheduler,
842             test_future(),
843             VirtualTableType::Ylong,
844         );
845 
846         executor_mng_info.enqueue(task, true);
847         executor_mng_info.turn_to_sleep(0);
848         rx.recv().unwrap();
849         assert_eq!(*flag.lock().unwrap(), 1);
850     }
851 
852     /// UT test cases for AsyncPoolSpawner::new()
853     ///
854     /// # Brief
855     /// 1. Verify the parameters of the initialization completion
856     #[test]
ut_async_pool_spawner_new()857     fn ut_async_pool_spawner_new() {
858         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
859         let async_pool_spawner = AsyncPoolSpawner::new(&thread_pool_builder).unwrap();
860         assert_eq!(
861             async_pool_spawner.inner.total,
862             thread_pool_builder
863                 .core_thread_size
864                 .unwrap_or_else(get_cpu_core)
865         );
866         assert_eq!(
867             async_pool_spawner.inner.worker_name,
868             thread_pool_builder.common.worker_name
869         );
870         assert_eq!(
871             async_pool_spawner.inner.stack_size,
872             thread_pool_builder.common.stack_size
873         );
874         assert!(!async_pool_spawner.exe_mng_info.is_cancel.load(Acquire));
875     }
876 
877     /// UT test cases for `create_async_thread_pool`.
878     ///
879     /// # Brief
880     /// 1. Create an async_pool_spawner with `is_affinity` setting to false
881     /// 2. Call create_async_thread_pool()
882     /// 3. This UT should not panic
883     #[test]
ut_async_pool_spawner_create_async_thread_pool_001()884     fn ut_async_pool_spawner_create_async_thread_pool_001() {
885         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
886         let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(false)).unwrap();
887     }
888 
889     /// UT test cases for `UnboundedSender`.
890     ///
891     /// # Brief
892     /// 1. Create an async_pool_spawner with `is_affinity` setting to true
893     /// 2. Call create_async_thread_pool()
894     /// 3. This UT should not panic
895     #[test]
ut_async_pool_spawner_create_async_thread_pool_002()896     fn ut_async_pool_spawner_create_async_thread_pool_002() {
897         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
898         let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(true)).unwrap();
899     }
900 }
901