1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wait_queue.h"
17 #include "sched/execute_ctx.h"
18 #include "core/task_ctx.h"
19 #include "eu/co_routine.h"
20 #include "dfx/log/ffrt_log_api.h"
21 #include "dfx/trace/ffrt_trace.h"
22 #include "sync/mutex_private.h"
23
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27 auto ctx = ExecuteCtx::Cur();
28 task = ctx->task;
29 }
30
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk)
32 {
33 wqlock.lock();
34 push_back(wn);
35 wqlock.unlock();
36 {
37 std::unique_lock<std::mutex> nl(wn->wl);
38 lk->unlock();
39 wn->cv.wait(nl);
40 }
41 wqlock.lock();
42 remove(wn);
43 wqlock.unlock();
44 lk->lock();
45 }
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp)46 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk, const TimePoint& tp)
47 {
48 bool ret = false;
49 wqlock.lock();
50 push_back(wn);
51 wqlock.unlock();
52 {
53 std::unique_lock<std::mutex> nl(wn->wl);
54 lk->unlock();
55 if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
56 ret = true;
57 }
58 }
59 wqlock.lock();
60 remove(wn);
61 wqlock.unlock();
62 lk->lock();
63 return ret;
64 }
65
SuspendAndWait(mutexPrivate * lk)66 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
67 {
68 ExecuteCtx* ctx = ExecuteCtx::Cur();
69 TaskCtx* task = ctx->task;
70 if (!USE_COROUTINE || ctx->task == nullptr) {
71 ThreadWait(&ctx->wn, lk);
72 return;
73 }
74 task->wue = new WaitUntilEntry(task);
75 FFRT_BLOCK_TRACER(task->gid, cnd);
76 CoWait([&](TaskCtx* inTask) -> bool {
77 wqlock.lock();
78 push_back(inTask->wue);
79 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
80 wqlock.unlock();
81 return true;
82 });
83 delete task->wue;
84 task->wue = nullptr;
85 lk->lock();
86 }
87
WeTimeoutProc(WaitUntilEntry * wue)88 bool WeTimeoutProc(WaitUntilEntry* wue)
89 {
90 int expected = we_status::INIT;
91 if (!atomic_compare_exchange_strong_explicit(
92 &wue->status, &expected, we_status::TIMEOUT, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
93 // The critical point wue->status has been written, notify will no longer access wue, it can be deleted
94 delete wue;
95 return false;
96 }
97
98 return true;
99 }
100
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)101 bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
102 {
103 bool ret = false;
104 ExecuteCtx* ctx = ExecuteCtx::Cur();
105 TaskCtx* task = ctx->task;
106 if (!USE_COROUTINE || task == nullptr) {
107 return ThreadWaitUntil(&ctx->wn, lk, tp);
108 }
109
110 task->wue = new WaitUntilEntry(task);
111 task->wue->hasWaitTime = true;
112 task->wue->tp = tp;
113 task->wue->cb = ([](WaitEntry* we) {
114 WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
115 ffrt::TaskCtx* task = wue->task;
116 if (!WeTimeoutProc(wue)) {
117 return;
118 }
119 FFRT_LOGD("task(%s) timeout out", task->label.c_str());
120 CoWake(task, true);
121 });
122 FFRT_BLOCK_TRACER(task->gid, cnt);
123 CoWait([&](TaskCtx* inTask) -> bool {
124 WaitUntilEntry* we = inTask->wue;
125 wqlock.lock();
126 push_back(we);
127 lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
128 wqlock.unlock();
129 if (DelayedWakeup(we->tp, we, we->cb)) {
130 return true;
131 } else {
132 if (!WeTimeoutProc(we)) {
133 return true;
134 }
135 inTask->wakeupTimeOut = true;
136 return false;
137 }
138 });
139 ret = task->wakeupTimeOut;
140 task->wue = nullptr;
141 task->wakeupTimeOut = false;
142 lk->lock();
143 return ret;
144 }
145
WeNotifyProc(WaitUntilEntry * we)146 bool WaitQueue::WeNotifyProc(WaitUntilEntry* we)
147 {
148 if (!we->hasWaitTime) {
149 return true;
150 }
151
152 auto expected = we_status::INIT;
153 if (!atomic_compare_exchange_strong_explicit(
154 &we->status, &expected, we_status::NOTIFIED, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
155 // The critical point we->status has been written, notify will no longer access we, it can be deleted
156 delete we;
157 return false;
158 }
159
160 return true;
161 }
162
NotifyOne()163 void WaitQueue::NotifyOne() noexcept
164 {
165 wqlock.lock();
166 if (empty()) {
167 wqlock.unlock();
168 return;
169 }
170 WaitUntilEntry* we = pop_front();
171 TaskCtx* task = we->task;
172 if (!USE_COROUTINE || we->weType == 2) {
173 std::unique_lock<std::mutex> lk(we->wl);
174 wqlock.unlock();
175 we->cv.notify_one();
176 } else {
177 wqlock.unlock();
178 if (!WeNotifyProc(we)) {
179 return;
180 }
181 CoWake(task, false);
182 }
183 }
NotifyAll()184 void WaitQueue::NotifyAll() noexcept
185 {
186 wqlock.lock();
187 while (!empty()) {
188 WaitUntilEntry* we = pop_front();
189 TaskCtx* task = we->task;
190 if (!USE_COROUTINE || we->weType == 2) {
191 std::unique_lock<std::mutex> lk(we->wl);
192 wqlock.unlock();
193 we->cv.notify_one();
194 } else {
195 wqlock.unlock();
196 if (!WeNotifyProc(we)) {
197 continue;
198 }
199 CoWake(task, false);
200 }
201 wqlock.lock();
202 }
203 wqlock.unlock();
204 }
205 } // namespace ffrt
206