1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
16
17 use crate::error::ErrorKind;
18
19 /// Task is currently running
20 const RUNNING: usize = 0b0001;
21 /// Task is in the schedule list
22 const SCHEDULING: usize = 0b0100;
23 /// Task has finished
24 const FINISHED: usize = 0b1000;
25 /// Task gets canceled
26 const CANCELED: usize = 0b1_0000;
27 /// Task needs to send the finished result back to the join handle
28 const CARE_JOIN_HANDLE: usize = 0b10_0000;
29 /// Task currently holds a waker to the join handle
30 const JOIN_WAKER: usize = 0b100_0000;
31
32 const RC_MASK: usize = !0b11_1111_1111;
33
34 const RC_SHIFT: usize = RC_MASK.count_zeros() as usize;
35
36 /// Reference count
37 const REF_ONE: usize = 1 << RC_SHIFT;
38
39 /// Initial state contains two ref count, one is held by join_handle, another
40 /// one is held by task itself.
41 const INIT: usize = CARE_JOIN_HANDLE | SCHEDULING | (REF_ONE * 2);
42
43 #[inline]
ref_count(state: usize) -> usize44 pub(crate) fn ref_count(state: usize) -> usize {
45 (state & RC_MASK) >> RC_SHIFT
46 }
47
48 #[inline]
is_last_ref_count(prev: usize) -> bool49 pub(crate) fn is_last_ref_count(prev: usize) -> bool {
50 ref_count(prev) == 1
51 }
52
53 #[inline]
is_canceled(cur: usize) -> bool54 pub(crate) fn is_canceled(cur: usize) -> bool {
55 cur & CANCELED == CANCELED
56 }
57
58 #[inline]
is_care_join_handle(cur: usize) -> bool59 pub(crate) fn is_care_join_handle(cur: usize) -> bool {
60 cur & CARE_JOIN_HANDLE == CARE_JOIN_HANDLE
61 }
62
63 #[inline]
is_finished(cur: usize) -> bool64 pub(crate) fn is_finished(cur: usize) -> bool {
65 cur & FINISHED == FINISHED
66 }
67
68 #[inline]
is_set_waker(cur: usize) -> bool69 pub(crate) fn is_set_waker(cur: usize) -> bool {
70 cur & JOIN_WAKER == JOIN_WAKER
71 }
72
73 #[inline]
is_scheduling(cur: usize) -> bool74 pub(crate) fn is_scheduling(cur: usize) -> bool {
75 cur & SCHEDULING == SCHEDULING
76 }
77
78 #[inline]
is_running(cur: usize) -> bool79 pub(crate) fn is_running(cur: usize) -> bool {
80 cur & RUNNING == RUNNING
81 }
82
83 // A task need to satisfy these state requirements in order to get pushed back
84 // to the schedule list.
85 #[inline]
need_enqueue(cur: usize) -> bool86 pub(crate) fn need_enqueue(cur: usize) -> bool {
87 (cur & SCHEDULING != SCHEDULING) && (cur & RUNNING != RUNNING) && (cur & FINISHED != FINISHED)
88 }
89
90 pub(crate) enum StateAction {
91 Success,
92 Canceled(usize),
93 Failed(usize),
94 Enqueue,
95 }
96
97 pub(crate) struct TaskState(AtomicUsize);
98 impl TaskState {
99 #[inline]
new() -> Self100 pub(crate) fn new() -> Self {
101 TaskState(AtomicUsize::new(INIT))
102 }
103
104 #[inline]
dec_ref(&self) -> usize105 pub(crate) fn dec_ref(&self) -> usize {
106 self.0.fetch_sub(REF_ONE, AcqRel)
107 }
108
109 #[inline]
inc_ref(&self)110 pub(crate) fn inc_ref(&self) {
111 self.0.fetch_add(REF_ONE, AcqRel);
112 }
113
114 #[inline]
get_current_state(&self) -> usize115 pub(crate) fn get_current_state(&self) -> usize {
116 self.0.load(Acquire)
117 }
118
119 /// Turns the task state into running. Contains CAS operations.
120 ///
121 /// Fails when the task is already running, scheduling or is already
122 /// finished.
turning_to_running(&self) -> StateAction123 pub(crate) fn turning_to_running(&self) -> StateAction {
124 let mut cur = self.get_current_state();
125 loop {
126 let mut action = StateAction::Success;
127
128 if is_running(cur) || is_finished(cur) || !is_scheduling(cur) {
129 return StateAction::Failed(cur);
130 }
131
132 let mut next = cur;
133 next &= !SCHEDULING;
134 next |= RUNNING;
135 if is_canceled(next) {
136 action = StateAction::Canceled(next);
137 }
138
139 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
140 match res {
141 Ok(_) => return action,
142 Err(actual) => cur = actual,
143 }
144 }
145 }
146
147 /// Turns the task state into finished. Contains CAS operations.
148 ///
149 /// Fails when the task is already finished or is not running.
turning_to_finish(&self) -> Result<usize, ErrorKind>150 pub(crate) fn turning_to_finish(&self) -> Result<usize, ErrorKind> {
151 let mut cur = self.get_current_state();
152
153 loop {
154 if is_finished(cur) {
155 return Err(ErrorKind::TaskShutdown);
156 }
157
158 if !is_running(cur) {
159 return Err(ErrorKind::TaskStateInvalid);
160 }
161 let mut next = cur;
162 next &= !RUNNING;
163 next |= FINISHED;
164
165 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
166 match res {
167 Ok(_) => return Ok(next),
168 Err(actual) => cur = actual,
169 }
170 }
171 }
172
173 /// Turns the task state into idle. Contains CAS operations.
174 ///
175 /// Fails when the task is canceled or running.
turning_to_idle(&self) -> StateAction176 pub(crate) fn turning_to_idle(&self) -> StateAction {
177 let mut cur = self.get_current_state();
178
179 loop {
180 let mut action = StateAction::Success;
181
182 if !is_running(cur) {
183 return StateAction::Failed(cur);
184 }
185
186 if is_canceled(cur) {
187 return StateAction::Canceled(cur);
188 }
189
190 let mut next = cur;
191 next &= !RUNNING;
192
193 if is_scheduling(next) {
194 action = StateAction::Enqueue;
195 }
196
197 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
198 match res {
199 Ok(_) => return action,
200 Err(actual) => cur = actual,
201 }
202 }
203 }
204
205 /// Turns the task state into scheduling. Returns the old state value.
206 #[inline]
turn_to_scheduling(&self) -> usize207 pub(crate) fn turn_to_scheduling(&self) -> usize {
208 self.0.fetch_or(SCHEDULING, AcqRel)
209 }
210
211 /// Turns the task state into unset_waker. Contains CAS operations.
212 ///
213 /// Fails when the task is already finished.
turn_to_un_set_waker(&self) -> Result<usize, usize>214 pub(crate) fn turn_to_un_set_waker(&self) -> Result<usize, usize> {
215 let mut cur = self.get_current_state();
216
217 loop {
218 if !is_care_join_handle(cur) || !is_set_waker(cur) {
219 return Err(cur);
220 }
221
222 if is_finished(cur) {
223 return Err(cur);
224 }
225
226 let mut next = cur;
227 next &= !JOIN_WAKER;
228
229 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
230 match res {
231 Ok(_) => return Ok(next),
232 Err(actual) => cur = actual,
233 }
234 }
235 }
236
237 /// Turns off the Join_Waker bit of the task state. Contains CAS operations.
238 ///
239 /// Fails when the task is already finished.
turn_to_set_waker(&self) -> Result<usize, usize>240 pub(crate) fn turn_to_set_waker(&self) -> Result<usize, usize> {
241 let mut cur = self.get_current_state();
242
243 loop {
244 if !is_care_join_handle(cur) || is_set_waker(cur) {
245 return Err(cur);
246 }
247 if is_finished(cur) {
248 return Err(cur);
249 }
250
251 let mut next = cur;
252 next |= JOIN_WAKER;
253
254 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
255 match res {
256 Ok(_) => return Ok(next),
257 Err(actual) => cur = actual,
258 }
259 }
260 }
261
turn_to_canceled_and_scheduled(&self) -> bool262 pub(crate) fn turn_to_canceled_and_scheduled(&self) -> bool {
263 let mut cur = self.get_current_state();
264
265 loop {
266 if is_canceled(cur) || is_finished(cur) {
267 return false;
268 }
269
270 let mut next = cur;
271 let need_schedule = if is_running(cur) {
272 next |= SCHEDULING;
273 next |= CANCELED;
274 false
275 } else {
276 next |= CANCELED;
277 if !is_scheduling(next) {
278 next |= SCHEDULING;
279 true
280 } else {
281 false
282 }
283 };
284
285 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
286 match res {
287 Ok(_) => return need_schedule,
288 Err(actual) => cur = actual,
289 }
290 }
291 }
292
293 /// Turns off the CARE_JOIN_HANDLE bit of the task state. Contains CAS
294 /// operations.
295 ///
296 /// Fails when the task is already finished.
turn_to_un_join_handle(&self) -> Result<usize, ()>297 pub(crate) fn turn_to_un_join_handle(&self) -> Result<usize, ()> {
298 let mut cur = self.get_current_state();
299
300 loop {
301 if is_finished(cur) {
302 return Err(());
303 }
304
305 let mut next = cur;
306 next &= !CARE_JOIN_HANDLE;
307
308 let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
309 match res {
310 Ok(_) => return Ok(next),
311 Err(actual) => cur = actual,
312 }
313 }
314 }
315
316 /// Attempts to turn off the CARE_JOIN_HANDLE bit of the task state.
317 ///
318 /// Returns true if successfully changed. Otherwise, returns false.
try_turning_to_un_join_handle(&self) -> bool319 pub(crate) fn try_turning_to_un_join_handle(&self) -> bool {
320 let old = INIT;
321 let new = (INIT - REF_ONE) & !CARE_JOIN_HANDLE;
322 self.0.compare_exchange(old, new, Relaxed, Relaxed) == Ok(old)
323 }
324
325 cfg_not_ffrt! {
326 /// Turns the task state into canceled. Returns the old state value.
327 #[inline]
328 pub(crate) fn set_cancel(&self) -> usize {
329 self.0.fetch_or(CANCELED, AcqRel)
330 }
331
332 /// Turns the task state into running. Returns the old state value.
333 #[inline]
334 pub(crate) fn set_running(&self) {
335 self.0.fetch_or(RUNNING, AcqRel);
336 }
337 }
338 }
339
340 #[cfg(test)]
341 mod test {
342 use std::sync::atomic::Ordering::{Acquire, Release};
343
344 use crate::task::state::{
345 StateAction, TaskState, CANCELED, CARE_JOIN_HANDLE, FINISHED, INIT, JOIN_WAKER, REF_ONE,
346 RUNNING, SCHEDULING,
347 };
348
349 /// UT test cases for TaskState::new()
350 ///
351 /// # Brief
352 /// 1. Verify that the status of the initialized completed task is INIT
353 #[test]
ut_task_state_new()354 fn ut_task_state_new() {
355 let task_state = TaskState::new();
356 assert_eq!(task_state.0.load(Acquire), INIT);
357 }
358
359 /// UT test cases for TaskState::dec_ref()
360 ///
361 /// # Brief
362 /// 1. Verify that the status of the initialized completed task is
363 /// INIT.wrapping_sub(REF_ONE) value should be INIT.wrapping_sub(REF_ONE)
364 #[test]
ut_task_state_dec_ref()365 fn ut_task_state_dec_ref() {
366 let task_state = TaskState::new();
367 task_state.dec_ref();
368 assert_eq!(task_state.0.load(Acquire), INIT.wrapping_sub(REF_ONE))
369 }
370
371 /// UT test cases for TaskState::inc_ref()
372 ///
373 /// # Brief
374 /// 1. Verify that the status of the initialized completed task is
375 /// INIT.wrapping_add(REF_ONE)
376 #[test]
ut_task_state_inc_ref()377 fn ut_task_state_inc_ref() {
378 let task_state = TaskState::new();
379 task_state.inc_ref();
380 assert_eq!(task_state.0.load(Acquire), INIT.wrapping_add(REF_ONE));
381 }
382
383 /// UT test cases for TaskState::get_current_state()
384 ///
385 /// # Brief
386 /// 1. Verify that the status of the initialized completed task is INIT
387 #[test]
ut_task_state_get_current_state()388 fn ut_task_state_get_current_state() {
389 let task_state = TaskState::new();
390 assert_eq!(task_state.get_current_state(), INIT);
391 }
392
393 /// UT test cases for TaskState::turning_to_running()
394 ///
395 /// # Brief
396 /// 1. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == true,
397 /// represents the current state is already running state or has ended
398 /// the state, the state does not information is not correct, directly
399 /// return failure
400 /// 2. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == false,
401 /// cur & SCHEDULING != SCHEDULING == true, means the current state is
402 /// not schedule state, and the status information is not correct, so it
403 /// returns an error directly
404 #[test]
ut_task_state_turning_to_running()405 fn ut_task_state_turning_to_running() {
406 let task_state = TaskState::new();
407 let mut test_task_state = INIT;
408 test_task_state &= !SCHEDULING;
409 test_task_state |= RUNNING;
410
411 match task_state.turning_to_running() {
412 StateAction::Success => {}
413 _ => panic!(),
414 }
415
416 match task_state.turning_to_running() {
417 StateAction::Failed(x) => assert_eq!(x, test_task_state),
418 _ => panic!(),
419 }
420 }
421
422 /// UT test cases for TaskState::turning_to_finish()
423 ///
424 /// # Brief
425 /// 1. cur & FINISHED == FINISHED == true, Represents the current state is
426 /// already the end state, the state does not information is not correct,
427 /// directly return failure
428 /// 2. cur & FINISHED == FINISHED == false, cur & RUNNING != RUNNING ==
429 /// true, means the current state is not running, and the status
430 /// information is not correct, so the error is returned directly
431 #[test]
ut_task_state_turning_to_finish()432 fn ut_task_state_turning_to_finish() {
433 let task_state = TaskState::new();
434 task_state.turning_to_running();
435 let mut test_task_state = INIT;
436 test_task_state &= !RUNNING;
437 test_task_state |= FINISHED;
438 test_task_state &= !SCHEDULING;
439 let ret = task_state.turning_to_finish().unwrap();
440 assert_eq!(ret, test_task_state);
441 assert!(task_state.turning_to_finish().is_err());
442 }
443
444 /// UT test cases for turning_to_idle
445 ///
446 /// # Brief
447 /// 1. Create a TaskState, set it to Canceled & Running
448 /// 2. Call turning_to_idle, check if return value equals to
449 /// StateAction::canceled
450 /// 3. Create a TaskState, set it to init
451 /// 4. Call turning_to_idle, check if return value equals to
452 /// StateAction::Failed
453 /// 5. Create a TaskState, set it to Running and not scheduling
454 /// 6. Call turning_to_idle, check if return value equals to
455 /// StateAction::Success
456 /// 7. Create a TaskState, set it to Running and scheduling
457 #[test]
ut_task_state_turning_to_idle()458 fn ut_task_state_turning_to_idle() {
459 let task_state = TaskState::new();
460 let mut next_state = task_state.0.load(Acquire);
461 next_state |= CANCELED;
462 next_state |= RUNNING;
463 task_state.0.store(next_state, Release);
464 match task_state.turning_to_idle() {
465 StateAction::Canceled(cur) => assert_eq!(cur, next_state),
466 _ => panic!(),
467 }
468
469 let task_state = TaskState::new();
470 match task_state.turning_to_idle() {
471 StateAction::Failed(cur) => assert_eq!(cur, INIT),
472 _ => panic!(),
473 }
474
475 let task_state = TaskState::new();
476 let mut next_state = task_state.0.load(Acquire);
477 next_state |= RUNNING;
478 next_state &= !SCHEDULING;
479 task_state.0.store(next_state, Release);
480 let mut test_state = next_state;
481 test_state &= !RUNNING;
482 match task_state.turning_to_idle() {
483 StateAction::Success => assert_eq!(task_state.0.load(Acquire), test_state),
484 _ => panic!(),
485 }
486
487 let task_state = TaskState::new();
488 let mut next_state = task_state.0.load(Acquire);
489 next_state |= RUNNING;
490 next_state |= SCHEDULING;
491 task_state.0.store(next_state, Release);
492 match task_state.turning_to_idle() {
493 StateAction::Enqueue => {}
494 _ => panic!(),
495 }
496 }
497
498 /// UT test cases for TaskState::turn_to_scheduling()
499 ///
500 /// # Brief
501 /// 1. Check if the state transition is SCHEDULING
502 #[test]
ut_task_state_turning_to_scheduling()503 fn ut_task_state_turning_to_scheduling() {
504 let task_state = TaskState::new();
505 let mut test_state = task_state.0.load(Acquire);
506 test_state |= SCHEDULING;
507 assert_eq!(task_state.turn_to_scheduling(), test_state);
508 }
509
510 /// UT test cases for TaskState::turn_to_un_set_waker()
511 ///
512 /// # Brief
513 /// 1. !is_care_join_handle(cur) || !is_set_waker(cur) == true, means that
514 /// the current state is neither focused on hooks nor set waker
515 /// 2. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
516 /// FINISHED == FINISHED == true, means the current status is FINISHED,
517 /// directly return failure
518 /// 3. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
519 /// FINISHED == FINISHED == false
520 #[test]
ut_task_state_turn_to_un_set_waker()521 fn ut_task_state_turn_to_un_set_waker() {
522 let task_state = TaskState::new();
523 let mut next_state = task_state.0.load(Acquire);
524 next_state &= !CARE_JOIN_HANDLE;
525 next_state &= !JOIN_WAKER;
526 task_state.0.store(next_state, Release);
527 assert!(task_state.turn_to_un_set_waker().is_err());
528
529 let task_state = TaskState::new();
530 let mut next_state = task_state.0.load(Acquire);
531 next_state |= CARE_JOIN_HANDLE;
532 next_state |= JOIN_WAKER;
533 next_state |= FINISHED;
534 task_state.0.store(next_state, Release);
535 assert!(task_state.turn_to_un_set_waker().is_err());
536
537 let task_state = TaskState::new();
538 let mut next_state = task_state.0.load(Acquire);
539 next_state |= CARE_JOIN_HANDLE;
540 next_state |= JOIN_WAKER;
541 next_state &= !FINISHED;
542 task_state.0.store(next_state, Release);
543 assert!(task_state.turn_to_un_set_waker().is_ok());
544 }
545
546 /// UT test cases for TaskState::turn_to_set_waker()
547 ///
548 /// # Brief
549 /// 1. !is_care_join_handle(cur) || is_set_waker(cur) == true, means that
550 /// the current state is neither concerned with hooks, has set waker
551 /// 2. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
552 /// FINISHED == FINISHED == true, means the current status is FINISHED,
553 /// directly return failure
554 /// 3. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
555 /// FINISHED == FINISHED == false
556 #[test]
ut_task_state_turn_to_set_waker()557 fn ut_task_state_turn_to_set_waker() {
558 let task_state = TaskState::new();
559 let mut next_state = task_state.0.load(Acquire);
560 next_state &= !CARE_JOIN_HANDLE;
561 next_state |= JOIN_WAKER;
562 task_state.0.store(next_state, Release);
563 assert!(task_state.turn_to_set_waker().is_err());
564
565 let task_state = TaskState::new();
566 let mut next_state = task_state.0.load(Acquire);
567 next_state |= CARE_JOIN_HANDLE;
568 next_state &= !JOIN_WAKER;
569 next_state |= FINISHED;
570 task_state.0.store(next_state, Release);
571 assert!(task_state.turn_to_set_waker().is_err());
572
573 let task_state = TaskState::new();
574 let mut next_state = task_state.0.load(Acquire);
575 next_state |= CARE_JOIN_HANDLE;
576 next_state &= !JOIN_WAKER;
577 next_state &= !FINISHED;
578 task_state.0.store(next_state, Release);
579 assert!(task_state.turn_to_set_waker().is_ok());
580 }
581
582 /// UT test cases for TaskState::turn_to_un_join_handle()
583 ///
584 /// # Brief
585 /// 1. cur & FINISHED == FINISHED == true, means the current state is
586 /// FINISHED state, directly return failure
587 /// 2. cur & FINISHED == FINISHED == false
588 #[test]
ut_task_state_turn_to_un_join_handle()589 fn ut_task_state_turn_to_un_join_handle() {
590 let task_state = TaskState::new();
591 let mut next_state = task_state.0.load(Acquire);
592 next_state |= FINISHED;
593 task_state.0.store(next_state, Release);
594 assert!(task_state.turn_to_un_join_handle().is_err());
595
596 let task_state = TaskState::new();
597 let mut next_state = task_state.0.load(Acquire);
598 next_state &= !FINISHED;
599 task_state.0.store(next_state, Release);
600 assert!(task_state.turn_to_un_join_handle().is_ok());
601 }
602
603 /// UT test cases for TaskState::try_turning_to_un_join_handle()
604 ///
605 /// # Brief
606 /// 1. After calling this function, check if the status is modified to
607 /// CARE_JOIN_HANDLE
608 #[test]
ut_task_state_turning_to_un_join_handle()609 fn ut_task_state_turning_to_un_join_handle() {
610 let task_state = TaskState::new();
611 assert!(task_state.try_turning_to_un_join_handle());
612 }
613 }
614