1 /*
2 * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "message_queue.h"
17 #include <cinttypes>
18 #include <sys/time.h>
19 #include <thread>
20 #include "wifi_log.h"
21
22 #undef LOG_TAG
23 #define LOG_TAG "OHWIFI_MESSAGE_QUEUE"
24
25 namespace OHOS {
26 namespace Wifi {
MessageQueue()27 MessageQueue::MessageQueue() : pMessageQueue(nullptr), mIsBlocked(false), mNeedQuit(false)
28 {
29 LOGI("MessageQueue::MessageQueue");
30 }
31
~MessageQueue()32 MessageQueue::~MessageQueue()
33 {
34 LOGI("MessageQueue::~MessageQueue");
35 /* Releasing Messages in a Queue */
36 std::unique_lock<std::mutex> lock(mMtxQueue);
37 InternalMessage *current = pMessageQueue;
38 InternalMessage *next = nullptr;
39 while (current != nullptr) {
40 next = current->GetNextMsg();
41 delete current;
42 current = next;
43 }
44 return;
45 }
46
AddMessageToQueue(InternalMessage * message,int64_t handleTime)47 bool MessageQueue::AddMessageToQueue(InternalMessage *message, int64_t handleTime)
48 {
49 if (message == nullptr) {
50 LOGE("message is null.");
51 return false;
52 }
53
54 LOGI("MessageQueue::AddMessageToQueue, msg: %{public}d, timestamp:%" PRIu64 "\n",
55 message->GetMessageName(), handleTime);
56
57 if (mNeedQuit) {
58 MessageManage::GetInstance().ReclaimMsg(message);
59 LOGE("Already quit the message queue.");
60 return false;
61 }
62
63 message->SetHandleTime(handleTime);
64 bool mNeedWakeup = false;
65 /*
66 * If the queue is empty, the current message needs to be executed
67 * immediately, or the execution time is earlier than the queue header, the
68 * message is placed in the queue header and is woken up when the queue is
69 * blocked.
70 */
71 {
72 std::unique_lock<std::mutex> lck(mMtxQueue);
73 InternalMessage *pTop = pMessageQueue;
74 if (pTop == nullptr || handleTime == 0 || handleTime <= pTop->GetHandleTime()) {
75 LOGI("Add the message in the head of queue.");
76 message->SetNextMsg(pTop);
77 pMessageQueue = message;
78 mNeedWakeup = mIsBlocked;
79 } else {
80 LOGI("Insert the message in the middle of the queue.");
81 InternalMessage *pPrev = nullptr;
82 InternalMessage *pCurrent = pTop;
83 /* Inserts messages in the middle of the queue based on the execution time. */
84 while (pCurrent != nullptr) {
85 pPrev = pCurrent;
86 pCurrent = pCurrent->GetNextMsg();
87 if (pCurrent == nullptr || handleTime < pCurrent->GetHandleTime()) {
88 message->SetNextMsg(pCurrent);
89 pPrev->SetNextMsg(message);
90 break;
91 }
92 }
93 }
94 }
95
96 LOGI("Add message needWakeup: %{public}d", static_cast<int>(mNeedWakeup));
97 if (mNeedWakeup) {
98 std::unique_lock<std::mutex> lck(mMtxBlock);
99 mIsBlocked = false;
100 }
101 /* Wake up the process. */
102 mCvQueue.notify_one();
103 return true;
104 }
105
DeleteMessageFromQueue(int messageName)106 bool MessageQueue::DeleteMessageFromQueue(int messageName)
107 {
108 LOGI("MessageQueue::DeleteMessageFromQueue");
109 std::unique_lock<std::mutex> lck(mMtxQueue);
110 InternalMessage *pTop = pMessageQueue;
111 if (pTop == nullptr) {
112 return true;
113 }
114
115 InternalMessage *pCurrent = pTop;
116 while (pCurrent != nullptr) {
117 InternalMessage *pPrev = pCurrent;
118 pCurrent = pCurrent->GetNextMsg();
119 if ((pCurrent != nullptr) && (pCurrent->GetMessageName() == messageName)) {
120 InternalMessage *pNextMsg = pCurrent->GetNextMsg();
121 pPrev->SetNextMsg(pNextMsg);
122 MessageManage::GetInstance().ReclaimMsg(pCurrent);
123 pCurrent = pNextMsg;
124 }
125 }
126
127 if (pTop->GetMessageName() == messageName) {
128 pMessageQueue = pTop->GetNextMsg();
129 MessageManage::GetInstance().ReclaimMsg(pTop);
130 }
131 return true;
132 }
133
GetNextMessage()134 InternalMessage *MessageQueue::GetNextMessage()
135 {
136 LOGI("MessageQueue::GetNextMessage");
137 int nextBlockTime = 0;
138
139 while (!mNeedQuit) {
140 /* Obtains the current time, accurate to milliseconds. */
141 struct timeval curTime = {0, 0};
142 if (gettimeofday(&curTime, nullptr) != 0) {
143 LOGE("gettimeofday failed.");
144 return nullptr;
145 }
146
147 int64_t nowTime = static_cast<int64_t>(curTime.tv_sec) * TIME_USEC_1000 + curTime.tv_usec / TIME_USEC_1000;
148 {
149 std::unique_lock<std::mutex> lck(mMtxQueue); // Data queue lock
150 InternalMessage *curMsg = pMessageQueue;
151 mIsBlocked = true;
152 if (curMsg != nullptr) {
153 LOGI("Message queue is not empty.");
154 if (nowTime < curMsg->GetHandleTime()) {
155 /* The execution time of the first message is not reached.
156 The remaining time is blocked here. */
157 nextBlockTime = curMsg->GetHandleTime() - nowTime;
158 } else {
159 /* Get the message of queue header. */
160 mIsBlocked = false;
161 pMessageQueue = curMsg->GetNextMsg();
162 curMsg->SetNextMsg(nullptr);
163 LOGI("Get queue message: %{public}d", curMsg->GetMessageName());
164 return curMsg;
165 }
166 } else {
167 /* If there's no message, check it every 30 seconds. */
168 nextBlockTime = TIME_INTERVAL;
169 }
170 }
171
172 std::unique_lock<std::mutex> lck(mMtxBlock); // mCvQueue lock
173 if (mIsBlocked && (!mNeedQuit)) {
174 LOGI("mCvQueue wait_for: %{public}d", nextBlockTime);
175 if (mCvQueue.wait_for(lck, std::chrono::milliseconds(nextBlockTime)) == std::cv_status::timeout) {
176 LOGI("mCvQueue wake up, reason: cv_status::timeout: %{public}d", nextBlockTime);
177 } else {
178 LOGI("mCvQueue is wake up.");
179 }
180 }
181 mIsBlocked = false;
182 }
183 LOGE("Already quit the message queue.");
184 return nullptr;
185 }
186
StopQueueLoop()187 void MessageQueue::StopQueueLoop()
188 {
189 LOGI("Start stop queue loop.");
190 mNeedQuit = true;
191 if (mIsBlocked) {
192 std::unique_lock<std::mutex> lck(mMtxBlock);
193 mIsBlocked = false;
194 }
195 mCvQueue.notify_one();
196 LOGI("Queue loop has stopped.");
197 }
198 } // namespace Wifi
199 } // namespace OHOS
200