• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "tel_event_queue.h"
17 
18 #include "tel_event_handler.h"
19 #include "telephony_log_wrapper.h"
20 
21 namespace OHOS {
22 namespace Telephony {
23 namespace {
24 enum class TelPriority : uint32_t { IMMEDIATE = 0, HIGH, LOW };
25 }
26 
27 static constexpr int PRINT_INTELVAL_MINUTES = 5;
28 
TelEventQueue(const std::string & name)29 TelEventQueue::TelEventQueue(const std::string &name) : name_(name)
30 {
31     TELEPHONY_LOGI("%{public}s create", name_.c_str());
32     name_ = std::string(name_.c_str());
33     queue_ = std::make_shared<ffrt::queue>(name_.c_str());
34 }
35 
~TelEventQueue()36 TelEventQueue::~TelEventQueue()
37 {
38     TELEPHONY_LOGI("%{public}s destroy", name_.c_str());
39     RemoveAllEvents();
40     ClearCurrentTask(true);
41     if (!queue_) {
42         return;
43     }
44     if (!curTask_) {
45         return;
46     }
47     TELEPHONY_LOGD("%{public}s need to wait", name_.c_str());
48     queue_->wait(curTask_);
49     curTask_ = ffrt::task_handle();
50     std::lock_guard<std::mutex> lock(taskCtx_);
51     queue_ = nullptr;
52 }
53 
GetCurHandleTime()54 AppExecFwk::InnerEvent::TimePoint TelEventQueue::GetCurHandleTime()
55 {
56     std::lock_guard<std::mutex> lock(memberCtx_);
57     return curHandleTime_;
58 }
59 
SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint handleTime)60 void TelEventQueue::SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint handleTime)
61 {
62     std::lock_guard<std::mutex> lock(memberCtx_);
63     curHandleTime_ = handleTime;
64 }
65 
Submit(AppExecFwk::InnerEvent::Pointer & event,AppExecFwk::EventQueue::Priority priority)66 void TelEventQueue::Submit(AppExecFwk::InnerEvent::Pointer &event, AppExecFwk::EventQueue::Priority priority)
67 {
68     InsertEventsInner(event, priority);
69     if (GetHandleTime() < GetCurHandleTime()) {
70         GetNextQueueId();
71         ClearCurrentTask(false);
72         SubmitInner(queueId_.load());
73     }
74 }
75 
ToTelPriority(AppExecFwk::EventQueue::Priority priority)76 uint32_t TelEventQueue::ToTelPriority(AppExecFwk::EventQueue::Priority priority)
77 {
78     if (priority == AppExecFwk::EventQueue::Priority::IMMEDIATE) {
79         return static_cast<uint32_t>(TelPriority::IMMEDIATE);
80     }
81     if (priority == AppExecFwk::EventQueue::Priority::HIGH) {
82         return static_cast<uint32_t>(TelPriority::HIGH);
83     }
84     return static_cast<uint32_t>(TelPriority::LOW);
85 }
86 
InsertEventsInner(AppExecFwk::InnerEvent::Pointer & event,AppExecFwk::EventQueue::Priority priority)87 void TelEventQueue::InsertEventsInner(AppExecFwk::InnerEvent::Pointer &event, AppExecFwk::EventQueue::Priority priority)
88 {
89     if (event == nullptr) {
90         return;
91     }
92     std::unique_lock<std::mutex> lock(eventCtx_);
93     auto &events = eventLists_[ToTelPriority(priority)].events;
94     auto f = [](const AppExecFwk::InnerEvent::Pointer &first, const AppExecFwk::InnerEvent::Pointer &second) {
95         if (!first || !second) {
96             return false;
97         }
98         return first->GetHandleTime() < second->GetHandleTime();
99     };
100     auto it = std::upper_bound(events.begin(), events.end(), event, f);
101     auto innerEventId = event->GetInnerEventId();
102     events.insert(it, std::move(event));
103     eventStats_.CalculationInsertQueueEvents();
104     eventStats_.PrintEventStats(name_);
105     TELEPHONY_LOGD(
106         "%{public}s InsertEventsInner eventId %{public}d finish", name_.c_str(), static_cast<int32_t>(innerEventId));
107 }
108 
ClearCurrentTask(bool isNeedEnd)109 void TelEventQueue::ClearCurrentTask(bool isNeedEnd)
110 {
111     std::lock_guard<std::mutex> lock(taskCtx_);
112     if (!curTask_ || !queue_) {
113         return;
114     }
115     queue_->cancel(curTask_);
116     if (isNeedEnd) {
117         GetNextQueueId();
118         return;
119     }
120     curTask_ = ffrt::task_handle();
121     TELEPHONY_LOGD("%{public}s cancel current task", name_.c_str());
122 }
123 
SubmitInner(int32_t queueId)124 void TelEventQueue::SubmitInner(int32_t queueId)
125 {
126     if (!queue_) {
127         TELEPHONY_LOGE("%{public}s queue is nullptr", name_.c_str());
128         return;
129     }
130     auto handleTime = GetHandleTime();
131     if (handleTime == AppExecFwk::InnerEvent::TimePoint::max()) {
132         TELEPHONY_LOGD("%{public}s SubmitInner has no task", name_.c_str());
133         return;
134     }
135     int64_t delayTime = 0;
136     AppExecFwk::InnerEvent::TimePoint now = AppExecFwk::InnerEvent::Clock::now();
137     if (handleTime > now) {
138         delayTime = std::chrono::duration_cast<std::chrono::microseconds>(handleTime - now).count();
139     }
140     SubmitToFFRT(queueId, handleTime, delayTime);
141 }
142 
GetNextQueueId()143 int32_t TelEventQueue::GetNextQueueId()
144 {
145     if (queueId_ >= INT32_MAX) {
146         queueId_ = 1;
147     }
148     return queueId_++;
149 }
150 
SubmitToFFRT(int32_t queueId,AppExecFwk::InnerEvent::TimePoint handleTime,int64_t delayTime)151 void TelEventQueue::SubmitToFFRT(int32_t queueId, AppExecFwk::InnerEvent::TimePoint handleTime, int64_t delayTime)
152 {
153     std::lock_guard<std::mutex> lock(taskCtx_);
154     if (queueId != queueId_.load()) {
155         TELEPHONY_LOGD("%{public}s task no need to submit", name_.c_str());
156         SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
157         return;
158     }
159     SetCurHandleTime(handleTime);
160     curTask_ = queue_->submit_h(
161         [this, queueId = queueId]() {
162             bool isNeedSubmit = true;
163             auto event = PopEvent(queueId, isNeedSubmit);
164             std::shared_ptr<TelEventHandler> handler = nullptr;
165             if (event) {
166                 handler = std::static_pointer_cast<TelEventHandler>(event->GetOwner());
167             }
168             if (event && handler) {
169                 TELEPHONY_LOGD("%{public}s ProcessEvent eventId %{public}d", name_.c_str(),
170                     static_cast<uint32_t>(event->GetInnerEventId()));
171                 handler->ProcessEvent(event);
172                 eventStats_.CalculationExecutedEvents();
173             }
174             if (!isNeedSubmit) {
175                 TELEPHONY_LOGD("%{public}s task no need to submit", name_.c_str());
176                 return;
177             }
178             SubmitInner(queueId);
179         },
180         ffrt::task_attr().delay(delayTime));
181         eventStats_.CalculationSubmitToFFRTEvents();
182 }
183 
RemoveEvent(uint32_t innerEventId)184 void TelEventQueue::RemoveEvent(uint32_t innerEventId)
185 {
186     std::lock_guard<std::mutex> lock(eventCtx_);
187     auto filter = [innerEventId](const AppExecFwk::InnerEvent::Pointer &p) {
188         if (p == nullptr) {
189             return false;
190         }
191         return p->GetInnerEventId() == innerEventId;
192     };
193     for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
194         eventLists_[i].events.remove_if(filter);
195     }
196     if (IsEmpty()) {
197         SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
198     }
199     eventStats_.CalculationRemovedEvents(1);
200     TELEPHONY_LOGD("%{public}s remove eventId %{public}d finish", name_.c_str(), innerEventId);
201 }
202 
HasInnerEvent(uint32_t innerEventId)203 bool TelEventQueue::HasInnerEvent(uint32_t innerEventId)
204 {
205     std::lock_guard<std::mutex> lock(eventCtx_);
206     auto filter = [innerEventId](
207                       const AppExecFwk::InnerEvent::Pointer &p) { return p->GetInnerEventId() == innerEventId; };
208     for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
209         std::list<AppExecFwk::InnerEvent::Pointer>::iterator iter =
210             std::find_if(eventLists_[i].events.begin(), eventLists_[i].events.end(), filter);
211         if (iter != eventLists_[i].events.end()) {
212             return true;
213         }
214     }
215     return false;
216 }
217 
RemoveAllEvents()218 void TelEventQueue::RemoveAllEvents()
219 {
220     std::lock_guard<std::mutex> lock(eventCtx_);
221     uint32_t removeCount = 0;
222     for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
223         removeCount += eventLists_[i].events.size();
224         eventLists_[i].events.clear();
225     }
226     SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
227     eventStats_.CalculationRemovedEvents(removeCount);
228     TELEPHONY_LOGD("%{public}s RemoveAllEvents finish", name_.c_str());
229 }
230 
IsEmpty()231 bool TelEventQueue::IsEmpty()
232 {
233     for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
234         if (!eventLists_[i].events.empty()) {
235             return false;
236         }
237     }
238     return true;
239 }
240 
PopEvent(int32_t queueId,bool & isNeedSubmit)241 AppExecFwk::InnerEvent::Pointer TelEventQueue::PopEvent(int32_t queueId, bool &isNeedSubmit)
242 {
243     std::lock_guard<std::mutex> lock(eventCtx_);
244     if (IsEmpty() || queueId != queueId_.load()) {
245         isNeedSubmit = false;
246         SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
247         return AppExecFwk::InnerEvent::Pointer(nullptr, nullptr);
248     }
249     uint32_t priorityIndex = GetPriorityIndex();
250     AppExecFwk::InnerEvent::Pointer event = std::move(eventLists_[priorityIndex].events.front());
251     eventLists_[priorityIndex].events.pop_front();
252     if (IsEmpty()) {
253         isNeedSubmit = false;
254         SetCurHandleTime(AppExecFwk::InnerEvent::TimePoint::max());
255     }
256     return event;
257 }
258 
GetHandleTime()259 AppExecFwk::InnerEvent::TimePoint TelEventQueue::GetHandleTime()
260 {
261     std::lock_guard<std::mutex> lock(eventCtx_);
262     if (IsEmpty()) {
263         return AppExecFwk::InnerEvent::TimePoint::max();
264     }
265     return eventLists_[GetPriorityIndex()].events.front()->GetHandleTime();
266 }
267 
GetPriorityIndex()268 uint32_t TelEventQueue::GetPriorityIndex()
269 {
270     AppExecFwk::InnerEvent::TimePoint now = AppExecFwk::InnerEvent::Clock::now();
271     AppExecFwk::InnerEvent::TimePoint needWakeUpTime = AppExecFwk::InnerEvent::TimePoint::max();
272     uint32_t priorityIndex = 0;
273     for (uint32_t i = 0; i < EVENT_QUEUE_NUM; ++i) {
274         if (eventLists_[i].events.empty()) {
275             continue;
276         }
277         auto handleTime = eventLists_[i].events.front()->GetHandleTime();
278         if (handleTime <= now) {
279             priorityIndex = i;
280             break;
281         }
282         if (handleTime < needWakeUpTime) {
283             needWakeUpTime = handleTime;
284             priorityIndex = i;
285         }
286     }
287     return priorityIndex;
288 }
289 
CalculationInsertQueueEvents()290 void TelEventQueue::EventStats::CalculationInsertQueueEvents()
291 {
292     totalHandledEvents++;
293     currentQueueEvents++;
294 }
295 
CalculationSubmitToFFRTEvents()296 void TelEventQueue::EventStats::CalculationSubmitToFFRTEvents()
297 {
298     if (currentQueueEvents <= 0) {
299         currentQueueEvents = 1;
300     }
301     submitedToFFRTEvents++;
302     currentQueueEvents--;
303 }
304 
CalculationExecutedEvents()305 void TelEventQueue::EventStats::CalculationExecutedEvents()
306 {
307     executedEvents++;
308 }
309 
CalculationRemovedEvents(int count)310 void TelEventQueue::EventStats::CalculationRemovedEvents(int count)
311 {
312     removedEvents += count;
313 }
314 
PrintEventStats(std::string & name)315 void TelEventQueue::EventStats::PrintEventStats(std::string &name)
316 {
317     AppExecFwk::InnerEvent::TimePoint now = AppExecFwk::InnerEvent::Clock::now();
318     auto duration = std::chrono::duration_cast<std::chrono::minutes>(now - lastPrintTime_);
319     if (duration.count() < PRINT_INTELVAL_MINUTES) {
320         return;
321     }
322     lastPrintTime_ = now;
323     TELEPHONY_LOGI(
324         "%{public}s, totalHandledEvents %{public}llu, currentQueueEvents %{public}llu,"
325         " submitedToFFRTEvents %{public}llu, executedEvents %{public}llu, removedEvents %{public}llu",
326         name.c_str(),
327         static_cast<unsigned long long>(totalHandledEvents.load()),
328         static_cast<unsigned long long>(currentQueueEvents.load()),
329         static_cast<unsigned long long>(submitedToFFRTEvents.load()),
330         static_cast<unsigned long long>(executedEvents.load()),
331         static_cast<unsigned long long>(removedEvents.load())
332     );
333 }
334 
335 } // namespace Telephony
336 } // namespace OHOS