1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wait_queue.h"
17 #include "eu/co_routine.h"
18 #include "dfx/log/ffrt_log_api.h"
19 #include "ffrt_trace.h"
20 #include "internal_inc/types.h"
21 #include "sync/mutex_private.h"
22 #include "tm/cpu_task.h"
23
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27 auto ctx = ExecuteCtx::Cur();
28 task = ctx->task;
29 }
30
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk,bool legacyMode,CPUEUTask * task)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task)
32 {
33 wqlock.lock();
34 if (legacyMode) {
35 task->coRoutine->blockType = BlockType::BLOCK_THREAD;
36 wn->task = task;
37 }
38 push_back(wn);
39 wqlock.unlock();
40 {
41 std::unique_lock<std::mutex> nl(wn->wl);
42 lk->unlock();
43 wn->cv.wait(nl);
44 }
45 wqlock.lock();
46 remove(wn);
47 wqlock.unlock();
48 lk->lock();
49 }
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp,bool legacyMode,CPUEUTask * task)50 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk,
51 const TimePoint& tp, bool legacyMode, CPUEUTask* task)
52 {
53 bool ret = false;
54 wqlock.lock();
55 if (legacyMode) {
56 task->coRoutine->blockType = BlockType::BLOCK_THREAD;
57 wn->task = task;
58 }
59 push_back(wn);
60 wqlock.unlock();
61 {
62 std::unique_lock<std::mutex> nl(wn->wl);
63 lk->unlock();
64 if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
65 ret = true;
66 }
67 }
68 wqlock.lock();
69 remove(wn);
70 wqlock.unlock();
71 lk->lock();
72 return ret;
73 }
74
SuspendAndWait(mutexPrivate * lk)75 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
76 {
77 ExecuteCtx* ctx = ExecuteCtx::Cur();
78 CPUEUTask* task = ctx->task;
79 bool legacyMode = task != nullptr ? (task->coRoutine != nullptr ? task->coRoutine->legacyMode : false) : false;
80 if (!USE_COROUTINE || task == nullptr || legacyMode) {
81 ThreadWait(&ctx->wn, lk, legacyMode, task);
82 return;
83 }
84 task->wue = new WaitUntilEntry(task);
85 FFRT_BLOCK_TRACER(task->gid, cnd);
86 CoWait([&](CPUEUTask* inTask) -> bool {
87 wqlock.lock();
88 push_back(inTask->wue);
89 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
90 wqlock.unlock();
91 return true;
92 });
93 delete task->wue;
94 task->wue = nullptr;
95 lk->lock();
96 }
97
WeTimeoutProc(WaitQueue * wq,WaitUntilEntry * wue)98 bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
99 {
100 int expected = we_status::INIT;
101 if (!atomic_compare_exchange_strong_explicit(
102 &wue->status, &expected, we_status::TIMEOUT, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
103 // The critical point wue->status has been written, notify will no longer access wue, it can be deleted
104 delete wue;
105 return false;
106 }
107
108 wq->wqlock.lock();
109 if (wue->status.load(std::memory_order_acquire) == we_status::TIMEOUT) {
110 wq->remove(wue);
111 delete wue;
112 wue = nullptr;
113 } else {
114 wue->status.store(we_status::HANDOVER, std::memory_order_release);
115 }
116 wq->wqlock.unlock();
117 return true;
118 }
119
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)120 bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
121 {
122 bool ret = false;
123 ExecuteCtx* ctx = ExecuteCtx::Cur();
124 CPUEUTask* task = ctx->task;
125 bool legacyMode = task != nullptr ? (task->coRoutine != nullptr ? task->coRoutine->legacyMode : false) : false;
126 if (!USE_COROUTINE || task == nullptr || legacyMode) {
127 return ThreadWaitUntil(&ctx->wn, lk, tp, legacyMode, task);
128 }
129
130 task->wue = new WaitUntilEntry(task);
131 task->wue->hasWaitTime = true;
132 task->wue->tp = tp;
133 task->wue->cb = ([&](WaitEntry* we) {
134 WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
135 ffrt::CPUEUTask* task = wue->task;
136 if (!WeTimeoutProc(this, wue)) {
137 return;
138 }
139 FFRT_LOGD("task(%s) timeout out", task->label.c_str());
140 CoWake(task, true);
141 });
142 FFRT_BLOCK_TRACER(task->gid, cnt);
143 CoWait([&](CPUEUTask* inTask) -> bool {
144 WaitUntilEntry* we = inTask->wue;
145 wqlock.lock();
146 push_back(we);
147 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
148 wqlock.unlock();
149 if (DelayedWakeup(we->tp, we, we->cb)) {
150 return true;
151 } else {
152 if (!WeTimeoutProc(this, we)) {
153 return true;
154 }
155 inTask->wakeupTimeOut = true;
156 return false;
157 }
158 });
159 ret = task->wakeupTimeOut;
160 task->wue = nullptr;
161 task->wakeupTimeOut = false;
162 lk->lock();
163 return ret;
164 }
165
WeNotifyProc(WaitUntilEntry * we)166 bool WaitQueue::WeNotifyProc(WaitUntilEntry* we)
167 {
168 if (!we->hasWaitTime) {
169 return true;
170 }
171
172 auto expected = we_status::INIT;
173 if (!atomic_compare_exchange_strong_explicit(
174 &we->status, &expected, we_status::NOTIFIED, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
175 // The critical point we->status has been written, notify will no longer access we, it can be deleted
176 we->status.store(we_status::NOTIFIED, std::memory_order_release);
177 wqlock.unlock();
178 while (we->status.load(std::memory_order_acquire) != we_status::HANDOVER) {
179 }
180 delete we;
181 wqlock.lock();
182 return false;
183 }
184
185 return true;
186 }
187
NotifyOne()188 void WaitQueue::NotifyOne() noexcept
189 {
190 wqlock.lock();
191 while (!empty()) {
192 WaitUntilEntry* we = pop_front();
193 CPUEUTask* task = we->task;
194 bool blockThread = task != nullptr ?
195 (task->coRoutine != nullptr ? task->coRoutine->blockType == BlockType::BLOCK_THREAD : false) : false;
196 if (!USE_COROUTINE || we->weType == 2 || blockThread) {
197 std::unique_lock<std::mutex> lk(we->wl);
198 if (blockThread) {
199 task->coRoutine->blockType = BlockType::BLOCK_COROUTINE;
200 we->task = nullptr;
201 }
202 wqlock.unlock();
203 we->cv.notify_one();
204 } else {
205 if (!WeNotifyProc(we)) {
206 continue;
207 }
208 wqlock.unlock();
209 CoWake(task, false);
210 }
211 return;
212 }
213 wqlock.unlock();
214 }
215
NotifyAll()216 void WaitQueue::NotifyAll() noexcept
217 {
218 wqlock.lock();
219 while (!empty()) {
220 WaitUntilEntry* we = pop_front();
221 CPUEUTask* task = we->task;
222 bool blockThread = task != nullptr ?
223 (task->coRoutine != nullptr ? task->coRoutine->blockType == BlockType::BLOCK_THREAD : false) : false;
224 if (!USE_COROUTINE || we->weType == 2 || blockThread) {
225 std::unique_lock<std::mutex> lk(we->wl);
226 if (blockThread) {
227 task->coRoutine->blockType = BlockType::BLOCK_COROUTINE;
228 we->task = nullptr;
229 }
230 wqlock.unlock();
231 we->cv.notify_one();
232 } else {
233 if (!WeNotifyProc(we)) {
234 continue;
235 }
236 wqlock.unlock();
237 CoWake(task, false);
238 }
239 wqlock.lock();
240 }
241 wqlock.unlock();
242 }
243 } // namespace ffrt
244