• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
16 
17 use crate::error::ErrorKind;
18 
19 /// Task is currently running
20 const RUNNING: usize = 0b0001;
21 /// Task is in the schedule list
22 const SCHEDULING: usize = 0b0100;
23 /// Task has finished
24 const FINISHED: usize = 0b1000;
25 /// Task gets canceled
26 const CANCELED: usize = 0b1_0000;
27 /// Task needs to send the finished result back to the join handle
28 const CARE_JOIN_HANDLE: usize = 0b10_0000;
29 /// Task currently holds a waker to the join handle
30 const JOIN_WAKER: usize = 0b100_0000;
31 
32 const RC_MASK: usize = !0b11_1111_1111;
33 
34 const RC_SHIFT: usize = RC_MASK.count_zeros() as usize;
35 
36 /// Reference count
37 const REF_ONE: usize = 1 << RC_SHIFT;
38 
39 /// Initial state contains two ref count, one is held by join_handle, another
40 /// one is held by task itself.
41 const INIT: usize = CARE_JOIN_HANDLE | SCHEDULING | (REF_ONE * 2);
42 
43 #[inline]
ref_count(state: usize) -> usize44 pub(crate) fn ref_count(state: usize) -> usize {
45     (state & RC_MASK) >> RC_SHIFT
46 }
47 
48 #[inline]
is_last_ref_count(prev: usize) -> bool49 pub(crate) fn is_last_ref_count(prev: usize) -> bool {
50     ref_count(prev) == 1
51 }
52 
53 #[inline]
is_canceled(cur: usize) -> bool54 pub(crate) fn is_canceled(cur: usize) -> bool {
55     cur & CANCELED == CANCELED
56 }
57 
58 #[inline]
is_care_join_handle(cur: usize) -> bool59 pub(crate) fn is_care_join_handle(cur: usize) -> bool {
60     cur & CARE_JOIN_HANDLE == CARE_JOIN_HANDLE
61 }
62 
63 #[inline]
is_finished(cur: usize) -> bool64 pub(crate) fn is_finished(cur: usize) -> bool {
65     cur & FINISHED == FINISHED
66 }
67 
68 #[inline]
is_set_waker(cur: usize) -> bool69 pub(crate) fn is_set_waker(cur: usize) -> bool {
70     cur & JOIN_WAKER == JOIN_WAKER
71 }
72 
73 #[inline]
is_scheduling(cur: usize) -> bool74 pub(crate) fn is_scheduling(cur: usize) -> bool {
75     cur & SCHEDULING == SCHEDULING
76 }
77 
78 #[inline]
is_running(cur: usize) -> bool79 pub(crate) fn is_running(cur: usize) -> bool {
80     cur & RUNNING == RUNNING
81 }
82 
83 // A task need to satisfy these state requirements in order to get pushed back
84 // to the schedule list.
85 #[inline]
need_enqueue(cur: usize) -> bool86 pub(crate) fn need_enqueue(cur: usize) -> bool {
87     (cur & SCHEDULING != SCHEDULING) && (cur & RUNNING != RUNNING) && (cur & FINISHED != FINISHED)
88 }
89 
90 pub(crate) enum StateAction {
91     Success,
92     Canceled(usize),
93     Failed(usize),
94     Enqueue,
95 }
96 
97 pub(crate) struct TaskState(AtomicUsize);
98 impl TaskState {
99     #[inline]
new() -> Self100     pub(crate) fn new() -> Self {
101         TaskState(AtomicUsize::new(INIT))
102     }
103 
104     #[inline]
dec_ref(&self) -> usize105     pub(crate) fn dec_ref(&self) -> usize {
106         self.0.fetch_sub(REF_ONE, AcqRel)
107     }
108 
109     #[inline]
inc_ref(&self)110     pub(crate) fn inc_ref(&self) {
111         self.0.fetch_add(REF_ONE, AcqRel);
112     }
113 
114     #[inline]
get_current_state(&self) -> usize115     pub(crate) fn get_current_state(&self) -> usize {
116         self.0.load(Acquire)
117     }
118 
119     /// Turns the task state into running. Contains CAS operations.
120     ///
121     /// Fails when the task is already running, scheduling or is already
122     /// finished.
turning_to_running(&self) -> StateAction123     pub(crate) fn turning_to_running(&self) -> StateAction {
124         let mut cur = self.get_current_state();
125         loop {
126             let mut action = StateAction::Success;
127 
128             if is_running(cur) || is_finished(cur) || !is_scheduling(cur) {
129                 return StateAction::Failed(cur);
130             }
131 
132             let mut next = cur;
133             next &= !SCHEDULING;
134             next |= RUNNING;
135             if is_canceled(next) {
136                 action = StateAction::Canceled(next);
137             }
138 
139             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
140             match res {
141                 Ok(_) => return action,
142                 Err(actual) => cur = actual,
143             }
144         }
145     }
146 
147     /// Turns the task state into finished. Contains CAS operations.
148     ///
149     /// Fails when the task is already finished or is not running.
turning_to_finish(&self) -> Result<usize, ErrorKind>150     pub(crate) fn turning_to_finish(&self) -> Result<usize, ErrorKind> {
151         let mut cur = self.get_current_state();
152 
153         loop {
154             if is_finished(cur) {
155                 return Err(ErrorKind::TaskShutdown);
156             }
157 
158             if !is_running(cur) {
159                 return Err(ErrorKind::TaskStateInvalid);
160             }
161             let mut next = cur;
162             next &= !RUNNING;
163             next |= FINISHED;
164 
165             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
166             match res {
167                 Ok(_) => return Ok(next),
168                 Err(actual) => cur = actual,
169             }
170         }
171     }
172 
173     /// Turns the task state into idle. Contains CAS operations.
174     ///
175     /// Fails when the task is canceled or running.
turning_to_idle(&self) -> StateAction176     pub(crate) fn turning_to_idle(&self) -> StateAction {
177         let mut cur = self.get_current_state();
178 
179         loop {
180             let mut action = StateAction::Success;
181 
182             if !is_running(cur) {
183                 return StateAction::Failed(cur);
184             }
185 
186             if is_canceled(cur) {
187                 return StateAction::Canceled(cur);
188             }
189 
190             let mut next = cur;
191             next &= !RUNNING;
192 
193             if is_scheduling(next) {
194                 action = StateAction::Enqueue;
195             }
196 
197             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
198             match res {
199                 Ok(_) => return action,
200                 Err(actual) => cur = actual,
201             }
202         }
203     }
204 
205     /// Turns the task state into scheduling. Returns the old state value.
206     #[inline]
turn_to_scheduling(&self) -> usize207     pub(crate) fn turn_to_scheduling(&self) -> usize {
208         self.0.fetch_or(SCHEDULING, AcqRel)
209     }
210 
211     /// Turns the task state into unset_waker. Contains CAS operations.
212     ///
213     /// Fails when the task is already finished.
turn_to_un_set_waker(&self) -> Result<usize, usize>214     pub(crate) fn turn_to_un_set_waker(&self) -> Result<usize, usize> {
215         let mut cur = self.get_current_state();
216 
217         loop {
218             if !is_care_join_handle(cur) || !is_set_waker(cur) {
219                 return Err(cur);
220             }
221 
222             if is_finished(cur) {
223                 return Err(cur);
224             }
225 
226             let mut next = cur;
227             next &= !JOIN_WAKER;
228 
229             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
230             match res {
231                 Ok(_) => return Ok(next),
232                 Err(actual) => cur = actual,
233             }
234         }
235     }
236 
237     /// Turns off the Join_Waker bit of the task state. Contains CAS operations.
238     ///
239     /// Fails when the task is already finished.
turn_to_set_waker(&self) -> Result<usize, usize>240     pub(crate) fn turn_to_set_waker(&self) -> Result<usize, usize> {
241         let mut cur = self.get_current_state();
242 
243         loop {
244             if !is_care_join_handle(cur) || is_set_waker(cur) {
245                 return Err(cur);
246             }
247             if is_finished(cur) {
248                 return Err(cur);
249             }
250 
251             let mut next = cur;
252             next |= JOIN_WAKER;
253 
254             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
255             match res {
256                 Ok(_) => return Ok(next),
257                 Err(actual) => cur = actual,
258             }
259         }
260     }
261 
turn_to_canceled_and_scheduled(&self) -> bool262     pub(crate) fn turn_to_canceled_and_scheduled(&self) -> bool {
263         let mut cur = self.get_current_state();
264 
265         loop {
266             if is_canceled(cur) || is_finished(cur) {
267                 return false;
268             }
269 
270             let mut next = cur;
271             let need_schedule = if is_running(cur) {
272                 next |= SCHEDULING;
273                 next |= CANCELED;
274                 false
275             } else {
276                 next |= CANCELED;
277                 if !is_scheduling(next) {
278                     next |= SCHEDULING;
279                     true
280                 } else {
281                     false
282                 }
283             };
284 
285             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
286             match res {
287                 Ok(_) => return need_schedule,
288                 Err(actual) => cur = actual,
289             }
290         }
291     }
292 
293     /// Turns off the CARE_JOIN_HANDLE bit of the task state. Contains CAS
294     /// operations.
295     ///
296     /// Fails when the task is already finished.
turn_to_un_join_handle(&self) -> Result<usize, ()>297     pub(crate) fn turn_to_un_join_handle(&self) -> Result<usize, ()> {
298         let mut cur = self.get_current_state();
299 
300         loop {
301             if is_finished(cur) {
302                 return Err(());
303             }
304 
305             let mut next = cur;
306             next &= !CARE_JOIN_HANDLE;
307 
308             let res = self.0.compare_exchange(cur, next, AcqRel, Acquire);
309             match res {
310                 Ok(_) => return Ok(next),
311                 Err(actual) => cur = actual,
312             }
313         }
314     }
315 
316     /// Attempts to turn off the CARE_JOIN_HANDLE bit of the task state.
317     ///
318     /// Returns true if successfully changed. Otherwise, returns false.
try_turning_to_un_join_handle(&self) -> bool319     pub(crate) fn try_turning_to_un_join_handle(&self) -> bool {
320         let old = INIT;
321         let new = (INIT - REF_ONE) & !CARE_JOIN_HANDLE;
322         self.0.compare_exchange(old, new, Relaxed, Relaxed) == Ok(old)
323     }
324 
325     cfg_not_ffrt! {
326         /// Turns the task state into canceled. Returns the old state value.
327         #[inline]
328         pub(crate) fn set_cancel(&self) -> usize {
329             self.0.fetch_or(CANCELED, AcqRel)
330         }
331 
332         /// Turns the task state into running. Returns the old state value.
333         #[inline]
334         pub(crate) fn set_running(&self) {
335             self.0.fetch_or(RUNNING, AcqRel);
336         }
337     }
338 }
339 
340 #[cfg(test)]
341 mod test {
342     use std::sync::atomic::Ordering::{Acquire, Release};
343 
344     use crate::task::state::{
345         StateAction, TaskState, CANCELED, CARE_JOIN_HANDLE, FINISHED, INIT, JOIN_WAKER, REF_ONE,
346         RUNNING, SCHEDULING,
347     };
348 
349     /// UT test cases for TaskState::new()
350     ///
351     /// # Brief
352     /// 1. Verify that the status of the initialized completed task is INIT
353     #[test]
ut_task_state_new()354     fn ut_task_state_new() {
355         let task_state = TaskState::new();
356         assert_eq!(task_state.0.load(Acquire), INIT);
357     }
358 
359     /// UT test cases for TaskState::dec_ref()
360     ///
361     /// # Brief
362     /// 1. Verify that the status of the initialized completed task is
363     ///    INIT.wrapping_sub(REF_ONE) value should be INIT.wrapping_sub(REF_ONE)
364     #[test]
ut_task_state_dec_ref()365     fn ut_task_state_dec_ref() {
366         let task_state = TaskState::new();
367         task_state.dec_ref();
368         assert_eq!(task_state.0.load(Acquire), INIT.wrapping_sub(REF_ONE))
369     }
370 
371     /// UT test cases for TaskState::inc_ref()
372     ///
373     /// # Brief
374     /// 1. Verify that the status of the initialized completed task is
375     ///    INIT.wrapping_add(REF_ONE)
376     #[test]
ut_task_state_inc_ref()377     fn ut_task_state_inc_ref() {
378         let task_state = TaskState::new();
379         task_state.inc_ref();
380         assert_eq!(task_state.0.load(Acquire), INIT.wrapping_add(REF_ONE));
381     }
382 
383     /// UT test cases for TaskState::get_current_state()
384     ///
385     /// # Brief
386     /// 1. Verify that the status of the initialized completed task is INIT
387     #[test]
ut_task_state_get_current_state()388     fn ut_task_state_get_current_state() {
389         let task_state = TaskState::new();
390         assert_eq!(task_state.get_current_state(), INIT);
391     }
392 
393     /// UT test cases for TaskState::turning_to_running()
394     ///
395     /// # Brief
396     /// 1. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == true,
397     ///    represents the current state is already running state or has ended
398     ///    the state, the state does not information is not correct, directly
399     ///    return failure
400     /// 2. (cur & RUNNING == RUNNING) || (cur & FINISHED == FINISHED) == false,
401     ///    cur & SCHEDULING != SCHEDULING == true, means the current state is
402     ///    not schedule state, and the status information is not correct, so it
403     ///    returns an error directly
404     #[test]
ut_task_state_turning_to_running()405     fn ut_task_state_turning_to_running() {
406         let task_state = TaskState::new();
407         let mut test_task_state = INIT;
408         test_task_state &= !SCHEDULING;
409         test_task_state |= RUNNING;
410 
411         match task_state.turning_to_running() {
412             StateAction::Success => {}
413             _ => panic!(),
414         }
415 
416         match task_state.turning_to_running() {
417             StateAction::Failed(x) => assert_eq!(x, test_task_state),
418             _ => panic!(),
419         }
420     }
421 
422     /// UT test cases for TaskState::turning_to_finish()
423     ///
424     /// # Brief
425     /// 1. cur & FINISHED == FINISHED == true, Represents the current state is
426     ///    already the end state, the state does not information is not correct,
427     ///    directly return failure
428     /// 2. cur & FINISHED == FINISHED == false, cur & RUNNING != RUNNING ==
429     ///    true, means the current state is not running, and the status
430     ///    information is not correct, so the error is returned directly
431     #[test]
ut_task_state_turning_to_finish()432     fn ut_task_state_turning_to_finish() {
433         let task_state = TaskState::new();
434         task_state.turning_to_running();
435         let mut test_task_state = INIT;
436         test_task_state &= !RUNNING;
437         test_task_state |= FINISHED;
438         test_task_state &= !SCHEDULING;
439         let ret = task_state.turning_to_finish().unwrap();
440         assert_eq!(ret, test_task_state);
441         assert!(task_state.turning_to_finish().is_err());
442     }
443 
444     /// UT test cases for turning_to_idle
445     ///
446     /// # Brief
447     /// 1. Create a TaskState, set it to Canceled & Running
448     /// 2. Call turning_to_idle, check if return value equals to
449     ///    StateAction::canceled
450     /// 3. Create a TaskState, set it to init
451     /// 4. Call turning_to_idle, check if return value equals to
452     ///    StateAction::Failed
453     /// 5. Create a TaskState, set it to Running and not scheduling
454     /// 6. Call turning_to_idle, check if return value equals to
455     ///    StateAction::Success
456     /// 7. Create a TaskState, set it to Running and scheduling
457     #[test]
ut_task_state_turning_to_idle()458     fn ut_task_state_turning_to_idle() {
459         let task_state = TaskState::new();
460         let mut next_state = task_state.0.load(Acquire);
461         next_state |= CANCELED;
462         next_state |= RUNNING;
463         task_state.0.store(next_state, Release);
464         match task_state.turning_to_idle() {
465             StateAction::Canceled(cur) => assert_eq!(cur, next_state),
466             _ => panic!(),
467         }
468 
469         let task_state = TaskState::new();
470         match task_state.turning_to_idle() {
471             StateAction::Failed(cur) => assert_eq!(cur, INIT),
472             _ => panic!(),
473         }
474 
475         let task_state = TaskState::new();
476         let mut next_state = task_state.0.load(Acquire);
477         next_state |= RUNNING;
478         next_state &= !SCHEDULING;
479         task_state.0.store(next_state, Release);
480         let mut test_state = next_state;
481         test_state &= !RUNNING;
482         match task_state.turning_to_idle() {
483             StateAction::Success => assert_eq!(task_state.0.load(Acquire), test_state),
484             _ => panic!(),
485         }
486 
487         let task_state = TaskState::new();
488         let mut next_state = task_state.0.load(Acquire);
489         next_state |= RUNNING;
490         next_state |= SCHEDULING;
491         task_state.0.store(next_state, Release);
492         match task_state.turning_to_idle() {
493             StateAction::Enqueue => {}
494             _ => panic!(),
495         }
496     }
497 
498     /// UT test cases for TaskState::turn_to_scheduling()
499     ///
500     /// # Brief
501     /// 1. Check if the state transition is SCHEDULING
502     #[test]
ut_task_state_turning_to_scheduling()503     fn ut_task_state_turning_to_scheduling() {
504         let task_state = TaskState::new();
505         let mut test_state = task_state.0.load(Acquire);
506         test_state |= SCHEDULING;
507         assert_eq!(task_state.turn_to_scheduling(), test_state);
508     }
509 
510     /// UT test cases for TaskState::turn_to_un_set_waker()
511     ///
512     /// # Brief
513     /// 1. !is_care_join_handle(cur) || !is_set_waker(cur) == true, means that
514     ///    the current state is neither focused on hooks nor set waker
515     /// 2. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
516     ///    FINISHED == FINISHED == true, means the current status is FINISHED,
517     ///    directly return failure
518     /// 3. !is_care_join_handle(cur) || !is_set_waker(cur) == false, cur &
519     ///    FINISHED == FINISHED == false
520     #[test]
ut_task_state_turn_to_un_set_waker()521     fn ut_task_state_turn_to_un_set_waker() {
522         let task_state = TaskState::new();
523         let mut next_state = task_state.0.load(Acquire);
524         next_state &= !CARE_JOIN_HANDLE;
525         next_state &= !JOIN_WAKER;
526         task_state.0.store(next_state, Release);
527         assert!(task_state.turn_to_un_set_waker().is_err());
528 
529         let task_state = TaskState::new();
530         let mut next_state = task_state.0.load(Acquire);
531         next_state |= CARE_JOIN_HANDLE;
532         next_state |= JOIN_WAKER;
533         next_state |= FINISHED;
534         task_state.0.store(next_state, Release);
535         assert!(task_state.turn_to_un_set_waker().is_err());
536 
537         let task_state = TaskState::new();
538         let mut next_state = task_state.0.load(Acquire);
539         next_state |= CARE_JOIN_HANDLE;
540         next_state |= JOIN_WAKER;
541         next_state &= !FINISHED;
542         task_state.0.store(next_state, Release);
543         assert!(task_state.turn_to_un_set_waker().is_ok());
544     }
545 
546     /// UT test cases for TaskState::turn_to_set_waker()
547     ///
548     /// # Brief
549     /// 1. !is_care_join_handle(cur) || is_set_waker(cur) == true, means that
550     ///    the current state is neither concerned with hooks, has set waker
551     /// 2. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
552     ///    FINISHED == FINISHED == true, means the current status is FINISHED,
553     ///    directly return failure
554     /// 3. !is_care_join_handle(cur) || is_set_waker(cur) == false, cur &
555     ///    FINISHED == FINISHED == false
556     #[test]
ut_task_state_turn_to_set_waker()557     fn ut_task_state_turn_to_set_waker() {
558         let task_state = TaskState::new();
559         let mut next_state = task_state.0.load(Acquire);
560         next_state &= !CARE_JOIN_HANDLE;
561         next_state |= JOIN_WAKER;
562         task_state.0.store(next_state, Release);
563         assert!(task_state.turn_to_set_waker().is_err());
564 
565         let task_state = TaskState::new();
566         let mut next_state = task_state.0.load(Acquire);
567         next_state |= CARE_JOIN_HANDLE;
568         next_state &= !JOIN_WAKER;
569         next_state |= FINISHED;
570         task_state.0.store(next_state, Release);
571         assert!(task_state.turn_to_set_waker().is_err());
572 
573         let task_state = TaskState::new();
574         let mut next_state = task_state.0.load(Acquire);
575         next_state |= CARE_JOIN_HANDLE;
576         next_state &= !JOIN_WAKER;
577         next_state &= !FINISHED;
578         task_state.0.store(next_state, Release);
579         assert!(task_state.turn_to_set_waker().is_ok());
580     }
581 
582     /// UT test cases for TaskState::turn_to_un_join_handle()
583     ///
584     /// # Brief
585     /// 1. cur & FINISHED == FINISHED == true, means the current state is
586     ///    FINISHED state, directly return failure
587     /// 2. cur & FINISHED == FINISHED == false
588     #[test]
ut_task_state_turn_to_un_join_handle()589     fn ut_task_state_turn_to_un_join_handle() {
590         let task_state = TaskState::new();
591         let mut next_state = task_state.0.load(Acquire);
592         next_state |= FINISHED;
593         task_state.0.store(next_state, Release);
594         assert!(task_state.turn_to_un_join_handle().is_err());
595 
596         let task_state = TaskState::new();
597         let mut next_state = task_state.0.load(Acquire);
598         next_state &= !FINISHED;
599         task_state.0.store(next_state, Release);
600         assert!(task_state.turn_to_un_join_handle().is_ok());
601     }
602 
603     /// UT test cases for TaskState::try_turning_to_un_join_handle()
604     ///
605     /// # Brief
606     /// 1. After calling this function, check if the status is modified to
607     ///    CARE_JOIN_HANDLE
608     #[test]
ut_task_state_turning_to_un_join_handle()609     fn ut_task_state_turning_to_un_join_handle() {
610         let task_state = TaskState::new();
611         assert!(task_state.try_turning_to_un_join_handle());
612     }
613 }
614