1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "message_queue.h"
16 #include <sys/time.h>
17 #include "wifi_log.h"
18 #include "wifi_errcode.h"
19
20 #undef LOG_TAG
21 #define LOG_TAG "OHWIFI_MESSAGE_QUEUE"
22
23 namespace OHOS {
24 namespace Wifi {
MessageQueue()25 MessageQueue::MessageQueue() : pMessageQueue(nullptr), mIsBlocked(false), mNeedQuit(false)
26 {}
27
~MessageQueue()28 MessageQueue::~MessageQueue()
29 {
30 LOGI("MessageQueue::~MessageQueue");
31 /* Releasing Messages in a Queue */
32 std::unique_lock<std::mutex> lock(mMtxQueue);
33 InternalMessage *current = pMessageQueue;
34 InternalMessage *next = nullptr;
35 while (current != nullptr) {
36 next = current->GetNextMsg();
37 delete current;
38 current = next;
39 }
40
41 return;
42 }
43
AddMessageToQueue(InternalMessage * message,int64_t handleTime)44 bool MessageQueue::AddMessageToQueue(InternalMessage *message, int64_t handleTime)
45 {
46 if (message == nullptr) {
47 LOGE("message is null.\n");
48 return false;
49 }
50
51 if (mNeedQuit) {
52 MessageManage::GetInstance().ReclaimMsg(message);
53 LOGE("Already quit the message queue.\n");
54 return false;
55 }
56
57 message->SetHandleTime(handleTime);
58 bool needWake = false;
59 /*
60 * If the queue is empty, the current message needs to be executed
61 * immediately, or the execution time is earlier than the queue header, the
62 * message is placed in the queue header and is woken up when the queue is
63 * blocked.
64 */
65 {
66 std::unique_lock<std::mutex> lck(mMtxQueue);
67 InternalMessage *pTop = pMessageQueue;
68 if (pTop == nullptr || handleTime == 0 || handleTime < pTop->GetHandleTime()) {
69 message->SetNextMsg(pTop);
70 pMessageQueue = message;
71 needWake = mIsBlocked;
72 /* Inserts messages in the middle of the queue based on the execution time. */
73 } else {
74 InternalMessage *pPrev = nullptr;
75 InternalMessage *pCurrent = pTop;
76 while (pCurrent != nullptr) {
77 pPrev = pCurrent;
78 pCurrent = pCurrent->GetNextMsg();
79 if (pCurrent == nullptr || handleTime < pCurrent->GetHandleTime()) {
80 message->SetNextMsg(pCurrent);
81 pPrev->SetNextMsg(message);
82 break;
83 }
84 }
85 }
86 }
87
88 /* Wake up the process. */
89 if (needWake) {
90 std::unique_lock<std::mutex> lck(mMtxBlock);
91 mCvQueue.notify_all();
92 mIsBlocked = false;
93 }
94
95 return true;
96 }
97
DeleteMessageFromQueue(int messageName)98 bool MessageQueue::DeleteMessageFromQueue(int messageName)
99 {
100 std::unique_lock<std::mutex> lck(mMtxQueue);
101
102 InternalMessage *pTop = pMessageQueue;
103 if (pTop == nullptr) {
104 return true;
105 }
106
107 InternalMessage *pCurrent = pTop;
108 while (pCurrent != nullptr) {
109 InternalMessage *pPrev = pCurrent;
110 pCurrent = pCurrent->GetNextMsg();
111 if ((pCurrent != nullptr) && (pCurrent->GetMessageName() == messageName)) {
112 InternalMessage *pNextMsg = pCurrent->GetNextMsg();
113 pPrev->SetNextMsg(pNextMsg);
114 MessageManage::GetInstance().ReclaimMsg(pCurrent);
115 pCurrent = pNextMsg;
116 }
117 }
118
119 if (pTop->GetMessageName() == messageName) {
120 pMessageQueue = pTop->GetNextMsg();
121 MessageManage::GetInstance().ReclaimMsg(pTop);
122 }
123
124 return true;
125 }
126
GetNextMessage()127 InternalMessage *MessageQueue::GetNextMessage()
128 {
129 LOGI("MessageQueue::GetNextMessage");
130 int nextBlockTime = 0;
131
132 while (!mNeedQuit) {
133 /* Obtains the current time, accurate to milliseconds. */
134 struct timeval curTime = {0, 0};
135 if (gettimeofday(&curTime, nullptr) != 0) {
136 LOGE("gettimeofday failed.\n");
137 return nullptr;
138 }
139 int64_t nowTime = static_cast<int64_t>(curTime.tv_sec) * TIME_USEC_1000 + curTime.tv_usec / TIME_USEC_1000;
140
141 {
142 std::unique_lock<std::mutex> lck(mMtxQueue);
143 InternalMessage *curMsg = pMessageQueue;
144 mIsBlocked = true;
145 if (curMsg != nullptr) {
146 if (nowTime < curMsg->GetHandleTime()) {
147 /* The execution time of the first message is not reached.
148 The remaining time is blocked here. */
149 nextBlockTime = curMsg->GetHandleTime() - nowTime;
150 } else {
151 /* Return the first message. */
152 mIsBlocked = false;
153 pMessageQueue = curMsg->GetNextMsg();
154 curMsg->SetNextMsg(nullptr);
155 return curMsg;
156 }
157 } else {
158 /* If there's no message, check it every 30 seconds. */
159 nextBlockTime = TIME_INTERVAL;
160 }
161 }
162
163 std::unique_lock<std::mutex> lck(mMtxBlock);
164 if (mIsBlocked && (!mNeedQuit)) {
165 if (mCvQueue.wait_for(lck, std::chrono::milliseconds(nextBlockTime)) == std::cv_status::timeout) {
166 LOGD("mCvQueue timeout.\n");
167 } else {
168 LOGD("Wake up.\n");
169 }
170 }
171 mIsBlocked = false;
172 }
173
174 LOGE("Already quit the message queue.\n");
175 return nullptr;
176 }
177
StopQueueLoop()178 void MessageQueue::StopQueueLoop()
179 {
180 mNeedQuit = true;
181 if (mIsBlocked) {
182 std::unique_lock<std::mutex> lck(mMtxBlock);
183 mCvQueue.notify_all();
184 mIsBlocked = false;
185 }
186
187 return;
188 }
189 } // namespace Wifi
190 } // namespace OHOS