1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wait_queue.h"
17 #include "sched/execute_ctx.h"
18 #include "eu/co_routine.h"
19 #include "dfx/log/ffrt_log_api.h"
20 #include "ffrt_trace.h"
21 #include "sync/mutex_private.h"
22 #include "tm/cpu_task.h"
23
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27 auto ctx = ExecuteCtx::Cur();
28 task = ctx->task;
29 }
30
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk,bool legacyMode,CPUEUTask * task)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task)
32 {
33 wqlock.lock();
34 if (legacyMode) {
35 task->blockType = BlockType::BLOCK_THREAD;
36 wn->task = task;
37 }
38 push_back(wn);
39 wqlock.unlock();
40 {
41 std::unique_lock<std::mutex> nl(wn->wl);
42 lk->unlock();
43 wn->cv.wait(nl);
44 }
45 wn->task = nullptr;
46 lk->lock();
47 }
48
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp,bool legacyMode,CPUEUTask * task)49 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk,
50 const TimePoint& tp, bool legacyMode, CPUEUTask* task)
51 {
52 bool ret = false;
53 wqlock.lock();
54 wn->status.store(we_status::INIT, std::memory_order_release);
55 if (legacyMode) {
56 task->blockType = BlockType::BLOCK_THREAD;
57 wn->task = task;
58 }
59 push_back(wn);
60 wqlock.unlock();
61 {
62 std::unique_lock<std::mutex> nl(wn->wl);
63 lk->unlock();
64 if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
65 ret = true;
66 }
67 }
68
69 // notify scenarios wn is already pooped
70 // in addition, condition variables may be spurious woken up
71 // in this case, wn needs to be removed from the linked list
72 if (ret || wn->status.load(std::memory_order_acquire) != we_status::NOTIFING) {
73 wqlock.lock();
74 remove(wn);
75 wqlock.unlock();
76 }
77 wn->task = nullptr;
78 lk->lock();
79 return ret;
80 }
81
SuspendAndWait(mutexPrivate * lk)82 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
83 {
84 ExecuteCtx* ctx = ExecuteCtx::Cur();
85 CPUEUTask* task = ctx->task;
86 if (ThreadWaitMode(task)) {
87 ThreadWait(&ctx->wn, lk, LegacyMode(task), task);
88 return;
89 }
90 task->wue = new (std::nothrow) WaitUntilEntry(task);
91 FFRT_COND_RETURN_VOID(task->wue == nullptr, "new WaitUntilEntry failed");
92 FFRT_BLOCK_TRACER(task->gid, cnd);
93 CoWait([&](CPUEUTask* task) -> bool {
94 wqlock.lock();
95 push_back(task->wue);
96 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
97 wqlock.unlock();
98 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
99 return true;
100 });
101 delete task->wue;
102 task->wue = nullptr;
103 lk->lock();
104 }
105
WeTimeoutProc(WaitQueue * wq,WaitUntilEntry * wue)106 bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
107 {
108 bool toWake = true;
109
110 // two kinds: 1) notify was not called, timeout grabbed the lock first;
111 if (wue->status.load(std::memory_order_acquire) == we_status::INIT) {
112 // timeout processes wue first, cv will not be processed again. timeout is responsible for destroying wue.
113 wq->remove(wue);
114 delete wue;
115 wue = nullptr;
116 } else {
117 // 2) notify enters the critical section, first writes the notify status, and then releases the lock
118 // notify is responsible for destroying wue.
119 wue->status.store(we_status::TIMEOUT_DONE, std::memory_order_release);
120 toWake = false;
121 }
122 return toWake;
123 }
124
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)125 int WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
126 {
127 ExecuteCtx* ctx = ExecuteCtx::Cur();
128 CPUEUTask* task = ctx->task;
129 int ret = ffrt_success;
130 if (ThreadWaitMode(task)) {
131 ret = ThreadWaitUntil(&ctx->wn, lk, tp, LegacyMode(task), task) ? ffrt_error_timedout : ffrt_success;
132 return ret;
133 }
134 task->wue = new WaitUntilEntry(task);
135 task->wue->hasWaitTime = true;
136 task->wue->tp = tp;
137 task->wue->cb = ([&](WaitEntry* we) {
138 WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
139 ffrt::CPUEUTask* task = wue->task;
140 wqlock.lock();
141 if (!WeTimeoutProc(this, wue)) {
142 wqlock.unlock();
143 return;
144 }
145 wqlock.unlock();
146 FFRT_LOGD("task(%d) time is up", task->gid);
147 CoRoutineFactory::CoWakeFunc(task, CoWakeType::TIMEOUT_WAKE);
148 });
149 FFRT_BLOCK_TRACER(task->gid, cnt);
150 CoWait([&](CPUEUTask* task) -> bool {
151 WaitUntilEntry* we = task->wue;
152 wqlock.lock();
153 push_back(we);
154 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
155 if (DelayedWakeup(we->tp, we, we->cb)) {
156 wqlock.unlock();
157 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
158 return true;
159 } else {
160 if (!WeTimeoutProc(this, we)) {
161 wqlock.unlock();
162 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
163 return true;
164 }
165 task->coWakeType = CoWakeType::TIMEOUT_WAKE;
166 wqlock.unlock();
167 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
168 return false;
169 }
170 });
171 ret = task->coWakeType == CoWakeType::NO_TIMEOUT_WAKE ? ffrt_success : ffrt_error_timedout;
172 task->wue = nullptr;
173 task->coWakeType = CoWakeType::NO_TIMEOUT_WAKE;
174 lk->lock();
175 return ret;
176 }
177
WeNotifyProc(WaitUntilEntry * we)178 void WaitQueue::WeNotifyProc(WaitUntilEntry* we)
179 {
180 if (!we->hasWaitTime) {
181 // For wait task without timeout, we will be deleted after the wait task wakes up.
182 return;
183 }
184
185 WaitEntry* dwe = static_cast<WaitEntry*>(we);
186 if (!DelayedRemove(we->tp, dwe)) {
187 // Deletion of timer failed during the notify process, indicating that timer cb has been executed at this time
188 // waiting for cb execution to complete, and marking notify as being processed.
189 we->status.store(we_status::NOTIFING, std::memory_order_release);
190 wqlock.unlock();
191 while (we->status.load(std::memory_order_acquire) != we_status::TIMEOUT_DONE) {
192 }
193 wqlock.lock();
194 }
195 delete we;
196 }
197
Notify(bool one)198 void WaitQueue::Notify(bool one) noexcept
199 {
200 // the caller should assure the WaitQueue lifetime.
201 // this function should assure the WaitQueue do not be access after the wqlock is empty(),
202 // that mean the last wait thread/co may destroy the WaitQueue.
203 // all the break-out should assure the wqlock is in unlock state.
204 // the continue should assure the wqlock is in lock state.
205 wqlock.lock();
206 for (; ;) {
207 if (empty()) {
208 wqlock.unlock();
209 break;
210 }
211 WaitUntilEntry* we = pop_front();
212 if (we == nullptr) {
213 wqlock.unlock();
214 break;
215 }
216 bool isEmpty = empty();
217 CPUEUTask* task = we->task;
218 if (ThreadNotifyMode(task) || we->weType == 2) {
219 std::unique_lock<std::mutex> lk(we->wl);
220 we->status.store(we_status::NOTIFING, std::memory_order_release);
221 if (BlockThread(task)) {
222 task->blockType = BlockType::BLOCK_COROUTINE;
223 we->task = nullptr;
224 }
225 wqlock.unlock();
226 we->cv.notify_one();
227 } else {
228 WeNotifyProc(we);
229 wqlock.unlock();
230 CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
231 }
232 if (isEmpty || one) {
233 break;
234 }
235 wqlock.lock();
236 }
237 }
238
239 } // namespace ffrt
240