1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cell::{Cell, RefCell};
15 use std::ptr;
16 use std::sync::Arc;
17 use std::task::Waker;
18
19 /// worker struct info and method
20 use crate::executor::async_pool::MultiThreadScheduler;
21 use crate::executor::driver::Handle;
22 use crate::executor::parker::Parker;
23 use crate::executor::queue::LocalQueue;
24 use crate::task::yield_now::wake_yielded_tasks;
25 use crate::task::Task;
26
27 thread_local! {
28 pub(crate) static CURRENT_WORKER: Cell<* const ()> = Cell::new(ptr::null());
29 pub(crate) static CURRENT_HANDLE: Cell<* const ()> = Cell::new(ptr::null());
30 }
31
32 pub(crate) struct WorkerContext {
33 pub(crate) worker: Arc<Worker>,
34 }
35
36 impl WorkerContext {
run(&mut self)37 fn run(&mut self) {
38 let worker_ref = &self.worker;
39 worker_ref.run(self);
40 }
41
release(&mut self)42 fn release(&mut self) {
43 self.worker.release();
44 }
45 }
46
47 pub(crate) struct WorkerHandle {
48 pub(crate) _handle: Arc<Handle>,
49 }
50
51 /// Gets the handle of the current thread
52 #[cfg(all(not(feature = "ffrt"), any(feature = "net", feature = "time")))]
53 #[inline]
get_current_handle() -> Option<&'static WorkerHandle>54 pub(crate) fn get_current_handle() -> Option<&'static WorkerHandle> {
55 CURRENT_HANDLE.with(|ctx| {
56 let val = ctx.get();
57 if val.is_null() {
58 None
59 } else {
60 Some(unsafe { &*(val as *const _ as *const WorkerHandle) })
61 }
62 })
63 }
64
65 /// Gets the worker context of the current thread
66 #[inline]
get_current_ctx() -> Option<&'static WorkerContext>67 pub(crate) fn get_current_ctx() -> Option<&'static WorkerContext> {
68 CURRENT_WORKER.with(|ctx| {
69 let val = ctx.get();
70 if val.is_null() {
71 None
72 } else {
73 Some(unsafe { &*(val as *const _ as *const WorkerContext) })
74 }
75 })
76 }
77
78 /// Runs the worker thread
run_worker(worker: Arc<Worker>, handle: Arc<Handle>)79 pub(crate) fn run_worker(worker: Arc<Worker>, handle: Arc<Handle>) {
80 let mut cur_context = WorkerContext { worker };
81
82 let cur_handle = WorkerHandle { _handle: handle };
83
84 struct Reset(*const (), *const ());
85
86 impl Drop for Reset {
87 fn drop(&mut self) {
88 CURRENT_WORKER.with(|ctx| ctx.set(self.0));
89 CURRENT_HANDLE.with(|handle| handle.set(self.1));
90 }
91 }
92 // store the worker to tls
93 let _guard = CURRENT_WORKER.with(|cur| {
94 let prev_ctx = cur.get();
95 cur.set(&cur_context as *const _ as *const ());
96
97 let handle = CURRENT_HANDLE.with(|handle| {
98 let prev_handle = handle.get();
99 handle.set(&cur_handle as *const _ as *const ());
100 prev_handle
101 });
102
103 Reset(prev_ctx, handle)
104 });
105
106 cur_context.run();
107 cur_context.release();
108 drop(cur_handle);
109 }
110
111 pub(crate) struct Worker {
112 pub(crate) index: usize,
113 pub(crate) scheduler: Arc<MultiThreadScheduler>,
114 pub(crate) inner: RefCell<Box<Inner>>,
115 pub(crate) lifo: RefCell<Option<Task>>,
116 pub(crate) yielded: RefCell<Vec<Waker>>,
117 }
118
119 unsafe impl Send for Worker {}
120 unsafe impl Sync for Worker {}
121
122 impl Worker {
run(&self, worker_ctx: &WorkerContext)123 fn run(&self, worker_ctx: &WorkerContext) {
124 let mut inner = self.inner.borrow_mut();
125 let inner = inner.as_mut();
126
127 while !inner.is_cancel() {
128 inner.increment_count();
129 inner.periodic_check(self);
130
131 // get a task from the queues and execute it
132 if let Some(task) = self.get_task(inner, worker_ctx) {
133 task.run();
134 continue;
135 }
136 wake_yielded_tasks(worker_ctx);
137 // if there is no task, park the worker
138 self.park_timeout(inner, worker_ctx);
139
140 self.check_cancel(inner);
141 }
142 self.pre_shutdown(inner, worker_ctx);
143 }
144
pre_shutdown(&self, inner: &mut Inner, worker_ctx: &WorkerContext)145 fn pre_shutdown(&self, inner: &mut Inner, worker_ctx: &WorkerContext) {
146 // drop all tasks in the local queue
147 loop {
148 if let Some(task) = self.get_task(inner, worker_ctx) {
149 task.shutdown();
150 continue;
151 }
152 if self.scheduler.has_no_work() {
153 break;
154 }
155 }
156 // thread 0 is responsible for dropping the tasks inside the global queue
157 if self.index == 0 {
158 let mut global = self.scheduler.get_global().get_global().lock().unwrap();
159 loop {
160 if let Some(task) = global.pop_front() {
161 task.shutdown();
162 continue;
163 }
164 if global.is_empty() {
165 break;
166 }
167 }
168 }
169 }
170
get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task>171 fn get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task> {
172 // schedule lifo task first
173 let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
174 if let Some(task) = lifo_slot.take() {
175 return Some(task);
176 }
177
178 self.scheduler.dequeue(self.index, inner)
179 }
180
181 #[inline]
check_cancel(&self, inner: &mut Inner)182 fn check_cancel(&self, inner: &mut Inner) {
183 inner.check_cancel(self)
184 }
185
park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext)186 fn park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext) {
187 // still has works to do, go back to work
188 if worker_ctx.worker.lifo.borrow().is_some() || !inner.run_queue.is_empty() {
189 return;
190 }
191
192 self.scheduler.turn_to_sleep(self.index);
193
194 while !inner.is_cancel {
195 inner.parker.park();
196 if self.scheduler.is_parked(&self.index) {
197 self.check_cancel(inner);
198 continue;
199 }
200 break;
201 }
202 }
203
204 /// Gets Worker's Inner with ptr.
205 ///
206 /// # Safety
207 /// We can't get Inner with `RefCell::borrow()`, because the worker will
208 /// always hold the borrow_mut until drop. So we can only get Inner by ptr.
209 /// This method can only be used to obtain values
210 #[cfg(feature = "metrics")]
get_inner_ptr(&self) -> &Inner211 pub(crate) unsafe fn get_inner_ptr(&self) -> &Inner {
212 let ptr = self.inner.as_ptr();
213 &*ptr
214 }
215
216 #[inline]
release(&self)217 fn release(&self) {
218 // wait for tasks in queue to finish
219 while !self.scheduler.has_no_work() {}
220 }
221 }
222
223 pub(crate) struct Inner {
224 /// A counter to define whether schedule global queue or local queue
225 pub(crate) count: u32,
226 /// Whether the workers are canceled
227 is_cancel: bool,
228 /// local queue
229 pub(crate) run_queue: LocalQueue,
230 pub(crate) parker: Parker,
231 }
232
233 impl Inner {
new(run_queues: LocalQueue, parker: Parker) -> Self234 pub(crate) fn new(run_queues: LocalQueue, parker: Parker) -> Self {
235 Inner {
236 count: 0,
237 is_cancel: false,
238 run_queue: run_queues,
239 parker,
240 }
241 }
242 }
243
244 const GLOBAL_PERIODIC_INTERVAL: u8 = 61;
245
246 impl Inner {
247 #[inline]
increment_count(&mut self)248 fn increment_count(&mut self) {
249 self.count = self.count.wrapping_add(1);
250 }
251
252 // checks if the worker is canceled
253 #[inline]
check_cancel(&mut self, worker: &Worker)254 fn check_cancel(&mut self, worker: &Worker) {
255 if !self.is_cancel {
256 self.is_cancel = worker.scheduler.is_cancel();
257 }
258 }
259
260 #[inline]
periodic_check(&mut self, worker: &Worker)261 fn periodic_check(&mut self, worker: &Worker) {
262 if self.count & GLOBAL_PERIODIC_INTERVAL as u32 == 0 {
263 self.check_cancel(worker);
264 if let Ok(mut driver) = self.parker.get_driver().try_lock() {
265 driver.run_once();
266 }
267 }
268 }
269
270 #[inline]
is_cancel(&self) -> bool271 fn is_cancel(&self) -> bool {
272 self.is_cancel
273 }
274 }
275