• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15  * Description: implementation of message queue thread
16  */
17 
18 #include "msg_handle_loop.h"
19 #include <chrono>
20 #include <cinttypes>
21 #include "hcodec_log.h"
22 
23 using namespace std;
24 
MsgHandleLoop()25 MsgHandleLoop::MsgHandleLoop()
26 {
27     m_thread = thread(&MsgHandleLoop::MainLoop, this);
28 }
29 
~MsgHandleLoop()30 MsgHandleLoop::~MsgHandleLoop()
31 {
32     Stop();
33 }
34 
Stop()35 void MsgHandleLoop::Stop()
36 {
37     {
38         lock_guard<mutex> lock(m_mtx);
39         m_threadNeedStop = true;
40         m_threadCond.notify_all();
41     }
42 
43     if (m_thread.joinable()) {
44         m_thread.join();
45     }
46 }
47 
SendAsyncMsg(MsgType type,const ParamSP & msg,uint32_t delayUs)48 void MsgHandleLoop::SendAsyncMsg(MsgType type, const ParamSP &msg, uint32_t delayUs)
49 {
50     lock_guard<mutex> lock(m_mtx);
51     TimeUs nowUs = GetNowUs();
52     TimeUs msgProcessTime = (delayUs > INT64_MAX - nowUs) ? INT64_MAX : (nowUs + delayUs);
53     if (m_msgQueue.find(msgProcessTime) != m_msgQueue.end()) {
54         LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
55         msgProcessTime++;
56     }
57     m_msgQueue[msgProcessTime] = MsgInfo {type, ASYNC_MSG_ID, msg};
58     m_threadCond.notify_all();
59 }
60 
SendSyncMsg(MsgType type,const ParamSP & msg,ParamSP & reply,uint32_t waitMs)61 bool MsgHandleLoop::SendSyncMsg(MsgType type, const ParamSP &msg, ParamSP &reply, uint32_t waitMs)
62 {
63     MsgId id = GenerateMsgId();
64     {
65         lock_guard<mutex> lock(m_mtx);
66         TimeUs time = GetNowUs();
67         if (m_msgQueue.find(time) != m_msgQueue.end()) {
68             LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
69             time++;
70         }
71         m_msgQueue[time] = MsgInfo {type, id, msg};
72         m_threadCond.notify_all();
73     }
74 
75     unique_lock<mutex> lock(m_replyMtx);
76     const auto pred = [this, id]() {
77         return m_replies.find(id) != m_replies.end();
78     };
79     if (waitMs == 0) {
80         m_replyCond.wait(lock, pred);
81     } else {
82         if (!m_replyCond.wait_for(lock, chrono::milliseconds(waitMs), pred)) {
83             LOGE("type=%{public}u wait reply timeout", type);
84             return false;
85         }
86     }
87     reply = m_replies[id];
88     m_replies.erase(id);
89     return true;
90 }
91 
PostReply(MsgId id,const ParamSP & reply)92 void MsgHandleLoop::PostReply(MsgId id, const ParamSP &reply)
93 {
94     if (id == ASYNC_MSG_ID) {
95         return;
96     }
97     lock_guard<mutex> lock(m_replyMtx);
98     m_replies[id] = reply;
99     m_replyCond.notify_all();
100 }
101 
GenerateMsgId()102 MsgId MsgHandleLoop::GenerateMsgId()
103 {
104     lock_guard<mutex> lock(m_mtx);
105     m_lastMsgId++;
106     if (m_lastMsgId == ASYNC_MSG_ID) {
107         m_lastMsgId++;
108     }
109     return m_lastMsgId;
110 }
111 
MainLoop()112 void MsgHandleLoop::MainLoop()
113 {
114     while (true) {
115         MsgInfo info;
116         {
117             unique_lock<mutex> lock(m_mtx);
118             m_threadCond.wait(lock, [this] {
119                 return m_threadNeedStop || !m_msgQueue.empty();
120             });
121             if (m_threadNeedStop) {
122                 LOGI("stopped, remain %{public}zu msg unprocessed", m_msgQueue.size());
123                 break;
124             }
125             TimeUs processUs = m_msgQueue.begin()->first;
126             TimeUs nowUs = GetNowUs();
127             if (processUs > nowUs) {
128                 m_threadCond.wait_for(lock, chrono::microseconds(processUs - nowUs));
129                 continue;
130             }
131             info = m_msgQueue.begin()->second;
132             m_msgQueue.erase(m_msgQueue.begin());
133         }
134         OnMsgReceived(info);
135     }
136 }
137 
GetNowUs()138 MsgHandleLoop::TimeUs MsgHandleLoop::GetNowUs()
139 {
140     auto now = chrono::steady_clock::now();
141     return chrono::duration_cast<chrono::microseconds>(now.time_since_epoch()).count();
142 }
143 
144