• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::{Cell, RefCell};
15 use std::ptr;
16 use std::sync::Arc;
17 use std::task::Waker;
18 
19 /// worker struct info and method
20 use crate::executor::async_pool::MultiThreadScheduler;
21 use crate::executor::driver::Handle;
22 use crate::executor::parker::Parker;
23 use crate::executor::queue::LocalQueue;
24 use crate::task::yield_now::wake_yielded_tasks;
25 use crate::task::Task;
26 
27 thread_local! {
28     pub(crate) static CURRENT_WORKER: Cell<* const ()> = Cell::new(ptr::null());
29     pub(crate) static CURRENT_HANDLE: Cell<* const ()> = Cell::new(ptr::null());
30 }
31 
32 pub(crate) struct WorkerContext {
33     pub(crate) worker: Arc<Worker>,
34 }
35 
36 impl WorkerContext {
run(&mut self)37     fn run(&mut self) {
38         let worker_ref = &self.worker;
39         worker_ref.run(self);
40     }
41 
release(&mut self)42     fn release(&mut self) {
43         self.worker.release();
44     }
45 }
46 
47 pub(crate) struct WorkerHandle {
48     pub(crate) _handle: Arc<Handle>,
49 }
50 
51 /// Gets the handle of the current thread
52 #[cfg(all(not(feature = "ffrt"), any(feature = "net", feature = "time")))]
53 #[inline]
get_current_handle() -> Option<&'static WorkerHandle>54 pub(crate) fn get_current_handle() -> Option<&'static WorkerHandle> {
55     CURRENT_HANDLE.with(|ctx| {
56         let val = ctx.get();
57         if val.is_null() {
58             None
59         } else {
60             Some(unsafe { &*(val as *const _ as *const WorkerHandle) })
61         }
62     })
63 }
64 
65 /// Gets the worker context of the current thread
66 #[inline]
get_current_ctx() -> Option<&'static WorkerContext>67 pub(crate) fn get_current_ctx() -> Option<&'static WorkerContext> {
68     CURRENT_WORKER.with(|ctx| {
69         let val = ctx.get();
70         if val.is_null() {
71             None
72         } else {
73             Some(unsafe { &*(val as *const _ as *const WorkerContext) })
74         }
75     })
76 }
77 
78 /// Runs the worker thread
run_worker(worker: Arc<Worker>, handle: Arc<Handle>)79 pub(crate) fn run_worker(worker: Arc<Worker>, handle: Arc<Handle>) {
80     let mut cur_context = WorkerContext { worker };
81 
82     let cur_handle = WorkerHandle { _handle: handle };
83 
84     struct Reset(*const (), *const ());
85 
86     impl Drop for Reset {
87         fn drop(&mut self) {
88             CURRENT_WORKER.with(|ctx| ctx.set(self.0));
89             CURRENT_HANDLE.with(|handle| handle.set(self.1));
90         }
91     }
92     // store the worker to tls
93     let _guard = CURRENT_WORKER.with(|cur| {
94         let prev_ctx = cur.get();
95         cur.set(&cur_context as *const _ as *const ());
96 
97         let handle = CURRENT_HANDLE.with(|handle| {
98             let prev_handle = handle.get();
99             handle.set(&cur_handle as *const _ as *const ());
100             prev_handle
101         });
102 
103         Reset(prev_ctx, handle)
104     });
105 
106     cur_context.run();
107     cur_context.release();
108     drop(cur_handle);
109 }
110 
111 pub(crate) struct Worker {
112     pub(crate) index: usize,
113     pub(crate) scheduler: Arc<MultiThreadScheduler>,
114     pub(crate) inner: RefCell<Box<Inner>>,
115     pub(crate) lifo: RefCell<Option<Task>>,
116     pub(crate) yielded: RefCell<Vec<Waker>>,
117 }
118 
119 unsafe impl Send for Worker {}
120 unsafe impl Sync for Worker {}
121 
122 impl Worker {
run(&self, worker_ctx: &WorkerContext)123     fn run(&self, worker_ctx: &WorkerContext) {
124         let mut inner = self.inner.borrow_mut();
125         let inner = inner.as_mut();
126 
127         while !inner.is_cancel() {
128             inner.increment_count();
129             inner.periodic_check(self);
130 
131             // get a task from the queues and execute it
132             if let Some(task) = self.get_task(inner, worker_ctx) {
133                 task.run();
134                 continue;
135             }
136             wake_yielded_tasks(worker_ctx);
137             // if there is no task, park the worker
138             self.park_timeout(inner, worker_ctx);
139 
140             self.check_cancel(inner);
141         }
142         self.pre_shutdown(inner, worker_ctx);
143     }
144 
pre_shutdown(&self, inner: &mut Inner, worker_ctx: &WorkerContext)145     fn pre_shutdown(&self, inner: &mut Inner, worker_ctx: &WorkerContext) {
146         // drop all tasks in the local queue
147         loop {
148             if let Some(task) = self.get_task(inner, worker_ctx) {
149                 task.shutdown();
150                 continue;
151             }
152             if self.scheduler.has_no_work() {
153                 break;
154             }
155         }
156         // thread 0 is responsible for dropping the tasks inside the global queue
157         if self.index == 0 {
158             let mut global = self.scheduler.get_global().get_global().lock().unwrap();
159             loop {
160                 if let Some(task) = global.pop_front() {
161                     task.shutdown();
162                     continue;
163                 }
164                 if global.is_empty() {
165                     break;
166                 }
167             }
168         }
169     }
170 
get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task>171     fn get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task> {
172         // schedule lifo task first
173         let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
174         if let Some(task) = lifo_slot.take() {
175             return Some(task);
176         }
177 
178         self.scheduler.dequeue(self.index, inner)
179     }
180 
181     #[inline]
check_cancel(&self, inner: &mut Inner)182     fn check_cancel(&self, inner: &mut Inner) {
183         inner.check_cancel(self)
184     }
185 
park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext)186     fn park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext) {
187         // still has works to do, go back to work
188         if worker_ctx.worker.lifo.borrow().is_some() || !inner.run_queue.is_empty() {
189             return;
190         }
191 
192         self.scheduler.turn_to_sleep(self.index);
193 
194         while !inner.is_cancel {
195             inner.parker.park();
196             if self.scheduler.is_parked(&self.index) {
197                 self.check_cancel(inner);
198                 continue;
199             }
200             break;
201         }
202     }
203 
204     /// Gets Worker's Inner with ptr.
205     ///
206     /// # Safety
207     /// We can't get Inner with `RefCell::borrow()`, because the worker will
208     /// always hold the borrow_mut until drop. So we can only get Inner by ptr.
209     /// This method can only be used to obtain values
210     #[cfg(feature = "metrics")]
get_inner_ptr(&self) -> &Inner211     pub(crate) unsafe fn get_inner_ptr(&self) -> &Inner {
212         let ptr = self.inner.as_ptr();
213         &*ptr
214     }
215 
216     #[inline]
release(&self)217     fn release(&self) {
218         // wait for tasks in queue to finish
219         while !self.scheduler.has_no_work() {}
220     }
221 }
222 
223 pub(crate) struct Inner {
224     /// A counter to define whether schedule global queue or local queue
225     pub(crate) count: u32,
226     /// Whether the workers are canceled
227     is_cancel: bool,
228     /// local queue
229     pub(crate) run_queue: LocalQueue,
230     pub(crate) parker: Parker,
231 }
232 
233 impl Inner {
new(run_queues: LocalQueue, parker: Parker) -> Self234     pub(crate) fn new(run_queues: LocalQueue, parker: Parker) -> Self {
235         Inner {
236             count: 0,
237             is_cancel: false,
238             run_queue: run_queues,
239             parker,
240         }
241     }
242 }
243 
244 const GLOBAL_PERIODIC_INTERVAL: u8 = 61;
245 
246 impl Inner {
247     #[inline]
increment_count(&mut self)248     fn increment_count(&mut self) {
249         self.count = self.count.wrapping_add(1);
250     }
251 
252     // checks if the worker is canceled
253     #[inline]
check_cancel(&mut self, worker: &Worker)254     fn check_cancel(&mut self, worker: &Worker) {
255         if !self.is_cancel {
256             self.is_cancel = worker.scheduler.is_cancel();
257         }
258     }
259 
260     #[inline]
periodic_check(&mut self, worker: &Worker)261     fn periodic_check(&mut self, worker: &Worker) {
262         if self.count & GLOBAL_PERIODIC_INTERVAL as u32 == 0 {
263             self.check_cancel(worker);
264             if let Ok(mut driver) = self.parker.get_driver().try_lock() {
265                 driver.run_once();
266             }
267         }
268     }
269 
270     #[inline]
is_cancel(&self) -> bool271     fn is_cancel(&self) -> bool {
272         self.is_cancel
273     }
274 }
275