1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cell::UnsafeCell;
15 use std::collections::linked_list::LinkedList;
16 use std::mem::MaybeUninit;
17 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
18 use std::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
19 use std::sync::{Arc, Mutex};
20 use std::{cmp, ptr};
21
22 /// Schedule strategy implementation, includes FIFO LIFO priority and
23 /// work-stealing work-stealing strategy include stealing half of every worker
24 /// or the largest amount of worker
25 use crate::task::Task;
26
non_atomic_load(data: &AtomicU16) -> u1627 unsafe fn non_atomic_load(data: &AtomicU16) -> u16 {
28 ptr::read(data as *const AtomicU16 as *const u16)
29 }
30
31 /// Capacity of the local queue
32 pub(crate) const LOCAL_QUEUE_CAP: usize = 256;
33 const MASK: u16 = LOCAL_QUEUE_CAP as u16 - 1;
34
35 /// Local queue of the worker
36 pub(crate) struct LocalQueue {
37 pub(crate) inner: Arc<InnerBuffer>,
38 }
39
40 unsafe impl Send for LocalQueue {}
41 unsafe impl Sync for LocalQueue {}
42
43 unsafe impl Send for InnerBuffer {}
44 unsafe impl Sync for InnerBuffer {}
45
46 impl LocalQueue {
new() -> Self47 pub(crate) fn new() -> Self {
48 LocalQueue {
49 inner: Arc::new(InnerBuffer::new(LOCAL_QUEUE_CAP as u16)),
50 }
51 }
52 }
53
54 #[inline]
unwrap(num: u32) -> (u16, u16)55 fn unwrap(num: u32) -> (u16, u16) {
56 let head_pos = num & u16::MAX as u32;
57 let steal_pos = num >> 16;
58 (steal_pos as u16, head_pos as u16)
59 }
60
61 #[inline]
wrap(steal_pos: u16, head_pos: u16) -> u3262 fn wrap(steal_pos: u16, head_pos: u16) -> u32 {
63 (head_pos as u32) | ((steal_pos as u32) << 16)
64 }
65
66 impl LocalQueue {
67 #[inline]
pop_front(&self) -> Option<Task>68 pub(crate) fn pop_front(&self) -> Option<Task> {
69 self.inner.pop_front()
70 }
71
72 #[inline]
push_back(&self, task: Task, global: &GlobalQueue)73 pub(crate) fn push_back(&self, task: Task, global: &GlobalQueue) {
74 self.inner.push_back(task, global);
75 }
76
77 #[inline]
steal_into(&self, dst: &LocalQueue) -> Option<Task>78 pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
79 self.inner.steal_into(dst)
80 }
81
82 #[inline]
is_empty(&self) -> bool83 pub(crate) fn is_empty(&self) -> bool {
84 self.inner.is_empty()
85 }
86
87 #[inline]
remaining(&self) -> u1688 pub(crate) fn remaining(&self) -> u16 {
89 self.inner.remaining()
90 }
91 }
92
93 #[cfg(feature = "metrics")]
94 impl LocalQueue {
95 #[inline]
len(&self) -> u1696 pub(crate) fn len(&self) -> u16 {
97 self.inner.len()
98 }
99
100 #[inline]
count(&self) -> usize101 pub(crate) fn count(&self) -> usize {
102 self.inner.count()
103 }
104
105 #[inline]
task_from_global_count(&self) -> usize106 pub(crate) fn task_from_global_count(&self) -> usize {
107 self.inner.task_from_global_count()
108 }
109
110 #[inline]
task_to_global_count(&self) -> usize111 pub(crate) fn task_to_global_count(&self) -> usize {
112 self.inner.task_to_global_count()
113 }
114 }
115
116 pub(crate) struct InnerBuffer {
117 /// Front stores the position of both head and steal
118 front: AtomicU32,
119 rear: AtomicU16,
120 cap: u16,
121 buffer: Box<[UnsafeCell<MaybeUninit<Task>>]>,
122 #[cfg(feature = "metrics")]
123 metrics: InnerBufferMetrics,
124 }
125
126 /// Metrics of InnerBuffer
127 #[cfg(feature = "metrics")]
128 struct InnerBufferMetrics {
129 /// The total number of task which has entered this LocalQueue
130 count: AtomicUsize,
131 /// The total number of task which has entered this LocalQueue from
132 /// GlobalQueue
133 task_from_global_count: AtomicUsize,
134 /// The total number of task which has entered GlobalQueue from this
135 /// LocalQueue
136 task_to_global_count: AtomicUsize,
137 }
138
139 #[cfg(feature = "metrics")]
140 impl InnerBuffer {
141 /// Return queue's len.
len(&self) -> u16142 fn len(&self) -> u16 {
143 let rear = self.rear.load(Acquire);
144 let (_, head) = unwrap(self.front.load(Acquire));
145 rear.wrapping_sub(head)
146 }
147
148 /// Returns the total number of task which has entered this LocalQueue
count(&self) -> usize149 fn count(&self) -> usize {
150 self.metrics.count.load(Acquire)
151 }
152
153 /// Returns the total number of task which has entered this LocalQueue from
154 /// GlobalQueue
task_from_global_count(&self) -> usize155 fn task_from_global_count(&self) -> usize {
156 self.metrics.task_from_global_count.load(Acquire)
157 }
158
159 /// Returns the total number of task which has entered GlobalQueue from this
160 /// LocalQueue
task_to_global_count(&self) -> usize161 fn task_to_global_count(&self) -> usize {
162 self.metrics.task_to_global_count.load(Acquire)
163 }
164 }
165
166 impl InnerBuffer {
new(cap: u16) -> Self167 fn new(cap: u16) -> Self {
168 let mut buffer = Vec::with_capacity(cap as usize);
169
170 for _ in 0..cap {
171 buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
172 }
173 InnerBuffer {
174 front: AtomicU32::new(0),
175 rear: AtomicU16::new(0),
176 cap,
177 buffer: buffer.into(),
178 #[cfg(feature = "metrics")]
179 metrics: InnerBufferMetrics {
180 count: AtomicUsize::new(0),
181 task_from_global_count: AtomicUsize::new(0),
182 task_to_global_count: AtomicUsize::new(0),
183 },
184 }
185 }
186
187 /// Checks whether the queue is empty
is_empty(&self) -> bool188 fn is_empty(&self) -> bool {
189 let (_, head) = unwrap(self.front.load(Acquire));
190 let rear = self.rear.load(Acquire);
191 head == rear
192 }
193
pop_front(&self) -> Option<Task>194 pub(crate) fn pop_front(&self) -> Option<Task> {
195 let mut head = self.front.load(Acquire);
196
197 let pos = loop {
198 let (steal_pos, real_pos) = unwrap(head);
199
200 // it's a spmc queue, so the queue could read its own tail non-atomically
201 let tail_pos = unsafe { non_atomic_load(&self.rear) };
202
203 // return none if the queue is empty
204 if real_pos == tail_pos {
205 return None;
206 }
207
208 let next_real = real_pos.wrapping_add(1);
209 let next = if steal_pos == real_pos {
210 wrap(next_real, next_real)
211 } else {
212 wrap(steal_pos, next_real)
213 };
214
215 let res = self
216 .front
217 .compare_exchange_weak(head, next, AcqRel, Acquire);
218 match res {
219 Ok(_) => break real_pos,
220 Err(actual) => head = actual,
221 }
222 };
223
224 let task = self.buffer[(pos & MASK) as usize].get();
225
226 Some(unsafe { ptr::read(task).assume_init() })
227 }
228
remaining(&self) -> u16229 pub(crate) fn remaining(&self) -> u16 {
230 let front = self.front.load(Acquire);
231
232 let (steal_pos, _real_pos) = unwrap(front);
233 // it's a spmc queue, so the queue could read its own tail non-atomically
234 let rear = unsafe { non_atomic_load(&self.rear) };
235
236 self.cap - (rear.wrapping_sub(steal_pos))
237 }
238
sync_steal_pos(&self, mut prev: u32)239 fn sync_steal_pos(&self, mut prev: u32) {
240 loop {
241 let (_front_steal, front_real) = unwrap(prev);
242 let next = wrap(front_real, front_real);
243 let res = self.front.compare_exchange(prev, next, AcqRel, Acquire);
244
245 match res {
246 Ok(_) => {
247 return;
248 }
249 Err(actual) => {
250 let (actual_steal_pos, actual_real_pos) = unwrap(actual);
251 if actual_steal_pos == actual_real_pos {
252 panic!("steal_pos and real_pos should not be the same");
253 }
254 prev = actual;
255 }
256 }
257 }
258 }
259
push_back(&self, mut task: Task, global: &GlobalQueue)260 pub(crate) fn push_back(&self, mut task: Task, global: &GlobalQueue) {
261 loop {
262 let front = self.front.load(Acquire);
263
264 let (steal_pos, _) = unwrap(front);
265 // it's a spmc queue, so the queue could read its own tail non-atomically
266 let rear = unsafe { non_atomic_load(&self.rear) };
267
268 // if the local queue is full, push the task into the global queue
269 if rear.wrapping_sub(steal_pos) < self.cap {
270 let idx = (rear & MASK) as usize;
271 let ptr = self.buffer[idx].get();
272 unsafe {
273 ptr::write((*ptr).as_mut_ptr(), task);
274 }
275 self.rear.store(rear.wrapping_add(1), SeqCst);
276 #[cfg(feature = "metrics")]
277 self.metrics.count.fetch_add(1, AcqRel);
278 return;
279 } else {
280 match self.push_overflowed(task, global, steal_pos) {
281 Ok(_) => return,
282 Err(ret) => task = ret,
283 }
284 }
285 }
286 }
287
288 #[allow(unused_assignments)]
push_overflowed( &self, task: Task, global: &GlobalQueue, front: u16, ) -> Result<(), Task>289 pub(crate) fn push_overflowed(
290 &self,
291 task: Task,
292 global: &GlobalQueue,
293 front: u16,
294 ) -> Result<(), Task> {
295 // get the number of tasks the worker has stolen
296 let count = LOCAL_QUEUE_CAP / 2;
297 let prev = wrap(front, front);
298 let next = wrap(front, front.wrapping_add(count as u16));
299
300 match self.front.compare_exchange(prev, next, Release, Acquire) {
301 Ok(_) => {}
302 Err(_) => return Err(task),
303 }
304
305 let (mut src_front_steal, _src_front_real) = unwrap(prev);
306
307 let mut tmp_buf = Vec::with_capacity(count);
308 for _ in 0..count {
309 tmp_buf.push(UnsafeCell::new(MaybeUninit::uninit()));
310 }
311
312 for dst_ptr in tmp_buf.iter().take(count) {
313 let src_idx = (src_front_steal & MASK) as usize;
314 let task_ptr = self.buffer[src_idx].get();
315 let task = unsafe { ptr::read(task_ptr).assume_init() };
316 unsafe {
317 ptr::write((*dst_ptr.get()).as_mut_ptr(), task);
318 }
319 src_front_steal = src_front_steal.wrapping_add(1);
320 }
321
322 self.sync_steal_pos(next);
323
324 #[cfg(feature = "metrics")]
325 self.metrics
326 .task_to_global_count
327 .fetch_add(tmp_buf.len() + 1, AcqRel);
328
329 global.push_batch(tmp_buf, task);
330
331 Ok(())
332 }
333
steal_into(&self, dst: &LocalQueue) -> Option<Task>334 pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
335 // it's a spmc queue, so the queue could read its own tail non-atomically
336 let mut dst_rear = unsafe { non_atomic_load(&dst.inner.rear) };
337 let (des_steal_pos, _des_front_pos) = unwrap(dst.inner.front.load(Acquire));
338 if dst_rear.wrapping_sub(des_steal_pos) > LOCAL_QUEUE_CAP as u16 / 2 {
339 return None;
340 }
341
342 let mut src_next_front;
343 let mut src_prev_front = self.front.load(Acquire);
344
345 // get the number of tasks the worker has stolen
346 let mut count = loop {
347 let (src_front_steal, src_front_real) = unwrap(src_prev_front);
348
349 // if these two values are not equal, it means another worker has stolen from
350 // this queue, therefore abort this steal.
351 if src_front_steal != src_front_real {
352 return None;
353 };
354
355 let src_rear = self.rear.load(Acquire);
356
357 // steal half of the tasks from the queue
358 let mut n = src_rear.wrapping_sub(src_front_real);
359 n = n - n / 2;
360 if n == 0 {
361 return None;
362 }
363
364 let src_steal_to = src_front_real.wrapping_add(n);
365 src_next_front = wrap(src_front_steal, src_steal_to);
366
367 let res =
368 self.front
369 .compare_exchange_weak(src_prev_front, src_next_front, AcqRel, Acquire);
370 match res {
371 Ok(_) => break n,
372 Err(actual) => src_prev_front = actual,
373 }
374 };
375
376 // transfer the tasks
377 let (mut src_front_steal, _src_front_real) = unwrap(src_next_front);
378 count -= 1;
379 for _ in 0..count {
380 let src_idx = (src_front_steal & MASK) as usize;
381 let des_idx = (dst_rear & MASK) as usize;
382
383 let task_ptr = self.buffer[src_idx].get();
384
385 let task = unsafe { ptr::read(task_ptr).assume_init() };
386 let ptr = dst.inner.buffer[des_idx].get();
387 unsafe {
388 ptr::write((*ptr).as_mut_ptr(), task);
389 }
390 src_front_steal = src_front_steal.wrapping_add(1);
391 dst_rear = dst_rear.wrapping_add(1);
392 }
393
394 let src_idx = (src_front_steal & MASK) as usize;
395
396 let task_ptr = self.buffer[src_idx].get();
397 let task = unsafe { ptr::read(task_ptr).assume_init() };
398 if count != 0 {
399 dst.inner.rear.store(dst_rear, SeqCst);
400 }
401
402 self.sync_steal_pos(src_next_front);
403
404 Some(task)
405 }
406 }
407
408 impl Drop for InnerBuffer {
drop(&mut self)409 fn drop(&mut self) {
410 while self.pop_front().is_some() {}
411 }
412 }
413
414 pub(crate) struct GlobalQueue {
415 /// Current number of tasks
416 len: AtomicUsize,
417 /// The total number of tasks which has entered global queue.
418 #[cfg(feature = "metrics")]
419 count: AtomicUsize,
420 globals: Mutex<LinkedList<Task>>,
421 }
422
423 impl GlobalQueue {
new() -> Self424 pub(crate) fn new() -> Self {
425 GlobalQueue {
426 len: AtomicUsize::new(0_usize),
427 #[cfg(feature = "metrics")]
428 count: AtomicUsize::new(0_usize),
429 globals: Mutex::new(LinkedList::new()),
430 }
431 }
is_empty(&self) -> bool432 pub(super) fn is_empty(&self) -> bool {
433 self.len.load(Acquire) == 0
434 }
435
push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task)436 pub(super) fn push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task) {
437 let mut list = self.globals.lock().unwrap();
438 let len = tasks.len() + 1;
439 for task_ptr in tasks {
440 let task = unsafe { ptr::read(task_ptr.get()).assume_init() };
441 list.push_back(task);
442 }
443 list.push_back(task);
444 self.len.fetch_add(len, AcqRel);
445 #[cfg(feature = "metrics")]
446 self.count.fetch_add(len, AcqRel);
447 }
448
pop_batch( &self, worker_num: usize, queue: &LocalQueue, limit: usize, ) -> Option<Task>449 pub(super) fn pop_batch(
450 &self,
451 worker_num: usize,
452 queue: &LocalQueue,
453 limit: usize,
454 ) -> Option<Task> {
455 let len = self.len.load(Acquire);
456 let num = cmp::min(len / worker_num, limit);
457
458 let inner_buf = &queue.inner;
459 // it's a spmc queue, so the queue could read its own tail non-atomically
460 let rear = unsafe { non_atomic_load(&inner_buf.rear) };
461 let mut curr = rear;
462
463 let mut list = self.globals.lock().unwrap();
464 let first_task = list.pop_front()?;
465
466 let mut count = 1;
467
468 for _ in 1..num {
469 if let Some(task) = list.pop_front() {
470 let idx = (curr & MASK) as usize;
471 let ptr = inner_buf.buffer[idx].get();
472 unsafe {
473 ptr::write((*ptr).as_mut_ptr(), task);
474 }
475 curr = curr.wrapping_add(1);
476 count += 1;
477 } else {
478 break;
479 }
480 }
481 drop(list);
482 self.len.fetch_sub(count, AcqRel);
483 inner_buf.rear.store(curr, Release);
484
485 #[cfg(feature = "metrics")]
486 inner_buf
487 .metrics
488 .task_from_global_count
489 .fetch_add(1, AcqRel);
490
491 Some(first_task)
492 }
493
pop_front(&self) -> Option<Task>494 pub(super) fn pop_front(&self) -> Option<Task> {
495 if self.is_empty() {
496 return None;
497 }
498 let mut list = self.globals.lock().unwrap();
499 let task = list.pop_front();
500 drop(list);
501 if task.is_some() {
502 self.len.fetch_sub(1, AcqRel);
503 }
504 task
505 }
506
push_back(&self, task: Task)507 pub(super) fn push_back(&self, task: Task) {
508 let mut list = self.globals.lock().unwrap();
509 list.push_back(task);
510 drop(list);
511 self.len.fetch_add(1, AcqRel);
512 #[cfg(feature = "metrics")]
513 self.count.fetch_add(1, AcqRel);
514 }
515
get_global(&self) -> &Mutex<LinkedList<Task>>516 pub(super) fn get_global(&self) -> &Mutex<LinkedList<Task>> {
517 &self.globals
518 }
519
520 #[cfg(feature = "metrics")]
get_len(&self) -> usize521 pub(crate) fn get_len(&self) -> usize {
522 self.len.load(Acquire)
523 }
524
525 #[cfg(feature = "metrics")]
get_count(&self) -> usize526 pub(crate) fn get_count(&self) -> usize {
527 self.count.load(Acquire)
528 }
529 }
530
531 #[cfg(feature = "multi_instance_runtime")]
532 #[cfg(test)]
533 mod test {
534 use std::future::Future;
535 use std::pin::Pin;
536 use std::sync::atomic::Ordering::Acquire;
537 use std::sync::Arc;
538 use std::task::{Context, Poll};
539 use std::thread::park;
540
541 use crate::executor::async_pool::MultiThreadScheduler;
542 use crate::executor::driver::Driver;
543 use crate::executor::queue::{GlobalQueue, InnerBuffer, LocalQueue, LOCAL_QUEUE_CAP};
544 use crate::task::{Task, TaskBuilder, VirtualTableType};
545
546 #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
547 impl InnerBuffer {
len(&self) -> u16548 fn len(&self) -> u16 {
549 let front = self.front.load(Acquire);
550 let (_, real_pos) = crate::executor::queue::unwrap(front);
551
552 let rear = self.rear.load(Acquire);
553 rear.wrapping_sub(real_pos)
554 }
555 }
556
557 #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
558 impl LocalQueue {
len(&self) -> u16559 pub fn len(&self) -> u16 {
560 self.inner.len()
561 }
562 }
563
564 pub struct TestFuture {
565 value: usize,
566 total: usize,
567 }
568
create_new() -> TestFuture569 pub fn create_new() -> TestFuture {
570 TestFuture {
571 value: 0,
572 total: 1000,
573 }
574 }
575
576 impl Future for TestFuture {
577 type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>578 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579 if self.total > self.value {
580 // unsafe {
581 // Pin::get_unchecked_mut(self).value += 1;
582
583 //}
584 self.get_mut().value += 1;
585 cx.waker().wake_by_ref();
586 Poll::Pending
587 } else {
588 Poll::Ready(self.total)
589 }
590 }
591 }
592
test_future() -> usize593 async fn test_future() -> usize {
594 create_new().await
595 }
596
597 #[test]
ut_inner_buffer()598 fn ut_inner_buffer() {
599 ut_inner_buffer_new();
600 ut_inner_buffer_len();
601 ut_inner_buffer_is_empty();
602 ut_inner_buffer_push_back();
603 ut_inner_buffer_pop_front();
604 ut_inner_buffer_steal_into();
605 }
606
607 /// UT test cases for InnerBuffer::new()
608 ///
609 /// # Brief
610 /// 1. Checking the parameters after initialization is completed
ut_inner_buffer_new()611 fn ut_inner_buffer_new() {
612 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
613 assert_eq!(inner_buffer.cap, LOCAL_QUEUE_CAP as u16);
614 assert_eq!(inner_buffer.buffer.len(), LOCAL_QUEUE_CAP);
615 }
616
617 // InnerBuffer::is_empty() UT test cases
618 //
619
620 /// # Brief
621 // case execution 1. Checking the parameters after initialization is
622 // completed 2. After entering a task into the queue space,
623 // determine again whether it is empty or not, and it should be non-empty
624 //
625 // property value should be related to the entry after the initialization is
626 // completed
ut_inner_buffer_is_empty()627 fn ut_inner_buffer_is_empty() {
628 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
629 assert!(inner_buffer.is_empty());
630
631 let builder = TaskBuilder::new();
632
633 let (arc_handle, _) = Driver::initialize();
634
635 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
636 let (task, _) = Task::create_task(
637 &builder,
638 exe_scheduler,
639 test_future(),
640 VirtualTableType::Ylong,
641 );
642 let global_queue = GlobalQueue::new();
643 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
644 inner_buffer.push_back(task, &global_queue);
645 assert!(!inner_buffer.is_empty());
646 }
647
648 // InnerBuffer::len() UT test cases
649 //
650
651 /// # Brief
652 // case execution 1. Checking the parameters after initialization is
653 // completed 2. Insert tasks up to their capacity into the local
654 // queue, checking the local queue length 3. Insert tasks into the
655 // local queue that exceed its capacity, checking the local queue length as well
656 // as the global queue length
657 // value, no exception branch, and the property value should be related to the
658 // entry after the initialization is completed
ut_inner_buffer_len()659 fn ut_inner_buffer_len() {
660 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
661 assert_eq!(inner_buffer.len(), 0);
662
663 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
664 let global_queue = GlobalQueue::new();
665 let builder = TaskBuilder::new();
666
667 let (arc_handle, _) = Driver::initialize();
668
669 let exe_scheduler =
670 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
671 let (task, _) = Task::create_task(
672 &builder,
673 exe_scheduler,
674 test_future(),
675 VirtualTableType::Ylong,
676 );
677 inner_buffer.push_back(task, &global_queue);
678 assert_eq!(inner_buffer.len(), 1);
679
680 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
681 let global_queue = GlobalQueue::new();
682 for _ in 0..LOCAL_QUEUE_CAP + 1 {
683 let exe_scheduler =
684 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
685 let (task, _) = Task::create_task(
686 &builder,
687 exe_scheduler,
688 test_future(),
689 VirtualTableType::Ylong,
690 );
691 inner_buffer.push_back(task, &global_queue);
692 }
693 assert_eq!(
694 inner_buffer.len() as usize,
695 LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
696 );
697 assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
698 }
699
700 // InnerBuffer::push_back() UT test cases
701 //
702
703 /// # Brief
704 // case execution 1. Insert tasks up to capacity into the local
705 // queue, verifying that they are functionally correct 2. Insert
706 // tasks that exceed the capacity into the local queue and verify that they are
707 // functionally correct
708 // there is an exception branch, after the initialization is completed the
709 // property value should be related to the entry
ut_inner_buffer_push_back()710 fn ut_inner_buffer_push_back() {
711 // 1. Insert tasks up to capacity into the local queue, verifying that they are
712 // functionally correct
713 let local_queue = LocalQueue::new();
714 let global_queue = GlobalQueue::new();
715
716 let (arc_handle, _) = Driver::initialize();
717
718 let builder = TaskBuilder::new();
719 for _ in 0..LOCAL_QUEUE_CAP / 2 {
720 let exe_scheduler =
721 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
722 let (task, _) = Task::create_task(
723 &builder,
724 exe_scheduler,
725 test_future(),
726 VirtualTableType::Ylong,
727 );
728 local_queue.push_back(task, &global_queue);
729 }
730
731 for _ in 0..LOCAL_QUEUE_CAP / 2 {
732 let exe_scheduler =
733 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
734 let (task, _) = Task::create_task(
735 &builder,
736 exe_scheduler,
737 test_future(),
738 VirtualTableType::Ylong,
739 );
740 local_queue.push_back(task, &global_queue);
741 }
742
743 assert_eq!(local_queue.len(), 256);
744
745 // 2. Insert tasks that exceed the capacity into the local queue and verify that
746 // they are functionally correct
747 let local_queue = LocalQueue::new();
748 let global_queue = GlobalQueue::new();
749
750 let (arc_handle, _) = Driver::initialize();
751
752 for _ in 0..LOCAL_QUEUE_CAP / 2 + 1 {
753 let exe_scheduler =
754 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
755 let (task, _) = Task::create_task(
756 &builder,
757 exe_scheduler,
758 test_future(),
759 VirtualTableType::Ylong,
760 );
761 local_queue.push_back(task, &global_queue);
762 }
763
764 for _ in 0..LOCAL_QUEUE_CAP / 2 {
765 let exe_scheduler =
766 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
767 let (task, _) = Task::create_task(
768 &builder,
769 exe_scheduler,
770 test_future(),
771 VirtualTableType::Ylong,
772 );
773 local_queue.push_back(task, &global_queue);
774 }
775
776 assert_eq!(
777 local_queue.len() as usize,
778 LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
779 );
780 assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
781 }
782
783 // InnerBuffer::pop_front() UT test cases
784 //
785
786 /// # Brief
787 // case execution 1. Multi-threaded take out task operation with
788 // empty local queue, check if the function is correct 2. If the
789 // local queue is not empty, multi-threaded take out operations up to the number
790 // of existing tasks and check if the function is correct 3. If the
791 // local queue is not empty, the multi-threaded operation to take out more than
792 // the number of existing tasks, check whether the function is correct
793 //
794 // should be related to the entry after the initialization is completed
795 //
ut_inner_buffer_pop_front()796 fn ut_inner_buffer_pop_front() {
797 // 1. Multi-threaded take out task operation with empty local queue, check if
798 // the function is correct
799 let local_queue = LocalQueue::new();
800 let global_queue = GlobalQueue::new();
801 assert!(local_queue.pop_front().is_none());
802
803 // 2. If the local queue is not empty, multi-threaded take out operations up to
804 // the number of existing tasks and check if the function is correct
805 let local_queue = Arc::new(LocalQueue::new());
806 let builder = TaskBuilder::new();
807
808 let (arc_handle, _) = Driver::initialize();
809
810 for _ in 0..LOCAL_QUEUE_CAP {
811 let exe_scheduler =
812 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
813 let (task, _) = Task::create_task(
814 &builder,
815 exe_scheduler,
816 test_future(),
817 VirtualTableType::Ylong,
818 );
819 local_queue.push_back(task, &global_queue);
820 }
821 assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
822
823 let local_queue_clone_one = local_queue.clone();
824 let local_queue_clone_two = local_queue.clone();
825
826 let thread_one = std::thread::spawn(move || {
827 for _ in 0..LOCAL_QUEUE_CAP / 2 {
828 local_queue_clone_one.pop_front();
829 }
830 });
831
832 let thread_two = std::thread::spawn(move || {
833 for _ in 0..LOCAL_QUEUE_CAP / 2 {
834 local_queue_clone_two.pop_front();
835 }
836 });
837
838 thread_one.join().expect("failed");
839 thread_two.join().expect("failed");
840 assert!(local_queue.is_empty());
841
842 // 3. If the local queue is not empty, the multi-threaded operation to take out
843 // more than the number of existing tasks, check whether the function is correct
844 let local_queue = Arc::new(LocalQueue::new());
845
846 let (arc_handle, _) = Driver::initialize();
847
848 for _ in 0..LOCAL_QUEUE_CAP {
849 let exe_scheduler =
850 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
851 let (task, _) = Task::create_task(
852 &builder,
853 exe_scheduler,
854 test_future(),
855 VirtualTableType::Ylong,
856 );
857 local_queue.push_back(task, &global_queue);
858 }
859 assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
860
861 let local_queue_clone_one = local_queue.clone();
862 let local_queue_clone_two = local_queue.clone();
863
864 let thread_one = std::thread::spawn(move || {
865 for _ in 0..LOCAL_QUEUE_CAP {
866 local_queue_clone_one.pop_front();
867 }
868 });
869
870 let thread_two = std::thread::spawn(move || {
871 for _ in 0..LOCAL_QUEUE_CAP {
872 local_queue_clone_two.pop_front();
873 }
874 });
875
876 thread_one.join().expect("failed");
877 thread_two.join().expect("failed");
878 assert!(local_queue.is_empty());
879 }
880
881 // InnerBuffer::steal_into() UT test cases
882 //
883
884 /// # Brief
885 // case execution 1. In the single-threaded case, the local queue
886 // has more than half the number of tasks, steal from other local queues, the
887 // number of steals is 0, check whether the function is completed
888 // 2. In the single-threaded case, the number of tasks already in the
889 // local queue is not more than half, steal from other local queues, the number
890 // of steals is 0, check whether the function is completed 3. In the
891 // single-threaded case, the number of tasks already in the local queue is not
892 // more than half, steal from other local queues, the number of steals is not 0,
893 // check whether the function is completed 4. Multi-threaded case,
894 // other queues are doing take out operations, but steal from this queue to see
895 // if the function is completed 5. In the multi-threaded case, other
896 // queues are being stolen by non-local queues, steal from that stolen queue and
897 // see if the function is completed
898 // invalid value, and the property value should be related to the entry after
899 // the initialization is completed
ut_inner_buffer_steal_into()900 fn ut_inner_buffer_steal_into() {
901 // 1. In the single-threaded case, the local queue has more than half the number
902 // of tasks, steal from other local queues, the number of steals is 0, check
903 // whether the function is completed
904 let local_queue = LocalQueue::new();
905 let other_local_queue = LocalQueue::new();
906 let global_queue = GlobalQueue::new();
907
908 let builder = TaskBuilder::new();
909
910 let (arc_handle, _) = Driver::initialize();
911 for _ in 0..LOCAL_QUEUE_CAP {
912 let exe_scheduler =
913 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
914 let (task, _) = Task::create_task(
915 &builder,
916 exe_scheduler,
917 test_future(),
918 VirtualTableType::Ylong,
919 );
920 local_queue.push_back(task, &global_queue);
921 }
922
923 let (arc_handle, _) = Driver::initialize();
924 for _ in 0..LOCAL_QUEUE_CAP {
925 let exe_scheduler =
926 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
927 let (task, _) = Task::create_task(
928 &builder,
929 exe_scheduler,
930 test_future(),
931 VirtualTableType::Ylong,
932 );
933 other_local_queue.push_back(task, &global_queue);
934 }
935
936 assert!(other_local_queue.steal_into(&local_queue).is_none());
937
938 // 2. In the single-threaded case, the number of tasks already in the local
939 // queue is not more than half, steal from other local queues, the number of
940 // steals is 0, check whether the function is completed
941 let local_queue = LocalQueue::new();
942 let other_local_queue = LocalQueue::new();
943
944 assert!(other_local_queue.steal_into(&local_queue).is_none());
945
946 // 3. In the single-threaded case, the number of tasks already in the local
947 // queue is not more than half, steal from other local queues, the number of
948 // steals is not 0, check whether the function is completed
949 let local_queue = LocalQueue::new();
950 let other_local_queue = LocalQueue::new();
951 let global_queue = GlobalQueue::new();
952
953 let (arc_handle, _) = Driver::initialize();
954 for _ in 0..LOCAL_QUEUE_CAP {
955 let exe_scheduler =
956 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
957 let (task, _) = Task::create_task(
958 &builder,
959 exe_scheduler,
960 test_future(),
961 VirtualTableType::Ylong,
962 );
963 other_local_queue.push_back(task, &global_queue);
964 }
965
966 assert!(other_local_queue.steal_into(&local_queue).is_some());
967 assert_eq!(other_local_queue.len(), (LOCAL_QUEUE_CAP / 2) as u16);
968 assert_eq!(local_queue.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
969
970 // 4. Multi-threaded case, other queues are doing take out operations, but steal
971 // from this queue to see if the function is completed
972 let local_queue = Arc::new(LocalQueue::new());
973 let local_queue_clone = local_queue.clone();
974
975 let other_local_queue = Arc::new(LocalQueue::new());
976 let other_local_queue_clone_one = other_local_queue.clone();
977 let other_local_queue_clone_two = other_local_queue.clone();
978
979 let global_queue = GlobalQueue::new();
980
981 let (arc_handle, _) = Driver::initialize();
982 for _ in 0..LOCAL_QUEUE_CAP {
983 let exe_scheduler =
984 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
985 let (task, _) = Task::create_task(
986 &builder,
987 exe_scheduler,
988 test_future(),
989 VirtualTableType::Ylong,
990 );
991 other_local_queue.push_back(task, &global_queue);
992 }
993
994 let thread_one = std::thread::spawn(move || {
995 for _ in 0..LOCAL_QUEUE_CAP / 2 {
996 other_local_queue_clone_one.pop_front();
997 }
998 });
999
1000 let thread_two = std::thread::spawn(move || {
1001 other_local_queue_clone_two.steal_into(&local_queue_clone);
1002 });
1003
1004 thread_one.join().expect("failed");
1005 thread_two.join().expect("failed");
1006
1007 assert_eq!(
1008 other_local_queue.len() + local_queue.len() + 1,
1009 (LOCAL_QUEUE_CAP / 2) as u16
1010 );
1011
1012 // 5. In the multi-threaded case, other queues are being stolen by non-local
1013 // queues, steal from that stolen queue and see if the function is completed
1014 let local_queue_one = Arc::new(LocalQueue::new());
1015 let local_queue_one_clone = local_queue_one.clone();
1016
1017 let local_queue_two = Arc::new(LocalQueue::new());
1018 let local_queue_two_clone = local_queue_two.clone();
1019
1020 let other_local_queue = Arc::new(LocalQueue::new());
1021 let other_local_queue_clone_one = other_local_queue.clone();
1022 let other_local_queue_clone_two = other_local_queue.clone();
1023
1024 let (arc_handle, _) = Driver::initialize();
1025 for _ in 0..LOCAL_QUEUE_CAP {
1026 let exe_scheduler =
1027 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
1028 let (task, _) = Task::create_task(
1029 &builder,
1030 exe_scheduler,
1031 test_future(),
1032 VirtualTableType::Ylong,
1033 );
1034 other_local_queue.push_back(task, &global_queue);
1035 }
1036
1037 let thread_one = std::thread::spawn(move || {
1038 park();
1039 other_local_queue_clone_one.steal_into(&local_queue_one_clone);
1040 });
1041
1042 let thread_two = std::thread::spawn(move || {
1043 other_local_queue_clone_two.steal_into(&local_queue_two_clone);
1044 });
1045
1046 thread_two.join().expect("failed");
1047 thread_one.thread().unpark();
1048 thread_one.join().expect("failed");
1049
1050 assert_eq!(local_queue_two.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
1051 assert_eq!(local_queue_one.len(), (LOCAL_QUEUE_CAP / 4 - 1) as u16);
1052 }
1053 }
1054