1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wait_queue.h"
17 #include "sched/execute_ctx.h"
18 #include "eu/co_routine.h"
19 #include "dfx/log/ffrt_log_api.h"
20 #include "dfx/trace/ffrt_trace.h"
21 #include "sync/mutex_private.h"
22 #include "tm/cpu_task.h"
23
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27 auto ctx = ExecuteCtx::Cur();
28 task = ctx->task;
29 }
30
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk,TaskBase * task)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, TaskBase* task)
32 {
33 {
34 std::lock_guard lg(wqlock);
35 wn->task = task;
36 push_back(wn);
37 }
38 {
39 std::unique_lock<std::mutex> nl(wn->wl);
40 lk->unlock();
41 wn->cv.wait(nl);
42 }
43 wn->task = nullptr;
44 lk->lock();
45 if (task) {
46 task->Wake();
47 }
48 }
49
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp,TaskBase * task)50 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk, const TimePoint& tp, TaskBase* task)
51 {
52 bool ret = false;
53 {
54 std::lock_guard lg(wqlock);
55 wn->status.store(WaitEntryStatus::INIT, std::memory_order_release);
56 wn->task = task;
57 push_back(wn);
58 }
59 {
60 std::unique_lock<std::mutex> nl(wn->wl);
61 lk->unlock();
62 if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
63 ret = true;
64 }
65 }
66 // notify scenarios WaitUntilEntry `wn` is already popped
67 // in addition, condition variables may be spurious woken up
68 // in this case, wn needs to be removed from the linked list
69 if (ret || wn->status.load(std::memory_order_acquire) != WaitEntryStatus::NOTIFYING) {
70 std::lock_guard lg(wqlock);
71 remove(wn);
72 }
73 // note that one wn->task can be set to nullptr only either after wn is removed from the queue,
74 // i.e. after the timeout occurred, or after the notify of the condition variable.
75 // In both cases this write will be ordered after the read of `we->task` in
76 // WaitQueue::Notify (if this entry is popped) and a data-race will not occur.
77 wn->task = nullptr;
78 lk->lock();
79 if (task) {
80 task->Wake();
81 }
82 return ret;
83 }
84
SuspendAndWait(mutexPrivate * lk)85 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
86 {
87 ExecuteCtx* ctx = ExecuteCtx::Cur();
88 TaskBase* task = ctx->task;
89 if (task == nullptr || task->Block() == BlockType::BLOCK_THREAD) {
90 ThreadWait(&ctx->wn, lk, task);
91 return;
92 }
93 CoTask* coTask = static_cast<CoTask*>(task);
94 coTask->wue = new (std::nothrow) WaitUntilEntry(task);
95 FFRT_COND_RETURN_VOID(coTask->wue == nullptr, "new WaitUntilEntry failed");
96 FFRT_BLOCK_TRACER(coTask->gid, cnd);
97 CoWait([&](CoTask* task) -> bool {
98 std::lock_guard lg(wqlock);
99 push_back(task->wue);
100 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
101 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
102 return true;
103 });
104 delete coTask->wue;
105 coTask->wue = nullptr;
106 lk->lock();
107 }
108
WeTimeoutProc(WaitQueue * wq,WaitUntilEntry * wue)109 bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
110 {
111 bool toWake = true;
112
113 // two kinds: 1) notify was not called, timeout grabbed the lock first;
114 if (wue->status.load(std::memory_order_acquire) == WaitEntryStatus::INIT) {
115 // timeout processes wue first, cv will not be processed again. timeout is responsible for destroying wue.
116 wq->remove(wue);
117 delete wue;
118 wue = nullptr;
119 } else {
120 // 2) notify enters the critical section, first writes the notify status, and then releases the lock
121 // notify is responsible for destroying wue.
122 wue->status.store(WaitEntryStatus::TIMEOUT_DONE, std::memory_order_release);
123 toWake = false;
124 }
125 return toWake;
126 }
127
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)128 int WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
129 {
130 ExecuteCtx* ctx = ExecuteCtx::Cur();
131 TaskBase* task = ctx->task;
132 int ret = ffrt_success;
133 if (task == nullptr || task->Block() == BlockType::BLOCK_THREAD) {
134 return ThreadWaitUntil(&ctx->wn, lk, tp, task) ? ffrt_error_timedout : ffrt_success;
135 }
136 CoTask* coTask = static_cast<CoTask*>(task);
137 coTask->wue = new WaitUntilEntry(task);
138 coTask->wue->hasWaitTime = true;
139 coTask->wue->tp = tp;
140 coTask->wue->cb = ([&](WaitEntry* we) {
141 WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
142 ffrt::TaskBase* task = wue->task;
143 {
144 std::lock_guard lock(wqlock);
145 if (!WeTimeoutProc(this, wue)) {
146 return;
147 }
148 }
149 FFRT_LOGD("task(%d) time is up", task->gid);
150 CoRoutineFactory::CoWakeFunc(static_cast<CoTask*>(task), CoWakeType::TIMEOUT_WAKE);
151 });
152 FFRT_BLOCK_TRACER(task->gid, cnt);
153 CoWait([&](CoTask* task) -> bool {
154 WaitUntilEntry* we = task->wue;
155 std::lock_guard lg(wqlock);
156 push_back(we);
157 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
158 if (DelayedWakeup(we->tp, we, we->cb)) {
159 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
160 return true;
161 } else {
162 if (!WeTimeoutProc(this, we)) {
163 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
164 return true;
165 }
166 task->coWakeType = CoWakeType::TIMEOUT_WAKE;
167 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
168 return false;
169 }
170 });
171 ret = coTask->coWakeType == CoWakeType::NO_TIMEOUT_WAKE ? ffrt_success : ffrt_error_timedout;
172 coTask->wue = nullptr;
173 coTask->coWakeType = CoWakeType::NO_TIMEOUT_WAKE;
174 lk->lock();
175 return ret;
176 }
177
WeNotifyProc(WaitUntilEntry * we)178 void WaitQueue::WeNotifyProc(WaitUntilEntry* we)
179 {
180 if (!we->hasWaitTime) {
181 // For wait task without timeout, we will be deleted after the wait task wakes up.
182 return;
183 }
184
185 WaitEntry* dwe = static_cast<WaitEntry*>(we);
186 if (!DelayedRemove(we->tp, dwe)) {
187 // Deletion of timer failed during the notify process, indicating that timer cb has been executed at this time
188 // waiting for cb execution to complete, and marking notify as being processed.
189 we->status.store(WaitEntryStatus::NOTIFYING, std::memory_order_release);
190 wqlock.unlock();
191 while (we->status.load(std::memory_order_acquire) != WaitEntryStatus::TIMEOUT_DONE) {
192 }
193 wqlock.lock();
194 }
195 delete we;
196 }
197
Notify(bool one)198 void WaitQueue::Notify(bool one) noexcept
199 {
200 // the caller should assure the WaitQueue lifetime.
201 // this function should assure the WaitQueue do not be access after the wqlock is empty(),
202 // that mean the last wait thread/co may destroy the WaitQueue.
203 // all the break-out should assure the wqlock is in unlock state.
204 // the continue should assure the wqlock is in lock state.
205
206 std::unique_lock lock(wqlock);
207 for (; ;) {
208 if (empty()) {
209 break;
210 }
211 WaitUntilEntry* we = pop_front();
212 if (we == nullptr) {
213 break;
214 }
215 bool isEmpty = empty();
216 TaskBase* task = we->task;
217 if (task == nullptr || task->GetBlockType() == BlockType::BLOCK_THREAD) {
218 std::lock_guard<std::mutex> lg(we->wl);
219 we->status.store(WaitEntryStatus::NOTIFYING, std::memory_order_release);
220 lock.unlock();
221 we->cv.notify_one();
222 } else {
223 WeNotifyProc(we);
224 lock.unlock();
225 CoRoutineFactory::CoWakeFunc(static_cast<CoTask*>(task), CoWakeType::NO_TIMEOUT_WAKE);
226 }
227 if (isEmpty || one) {
228 break;
229 }
230 lock.lock();
231 }
232 }
233
234 } // namespace ffrt
235