• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "wait_queue.h"
17 #include "sched/execute_ctx.h"
18 #include "eu/co_routine.h"
19 #include "dfx/log/ffrt_log_api.h"
20 #include "ffrt_trace.h"
21 #include "sync/mutex_private.h"
22 #include "tm/cpu_task.h"
23 
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27     auto ctx = ExecuteCtx::Cur();
28     task = ctx->task;
29 }
30 
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk,bool legacyMode,CPUEUTask * task)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task)
32 {
33     wqlock.lock();
34     if (legacyMode) {
35         task->blockType = BlockType::BLOCK_THREAD;
36         wn->task = task;
37     }
38     push_back(wn);
39     wqlock.unlock();
40     {
41         std::unique_lock<std::mutex> nl(wn->wl);
42         lk->unlock();
43         wn->cv.wait(nl);
44     }
45     wn->task = nullptr;
46     lk->lock();
47 }
48 
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp,bool legacyMode,CPUEUTask * task)49 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk,
50     const TimePoint& tp, bool legacyMode, CPUEUTask* task)
51 {
52     bool ret = false;
53     wqlock.lock();
54     wn->status.store(we_status::INIT, std::memory_order_release);
55     if (legacyMode) {
56         task->blockType = BlockType::BLOCK_THREAD;
57         wn->task = task;
58     }
59     push_back(wn);
60     wqlock.unlock();
61     {
62         std::unique_lock<std::mutex> nl(wn->wl);
63         lk->unlock();
64         if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
65             ret = true;
66         }
67     }
68 
69     // notify scenarios wn is already pooped
70     // in addition, condition variables may be spurious woken up
71     // in this case, wn needs to be removed from the linked list
72     if (ret || wn->status.load(std::memory_order_acquire) != we_status::NOTIFING) {
73         wqlock.lock();
74         remove(wn);
75         wqlock.unlock();
76     }
77     wn->task = nullptr;
78     lk->lock();
79     return ret;
80 }
81 
SuspendAndWait(mutexPrivate * lk)82 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
83 {
84     ExecuteCtx* ctx = ExecuteCtx::Cur();
85     CPUEUTask* task = ctx->task;
86     if (ThreadWaitMode(task)) {
87         ThreadWait(&ctx->wn, lk, LegacyMode(task), task);
88         return;
89     }
90     task->wue = new (std::nothrow) WaitUntilEntry(task);
91     FFRT_COND_RETURN_VOID(task->wue == nullptr, "new WaitUntilEntry failed");
92     FFRT_BLOCK_TRACER(task->gid, cnd);
93     CoWait([&](CPUEUTask* task) -> bool {
94         wqlock.lock();
95         push_back(task->wue);
96         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
97         wqlock.unlock();
98         // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
99         return true;
100     });
101     delete task->wue;
102     task->wue = nullptr;
103     lk->lock();
104 }
105 
WeTimeoutProc(WaitQueue * wq,WaitUntilEntry * wue)106 bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
107 {
108     bool toWake = true;
109 
110     // two kinds: 1) notify was not called, timeout grabbed the lock first;
111     if (wue->status.load(std::memory_order_acquire) == we_status::INIT) {
112         // timeout processes wue first, cv will not be processed again. timeout is responsible for destroying wue.
113         wq->remove(wue);
114         delete wue;
115         wue = nullptr;
116     } else {
117         // 2) notify enters the critical section, first writes the notify status, and then releases the lock
118         // notify is responsible for destroying wue.
119         wue->status.store(we_status::TIMEOUT_DONE, std::memory_order_release);
120         toWake = false;
121     }
122     return toWake;
123 }
124 
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)125 int WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
126 {
127     ExecuteCtx* ctx = ExecuteCtx::Cur();
128     CPUEUTask* task = ctx->task;
129     int ret = ffrt_success;
130     if (ThreadWaitMode(task)) {
131         ret = ThreadWaitUntil(&ctx->wn, lk, tp, LegacyMode(task), task) ? ffrt_error_timedout : ffrt_success;
132         return ret;
133     }
134     task->wue = new WaitUntilEntry(task);
135     task->wue->hasWaitTime = true;
136     task->wue->tp = tp;
137     task->wue->cb = ([&](WaitEntry* we) {
138         WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
139         ffrt::CPUEUTask* task = wue->task;
140         wqlock.lock();
141         if (!WeTimeoutProc(this, wue)) {
142             wqlock.unlock();
143             return;
144         }
145         wqlock.unlock();
146         FFRT_LOGD("task(%d) time is up", task->gid);
147         CoRoutineFactory::CoWakeFunc(task, CoWakeType::TIMEOUT_WAKE);
148     });
149     FFRT_BLOCK_TRACER(task->gid, cnt);
150     CoWait([&](CPUEUTask* task) -> bool {
151         WaitUntilEntry* we = task->wue;
152         wqlock.lock();
153         push_back(we);
154         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
155         if (DelayedWakeup(we->tp, we, we->cb)) {
156             wqlock.unlock();
157             // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
158             return true;
159         } else {
160             if (!WeTimeoutProc(this, we)) {
161                 wqlock.unlock();
162                 // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
163                 return true;
164             }
165             task->coWakeType = CoWakeType::TIMEOUT_WAKE;
166             wqlock.unlock();
167             // The ownership of the task belongs to WaitQueue list, and the task cannot be accessed anymore.
168             return false;
169         }
170     });
171     ret = task->coWakeType == CoWakeType::NO_TIMEOUT_WAKE ? ffrt_success : ffrt_error_timedout;
172     task->wue = nullptr;
173     task->coWakeType = CoWakeType::NO_TIMEOUT_WAKE;
174     lk->lock();
175     return ret;
176 }
177 
WeNotifyProc(WaitUntilEntry * we)178 void WaitQueue::WeNotifyProc(WaitUntilEntry* we)
179 {
180     if (!we->hasWaitTime) {
181         // For wait task without timeout, we will be deleted after the wait task wakes up.
182         return;
183     }
184 
185     WaitEntry* dwe = static_cast<WaitEntry*>(we);
186     if (!DelayedRemove(we->tp, dwe)) {
187         // Deletion of timer failed during the notify process, indicating that timer cb has been executed at this time
188         // waiting for cb execution to complete, and marking notify as being processed.
189         we->status.store(we_status::NOTIFING, std::memory_order_release);
190         wqlock.unlock();
191         while (we->status.load(std::memory_order_acquire) != we_status::TIMEOUT_DONE) {
192         }
193         wqlock.lock();
194     }
195     delete we;
196 }
197 
Notify(bool one)198 void WaitQueue::Notify(bool one) noexcept
199 {
200     // the caller should assure the WaitQueue lifetime.
201     // this function should assure the WaitQueue do not be access after the wqlock is empty(),
202     // that mean the last wait thread/co may destroy the WaitQueue.
203     // all the break-out should assure the wqlock is in unlock state.
204     // the continue should assure the wqlock is in lock state.
205     wqlock.lock();
206     for (; ;) {
207         if (empty()) {
208             wqlock.unlock();
209             break;
210         }
211         WaitUntilEntry* we = pop_front();
212         if (we == nullptr) {
213             wqlock.unlock();
214             break;
215         }
216         bool isEmpty = empty();
217         CPUEUTask* task = we->task;
218         if (ThreadNotifyMode(task) || we->weType == 2) {
219             std::unique_lock<std::mutex> lk(we->wl);
220             we->status.store(we_status::NOTIFING, std::memory_order_release);
221             if (BlockThread(task)) {
222                 task->blockType = BlockType::BLOCK_COROUTINE;
223                 we->task = nullptr;
224             }
225             wqlock.unlock();
226             we->cv.notify_one();
227         } else {
228             WeNotifyProc(we);
229             wqlock.unlock();
230             CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
231         }
232         if (isEmpty || one) {
233             break;
234         }
235         wqlock.lock();
236     }
237 }
238 
239 } // namespace ffrt
240