1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "tel_event_queue.h"
17
18 #include "tel_event_handler.h"
19 #include "telephony_log_wrapper.h"
20
21 namespace OHOS {
22 namespace Telephony {
23 namespace {
24 enum class TelPriority : uint32_t { IMMEDIATE = 0, HIGH, LOW };
25 }
26
27 static constexpr int PRINT_INTELVAL_MINUTES = 5;
28
TelEventQueue(const std::string & name)29 TelEventQueue::TelEventQueue(const std::string &name) : name_(name)
30 {
31 TELEPHONY_LOGI("%{public}s create", name_.c_str());
32 name_ = std::string(name_.c_str());
33 queue_ = std::make_shared<ffrt::queue>(name_.c_str());
34 }
35
~TelEventQueue()36 TelEventQueue::~TelEventQueue()
37 {
38 TELEPHONY_LOGI("%{public}s destroy", name_.c_str());
39 RemoveAllEvents();
40 ClearCurrentTask(true);
41 if (!queue_) {
42 return;
43 }
44 if (!curTask_) {
45 return;
46 }
47 TELEPHONY_LOGD("%{public}s need to wait", name_.c_str());
48 queue_->wait(curTask_);
49 curTask_ = ffrt::task_handle();
50 std::lock_guard<std::mutex> lock(taskCtx_);
51 queue_ = nullptr;
52 }
53
GetCurHandleTime()54 AppExecFwk::InnerEvent::TimePoint TelEventQueue::GetCurHandleTime()
55 {
56 std::lock_guard<std::mutex> lock(memberCtx_);
57 return curHandleTime_;
58 }
59
SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint handleTime)60 void TelEventQueue::SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint handleTime)
61 {
62 std::lock_guard<std::mutex> lock(memberCtx_);
63 curHandleTime_ = handleTime;
64 }
65
Submit(AppExecFwk::InnerEvent::Pointer & event,AppExecFwk::EventQueue::Priority priority)66 void TelEventQueue::Submit(AppExecFwk::InnerEvent::Pointer &event, AppExecFwk::EventQueue::Priority priority)
67 {
68 InsertEventsInner(event, priority);
69 if (GetHandleTime() < GetCurHandleTime()) {
70 GetNextQueueId();
71 ClearCurrentTask(false);
72 SubmitInner(queueId_.load());
73 }
74 }
75
ToTelPriority(AppExecFwk::EventQueue::Priority priority)76 uint32_t TelEventQueue::ToTelPriority(AppExecFwk::EventQueue::Priority priority)
77 {
78 if (priority == AppExecFwk::EventQueue::Priority::IMMEDIATE) {
79 return static_cast<uint32_t>(TelPriority::IMMEDIATE);
80 }
81 if (priority == AppExecFwk::EventQueue::Priority::HIGH) {
82 return static_cast<uint32_t>(TelPriority::HIGH);
83 }
84 return static_cast<uint32_t>(TelPriority::LOW);
85 }
86
InsertEventsInner(AppExecFwk::InnerEvent::Pointer & event,AppExecFwk::EventQueue::Priority priority)87 void TelEventQueue::InsertEventsInner(AppExecFwk::InnerEvent::Pointer &event, AppExecFwk::EventQueue::Priority priority)
88 {
89 if (event == nullptr) {
90 return;
91 }
92 std::unique_lock<std::mutex> lock(eventCtx_);
93 auto &events = eventLists_[ToTelPriority(priority)].events;
94 auto f = [](const AppExecFwk::InnerEvent::Pointer &first, const AppExecFwk::InnerEvent::Pointer &second) {
95 if (!first || !second) {
96 return false;
97 }
98 return first->GetHandleTime() < second->GetHandleTime();
99 };
100 auto it = std::upper_bound(events.begin(), events.end(), event, f);
101 auto innerEventId = event->GetInnerEventId();
102 events.insert(it, std::move(event));
103 eventStats_.CalculationInsertQueueEvents();
104 eventStats_.PrintEventStats(name_);
105 TELEPHONY_LOGD(
106 "%{public}s InsertEventsInner eventId %{public}d finish", name_.c_str(), static_cast<int32_t>(innerEventId));
107 }
108
ClearCurrentTask(bool isNeedEnd)109 void TelEventQueue::ClearCurrentTask(bool isNeedEnd)
110 {
111 std::lock_guard<std::mutex> lock(taskCtx_);
112 if (!curTask_ || !queue_) {
113 return;
114 }
115 queue_->cancel(curTask_);
116 if (isNeedEnd) {
117 GetNextQueueId();
118 return;
119 }
120 curTask_ = ffrt::task_handle();
121 TELEPHONY_LOGD("%{public}s cancel current task", name_.c_str());
122 }
123
SubmitInner(int32_t queueId)124 void TelEventQueue::SubmitInner(int32_t queueId)
125 {
126 if (!queue_) {
127 TELEPHONY_LOGE("%{public}s queue is nullptr", name_.c_str());
128 return;
129 }
130 auto handleTime = GetHandleTime();
131 if (handleTime == AppExecFwk::InnerEvent::TimePoint::max()) {
132 TELEPHONY_LOGD("%{public}s SubmitInner has no task", name_.c_str());
133 return;
134 }
135 int64_t delayTime = 0;
136 AppExecFwk::InnerEvent::TimePoint now = AppExecFwk::InnerEvent::Clock::now();
137 if (handleTime > now) {
138 delayTime = std::chrono::duration_cast<std::chrono::microseconds>(handleTime - now).count();
139 }
140 SubmitToFFRT(queueId, handleTime, delayTime);
141 }
142
GetNextQueueId()143 int32_t TelEventQueue::GetNextQueueId()
144 {
145 if (queueId_ >= INT32_MAX) {
146 queueId_ = 1;
147 }
148 return queueId_++;
149 }
150
SubmitToFFRT(int32_t queueId,AppExecFwk::InnerEvent::TimePoint handleTime,int64_t delayTime)151 void TelEventQueue::SubmitToFFRT(int32_t queueId, AppExecFwk::InnerEvent::TimePoint handleTime, int64_t delayTime)
152 {
153 std::lock_guard<std::mutex> lock(taskCtx_);
154 if (queueId != queueId_.load()) {
155 TELEPHONY_LOGD("%{public}s task no need to submit", name_.c_str());
156 SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
157 return;
158 }
159 SetCurHandleTime(handleTime);
160 curTask_ = queue_->submit_h(
161 [this, queueId = queueId]() {
162 bool isNeedSubmit = true;
163 auto event = PopEvent(queueId, isNeedSubmit);
164 std::shared_ptr<TelEventHandler> handler = nullptr;
165 if (event) {
166 handler = std::static_pointer_cast<TelEventHandler>(event->GetOwner());
167 }
168 if (event && handler) {
169 TELEPHONY_LOGD("%{public}s ProcessEvent eventId %{public}d", name_.c_str(),
170 static_cast<uint32_t>(event->GetInnerEventId()));
171 handler->ProcessEvent(event);
172 eventStats_.CalculationExecutedEvents();
173 }
174 if (!isNeedSubmit) {
175 TELEPHONY_LOGD("%{public}s task no need to submit", name_.c_str());
176 return;
177 }
178 SubmitInner(queueId);
179 },
180 ffrt::task_attr().delay(delayTime));
181 eventStats_.CalculationSubmitToFFRTEvents();
182 }
183
RemoveEvent(uint32_t innerEventId)184 void TelEventQueue::RemoveEvent(uint32_t innerEventId)
185 {
186 std::lock_guard<std::mutex> lock(eventCtx_);
187 auto filter = [innerEventId](const AppExecFwk::InnerEvent::Pointer &p) {
188 if (p == nullptr) {
189 return false;
190 }
191 return p->GetInnerEventId() == innerEventId;
192 };
193 for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
194 eventLists_[i].events.remove_if(filter);
195 }
196 if (IsEmpty()) {
197 SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
198 }
199 eventStats_.CalculationRemovedEvents(1);
200 TELEPHONY_LOGD("%{public}s remove eventId %{public}d finish", name_.c_str(), innerEventId);
201 }
202
HasInnerEvent(uint32_t innerEventId)203 bool TelEventQueue::HasInnerEvent(uint32_t innerEventId)
204 {
205 std::lock_guard<std::mutex> lock(eventCtx_);
206 auto filter = [innerEventId](
207 const AppExecFwk::InnerEvent::Pointer &p) { return p->GetInnerEventId() == innerEventId; };
208 for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
209 std::list<AppExecFwk::InnerEvent::Pointer>::iterator iter =
210 std::find_if(eventLists_[i].events.begin(), eventLists_[i].events.end(), filter);
211 if (iter != eventLists_[i].events.end()) {
212 return true;
213 }
214 }
215 return false;
216 }
217
RemoveAllEvents()218 void TelEventQueue::RemoveAllEvents()
219 {
220 std::lock_guard<std::mutex> lock(eventCtx_);
221 uint32_t removeCount = 0;
222 for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
223 removeCount += eventLists_[i].events.size();
224 eventLists_[i].events.clear();
225 }
226 SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
227 eventStats_.CalculationRemovedEvents(removeCount);
228 TELEPHONY_LOGD("%{public}s RemoveAllEvents finish", name_.c_str());
229 }
230
IsEmpty()231 bool TelEventQueue::IsEmpty()
232 {
233 for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
234 if (!eventLists_[i].events.empty()) {
235 return false;
236 }
237 }
238 return true;
239 }
240
PopEvent(int32_t queueId,bool & isNeedSubmit)241 AppExecFwk::InnerEvent::Pointer TelEventQueue::PopEvent(int32_t queueId, bool &isNeedSubmit)
242 {
243 std::lock_guard<std::mutex> lock(eventCtx_);
244 if (IsEmpty() || queueId != queueId_.load()) {
245 isNeedSubmit = false;
246 SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
247 return AppExecFwk::InnerEvent::Pointer(nullptr, nullptr);
248 }
249 uint32_t priorityIndex = GetPriorityIndex();
250 AppExecFwk::InnerEvent::Pointer event = std::move(eventLists_[priorityIndex].events.front());
251 eventLists_[priorityIndex].events.pop_front();
252 if (IsEmpty()) {
253 isNeedSubmit = false;
254 SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
255 }
256 return event;
257 }
258
GetHandleTime()259 AppExecFwk::InnerEvent::TimePoint TelEventQueue::GetHandleTime()
260 {
261 std::lock_guard<std::mutex> lock(eventCtx_);
262 if (IsEmpty()) {
263 return AppExecFwk::InnerEvent::TimePoint::max();
264 }
265 return eventLists_[GetPriorityIndex()].events.front()->GetHandleTime();
266 }
267
GetPriorityIndex()268 uint32_t TelEventQueue::GetPriorityIndex()
269 {
270 AppExecFwk::InnerEvent::TimePoint now = AppExecFwk::InnerEvent::Clock::now();
271 AppExecFwk::InnerEvent::TimePoint needWakeUpTime = AppExecFwk::InnerEvent::TimePoint::max();
272 uint32_t priorityIndex = 0;
273 for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
274 if (eventLists_[i].events.empty()) {
275 continue;
276 }
277 auto handleTime = eventLists_[i].events.front()->GetHandleTime();
278 if (handleTime <= now) {
279 priorityIndex = i;
280 break;
281 }
282 if (handleTime < needWakeUpTime) {
283 needWakeUpTime = handleTime;
284 priorityIndex = i;
285 }
286 }
287 return priorityIndex;
288 }
289
CalculationInsertQueueEvents()290 void TelEventQueue::EventStats::CalculationInsertQueueEvents()
291 {
292 totalHandledEvents++;
293 currentQueueEvents++;
294 }
295
CalculationSubmitToFFRTEvents()296 void TelEventQueue::EventStats::CalculationSubmitToFFRTEvents()
297 {
298 if (currentQueueEvents <= 0) {
299 currentQueueEvents = 1;
300 }
301 submitedToFFRTEvents++;
302 currentQueueEvents--;
303 }
304
CalculationExecutedEvents()305 void TelEventQueue::EventStats::CalculationExecutedEvents()
306 {
307 executedEvents++;
308 }
309
CalculationRemovedEvents(int count)310 void TelEventQueue::EventStats::CalculationRemovedEvents(int count)
311 {
312 removedEvents += count;
313 }
314
PrintEventStats(std::string & name)315 void TelEventQueue::EventStats::PrintEventStats(std::string &name)
316 {
317 AppExecFwk::InnerEvent::TimePoint now = AppExecFwk::InnerEvent::Clock::now();
318 auto duration = std::chrono::duration_cast<std::chrono::minutes>(now - lastPrintTime_);
319 if (duration.count() < PRINT_INTELVAL_MINUTES) {
320 return;
321 }
322 lastPrintTime_ = now;
323 TELEPHONY_LOGI(
324 "%{public}s, totalHandledEvents %{public}llu, currentQueueEvents %{public}llu,"
325 " submitedToFFRTEvents %{public}llu, executedEvents %{public}llu, removedEvents %{public}llu",
326 name.c_str(),
327 static_cast<unsigned long long>(totalHandledEvents.load()),
328 static_cast<unsigned long long>(currentQueueEvents.load()),
329 static_cast<unsigned long long>(submitedToFFRTEvents.load()),
330 static_cast<unsigned long long>(executedEvents.load()),
331 static_cast<unsigned long long>(removedEvents.load())
332 );
333 }
334
335 } // namespace Telephony
336 } // namespace OHOS