• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "wait_queue.h"
17 #include "eu/co_routine.h"
18 #include "dfx/log/ffrt_log_api.h"
19 #include "ffrt_trace.h"
20 #include "internal_inc/types.h"
21 #include "sync/mutex_private.h"
22 #include "tm/cpu_task.h"
23 
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27     auto ctx = ExecuteCtx::Cur();
28     task = ctx->task;
29 }
30 
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk,bool legacyMode,CPUEUTask * task)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task)
32 {
33     wqlock.lock();
34     if (legacyMode) {
35         task->coRoutine->blockType = BlockType::BLOCK_THREAD;
36         wn->task = task;
37     }
38     push_back(wn);
39     wqlock.unlock();
40     {
41         std::unique_lock<std::mutex> nl(wn->wl);
42         lk->unlock();
43         wn->cv.wait(nl);
44     }
45     wqlock.lock();
46     remove(wn);
47     wqlock.unlock();
48     lk->lock();
49 }
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp,bool legacyMode,CPUEUTask * task)50 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk,
51     const TimePoint& tp, bool legacyMode, CPUEUTask* task)
52 {
53     bool ret = false;
54     wqlock.lock();
55     if (legacyMode) {
56         task->coRoutine->blockType = BlockType::BLOCK_THREAD;
57         wn->task = task;
58     }
59     push_back(wn);
60     wqlock.unlock();
61     {
62         std::unique_lock<std::mutex> nl(wn->wl);
63         lk->unlock();
64         if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
65             ret = true;
66         }
67     }
68     wqlock.lock();
69     remove(wn);
70     wqlock.unlock();
71     lk->lock();
72     return ret;
73 }
74 
SuspendAndWait(mutexPrivate * lk)75 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
76 {
77     ExecuteCtx* ctx = ExecuteCtx::Cur();
78     CPUEUTask* task = ctx->task;
79     bool legacyMode = task != nullptr ? (task->coRoutine != nullptr ? task->coRoutine->legacyMode : false) : false;
80     if (!USE_COROUTINE || task == nullptr || legacyMode) {
81         ThreadWait(&ctx->wn, lk, legacyMode, task);
82         return;
83     }
84     task->wue = new WaitUntilEntry(task);
85     FFRT_BLOCK_TRACER(task->gid, cnd);
86     CoWait([&](CPUEUTask* inTask) -> bool {
87         wqlock.lock();
88         push_back(inTask->wue);
89         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
90         wqlock.unlock();
91         return true;
92     });
93     delete task->wue;
94     task->wue = nullptr;
95     lk->lock();
96 }
97 
WeTimeoutProc(WaitQueue * wq,WaitUntilEntry * wue)98 bool WeTimeoutProc(WaitQueue* wq, WaitUntilEntry* wue)
99 {
100     int expected = we_status::INIT;
101     if (!atomic_compare_exchange_strong_explicit(
102         &wue->status, &expected, we_status::TIMEOUT, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
103         // The critical point wue->status has been written, notify will no longer access wue, it can be deleted
104         delete wue;
105         return false;
106     }
107 
108     wq->wqlock.lock();
109     if (wue->status.load(std::memory_order_acquire) == we_status::TIMEOUT) {
110         wq->remove(wue);
111         delete wue;
112         wue = nullptr;
113     } else {
114         wue->status.store(we_status::HANDOVER, std::memory_order_release);
115     }
116     wq->wqlock.unlock();
117     return true;
118 }
119 
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)120 bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
121 {
122     bool ret = false;
123     ExecuteCtx* ctx = ExecuteCtx::Cur();
124     CPUEUTask* task = ctx->task;
125     bool legacyMode = task != nullptr ? (task->coRoutine != nullptr ? task->coRoutine->legacyMode : false) : false;
126     if (!USE_COROUTINE || task == nullptr || legacyMode) {
127         return ThreadWaitUntil(&ctx->wn, lk, tp, legacyMode, task);
128     }
129 
130     task->wue = new WaitUntilEntry(task);
131     task->wue->hasWaitTime = true;
132     task->wue->tp = tp;
133     task->wue->cb = ([&](WaitEntry* we) {
134         WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
135         ffrt::CPUEUTask* task = wue->task;
136         if (!WeTimeoutProc(this, wue)) {
137             return;
138         }
139         FFRT_LOGD("task(%s) timeout out", task->label.c_str());
140         CoWake(task, true);
141     });
142     FFRT_BLOCK_TRACER(task->gid, cnt);
143     CoWait([&](CPUEUTask* inTask) -> bool {
144         WaitUntilEntry* we = inTask->wue;
145         wqlock.lock();
146         push_back(we);
147         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
148         wqlock.unlock();
149         if (DelayedWakeup(we->tp, we, we->cb)) {
150             return true;
151         } else {
152             if (!WeTimeoutProc(this, we)) {
153                 return true;
154             }
155             inTask->wakeupTimeOut = true;
156             return false;
157         }
158     });
159     ret = task->wakeupTimeOut;
160     task->wue = nullptr;
161     task->wakeupTimeOut = false;
162     lk->lock();
163     return ret;
164 }
165 
WeNotifyProc(WaitUntilEntry * we)166 bool WaitQueue::WeNotifyProc(WaitUntilEntry* we)
167 {
168     if (!we->hasWaitTime) {
169         return true;
170     }
171 
172     auto expected = we_status::INIT;
173     if (!atomic_compare_exchange_strong_explicit(
174         &we->status, &expected, we_status::NOTIFIED, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
175         // The critical point we->status has been written, notify will no longer access we, it can be deleted
176         we->status.store(we_status::NOTIFIED, std::memory_order_release);
177         wqlock.unlock();
178         while (we->status.load(std::memory_order_acquire) != we_status::HANDOVER) {
179         }
180         delete we;
181         wqlock.lock();
182         return false;
183     }
184 
185     return true;
186 }
187 
NotifyOne()188 void WaitQueue::NotifyOne() noexcept
189 {
190     wqlock.lock();
191     while (!empty()) {
192         WaitUntilEntry* we = pop_front();
193         CPUEUTask* task = we->task;
194         bool blockThread = task != nullptr ?
195             (task->coRoutine != nullptr ? task->coRoutine->blockType == BlockType::BLOCK_THREAD : false) : false;
196         if (!USE_COROUTINE || we->weType == 2 || blockThread) {
197             std::unique_lock<std::mutex> lk(we->wl);
198             if (blockThread) {
199                 task->coRoutine->blockType = BlockType::BLOCK_COROUTINE;
200                 we->task = nullptr;
201             }
202             wqlock.unlock();
203             we->cv.notify_one();
204         } else {
205             if (!WeNotifyProc(we)) {
206                 continue;
207             }
208             wqlock.unlock();
209             CoWake(task, false);
210         }
211         return;
212     }
213     wqlock.unlock();
214 }
215 
NotifyAll()216 void WaitQueue::NotifyAll() noexcept
217 {
218     wqlock.lock();
219     while (!empty()) {
220         WaitUntilEntry* we = pop_front();
221         CPUEUTask* task = we->task;
222         bool blockThread = task != nullptr ?
223             (task->coRoutine != nullptr ? task->coRoutine->blockType == BlockType::BLOCK_THREAD : false) : false;
224         if (!USE_COROUTINE || we->weType == 2 || blockThread) {
225             std::unique_lock<std::mutex> lk(we->wl);
226             if (blockThread) {
227                 task->coRoutine->blockType = BlockType::BLOCK_COROUTINE;
228                 we->task = nullptr;
229             }
230             wqlock.unlock();
231             we->cv.notify_one();
232         } else {
233             if (!WeNotifyProc(we)) {
234                 continue;
235             }
236             wqlock.unlock();
237             CoWake(task, false);
238         }
239         wqlock.lock();
240     }
241     wqlock.unlock();
242 }
243 } // namespace ffrt
244