• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "wait_queue.h"
17 #include "sched/execute_ctx.h"
18 #include "core/task_ctx.h"
19 #include "eu/co_routine.h"
20 #include "dfx/log/ffrt_log_api.h"
21 #include "dfx/trace/ffrt_trace.h"
22 #include "sync/mutex_private.h"
23 
24 namespace ffrt {
TaskWithNode()25 TaskWithNode::TaskWithNode()
26 {
27     auto ctx = ExecuteCtx::Cur();
28     task = ctx->task;
29 }
30 
ThreadWait(WaitUntilEntry * wn,mutexPrivate * lk)31 void WaitQueue::ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk)
32 {
33     wqlock.lock();
34     push_back(wn);
35     wqlock.unlock();
36     {
37         std::unique_lock<std::mutex> nl(wn->wl);
38         lk->unlock();
39         wn->cv.wait(nl);
40     }
41     wqlock.lock();
42     remove(wn);
43     wqlock.unlock();
44     lk->lock();
45 }
ThreadWaitUntil(WaitUntilEntry * wn,mutexPrivate * lk,const TimePoint & tp)46 bool WaitQueue::ThreadWaitUntil(WaitUntilEntry* wn, mutexPrivate* lk, const TimePoint& tp)
47 {
48     bool ret = false;
49     wqlock.lock();
50     push_back(wn);
51     wqlock.unlock();
52     {
53         std::unique_lock<std::mutex> nl(wn->wl);
54         lk->unlock();
55         if (wn->cv.wait_until(nl, tp) == std::cv_status::timeout) {
56             ret = true;
57         }
58     }
59     wqlock.lock();
60     remove(wn);
61     wqlock.unlock();
62     lk->lock();
63     return ret;
64 }
65 
SuspendAndWait(mutexPrivate * lk)66 void WaitQueue::SuspendAndWait(mutexPrivate* lk)
67 {
68     ExecuteCtx* ctx = ExecuteCtx::Cur();
69     TaskCtx* task = ctx->task;
70     if (!USE_COROUTINE || ctx->task == nullptr) {
71         ThreadWait(&ctx->wn, lk);
72         return;
73     }
74     task->wue = new WaitUntilEntry(task);
75     FFRT_BLOCK_TRACER(task->gid, cnd);
76     CoWait([&](TaskCtx* inTask) -> bool {
77         wqlock.lock();
78         push_back(inTask->wue);
79         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
80         wqlock.unlock();
81         return true;
82     });
83     delete task->wue;
84     task->wue = nullptr;
85     lk->lock();
86 }
87 
WeTimeoutProc(WaitUntilEntry * wue)88 bool WeTimeoutProc(WaitUntilEntry* wue)
89 {
90     int expected = we_status::INIT;
91     if (!atomic_compare_exchange_strong_explicit(
92         &wue->status, &expected, we_status::TIMEOUT, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
93         // The critical point wue->status has been written, notify will no longer access wue, it can be deleted
94         delete wue;
95         return false;
96     }
97 
98     return true;
99 }
100 
SuspendAndWaitUntil(mutexPrivate * lk,const TimePoint & tp)101 bool WaitQueue::SuspendAndWaitUntil(mutexPrivate* lk, const TimePoint& tp) noexcept
102 {
103     bool ret = false;
104     ExecuteCtx* ctx = ExecuteCtx::Cur();
105     TaskCtx* task = ctx->task;
106     if (!USE_COROUTINE || task == nullptr) {
107         return ThreadWaitUntil(&ctx->wn, lk, tp);
108     }
109 
110     task->wue = new WaitUntilEntry(task);
111     task->wue->hasWaitTime = true;
112     task->wue->tp = tp;
113     task->wue->cb = ([](WaitEntry* we) {
114         WaitUntilEntry* wue = static_cast<WaitUntilEntry*>(we);
115         ffrt::TaskCtx* task = wue->task;
116         if (!WeTimeoutProc(wue)) {
117             return;
118         }
119         FFRT_LOGD("task(%s) timeout out", task->label.c_str());
120         CoWake(task, true);
121     });
122     FFRT_BLOCK_TRACER(task->gid, cnt);
123     CoWait([&](TaskCtx* inTask) -> bool {
124         WaitUntilEntry* we = inTask->wue;
125         wqlock.lock();
126         push_back(we);
127         lk->unlock(); // Unlock needs to be in wqlock protection, guaranteed to be executed before lk.lock after CoWake
128         wqlock.unlock();
129         if (DelayedWakeup(we->tp, we, we->cb)) {
130             return true;
131         } else {
132             if (!WeTimeoutProc(we)) {
133                 return true;
134             }
135             inTask->wakeupTimeOut = true;
136             return false;
137         }
138     });
139     ret = task->wakeupTimeOut;
140     task->wue = nullptr;
141     task->wakeupTimeOut = false;
142     lk->lock();
143     return ret;
144 }
145 
WeNotifyProc(WaitUntilEntry * we)146 bool WaitQueue::WeNotifyProc(WaitUntilEntry* we)
147 {
148     if (!we->hasWaitTime) {
149         return true;
150     }
151 
152     auto expected = we_status::INIT;
153     if (!atomic_compare_exchange_strong_explicit(
154         &we->status, &expected, we_status::NOTIFIED, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
155         // The critical point we->status has been written, notify will no longer access we, it can be deleted
156         delete we;
157         return false;
158     }
159 
160     return true;
161 }
162 
NotifyOne()163 void WaitQueue::NotifyOne() noexcept
164 {
165     wqlock.lock();
166     if (empty()) {
167         wqlock.unlock();
168         return;
169     }
170     WaitUntilEntry* we = pop_front();
171     TaskCtx* task = we->task;
172     if (!USE_COROUTINE || we->weType == 2) {
173         std::unique_lock<std::mutex> lk(we->wl);
174         wqlock.unlock();
175         we->cv.notify_one();
176     } else {
177         wqlock.unlock();
178         if (!WeNotifyProc(we)) {
179             return;
180         }
181         CoWake(task, false);
182     }
183 }
NotifyAll()184 void WaitQueue::NotifyAll() noexcept
185 {
186     wqlock.lock();
187     while (!empty()) {
188         WaitUntilEntry* we = pop_front();
189         TaskCtx* task = we->task;
190         if (!USE_COROUTINE || we->weType == 2) {
191             std::unique_lock<std::mutex> lk(we->wl);
192             wqlock.unlock();
193             we->cv.notify_one();
194         } else {
195             wqlock.unlock();
196             if (!WeNotifyProc(we)) {
197                 continue;
198             }
199             CoWake(task, false);
200         }
201         wqlock.lock();
202     }
203     wqlock.unlock();
204 }
205 } // namespace ffrt
206