• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::UnsafeCell;
15 use std::collections::linked_list::LinkedList;
16 use std::mem::MaybeUninit;
17 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
18 use std::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
19 use std::sync::{Arc, Mutex};
20 use std::{cmp, ptr};
21 
22 /// Schedule strategy implementation, includes FIFO LIFO priority and
23 /// work-stealing work-stealing strategy include stealing half of every worker
24 /// or the largest amount of worker
25 use crate::task::Task;
26 
non_atomic_load(data: &AtomicU16) -> u1627 unsafe fn non_atomic_load(data: &AtomicU16) -> u16 {
28     ptr::read(data as *const AtomicU16 as *const u16)
29 }
30 
31 /// Capacity of the local queue
32 pub(crate) const LOCAL_QUEUE_CAP: usize = 256;
33 const MASK: u16 = LOCAL_QUEUE_CAP as u16 - 1;
34 
35 /// Local queue of the worker
36 pub(crate) struct LocalQueue {
37     pub(crate) inner: Arc<InnerBuffer>,
38 }
39 
40 unsafe impl Send for LocalQueue {}
41 unsafe impl Sync for LocalQueue {}
42 
43 unsafe impl Send for InnerBuffer {}
44 unsafe impl Sync for InnerBuffer {}
45 
46 impl LocalQueue {
new() -> Self47     pub(crate) fn new() -> Self {
48         LocalQueue {
49             inner: Arc::new(InnerBuffer::new(LOCAL_QUEUE_CAP as u16)),
50         }
51     }
52 }
53 
54 #[inline]
unwrap(num: u32) -> (u16, u16)55 fn unwrap(num: u32) -> (u16, u16) {
56     let head_pos = num & u16::MAX as u32;
57     let steal_pos = num >> 16;
58     (steal_pos as u16, head_pos as u16)
59 }
60 
61 #[inline]
wrap(steal_pos: u16, head_pos: u16) -> u3262 fn wrap(steal_pos: u16, head_pos: u16) -> u32 {
63     (head_pos as u32) | ((steal_pos as u32) << 16)
64 }
65 
66 impl LocalQueue {
67     #[inline]
pop_front(&self) -> Option<Task>68     pub(crate) fn pop_front(&self) -> Option<Task> {
69         self.inner.pop_front()
70     }
71 
72     #[inline]
push_back(&self, task: Task, global: &GlobalQueue)73     pub(crate) fn push_back(&self, task: Task, global: &GlobalQueue) {
74         self.inner.push_back(task, global);
75     }
76 
77     #[inline]
steal_into(&self, dst: &LocalQueue) -> Option<Task>78     pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
79         self.inner.steal_into(dst)
80     }
81 
82     #[inline]
is_empty(&self) -> bool83     pub(crate) fn is_empty(&self) -> bool {
84         self.inner.is_empty()
85     }
86 
87     #[inline]
remaining(&self) -> u1688     pub(crate) fn remaining(&self) -> u16 {
89         self.inner.remaining()
90     }
91 }
92 
93 #[cfg(feature = "metrics")]
94 impl LocalQueue {
95     #[inline]
len(&self) -> u1696     pub(crate) fn len(&self) -> u16 {
97         self.inner.len()
98     }
99 
100     #[inline]
count(&self) -> usize101     pub(crate) fn count(&self) -> usize {
102         self.inner.count()
103     }
104 
105     #[inline]
task_from_global_count(&self) -> usize106     pub(crate) fn task_from_global_count(&self) -> usize {
107         self.inner.task_from_global_count()
108     }
109 
110     #[inline]
task_to_global_count(&self) -> usize111     pub(crate) fn task_to_global_count(&self) -> usize {
112         self.inner.task_to_global_count()
113     }
114 }
115 
116 pub(crate) struct InnerBuffer {
117     /// Front stores the position of both head and steal
118     front: AtomicU32,
119     rear: AtomicU16,
120     cap: u16,
121     buffer: Box<[UnsafeCell<MaybeUninit<Task>>]>,
122     #[cfg(feature = "metrics")]
123     metrics: InnerBufferMetrics,
124 }
125 
126 /// Metrics of InnerBuffer
127 #[cfg(feature = "metrics")]
128 struct InnerBufferMetrics {
129     /// The total number of task which has entered this LocalQueue
130     count: AtomicUsize,
131     /// The total number of task which has entered this LocalQueue from
132     /// GlobalQueue
133     task_from_global_count: AtomicUsize,
134     /// The total number of task which has entered GlobalQueue from this
135     /// LocalQueue
136     task_to_global_count: AtomicUsize,
137 }
138 
139 #[cfg(feature = "metrics")]
140 impl InnerBuffer {
141     /// Return queue's len.
len(&self) -> u16142     fn len(&self) -> u16 {
143         let rear = self.rear.load(Acquire);
144         let (_, head) = unwrap(self.front.load(Acquire));
145         rear.wrapping_sub(head)
146     }
147 
148     /// Returns the total number of task which has entered this LocalQueue
count(&self) -> usize149     fn count(&self) -> usize {
150         self.metrics.count.load(Acquire)
151     }
152 
153     /// Returns the total number of task which has entered this LocalQueue from
154     /// GlobalQueue
task_from_global_count(&self) -> usize155     fn task_from_global_count(&self) -> usize {
156         self.metrics.task_from_global_count.load(Acquire)
157     }
158 
159     /// Returns the total number of task which has entered GlobalQueue from this
160     /// LocalQueue
task_to_global_count(&self) -> usize161     fn task_to_global_count(&self) -> usize {
162         self.metrics.task_to_global_count.load(Acquire)
163     }
164 }
165 
166 impl InnerBuffer {
new(cap: u16) -> Self167     fn new(cap: u16) -> Self {
168         let mut buffer = Vec::with_capacity(cap as usize);
169 
170         for _ in 0..cap {
171             buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
172         }
173         InnerBuffer {
174             front: AtomicU32::new(0),
175             rear: AtomicU16::new(0),
176             cap,
177             buffer: buffer.into(),
178             #[cfg(feature = "metrics")]
179             metrics: InnerBufferMetrics {
180                 count: AtomicUsize::new(0),
181                 task_from_global_count: AtomicUsize::new(0),
182                 task_to_global_count: AtomicUsize::new(0),
183             },
184         }
185     }
186 
187     /// Checks whether the queue is empty
is_empty(&self) -> bool188     fn is_empty(&self) -> bool {
189         let (_, head) = unwrap(self.front.load(Acquire));
190         let rear = self.rear.load(Acquire);
191         head == rear
192     }
193 
pop_front(&self) -> Option<Task>194     pub(crate) fn pop_front(&self) -> Option<Task> {
195         let mut head = self.front.load(Acquire);
196 
197         let pos = loop {
198             let (steal_pos, real_pos) = unwrap(head);
199 
200             // it's a spmc queue, so the queue could read its own tail non-atomically
201             let tail_pos = unsafe { non_atomic_load(&self.rear) };
202 
203             // return none if the queue is empty
204             if real_pos == tail_pos {
205                 return None;
206             }
207 
208             let next_real = real_pos.wrapping_add(1);
209             let next = if steal_pos == real_pos {
210                 wrap(next_real, next_real)
211             } else {
212                 wrap(steal_pos, next_real)
213             };
214 
215             let res = self
216                 .front
217                 .compare_exchange_weak(head, next, AcqRel, Acquire);
218             match res {
219                 Ok(_) => break real_pos,
220                 Err(actual) => head = actual,
221             }
222         };
223 
224         let task = self.buffer[(pos & MASK) as usize].get();
225 
226         Some(unsafe { ptr::read(task).assume_init() })
227     }
228 
remaining(&self) -> u16229     pub(crate) fn remaining(&self) -> u16 {
230         let front = self.front.load(Acquire);
231 
232         let (steal_pos, _real_pos) = unwrap(front);
233         // it's a spmc queue, so the queue could read its own tail non-atomically
234         let rear = unsafe { non_atomic_load(&self.rear) };
235 
236         self.cap - (rear.wrapping_sub(steal_pos))
237     }
238 
sync_steal_pos(&self, mut prev: u32)239     fn sync_steal_pos(&self, mut prev: u32) {
240         loop {
241             let (_front_steal, front_real) = unwrap(prev);
242             let next = wrap(front_real, front_real);
243             let res = self.front.compare_exchange(prev, next, AcqRel, Acquire);
244 
245             match res {
246                 Ok(_) => {
247                     return;
248                 }
249                 Err(actual) => {
250                     let (actual_steal_pos, actual_real_pos) = unwrap(actual);
251                     if actual_steal_pos == actual_real_pos {
252                         panic!("steal_pos and real_pos should not be the same");
253                     }
254                     prev = actual;
255                 }
256             }
257         }
258     }
259 
push_back(&self, mut task: Task, global: &GlobalQueue)260     pub(crate) fn push_back(&self, mut task: Task, global: &GlobalQueue) {
261         loop {
262             let front = self.front.load(Acquire);
263 
264             let (steal_pos, _) = unwrap(front);
265             // it's a spmc queue, so the queue could read its own tail non-atomically
266             let rear = unsafe { non_atomic_load(&self.rear) };
267 
268             // if the local queue is full, push the task into the global queue
269             if rear.wrapping_sub(steal_pos) < self.cap {
270                 let idx = (rear & MASK) as usize;
271                 let ptr = self.buffer[idx].get();
272                 unsafe {
273                     ptr::write((*ptr).as_mut_ptr(), task);
274                 }
275                 self.rear.store(rear.wrapping_add(1), SeqCst);
276                 #[cfg(feature = "metrics")]
277                 self.metrics.count.fetch_add(1, AcqRel);
278                 return;
279             } else {
280                 match self.push_overflowed(task, global, steal_pos) {
281                     Ok(_) => return,
282                     Err(ret) => task = ret,
283                 }
284             }
285         }
286     }
287 
288     #[allow(unused_assignments)]
push_overflowed( &self, task: Task, global: &GlobalQueue, front: u16, ) -> Result<(), Task>289     pub(crate) fn push_overflowed(
290         &self,
291         task: Task,
292         global: &GlobalQueue,
293         front: u16,
294     ) -> Result<(), Task> {
295         // get the number of tasks the worker has stolen
296         let count = LOCAL_QUEUE_CAP / 2;
297         let prev = wrap(front, front);
298         let next = wrap(front, front.wrapping_add(count as u16));
299 
300         match self.front.compare_exchange(prev, next, Release, Acquire) {
301             Ok(_) => {}
302             Err(_) => return Err(task),
303         }
304 
305         let (mut src_front_steal, _src_front_real) = unwrap(prev);
306 
307         let mut tmp_buf = Vec::with_capacity(count);
308         for _ in 0..count {
309             tmp_buf.push(UnsafeCell::new(MaybeUninit::uninit()));
310         }
311 
312         for dst_ptr in tmp_buf.iter().take(count) {
313             let src_idx = (src_front_steal & MASK) as usize;
314             let task_ptr = self.buffer[src_idx].get();
315             let task = unsafe { ptr::read(task_ptr).assume_init() };
316             unsafe {
317                 ptr::write((*dst_ptr.get()).as_mut_ptr(), task);
318             }
319             src_front_steal = src_front_steal.wrapping_add(1);
320         }
321 
322         self.sync_steal_pos(next);
323 
324         #[cfg(feature = "metrics")]
325         self.metrics
326             .task_to_global_count
327             .fetch_add(tmp_buf.len() + 1, AcqRel);
328 
329         global.push_batch(tmp_buf, task);
330 
331         Ok(())
332     }
333 
steal_into(&self, dst: &LocalQueue) -> Option<Task>334     pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
335         // it's a spmc queue, so the queue could read its own tail non-atomically
336         let mut dst_rear = unsafe { non_atomic_load(&dst.inner.rear) };
337         let (des_steal_pos, _des_front_pos) = unwrap(dst.inner.front.load(Acquire));
338         if dst_rear.wrapping_sub(des_steal_pos) > LOCAL_QUEUE_CAP as u16 / 2 {
339             return None;
340         }
341 
342         let mut src_next_front;
343         let mut src_prev_front = self.front.load(Acquire);
344 
345         // get the number of tasks the worker has stolen
346         let mut count = loop {
347             let (src_front_steal, src_front_real) = unwrap(src_prev_front);
348 
349             // if these two values are not equal, it means another worker has stolen from
350             // this queue, therefore abort this steal.
351             if src_front_steal != src_front_real {
352                 return None;
353             };
354 
355             let src_rear = self.rear.load(Acquire);
356 
357             // steal half of the tasks from the queue
358             let mut n = src_rear.wrapping_sub(src_front_real);
359             n = n - n / 2;
360             if n == 0 {
361                 return None;
362             }
363 
364             let src_steal_to = src_front_real.wrapping_add(n);
365             src_next_front = wrap(src_front_steal, src_steal_to);
366 
367             let res =
368                 self.front
369                     .compare_exchange_weak(src_prev_front, src_next_front, AcqRel, Acquire);
370             match res {
371                 Ok(_) => break n,
372                 Err(actual) => src_prev_front = actual,
373             }
374         };
375 
376         // transfer the tasks
377         let (mut src_front_steal, _src_front_real) = unwrap(src_next_front);
378         count -= 1;
379         for _ in 0..count {
380             let src_idx = (src_front_steal & MASK) as usize;
381             let des_idx = (dst_rear & MASK) as usize;
382 
383             let task_ptr = self.buffer[src_idx].get();
384 
385             let task = unsafe { ptr::read(task_ptr).assume_init() };
386             let ptr = dst.inner.buffer[des_idx].get();
387             unsafe {
388                 ptr::write((*ptr).as_mut_ptr(), task);
389             }
390             src_front_steal = src_front_steal.wrapping_add(1);
391             dst_rear = dst_rear.wrapping_add(1);
392         }
393 
394         let src_idx = (src_front_steal & MASK) as usize;
395 
396         let task_ptr = self.buffer[src_idx].get();
397         let task = unsafe { ptr::read(task_ptr).assume_init() };
398         if count != 0 {
399             dst.inner.rear.store(dst_rear, SeqCst);
400         }
401 
402         self.sync_steal_pos(src_next_front);
403 
404         Some(task)
405     }
406 }
407 
408 impl Drop for InnerBuffer {
drop(&mut self)409     fn drop(&mut self) {
410         while self.pop_front().is_some() {}
411     }
412 }
413 
414 pub(crate) struct GlobalQueue {
415     /// Current number of tasks
416     len: AtomicUsize,
417     /// The total number of tasks which has entered global queue.
418     #[cfg(feature = "metrics")]
419     count: AtomicUsize,
420     globals: Mutex<LinkedList<Task>>,
421 }
422 
423 impl GlobalQueue {
new() -> Self424     pub(crate) fn new() -> Self {
425         GlobalQueue {
426             len: AtomicUsize::new(0_usize),
427             #[cfg(feature = "metrics")]
428             count: AtomicUsize::new(0_usize),
429             globals: Mutex::new(LinkedList::new()),
430         }
431     }
is_empty(&self) -> bool432     pub(super) fn is_empty(&self) -> bool {
433         self.len.load(Acquire) == 0
434     }
435 
push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task)436     pub(super) fn push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task) {
437         let mut list = self.globals.lock().unwrap();
438         let len = tasks.len() + 1;
439         for task_ptr in tasks {
440             let task = unsafe { ptr::read(task_ptr.get()).assume_init() };
441             list.push_back(task);
442         }
443         list.push_back(task);
444         self.len.fetch_add(len, AcqRel);
445         #[cfg(feature = "metrics")]
446         self.count.fetch_add(len, AcqRel);
447     }
448 
pop_batch( &self, worker_num: usize, queue: &LocalQueue, limit: usize, ) -> Option<Task>449     pub(super) fn pop_batch(
450         &self,
451         worker_num: usize,
452         queue: &LocalQueue,
453         limit: usize,
454     ) -> Option<Task> {
455         let len = self.len.load(Acquire);
456         let num = cmp::min(len / worker_num, limit);
457 
458         let inner_buf = &queue.inner;
459         // it's a spmc queue, so the queue could read its own tail non-atomically
460         let rear = unsafe { non_atomic_load(&inner_buf.rear) };
461         let mut curr = rear;
462 
463         let mut list = self.globals.lock().unwrap();
464         let first_task = list.pop_front()?;
465 
466         let mut count = 1;
467 
468         for _ in 1..num {
469             if let Some(task) = list.pop_front() {
470                 let idx = (curr & MASK) as usize;
471                 let ptr = inner_buf.buffer[idx].get();
472                 unsafe {
473                     ptr::write((*ptr).as_mut_ptr(), task);
474                 }
475                 curr = curr.wrapping_add(1);
476                 count += 1;
477             } else {
478                 break;
479             }
480         }
481         drop(list);
482         self.len.fetch_sub(count, AcqRel);
483         inner_buf.rear.store(curr, Release);
484 
485         #[cfg(feature = "metrics")]
486         inner_buf
487             .metrics
488             .task_from_global_count
489             .fetch_add(1, AcqRel);
490 
491         Some(first_task)
492     }
493 
pop_front(&self) -> Option<Task>494     pub(super) fn pop_front(&self) -> Option<Task> {
495         if self.is_empty() {
496             return None;
497         }
498         let mut list = self.globals.lock().unwrap();
499         let task = list.pop_front();
500         drop(list);
501         if task.is_some() {
502             self.len.fetch_sub(1, AcqRel);
503         }
504         task
505     }
506 
push_back(&self, task: Task)507     pub(super) fn push_back(&self, task: Task) {
508         let mut list = self.globals.lock().unwrap();
509         list.push_back(task);
510         drop(list);
511         self.len.fetch_add(1, AcqRel);
512         #[cfg(feature = "metrics")]
513         self.count.fetch_add(1, AcqRel);
514     }
515 
get_global(&self) -> &Mutex<LinkedList<Task>>516     pub(super) fn get_global(&self) -> &Mutex<LinkedList<Task>> {
517         &self.globals
518     }
519 
520     #[cfg(feature = "metrics")]
get_len(&self) -> usize521     pub(crate) fn get_len(&self) -> usize {
522         self.len.load(Acquire)
523     }
524 
525     #[cfg(feature = "metrics")]
get_count(&self) -> usize526     pub(crate) fn get_count(&self) -> usize {
527         self.count.load(Acquire)
528     }
529 }
530 
531 #[cfg(feature = "multi_instance_runtime")]
532 #[cfg(test)]
533 mod test {
534     use std::future::Future;
535     use std::pin::Pin;
536     use std::sync::atomic::Ordering::Acquire;
537     use std::sync::Arc;
538     use std::task::{Context, Poll};
539     use std::thread::park;
540 
541     use crate::executor::async_pool::MultiThreadScheduler;
542     use crate::executor::driver::Driver;
543     use crate::executor::queue::{GlobalQueue, InnerBuffer, LocalQueue, LOCAL_QUEUE_CAP};
544     use crate::task::{Task, TaskBuilder, VirtualTableType};
545 
546     #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
547     impl InnerBuffer {
len(&self) -> u16548         fn len(&self) -> u16 {
549             let front = self.front.load(Acquire);
550             let (_, real_pos) = crate::executor::queue::unwrap(front);
551 
552             let rear = self.rear.load(Acquire);
553             rear.wrapping_sub(real_pos)
554         }
555     }
556 
557     #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
558     impl LocalQueue {
len(&self) -> u16559         pub fn len(&self) -> u16 {
560             self.inner.len()
561         }
562     }
563 
564     pub struct TestFuture {
565         value: usize,
566         total: usize,
567     }
568 
create_new() -> TestFuture569     pub fn create_new() -> TestFuture {
570         TestFuture {
571             value: 0,
572             total: 1000,
573         }
574     }
575 
576     impl Future for TestFuture {
577         type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>578         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579             if self.total > self.value {
580                 // unsafe {
581                 // Pin::get_unchecked_mut(self).value += 1;
582 
583                 //}
584                 self.get_mut().value += 1;
585                 cx.waker().wake_by_ref();
586                 Poll::Pending
587             } else {
588                 Poll::Ready(self.total)
589             }
590         }
591     }
592 
test_future() -> usize593     async fn test_future() -> usize {
594         create_new().await
595     }
596 
597     #[test]
ut_inner_buffer()598     fn ut_inner_buffer() {
599         ut_inner_buffer_new();
600         ut_inner_buffer_len();
601         ut_inner_buffer_is_empty();
602         ut_inner_buffer_push_back();
603         ut_inner_buffer_pop_front();
604         ut_inner_buffer_steal_into();
605     }
606 
607     /// UT test cases for InnerBuffer::new()
608     ///
609     /// # Brief
610     /// 1. Checking the parameters after initialization is completed
ut_inner_buffer_new()611     fn ut_inner_buffer_new() {
612         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
613         assert_eq!(inner_buffer.cap, LOCAL_QUEUE_CAP as u16);
614         assert_eq!(inner_buffer.buffer.len(), LOCAL_QUEUE_CAP);
615     }
616 
617     // InnerBuffer::is_empty() UT test cases
618     //
619 
620     /// # Brief
621     // case execution         1. Checking the parameters after initialization is
622     // completed         2. After entering a task into the queue space,
623     // determine again whether it is empty or not, and it should be non-empty
624     //
625     // property value should be related to the entry after the initialization is
626     // completed
ut_inner_buffer_is_empty()627     fn ut_inner_buffer_is_empty() {
628         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
629         assert!(inner_buffer.is_empty());
630 
631         let builder = TaskBuilder::new();
632 
633         let (arc_handle, _) = Driver::initialize();
634 
635         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
636         let (task, _) = Task::create_task(
637             &builder,
638             exe_scheduler,
639             test_future(),
640             VirtualTableType::Ylong,
641         );
642         let global_queue = GlobalQueue::new();
643         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
644         inner_buffer.push_back(task, &global_queue);
645         assert!(!inner_buffer.is_empty());
646     }
647 
648     // InnerBuffer::len() UT test cases
649     //
650 
651     /// # Brief
652     // case execution         1. Checking the parameters after initialization is
653     // completed         2. Insert tasks up to their capacity into the local
654     // queue, checking the local queue length         3. Insert tasks into the
655     // local queue that exceed its capacity, checking the local queue length as well
656     // as the global queue length
657     // value, no exception branch, and the property value should be related to the
658     // entry after the initialization is completed
ut_inner_buffer_len()659     fn ut_inner_buffer_len() {
660         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
661         assert_eq!(inner_buffer.len(), 0);
662 
663         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
664         let global_queue = GlobalQueue::new();
665         let builder = TaskBuilder::new();
666 
667         let (arc_handle, _) = Driver::initialize();
668 
669         let exe_scheduler =
670             Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
671         let (task, _) = Task::create_task(
672             &builder,
673             exe_scheduler,
674             test_future(),
675             VirtualTableType::Ylong,
676         );
677         inner_buffer.push_back(task, &global_queue);
678         assert_eq!(inner_buffer.len(), 1);
679 
680         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
681         let global_queue = GlobalQueue::new();
682         for _ in 0..LOCAL_QUEUE_CAP + 1 {
683             let exe_scheduler =
684                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
685             let (task, _) = Task::create_task(
686                 &builder,
687                 exe_scheduler,
688                 test_future(),
689                 VirtualTableType::Ylong,
690             );
691             inner_buffer.push_back(task, &global_queue);
692         }
693         assert_eq!(
694             inner_buffer.len() as usize,
695             LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
696         );
697         assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
698     }
699 
700     // InnerBuffer::push_back() UT test cases
701     //
702 
703     /// # Brief
704     // case execution         1. Insert tasks up to capacity into the local
705     // queue, verifying that they are functionally correct         2. Insert
706     // tasks that exceed the capacity into the local queue and verify that they are
707     // functionally correct
708     // there is an exception branch, after the initialization is completed the
709     // property value should be related to the entry
ut_inner_buffer_push_back()710     fn ut_inner_buffer_push_back() {
711         // 1. Insert tasks up to capacity into the local queue, verifying that they are
712         // functionally correct
713         let local_queue = LocalQueue::new();
714         let global_queue = GlobalQueue::new();
715 
716         let (arc_handle, _) = Driver::initialize();
717 
718         let builder = TaskBuilder::new();
719         for _ in 0..LOCAL_QUEUE_CAP / 2 {
720             let exe_scheduler =
721                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
722             let (task, _) = Task::create_task(
723                 &builder,
724                 exe_scheduler,
725                 test_future(),
726                 VirtualTableType::Ylong,
727             );
728             local_queue.push_back(task, &global_queue);
729         }
730 
731         for _ in 0..LOCAL_QUEUE_CAP / 2 {
732             let exe_scheduler =
733                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
734             let (task, _) = Task::create_task(
735                 &builder,
736                 exe_scheduler,
737                 test_future(),
738                 VirtualTableType::Ylong,
739             );
740             local_queue.push_back(task, &global_queue);
741         }
742 
743         assert_eq!(local_queue.len(), 256);
744 
745         // 2. Insert tasks that exceed the capacity into the local queue and verify that
746         // they are functionally correct
747         let local_queue = LocalQueue::new();
748         let global_queue = GlobalQueue::new();
749 
750         let (arc_handle, _) = Driver::initialize();
751 
752         for _ in 0..LOCAL_QUEUE_CAP / 2 + 1 {
753             let exe_scheduler =
754                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
755             let (task, _) = Task::create_task(
756                 &builder,
757                 exe_scheduler,
758                 test_future(),
759                 VirtualTableType::Ylong,
760             );
761             local_queue.push_back(task, &global_queue);
762         }
763 
764         for _ in 0..LOCAL_QUEUE_CAP / 2 {
765             let exe_scheduler =
766                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
767             let (task, _) = Task::create_task(
768                 &builder,
769                 exe_scheduler,
770                 test_future(),
771                 VirtualTableType::Ylong,
772             );
773             local_queue.push_back(task, &global_queue);
774         }
775 
776         assert_eq!(
777             local_queue.len() as usize,
778             LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
779         );
780         assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
781     }
782 
783     // InnerBuffer::pop_front() UT test cases
784     //
785 
786     /// # Brief
787     // case execution         1. Multi-threaded take out task operation with
788     // empty local queue, check if the function is correct         2. If the
789     // local queue is not empty, multi-threaded take out operations up to the number
790     // of existing tasks and check if the function is correct         3. If the
791     // local queue is not empty, the multi-threaded operation to take out more than
792     // the number of existing tasks, check whether the function is correct
793     //
794     // should be related to the entry after the initialization is completed
795     //
ut_inner_buffer_pop_front()796     fn ut_inner_buffer_pop_front() {
797         // 1. Multi-threaded take out task operation with empty local queue, check if
798         // the function is correct
799         let local_queue = LocalQueue::new();
800         let global_queue = GlobalQueue::new();
801         assert!(local_queue.pop_front().is_none());
802 
803         // 2. If the local queue is not empty, multi-threaded take out operations up to
804         // the number of existing tasks and check if the function is correct
805         let local_queue = Arc::new(LocalQueue::new());
806         let builder = TaskBuilder::new();
807 
808         let (arc_handle, _) = Driver::initialize();
809 
810         for _ in 0..LOCAL_QUEUE_CAP {
811             let exe_scheduler =
812                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
813             let (task, _) = Task::create_task(
814                 &builder,
815                 exe_scheduler,
816                 test_future(),
817                 VirtualTableType::Ylong,
818             );
819             local_queue.push_back(task, &global_queue);
820         }
821         assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
822 
823         let local_queue_clone_one = local_queue.clone();
824         let local_queue_clone_two = local_queue.clone();
825 
826         let thread_one = std::thread::spawn(move || {
827             for _ in 0..LOCAL_QUEUE_CAP / 2 {
828                 local_queue_clone_one.pop_front();
829             }
830         });
831 
832         let thread_two = std::thread::spawn(move || {
833             for _ in 0..LOCAL_QUEUE_CAP / 2 {
834                 local_queue_clone_two.pop_front();
835             }
836         });
837 
838         thread_one.join().expect("failed");
839         thread_two.join().expect("failed");
840         assert!(local_queue.is_empty());
841 
842         // 3. If the local queue is not empty, the multi-threaded operation to take out
843         // more than the number of existing tasks, check whether the function is correct
844         let local_queue = Arc::new(LocalQueue::new());
845 
846         let (arc_handle, _) = Driver::initialize();
847 
848         for _ in 0..LOCAL_QUEUE_CAP {
849             let exe_scheduler =
850                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
851             let (task, _) = Task::create_task(
852                 &builder,
853                 exe_scheduler,
854                 test_future(),
855                 VirtualTableType::Ylong,
856             );
857             local_queue.push_back(task, &global_queue);
858         }
859         assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
860 
861         let local_queue_clone_one = local_queue.clone();
862         let local_queue_clone_two = local_queue.clone();
863 
864         let thread_one = std::thread::spawn(move || {
865             for _ in 0..LOCAL_QUEUE_CAP {
866                 local_queue_clone_one.pop_front();
867             }
868         });
869 
870         let thread_two = std::thread::spawn(move || {
871             for _ in 0..LOCAL_QUEUE_CAP {
872                 local_queue_clone_two.pop_front();
873             }
874         });
875 
876         thread_one.join().expect("failed");
877         thread_two.join().expect("failed");
878         assert!(local_queue.is_empty());
879     }
880 
881     // InnerBuffer::steal_into() UT test cases
882     //
883 
884     /// # Brief
885     // case execution         1. In the single-threaded case, the local queue
886     // has more than half the number of tasks, steal from other local queues, the
887     // number of steals is 0, check whether the function is completed
888     //         2. In the single-threaded case, the number of tasks already in the
889     // local queue is not more than half, steal from other local queues, the number
890     // of steals is 0, check whether the function is completed         3. In the
891     // single-threaded case, the number of tasks already in the local queue is not
892     // more than half, steal from other local queues, the number of steals is not 0,
893     // check whether the function is completed         4. Multi-threaded case,
894     // other queues are doing take out operations, but steal from this queue to see
895     // if the function is completed         5. In the multi-threaded case, other
896     // queues are being stolen by non-local queues, steal from that stolen queue and
897     // see if the function is completed
898     // invalid value, and the property value should be related to the entry after
899     // the initialization is completed
ut_inner_buffer_steal_into()900     fn ut_inner_buffer_steal_into() {
901         // 1. In the single-threaded case, the local queue has more than half the number
902         // of tasks, steal from other local queues, the number of steals is 0, check
903         // whether the function is completed
904         let local_queue = LocalQueue::new();
905         let other_local_queue = LocalQueue::new();
906         let global_queue = GlobalQueue::new();
907 
908         let builder = TaskBuilder::new();
909 
910         let (arc_handle, _) = Driver::initialize();
911         for _ in 0..LOCAL_QUEUE_CAP {
912             let exe_scheduler =
913                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
914             let (task, _) = Task::create_task(
915                 &builder,
916                 exe_scheduler,
917                 test_future(),
918                 VirtualTableType::Ylong,
919             );
920             local_queue.push_back(task, &global_queue);
921         }
922 
923         let (arc_handle, _) = Driver::initialize();
924         for _ in 0..LOCAL_QUEUE_CAP {
925             let exe_scheduler =
926                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
927             let (task, _) = Task::create_task(
928                 &builder,
929                 exe_scheduler,
930                 test_future(),
931                 VirtualTableType::Ylong,
932             );
933             other_local_queue.push_back(task, &global_queue);
934         }
935 
936         assert!(other_local_queue.steal_into(&local_queue).is_none());
937 
938         // 2. In the single-threaded case, the number of tasks already in the local
939         // queue is not more than half, steal from other local queues, the number of
940         // steals is 0, check whether the function is completed
941         let local_queue = LocalQueue::new();
942         let other_local_queue = LocalQueue::new();
943 
944         assert!(other_local_queue.steal_into(&local_queue).is_none());
945 
946         // 3. In the single-threaded case, the number of tasks already in the local
947         // queue is not more than half, steal from other local queues, the number of
948         // steals is not 0, check whether the function is completed
949         let local_queue = LocalQueue::new();
950         let other_local_queue = LocalQueue::new();
951         let global_queue = GlobalQueue::new();
952 
953         let (arc_handle, _) = Driver::initialize();
954         for _ in 0..LOCAL_QUEUE_CAP {
955             let exe_scheduler =
956                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
957             let (task, _) = Task::create_task(
958                 &builder,
959                 exe_scheduler,
960                 test_future(),
961                 VirtualTableType::Ylong,
962             );
963             other_local_queue.push_back(task, &global_queue);
964         }
965 
966         assert!(other_local_queue.steal_into(&local_queue).is_some());
967         assert_eq!(other_local_queue.len(), (LOCAL_QUEUE_CAP / 2) as u16);
968         assert_eq!(local_queue.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
969 
970         // 4. Multi-threaded case, other queues are doing take out operations, but steal
971         // from this queue to see if the function is completed
972         let local_queue = Arc::new(LocalQueue::new());
973         let local_queue_clone = local_queue.clone();
974 
975         let other_local_queue = Arc::new(LocalQueue::new());
976         let other_local_queue_clone_one = other_local_queue.clone();
977         let other_local_queue_clone_two = other_local_queue.clone();
978 
979         let global_queue = GlobalQueue::new();
980 
981         let (arc_handle, _) = Driver::initialize();
982         for _ in 0..LOCAL_QUEUE_CAP {
983             let exe_scheduler =
984                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
985             let (task, _) = Task::create_task(
986                 &builder,
987                 exe_scheduler,
988                 test_future(),
989                 VirtualTableType::Ylong,
990             );
991             other_local_queue.push_back(task, &global_queue);
992         }
993 
994         let thread_one = std::thread::spawn(move || {
995             for _ in 0..LOCAL_QUEUE_CAP / 2 {
996                 other_local_queue_clone_one.pop_front();
997             }
998         });
999 
1000         let thread_two = std::thread::spawn(move || {
1001             other_local_queue_clone_two.steal_into(&local_queue_clone);
1002         });
1003 
1004         thread_one.join().expect("failed");
1005         thread_two.join().expect("failed");
1006 
1007         assert_eq!(
1008             other_local_queue.len() + local_queue.len() + 1,
1009             (LOCAL_QUEUE_CAP / 2) as u16
1010         );
1011 
1012         // 5. In the multi-threaded case, other queues are being stolen by non-local
1013         // queues, steal from that stolen queue and see if the function is completed
1014         let local_queue_one = Arc::new(LocalQueue::new());
1015         let local_queue_one_clone = local_queue_one.clone();
1016 
1017         let local_queue_two = Arc::new(LocalQueue::new());
1018         let local_queue_two_clone = local_queue_two.clone();
1019 
1020         let other_local_queue = Arc::new(LocalQueue::new());
1021         let other_local_queue_clone_one = other_local_queue.clone();
1022         let other_local_queue_clone_two = other_local_queue.clone();
1023 
1024         let (arc_handle, _) = Driver::initialize();
1025         for _ in 0..LOCAL_QUEUE_CAP {
1026             let exe_scheduler =
1027                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
1028             let (task, _) = Task::create_task(
1029                 &builder,
1030                 exe_scheduler,
1031                 test_future(),
1032                 VirtualTableType::Ylong,
1033             );
1034             other_local_queue.push_back(task, &global_queue);
1035         }
1036 
1037         let thread_one = std::thread::spawn(move || {
1038             park();
1039             other_local_queue_clone_one.steal_into(&local_queue_one_clone);
1040         });
1041 
1042         let thread_two = std::thread::spawn(move || {
1043             other_local_queue_clone_two.steal_into(&local_queue_two_clone);
1044         });
1045 
1046         thread_two.join().expect("failed");
1047         thread_one.thread().unpark();
1048         thread_one.join().expect("failed");
1049 
1050         assert_eq!(local_queue_two.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
1051         assert_eq!(local_queue_one.len(), (LOCAL_QUEUE_CAP / 4 - 1) as u16);
1052     }
1053 }
1054