• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "message_queue.h"
17 #include <cinttypes>
18 #include <sys/time.h>
19 #include <thread>
20 #include "wifi_log.h"
21 
22 #undef LOG_TAG
23 #define LOG_TAG "OHWIFI_MESSAGE_QUEUE"
24 
25 namespace OHOS {
26 namespace Wifi {
MessageQueue()27 MessageQueue::MessageQueue() : pMessageQueue(nullptr), mIsBlocked(false), mNeedQuit(false)
28 {
29     LOGI("MessageQueue::MessageQueue");
30 }
31 
~MessageQueue()32 MessageQueue::~MessageQueue()
33 {
34     LOGI("MessageQueue::~MessageQueue");
35     /* Releasing Messages in a Queue */
36     std::unique_lock<std::mutex> lock(mMtxQueue);
37     InternalMessage *current = pMessageQueue;
38     InternalMessage *next = nullptr;
39     while (current != nullptr) {
40         next = current->GetNextMsg();
41         delete current;
42         current = next;
43     }
44     return;
45 }
46 
AddMessageToQueue(InternalMessage * message,int64_t handleTime)47 bool MessageQueue::AddMessageToQueue(InternalMessage *message, int64_t handleTime)
48 {
49     if (message == nullptr) {
50         LOGE("message is null.");
51         return false;
52     }
53 
54     LOGI("MessageQueue::AddMessageToQueue, msg: %{public}d, timestamp:%" PRIu64 "\n",
55         message->GetMessageName(), handleTime);
56 
57     if (mNeedQuit) {
58         MessageManage::GetInstance().ReclaimMsg(message);
59         LOGE("Already quit the message queue.");
60         return false;
61     }
62 
63     message->SetHandleTime(handleTime);
64     bool mNeedWakeup = false;
65     /*
66      * If the queue is empty, the current message needs to be executed
67      * immediately, or the execution time is earlier than the queue header, the
68      * message is placed in the queue header and is woken up when the queue is
69      * blocked.
70      */
71     {
72         std::unique_lock<std::mutex> lck(mMtxQueue);
73         InternalMessage *pTop = pMessageQueue;
74         if (pTop == nullptr || handleTime == 0 || handleTime <= pTop->GetHandleTime()) {
75             LOGI("Add the message in the head of queue.");
76             message->SetNextMsg(pTop);
77             pMessageQueue = message;
78             mNeedWakeup = mIsBlocked;
79         } else {
80             LOGI("Insert the message in the middle of the queue.");
81             InternalMessage *pPrev = nullptr;
82             InternalMessage *pCurrent = pTop;
83             /* Inserts messages in the middle of the queue based on the execution time. */
84             while (pCurrent != nullptr) {
85                 pPrev = pCurrent;
86                 pCurrent = pCurrent->GetNextMsg();
87                 if (pCurrent == nullptr || handleTime < pCurrent->GetHandleTime()) {
88                     message->SetNextMsg(pCurrent);
89                     pPrev->SetNextMsg(message);
90                     break;
91                 }
92             }
93         }
94     }
95 
96     LOGI("Add message needWakeup: %{public}d", static_cast<int>(mNeedWakeup));
97     if (mNeedWakeup) {
98         std::unique_lock<std::mutex> lck(mMtxBlock);
99         mIsBlocked = false;
100     }
101     /* Wake up the process. */
102     mCvQueue.notify_one();
103     return true;
104 }
105 
DeleteMessageFromQueue(int messageName)106 bool MessageQueue::DeleteMessageFromQueue(int messageName)
107 {
108     LOGI("MessageQueue::DeleteMessageFromQueue");
109     std::unique_lock<std::mutex> lck(mMtxQueue);
110     InternalMessage *pTop = pMessageQueue;
111     if (pTop == nullptr) {
112         return true;
113     }
114 
115     InternalMessage *pCurrent = pTop;
116     while (pCurrent != nullptr) {
117         InternalMessage *pPrev = pCurrent;
118         pCurrent = pCurrent->GetNextMsg();
119         if ((pCurrent != nullptr) && (pCurrent->GetMessageName() == messageName)) {
120             InternalMessage *pNextMsg = pCurrent->GetNextMsg();
121             pPrev->SetNextMsg(pNextMsg);
122             MessageManage::GetInstance().ReclaimMsg(pCurrent);
123             pCurrent = pNextMsg;
124         }
125     }
126 
127     if (pTop->GetMessageName() == messageName) {
128         pMessageQueue = pTop->GetNextMsg();
129         MessageManage::GetInstance().ReclaimMsg(pTop);
130     }
131     return true;
132 }
133 
GetNextMessage()134 InternalMessage *MessageQueue::GetNextMessage()
135 {
136     LOGI("MessageQueue::GetNextMessage");
137     int nextBlockTime = 0;
138 
139     while (!mNeedQuit) {
140         /* Obtains the current time, accurate to milliseconds. */
141         struct timeval curTime = {0, 0};
142         if (gettimeofday(&curTime, nullptr) != 0) {
143             LOGE("gettimeofday failed.");
144             return nullptr;
145         }
146 
147         int64_t nowTime = static_cast<int64_t>(curTime.tv_sec) * TIME_USEC_1000 + curTime.tv_usec / TIME_USEC_1000;
148         {
149             std::unique_lock<std::mutex> lck(mMtxQueue); // Data queue lock
150             InternalMessage *curMsg = pMessageQueue;
151             mIsBlocked = true;
152             if (curMsg != nullptr) {
153                 LOGI("Message queue is not empty.");
154                 if (nowTime < curMsg->GetHandleTime()) {
155                     /* The execution time of the first message is not reached.
156                         The remaining time is blocked here. */
157                     nextBlockTime = curMsg->GetHandleTime() - nowTime;
158                 } else {
159                     /* Get the message of queue header. */
160                     mIsBlocked = false;
161                     pMessageQueue = curMsg->GetNextMsg();
162                     curMsg->SetNextMsg(nullptr);
163                     LOGI("Get queue message: %{public}d", curMsg->GetMessageName());
164                     return curMsg;
165                 }
166             } else {
167                 /* If there's no message, check it every 30 seconds. */
168                 nextBlockTime = TIME_INTERVAL;
169             }
170         }
171 
172         std::unique_lock<std::mutex> lck(mMtxBlock); // mCvQueue lock
173         if (mIsBlocked && (!mNeedQuit)) {
174             LOGI("mCvQueue wait_for: %{public}d", nextBlockTime);
175             if (mCvQueue.wait_for(lck, std::chrono::milliseconds(nextBlockTime)) == std::cv_status::timeout) {
176                 LOGI("mCvQueue wake up, reason: cv_status::timeout: %{public}d", nextBlockTime);
177             } else {
178                 LOGI("mCvQueue is wake up.");
179             }
180         }
181         mIsBlocked = false;
182     }
183     LOGE("Already quit the message queue.");
184     return nullptr;
185 }
186 
StopQueueLoop()187 void MessageQueue::StopQueueLoop()
188 {
189     LOGI("Start stop queue loop.");
190     mNeedQuit = true;
191     if (mIsBlocked) {
192         std::unique_lock<std::mutex> lck(mMtxBlock);
193         mIsBlocked = false;
194     }
195     mCvQueue.notify_one();
196     LOGI("Queue loop has stopped.");
197 }
198 }  // namespace Wifi
199 }  // namespace OHOS
200