• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "message_queue.h"
16 #include <sys/time.h>
17 #include "wifi_log.h"
18 #include "wifi_errcode.h"
19 
20 #undef LOG_TAG
21 #define LOG_TAG "OHWIFI_MESSAGE_QUEUE"
22 
23 namespace OHOS {
24 namespace Wifi {
MessageQueue()25 MessageQueue::MessageQueue() : pMessageQueue(nullptr), mIsBlocked(false), mNeedQuit(false)
26 {}
27 
~MessageQueue()28 MessageQueue::~MessageQueue()
29 {
30     LOGI("MessageQueue::~MessageQueue");
31     /* Releasing Messages in a Queue */
32     std::unique_lock<std::mutex> lock(mMtxQueue);
33     InternalMessage *current = pMessageQueue;
34     InternalMessage *next = nullptr;
35     while (current != nullptr) {
36         next = current->GetNextMsg();
37         delete current;
38         current = next;
39     }
40 
41     return;
42 }
43 
AddMessageToQueue(InternalMessage * message,int64_t handleTime)44 bool MessageQueue::AddMessageToQueue(InternalMessage *message, int64_t handleTime)
45 {
46     if (message == nullptr) {
47         LOGE("message is null.\n");
48         return false;
49     }
50 
51     if (mNeedQuit) {
52         MessageManage::GetInstance().ReclaimMsg(message);
53         LOGE("Already quit the message queue.\n");
54         return false;
55     }
56 
57     message->SetHandleTime(handleTime);
58     bool needWake = false;
59     /*
60      * If the queue is empty, the current message needs to be executed
61      * immediately, or the execution time is earlier than the queue header, the
62      * message is placed in the queue header and is woken up when the queue is
63      * blocked.
64      */
65     {
66         std::unique_lock<std::mutex> lck(mMtxQueue);
67         InternalMessage *pTop = pMessageQueue;
68         if (pTop == nullptr || handleTime == 0 || handleTime < pTop->GetHandleTime()) {
69             message->SetNextMsg(pTop);
70             pMessageQueue = message;
71             needWake = mIsBlocked;
72             /* Inserts messages in the middle of the queue based on the execution time. */
73         } else {
74             InternalMessage *pPrev = nullptr;
75             InternalMessage *pCurrent = pTop;
76             while (pCurrent != nullptr) {
77                 pPrev = pCurrent;
78                 pCurrent = pCurrent->GetNextMsg();
79                 if (pCurrent == nullptr || handleTime < pCurrent->GetHandleTime()) {
80                     message->SetNextMsg(pCurrent);
81                     pPrev->SetNextMsg(message);
82                     break;
83                 }
84             }
85         }
86     }
87 
88     /* Wake up the process. */
89     if (needWake) {
90         std::unique_lock<std::mutex> lck(mMtxBlock);
91         mCvQueue.notify_all();
92         mIsBlocked = false;
93     }
94 
95     return true;
96 }
97 
DeleteMessageFromQueue(int messageName)98 bool MessageQueue::DeleteMessageFromQueue(int messageName)
99 {
100     std::unique_lock<std::mutex> lck(mMtxQueue);
101 
102     InternalMessage *pTop = pMessageQueue;
103     if (pTop == nullptr) {
104         return true;
105     }
106 
107     InternalMessage *pCurrent = pTop;
108     while (pCurrent != nullptr) {
109         InternalMessage *pPrev = pCurrent;
110         pCurrent = pCurrent->GetNextMsg();
111         if ((pCurrent != nullptr) && (pCurrent->GetMessageName() == messageName)) {
112             InternalMessage *pNextMsg = pCurrent->GetNextMsg();
113             pPrev->SetNextMsg(pNextMsg);
114             MessageManage::GetInstance().ReclaimMsg(pCurrent);
115             pCurrent = pNextMsg;
116         }
117     }
118 
119     if (pTop->GetMessageName() == messageName) {
120         pMessageQueue = pTop->GetNextMsg();
121         MessageManage::GetInstance().ReclaimMsg(pTop);
122     }
123 
124     return true;
125 }
126 
GetNextMessage()127 InternalMessage *MessageQueue::GetNextMessage()
128 {
129     LOGI("MessageQueue::GetNextMessage");
130     int nextBlockTime = 0;
131 
132     while (!mNeedQuit) {
133         /* Obtains the current time, accurate to milliseconds. */
134         struct timeval curTime = {0, 0};
135         if (gettimeofday(&curTime, nullptr) != 0) {
136             LOGE("gettimeofday failed.\n");
137             return nullptr;
138         }
139         int64_t nowTime = static_cast<int64_t>(curTime.tv_sec) * TIME_USEC_1000 + curTime.tv_usec / TIME_USEC_1000;
140 
141         {
142             std::unique_lock<std::mutex> lck(mMtxQueue);
143             InternalMessage *curMsg = pMessageQueue;
144             mIsBlocked = true;
145             if (curMsg != nullptr) {
146                 if (nowTime < curMsg->GetHandleTime()) {
147                     /* The execution time of the first message is not reached.
148                        The remaining time is blocked here. */
149                     nextBlockTime = curMsg->GetHandleTime() - nowTime;
150                 } else {
151                     /* Return the first message. */
152                     mIsBlocked = false;
153                     pMessageQueue = curMsg->GetNextMsg();
154                     curMsg->SetNextMsg(nullptr);
155                     return curMsg;
156                 }
157             } else {
158                 /* If there's no message, check it every 30 seconds. */
159                 nextBlockTime = TIME_INTERVAL;
160             }
161         }
162 
163         std::unique_lock<std::mutex> lck(mMtxBlock);
164         if (mIsBlocked && (!mNeedQuit)) {
165             if (mCvQueue.wait_for(lck, std::chrono::milliseconds(nextBlockTime)) == std::cv_status::timeout) {
166                 LOGD("mCvQueue timeout.\n");
167             } else {
168                 LOGD("Wake up.\n");
169             }
170         }
171         mIsBlocked = false;
172     }
173 
174     LOGE("Already quit the message queue.\n");
175     return nullptr;
176 }
177 
StopQueueLoop()178 void MessageQueue::StopQueueLoop()
179 {
180     mNeedQuit = true;
181     if (mIsBlocked) {
182         std::unique_lock<std::mutex> lck(mMtxBlock);
183         mCvQueue.notify_all();
184         mIsBlocked = false;
185     }
186 
187     return;
188 }
189 }  // namespace Wifi
190 }  // namespace OHOS