1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cell::RefCell;
15 use std::future::Future;
16 use std::sync::atomic::AtomicBool;
17 use std::sync::atomic::Ordering::{Acquire, SeqCst};
18 use std::sync::{Arc, Condvar, Mutex, RwLock};
19 use std::time::Duration;
20 use std::{cmp, io, thread};
21
22 use super::driver::{Driver, Handle};
23 use super::parker::Parker;
24 use super::queue::{GlobalQueue, LocalQueue, LOCAL_QUEUE_CAP};
25 use super::sleeper::Sleeper;
26 use super::worker::{get_current_ctx, run_worker, Worker};
27 use super::{worker, Schedule};
28 use crate::builder::multi_thread_builder::MultiThreadBuilder;
29 use crate::builder::CallbackHook;
30 use crate::task::{Task, TaskBuilder, VirtualTableType};
31 use crate::util::core_affinity::set_current_affinity;
32 use crate::util::fastrand::fast_random;
33 use crate::util::num_cpus::get_cpu_num;
34 use crate::JoinHandle;
35
36 const ASYNC_THREAD_QUIT_WAIT_TIME: Duration = Duration::from_secs(3);
37 pub(crate) const GLOBAL_POLL_INTERVAL: u8 = 61;
38
39 pub(crate) struct MultiThreadScheduler {
40 /// Async pool shutdown state
41 is_cancel: AtomicBool,
42 /// Number of total workers
43 pub(crate) num_workers: usize,
44 /// Join Handles for all threads in the executor
45 handles: RwLock<Vec<Parker>>,
46 /// Used for idle and wakeup logic.
47 sleeper: Sleeper,
48 /// The global queue of the executor
49 global: GlobalQueue,
50 /// A set of all the local queues in the executor
51 locals: Vec<LocalQueue>,
52 pub(crate) handle: Arc<Handle>,
53 #[cfg(feature = "metrics")]
54 steal_count: std::sync::atomic::AtomicUsize,
55 }
56
57 impl Schedule for MultiThreadScheduler {
58 #[inline]
schedule(&self, task: Task, lifo: bool)59 fn schedule(&self, task: Task, lifo: bool) {
60 if self.enqueue(task, lifo) {
61 self.wake_up_rand_one();
62 }
63 }
64 }
65
66 impl MultiThreadScheduler {
new(thread_num: usize, handle: Arc<Handle>) -> Self67 pub(crate) fn new(thread_num: usize, handle: Arc<Handle>) -> Self {
68 let mut locals = Vec::new();
69 for _ in 0..thread_num {
70 locals.push(LocalQueue::new());
71 }
72
73 Self {
74 is_cancel: AtomicBool::new(false),
75 num_workers: thread_num,
76 handles: RwLock::new(Vec::new()),
77 sleeper: Sleeper::new(thread_num),
78 global: GlobalQueue::new(),
79 locals,
80 handle,
81 #[cfg(feature = "metrics")]
82 steal_count: std::sync::atomic::AtomicUsize::new(0),
83 }
84 }
85
is_cancel(&self) -> bool86 pub(crate) fn is_cancel(&self) -> bool {
87 self.is_cancel.load(Acquire)
88 }
89
set_cancel(&self)90 pub(crate) fn set_cancel(&self) {
91 self.is_cancel.store(true, SeqCst);
92 }
93
cancel(&self)94 pub(crate) fn cancel(&self) {
95 self.set_cancel();
96 self.wake_up_all();
97 }
98
wake_up_all(&self)99 fn wake_up_all(&self) {
100 let join_handle = self.handles.read().unwrap();
101 for item in join_handle.iter() {
102 item.unpark(self.handle.clone());
103 }
104 }
105
106 #[inline]
is_parked(&self, worker_index: &usize) -> bool107 pub(crate) fn is_parked(&self, worker_index: &usize) -> bool {
108 self.sleeper.is_parked(worker_index)
109 }
110
wake_up_rand_one(&self)111 pub(crate) fn wake_up_rand_one(&self) {
112 if let Some(index) = self.sleeper.pop_worker() {
113 self.handles
114 .read()
115 .unwrap()
116 .get(index)
117 .unwrap()
118 .unpark(self.handle.clone());
119 }
120 }
121
turn_to_sleep(&self, worker_index: usize)122 pub(crate) fn turn_to_sleep(&self, worker_index: usize) {
123 // If it's the last thread going to sleep, check if there are any tasks
124 // left. If yes, wakes up a thread.
125 if self.sleeper.push_worker(worker_index) && !self.has_no_work() {
126 self.wake_up_rand_one();
127 }
128 }
129
create_local_queue(&self, index: usize) -> LocalQueue130 pub(crate) fn create_local_queue(&self, index: usize) -> LocalQueue {
131 let local_run_queue = self.locals.get(index).unwrap();
132 LocalQueue {
133 inner: local_run_queue.inner.clone(),
134 }
135 }
136
has_no_work(&self) -> bool137 pub(crate) fn has_no_work(&self) -> bool {
138 // check if local queues are empty
139 for index in 0..self.num_workers {
140 let item = self.locals.get(index).unwrap();
141 if !item.is_empty() {
142 return false;
143 }
144 }
145 // then check is global queue empty
146 self.global.is_empty()
147 }
148
149 // The returned value indicates whether or not to wake up another worker
150 // We need to wake another worker under these circumstances:
151 // 1. The task has been inserted into the global queue
152 // 2. The lifo slot is taken, we push the old task into the local queue
enqueue(&self, mut task: Task, lifo: bool) -> bool153 pub(crate) fn enqueue(&self, mut task: Task, lifo: bool) -> bool {
154 let cur_worker = get_current_ctx();
155
156 // WorkerContext::Curr will never enter here.
157 if let Some(worker_ctx) = cur_worker {
158 if !std::ptr::eq(&self.global, &worker_ctx.worker.scheduler.global) {
159 self.global.push_back(task);
160 return true;
161 }
162
163 if lifo {
164 let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
165 let prev_task = lifo_slot.take();
166 if let Some(prev) = prev_task {
167 // there is some task in lifo slot, therefore we put the prev task
168 // into run queue, and put the current task into the lifo slot
169 *lifo_slot = Some(task);
170 task = prev;
171 } else {
172 // there is no task in lifo slot, return immediately
173 *lifo_slot = Some(task);
174 return false;
175 }
176 }
177
178 let local_run_queue = self.locals.get(worker_ctx.worker.index).unwrap();
179 local_run_queue.push_back(task, &self.global);
180 return true;
181 }
182
183 // If the local queue of the current worker is full, push the task into the
184 // global queue
185 self.global.push_back(task);
186 true
187 }
188
dequeue(&self, index: usize, worker_inner: &mut worker::Inner) -> Option<Task>189 pub(crate) fn dequeue(&self, index: usize, worker_inner: &mut worker::Inner) -> Option<Task> {
190 let local_run_queue = &worker_inner.run_queue;
191 let count = worker_inner.count;
192
193 let task = {
194 // For every 61 times of execution, dequeue a task from the global queue first.
195 // Otherwise, dequeue a task from the local queue. However, if the local queue
196 // has no task, dequeue a task from the global queue instead.
197 if count % GLOBAL_POLL_INTERVAL as u32 == 0 {
198 let limit = local_run_queue.remaining() as usize;
199 // If the local queue is empty, multiple tasks are stolen from the global queue
200 // to the local queue. If the local queue has tasks, only dequeue one task from
201 // the global queue and run it.
202 let task = if limit == LOCAL_QUEUE_CAP {
203 self.global
204 .pop_batch(self.num_workers, local_run_queue, limit)
205 } else {
206 self.global.pop_front()
207 };
208 match task {
209 Some(task) => Some(task),
210 None => local_run_queue.pop_front(),
211 }
212 } else {
213 let local_task = local_run_queue.pop_front();
214 match local_task {
215 Some(task) => Some(task),
216 None => {
217 let limit = local_run_queue.remaining() as usize;
218 if limit > 1 {
219 self.global
220 .pop_batch(self.num_workers, local_run_queue, limit)
221 } else {
222 self.global.pop_front()
223 }
224 }
225 }
226 }
227 };
228
229 if task.is_some() {
230 return task;
231 }
232
233 // There is no task in the local queue or the global queue, so we try to steal
234 // tasks from another worker's local queue.
235 // The number of stealing worker should be less than half of the total worker
236 // number.
237
238 if !self.sleeper.try_inc_searching_num() {
239 return None;
240 }
241
242 // start to searching.
243 let num = self.locals.len();
244 let start = (fast_random() >> 56) as usize;
245
246 for i in 0..num {
247 let i = (start + i) % num;
248 // skip the current worker's local queue
249 if i == index {
250 continue;
251 }
252 let target = self.locals.get(i).unwrap();
253 if let Some(task) = target.steal_into(local_run_queue) {
254 #[cfg(feature = "metrics")]
255 self.steal_count
256 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
257
258 if self.sleeper.dec_searching_num() {
259 self.wake_up_rand_one()
260 };
261 return Some(task);
262 }
263 }
264 // if there is no task to steal, we check global queue for one last time
265 let task_from_global = self.global.pop_front();
266
267 // end searching
268 // regardless of whether a task can be stolen from the global queue,
269 // wake_up_rand_one is not called.
270 self.sleeper.dec_searching_num();
271
272 task_from_global
273 }
274
get_global(&self) -> &GlobalQueue275 pub(crate) fn get_global(&self) -> &GlobalQueue {
276 &self.global
277 }
278
279 cfg_metrics!(
280 pub(crate) fn get_handles(&self) -> &RwLock<Vec<Parker>> {
281 &self.handles
282 }
283
284 pub(crate) fn get_steal_count(&self) -> usize {
285 self.steal_count.load(Acquire)
286 }
287
288 pub(crate) fn load_state(&self) -> (usize, usize) {
289 self.sleeper.load_state()
290 }
291 );
292 }
293
294 #[derive(Clone)]
295 pub(crate) struct AsyncPoolSpawner {
296 pub(crate) inner: Arc<Inner>,
297
298 pub(crate) exe_mng_info: Arc<MultiThreadScheduler>,
299 }
300
301 impl Drop for AsyncPoolSpawner {
drop(&mut self)302 fn drop(&mut self) {
303 self.release()
304 }
305 }
306
307 pub(crate) struct Inner {
308 /// Number of total threads
309 pub(crate) total: usize,
310 /// Core-affinity setting of the threads
311 is_affinity: bool,
312 /// Handle for shutting down the pool
313 shutdown_handle: Arc<(Mutex<usize>, Condvar)>,
314 /// A callback func to be called after thread starts
315 after_start: Option<CallbackHook>,
316 /// A callback func to be called before thread stops
317 before_stop: Option<CallbackHook>,
318 /// Name of the worker threads
319 worker_name: Option<String>,
320 /// Stack size of each thread
321 stack_size: Option<usize>,
322 /// Workers
323 #[cfg(feature = "metrics")]
324 workers: Mutex<Vec<Arc<Worker>>>,
325 }
326
get_cpu_core() -> usize327 fn get_cpu_core() -> usize {
328 cmp::max(1, get_cpu_num() as usize)
329 }
330
async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>)331 fn async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>) {
332 if let Some(f) = inner.after_start.clone() {
333 f();
334 }
335
336 run_worker(worker, handle);
337 let (lock, cvar) = &*(inner.shutdown_handle.clone());
338 let mut finished = lock.lock().unwrap();
339 *finished += 1;
340
341 // the last thread wakes up the main thread
342 if *finished >= inner.total {
343 cvar.notify_one();
344 }
345
346 if let Some(f) = inner.before_stop.clone() {
347 f();
348 }
349 }
350
351 impl AsyncPoolSpawner {
new(builder: &MultiThreadBuilder) -> io::Result<Self>352 pub(crate) fn new(builder: &MultiThreadBuilder) -> io::Result<Self> {
353 let (handle, driver) = Driver::initialize();
354
355 let thread_num = builder.core_thread_size.unwrap_or_else(get_cpu_core);
356 let spawner = AsyncPoolSpawner {
357 inner: Arc::new(Inner {
358 total: thread_num,
359 is_affinity: builder.common.is_affinity,
360 shutdown_handle: Arc::new((Mutex::new(0), Condvar::new())),
361 after_start: builder.common.after_start.clone(),
362 before_stop: builder.common.before_stop.clone(),
363 worker_name: builder.common.worker_name.clone(),
364 stack_size: builder.common.stack_size,
365 #[cfg(feature = "metrics")]
366 workers: Mutex::new(Vec::with_capacity(thread_num)),
367 }),
368 exe_mng_info: Arc::new(MultiThreadScheduler::new(thread_num, handle)),
369 };
370 spawner.create_async_thread_pool(driver)?;
371 Ok(spawner)
372 }
373
create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()>374 fn create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()> {
375 let mut workers = vec![];
376 for index in 0..self.inner.total {
377 let local_queue = self.exe_mng_info.create_local_queue(index);
378 let local_run_queue =
379 Box::new(worker::Inner::new(local_queue, Parker::new(driver.clone())));
380 workers.push(Arc::new(Worker {
381 index,
382 scheduler: self.exe_mng_info.clone(),
383 inner: RefCell::new(local_run_queue),
384 lifo: RefCell::new(None),
385 yielded: RefCell::new(Vec::new()),
386 }))
387 }
388
389 for (worker_id, worker) in workers.drain(..).enumerate() {
390 let work_arc_handle = self.exe_mng_info.handle.clone();
391 #[cfg(feature = "metrics")]
392 self.inner.workers.lock().unwrap().push(worker.clone());
393 // set up thread attributes
394 let mut builder = thread::Builder::new();
395
396 if let Some(worker_name) = self.inner.worker_name.clone() {
397 builder = builder.name(format!("async-{worker_id}-{worker_name}"));
398 } else {
399 builder = builder.name(format!("async-{worker_id}"));
400 }
401
402 if let Some(stack_size) = self.inner.stack_size {
403 builder = builder.stack_size(stack_size);
404 }
405
406 let parker = worker.inner.borrow().parker.clone();
407 self.exe_mng_info.handles.write().unwrap().push(parker);
408
409 let inner = self.inner.clone();
410
411 if self.inner.is_affinity {
412 builder.spawn(move || {
413 let cpu_core_num = get_cpu_core();
414 let cpu_id = worker_id % cpu_core_num;
415 set_current_affinity(cpu_id).expect("set_current_affinity() fail!");
416 async_thread_proc(inner, worker, work_arc_handle);
417 })?;
418 } else {
419 builder.spawn(move || {
420 async_thread_proc(inner, worker, work_arc_handle);
421 })?;
422 }
423 }
424 Ok(())
425 }
426
spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static,427 pub(crate) fn spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output>
428 where
429 T: Future + Send + 'static,
430 T::Output: Send + 'static,
431 {
432 let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
433 let (task, join_handle) =
434 Task::create_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
435
436 self.exe_mng_info.schedule(task, false);
437 join_handle
438 }
439
440 /// # Safety
441 /// Users need to guarantee that the future will remember lifetime and thus
442 /// compiler will capture lifetime issues, or the future will complete
443 /// when its context remains valid. If not, currently
444 /// runtime initialization will cause memory error.
445 ///
446 /// ## Memory issue example
447 /// No matter using which type (current / multi thread) of runtime, the
448 /// following code can compile. When the variable `slice` gets released
449 /// when the function ends, any handles returned from this function rely
450 /// on a dangled pointer.
451 ///
452 /// ```no run
453 /// fn err_example(runtime: &Runtime) -> JoinHandle<()> {
454 /// let builder = TaskBuilder::default();
455 /// let mut slice = [1, 2, 3, 4, 5];
456 /// let borrow = &mut slice;
457 /// match &runtime.async_spawner {
458 /// AsyncHandle::CurrentThread(pool) => {
459 /// pool.spawn_with_ref(
460 /// &builder,
461 /// async { borrow.iter_mut().for_each(|x| *x *= 2) }
462 /// )
463 /// }
464 /// AsyncHandle::MultiThread(pool) => {
465 /// pool.spawn_with_ref(
466 /// &builder,
467 /// async { borrow.iter_mut().for_each(|x| *x *= 2) }
468 /// )
469 /// }
470 /// }
471 /// }
472 ///
473 /// let runtime = Runtime::new().unwrap();
474 /// let handle = spawn_blocking(
475 /// move || block_on(err_example(&runtime)).unwrap()
476 /// );
477 /// ```
spawn_with_ref<T>( &self, builder: &TaskBuilder, task: T, ) -> JoinHandle<T::Output> where T: Future + Send, T::Output: Send,478 pub(crate) unsafe fn spawn_with_ref<T>(
479 &self,
480 builder: &TaskBuilder,
481 task: T,
482 ) -> JoinHandle<T::Output>
483 where
484 T: Future + Send,
485 T::Output: Send,
486 {
487 let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
488 let raw_task = Task::create_raw_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
489 let handle = JoinHandle::new(raw_task);
490 let task = Task(raw_task);
491 self.exe_mng_info.schedule(task, false);
492 handle
493 }
494
495 /// Waits 3 seconds for threads to finish before releasing the async pool.
496 /// If threads could not finish before releasing, there could be possible
497 /// memory leak.
release_wait(&self) -> Result<(), ()>498 fn release_wait(&self) -> Result<(), ()> {
499 self.exe_mng_info.cancel();
500 let pair = self.inner.shutdown_handle.clone();
501 let total = self.inner.total;
502 let (lock, cvar) = &*pair;
503 let finished = lock.lock().unwrap();
504 let res = cvar
505 .wait_timeout_while(finished, ASYNC_THREAD_QUIT_WAIT_TIME, |&mut finished| {
506 finished < total
507 })
508 .unwrap();
509 // if time limit has been reached, the unfinished threads would not get released
510 if res.1.timed_out() {
511 Err(())
512 } else {
513 Ok(())
514 }
515 }
516
release(&self)517 pub(crate) fn release(&self) {
518 if let Ok(()) = self.release_wait() {
519 let mut join_handle = self.exe_mng_info.handles.write().unwrap();
520 #[allow(clippy::mem_replace_with_default)]
521 let mut worker_handles = std::mem::replace(join_handle.as_mut(), vec![]);
522 drop(join_handle);
523 for parker in worker_handles.drain(..) {
524 parker.release();
525 }
526 }
527 }
528
529 #[cfg(feature = "metrics")]
get_worker(&self, index: u8) -> Result<Arc<Worker>, ()>530 pub(crate) fn get_worker(&self, index: u8) -> Result<Arc<Worker>, ()> {
531 let vec = self.inner.workers.lock().unwrap();
532 for i in 0..vec.len() {
533 let worker = vec.get(i).expect("worker index out of range");
534 if worker.index == index.into() {
535 return Ok(worker.clone());
536 }
537 }
538 Err(())
539 }
540 }
541
542 #[cfg(test)]
543 mod test {
544 use std::future::Future;
545 use std::pin::Pin;
546 use std::sync::atomic::Ordering::{Acquire, Release};
547 use std::sync::mpsc::channel;
548 use std::sync::{Arc, Mutex};
549 use std::task::{Context, Poll};
550 use std::thread::spawn;
551
552 use crate::builder::RuntimeBuilder;
553 use crate::executor::async_pool::{get_cpu_core, AsyncPoolSpawner, MultiThreadScheduler};
554 use crate::executor::driver::Driver;
555 use crate::executor::parker::Parker;
556 use crate::task::{Task, TaskBuilder, VirtualTableType};
557
558 pub struct TestFuture {
559 value: usize,
560 total: usize,
561 }
562
create_new() -> TestFuture563 pub fn create_new() -> TestFuture {
564 TestFuture {
565 value: 0,
566 total: 1000,
567 }
568 }
569
570 impl Future for TestFuture {
571 type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>572 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
573 if self.total > self.value {
574 self.get_mut().value += 1;
575 cx.waker().wake_by_ref();
576 Poll::Pending
577 } else {
578 Poll::Ready(self.total)
579 }
580 }
581 }
582
test_future() -> usize583 async fn test_future() -> usize {
584 create_new().await
585 }
586
587 /// UT test cases for ExecutorMngInfo::new()
588 ///
589 /// # Brief
590 /// 1. Creates a ExecutorMsgInfo with thread number 1
591 /// 2. Creates a ExecutorMsgInfo with thread number 2
592 #[test]
ut_executor_mng_info_new_001()593 fn ut_executor_mng_info_new_001() {
594 let (arc_handle, _) = Driver::initialize();
595 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
596 assert!(!executor_mng_info.is_cancel.load(Acquire));
597 assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
598
599 let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
600 assert!(!executor_mng_info.is_cancel.load(Acquire));
601 assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
602 }
603
604 /// UT test cases for ExecutorMngInfo::create_local_queues()
605 ///
606 /// # Brief
607 /// 1. index set to 0, check the return value
608 /// 2. index set to ExecutorMngInfo.inner.total, check the return value
609 #[test]
ut_executor_mng_info_create_local_queues()610 fn ut_executor_mng_info_create_local_queues() {
611 let (arc_handle, _) = Driver::initialize();
612 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
613 let local_run_queue_info = executor_mng_info.create_local_queue(0);
614 assert!(local_run_queue_info.is_empty());
615
616 let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
617 let local_run_queue_info = executor_mng_info.create_local_queue(63);
618 assert!(local_run_queue_info.is_empty());
619 }
620
621 /// UT test cases for ExecutorMngInfo::enqueue()
622 ///
623 /// # Brief
624 /// 1. index set to 0, check the return value
625 /// 2. index set to ExecutorMngInfo.inner.total, check the return value
626 #[test]
ut_executor_mng_info_enqueue()627 fn ut_executor_mng_info_enqueue() {
628 let (arc_handle, _) = Driver::initialize();
629 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
630
631 let builder = TaskBuilder::new();
632 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
633 let (task, _) = Task::create_task(
634 &builder,
635 exe_scheduler,
636 test_future(),
637 VirtualTableType::Ylong,
638 );
639
640 executor_mng_info.enqueue(task, true);
641 assert!(!executor_mng_info.has_no_work());
642 }
643
644 /// UT test cases for ExecutorMngInfo::is_cancel()
645 ///
646 /// # Brief
647 /// 1. The is_cancel value is set to true to check the return value
648 /// 2. The is_cancel value is set to false to check the return value
649 #[test]
ut_executor_mng_info_is_cancel()650 fn ut_executor_mng_info_is_cancel() {
651 let (arc_handle, _) = Driver::initialize();
652 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
653 executor_mng_info.is_cancel.store(false, Release);
654 assert!(!executor_mng_info.is_cancel());
655 executor_mng_info.is_cancel.store(true, Release);
656 assert!(executor_mng_info.is_cancel());
657 }
658
659 /// UT test cases for ExecutorMngInfo::set_cancel()
660 ///
661 /// # Brief
662 /// 1. Check if the is_cancel parameter becomes true after set_cancel
663 #[test]
ut_executor_mng_info_set_cancel()664 fn ut_executor_mng_info_set_cancel() {
665 let (arc_handle, _) = Driver::initialize();
666 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
667 assert!(!executor_mng_info.is_cancel.load(Acquire));
668 executor_mng_info.set_cancel();
669 assert!(executor_mng_info.is_cancel.load(Acquire));
670 }
671
672 /// UT test cases for ExecutorMngInfo::cancel()
673 ///
674 /// # Brief
675 /// 1. Check if the is_cancel parameter becomes true after set_cancel
676 #[test]
ut_executor_mng_info_cancel()677 fn ut_executor_mng_info_cancel() {
678 let (arc_handle, arc_driver) = Driver::initialize();
679 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
680
681 let flag = Arc::new(Mutex::new(0));
682 let (tx, rx) = channel();
683
684 let (flag_clone, tx) = (flag.clone(), tx);
685
686 let mut parker = Parker::new(arc_driver);
687 let parker_cpy = parker.clone();
688 let _ = spawn(move || {
689 parker.park();
690 *flag_clone.lock().unwrap() = 1;
691 tx.send(()).unwrap()
692 });
693 executor_mng_info.handles.write().unwrap().push(parker_cpy);
694
695 executor_mng_info.cancel();
696 rx.recv().unwrap();
697 assert_eq!(*flag.lock().unwrap(), 1);
698 }
699
700 /// UT test cases for ExecutorMngInfo::wake_up_all()
701 ///
702 /// # Brief
703 /// 1. Constructs an environment to check if all threads are woken up and
704 /// executed via thread hooks.
705 #[test]
ut_executor_mng_info_wake_up_all()706 fn ut_executor_mng_info_wake_up_all() {
707 let (arc_handle, arc_driver) = Driver::initialize();
708 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
709
710 let flag = Arc::new(Mutex::new(0));
711 let (tx, rx) = channel();
712
713 let (flag_clone, tx) = (flag.clone(), tx);
714
715 let mut parker = Parker::new(arc_driver);
716 let parker_cpy = parker.clone();
717
718 let _ = spawn(move || {
719 parker.park();
720 *flag_clone.lock().unwrap() = 1;
721 tx.send(()).unwrap()
722 });
723
724 executor_mng_info.handles.write().unwrap().push(parker_cpy);
725
726 executor_mng_info.wake_up_all();
727 rx.recv().unwrap();
728 assert_eq!(*flag.lock().unwrap(), 1);
729 }
730
731 /// UT test cases for ExecutorMngInfo::wake_up_rand_one()
732 ///
733 /// # Brief
734 /// 1. Constructs an environment to check if a thread is woken up and
735 /// executed by a thread hook.
736 #[test]
ut_executor_mng_info_wake_up_rand_one()737 fn ut_executor_mng_info_wake_up_rand_one() {
738 let (arc_handle, arc_driver) = Driver::initialize();
739 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
740 executor_mng_info.turn_to_sleep(0);
741
742 let flag = Arc::new(Mutex::new(0));
743 let (tx, rx) = channel();
744
745 let (flag_clone, tx) = (flag.clone(), tx);
746
747 let mut parker = Parker::new(arc_driver);
748 let parker_cpy = parker.clone();
749
750 let _ = spawn(move || {
751 parker.park();
752 *flag_clone.lock().unwrap() = 1;
753 tx.send(()).unwrap()
754 });
755
756 executor_mng_info.handles.write().unwrap().push(parker_cpy);
757
758 executor_mng_info.wake_up_rand_one();
759 rx.recv().unwrap();
760 assert_eq!(*flag.lock().unwrap(), 1);
761 }
762
763 /// UT test cases for ExecutorMngInfo::wake_up_if_one_task_left()
764 ///
765 /// # Brief
766 /// 1. Constructs the environment, checks if there are still tasks, and if
767 /// so, wakes up a thread to continue working.
768 #[test]
ut_executor_mng_info_wake_up_if_one_task_left()769 fn ut_executor_mng_info_wake_up_if_one_task_left() {
770 let (arc_handle, arc_driver) = Driver::initialize();
771 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
772
773 executor_mng_info.turn_to_sleep(0);
774
775 let flag = Arc::new(Mutex::new(0));
776 let (tx, rx) = channel();
777
778 let (flag_clone, tx) = (flag.clone(), tx);
779
780 let mut parker = Parker::new(arc_driver);
781 let parker_cpy = parker.clone();
782
783 let _ = spawn(move || {
784 parker.park();
785 *flag_clone.lock().unwrap() = 1;
786 tx.send(()).unwrap()
787 });
788
789 executor_mng_info.handles.write().unwrap().push(parker_cpy);
790
791 let builder = TaskBuilder::new();
792 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
793 let (task, _) = Task::create_task(
794 &builder,
795 exe_scheduler,
796 test_future(),
797 VirtualTableType::Ylong,
798 );
799
800 executor_mng_info.enqueue(task, true);
801
802 if !executor_mng_info.has_no_work() {
803 executor_mng_info.wake_up_rand_one();
804 }
805
806 rx.recv().unwrap();
807 assert_eq!(*flag.lock().unwrap(), 1);
808 }
809
810 /// UT test cases for ExecutorMngInfo::from_woken_to_sleep()
811 ///
812 /// # Brief
813 /// 1. Construct the environment and set the state of the specified thread
814 /// to park state. If the last thread is in park state, check whether
815 /// there is a task, and if so, wake up this thread.
816 #[test]
ut_from_woken_to_sleep()817 fn ut_from_woken_to_sleep() {
818 let (arc_handle, arc_driver) = Driver::initialize();
819 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
820
821 let flag = Arc::new(Mutex::new(0));
822 let (tx, rx) = channel();
823
824 let (flag_clone, tx) = (flag.clone(), tx);
825
826 let mut parker = Parker::new(arc_driver);
827 let parker_cpy = parker.clone();
828
829 let _ = spawn(move || {
830 parker.park();
831 *flag_clone.lock().unwrap() = 1;
832 tx.send(()).unwrap()
833 });
834
835 executor_mng_info.handles.write().unwrap().push(parker_cpy);
836
837 let builder = TaskBuilder::new();
838 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
839 let (task, _) = Task::create_task(
840 &builder,
841 exe_scheduler,
842 test_future(),
843 VirtualTableType::Ylong,
844 );
845
846 executor_mng_info.enqueue(task, true);
847 executor_mng_info.turn_to_sleep(0);
848 rx.recv().unwrap();
849 assert_eq!(*flag.lock().unwrap(), 1);
850 }
851
852 /// UT test cases for AsyncPoolSpawner::new()
853 ///
854 /// # Brief
855 /// 1. Verify the parameters of the initialization completion
856 #[test]
ut_async_pool_spawner_new()857 fn ut_async_pool_spawner_new() {
858 let thread_pool_builder = RuntimeBuilder::new_multi_thread();
859 let async_pool_spawner = AsyncPoolSpawner::new(&thread_pool_builder).unwrap();
860 assert_eq!(
861 async_pool_spawner.inner.total,
862 thread_pool_builder
863 .core_thread_size
864 .unwrap_or_else(get_cpu_core)
865 );
866 assert_eq!(
867 async_pool_spawner.inner.worker_name,
868 thread_pool_builder.common.worker_name
869 );
870 assert_eq!(
871 async_pool_spawner.inner.stack_size,
872 thread_pool_builder.common.stack_size
873 );
874 assert!(!async_pool_spawner.exe_mng_info.is_cancel.load(Acquire));
875 }
876
877 /// UT test cases for `create_async_thread_pool`.
878 ///
879 /// # Brief
880 /// 1. Create an async_pool_spawner with `is_affinity` setting to false
881 /// 2. Call create_async_thread_pool()
882 /// 3. This UT should not panic
883 #[test]
ut_async_pool_spawner_create_async_thread_pool_001()884 fn ut_async_pool_spawner_create_async_thread_pool_001() {
885 let thread_pool_builder = RuntimeBuilder::new_multi_thread();
886 let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(false)).unwrap();
887 }
888
889 /// UT test cases for `UnboundedSender`.
890 ///
891 /// # Brief
892 /// 1. Create an async_pool_spawner with `is_affinity` setting to true
893 /// 2. Call create_async_thread_pool()
894 /// 3. This UT should not panic
895 #[test]
ut_async_pool_spawner_create_async_thread_pool_002()896 fn ut_async_pool_spawner_create_async_thread_pool_002() {
897 let thread_pool_builder = RuntimeBuilder::new_multi_thread();
898 let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(true)).unwrap();
899 }
900 }
901