• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15  * Description: implementation of message queue thread
16  */
17 
18 #include "msg_handle_loop.h"
19 #include <chrono>
20 #include <cinttypes>
21 #include "qos.h"
22 #include "hcodec_log.h"
23 #include "hcodec_utils.h"
24 
25 using namespace std;
26 
MsgHandleLoop()27 MsgHandleLoop::MsgHandleLoop()
28 {
29     m_token = make_shared<MsgToken>();
30     m_thread = thread(&MsgHandleLoop::MainLoop, this);
31 }
32 
~MsgHandleLoop()33 MsgHandleLoop::~MsgHandleLoop()
34 {
35     Stop();
36 }
37 
Stop()38 void MsgHandleLoop::Stop()
39 {
40     {
41         lock_guard<mutex> lock(m_token->m_mtx);
42         m_threadNeedStop = true;
43         m_token->m_threadCond.notify_one();
44     }
45 
46     if (m_thread.joinable()) {
47         m_thread.join();
48     }
49 }
50 
SendAsyncMsg(MsgType type,const ParamSP & msg,uint32_t delayUs)51 void MsgHandleLoop::SendAsyncMsg(MsgType type, const ParamSP &msg, uint32_t delayUs)
52 {
53     m_token->SendAsyncMsg(type, msg, delayUs);
54 }
55 
SendAsyncMsg(MsgType type,const ParamSP & msg,uint32_t delayUs)56 void MsgHandleLoop::MsgToken::SendAsyncMsg(MsgType type, const ParamSP &msg, uint32_t delayUs)
57 {
58     lock_guard<mutex> lock(m_mtx);
59     TimeUs nowUs = GetNowUs();
60     TimeUs msgProcessTime = (delayUs > INT64_MAX - nowUs) ? INT64_MAX : (nowUs + static_cast<int64_t>(delayUs));
61     if (m_msgQueue.find(msgProcessTime) != m_msgQueue.end()) {
62         LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
63         msgProcessTime++;
64     }
65     m_msgQueue[msgProcessTime] = MsgInfo {type, ASYNC_MSG_ID, msg};
66     m_threadCond.notify_one();
67 }
68 
SendSyncMsg(MsgType type,const ParamSP & msg,ParamSP & reply,uint32_t waitMs)69 bool MsgHandleLoop::SendSyncMsg(MsgType type, const ParamSP &msg, ParamSP &reply, uint32_t waitMs)
70 {
71     MsgId id = GenerateMsgId();
72     {
73         lock_guard<mutex> lock(m_token->m_mtx);
74         TimeUs time = GetNowUs();
75         if (m_token->m_msgQueue.find(time) != m_token->m_msgQueue.end()) {
76             LOGW("DUPLICATIVE MSG TIMESTAMP!!!");
77             time++;
78         }
79         m_token->m_msgQueue[time] = MsgInfo {type, id, msg};
80         m_token->m_threadCond.notify_one();
81     }
82 
83     unique_lock<mutex> lock(m_replyMtx);
84     const auto pred = [this, id]() {
85         return m_replies.find(id) != m_replies.end();
86     };
87     if (waitMs == 0) {
88         m_replyCond.wait(lock, pred);
89     } else {
90         if (!m_replyCond.wait_for(lock, chrono::milliseconds(waitMs), pred)) {
91             LOGE("type=%u wait reply timeout", type);
92             return false;
93         }
94     }
95     reply = m_replies[id];
96     m_replies.erase(id);
97     return true;
98 }
99 
PostReply(MsgId id,const ParamSP & reply)100 void MsgHandleLoop::PostReply(MsgId id, const ParamSP &reply)
101 {
102     if (id == ASYNC_MSG_ID) {
103         return;
104     }
105     lock_guard<mutex> lock(m_replyMtx);
106     m_replies[id] = reply;
107     m_replyCond.notify_all();
108 }
109 
GenerateMsgId()110 MsgId MsgHandleLoop::GenerateMsgId()
111 {
112     lock_guard<mutex> lock(m_token->m_mtx);
113     m_lastMsgId++;
114     if (m_lastMsgId == ASYNC_MSG_ID) {
115         m_lastMsgId++;
116     }
117     return m_lastMsgId;
118 }
119 
MainLoop()120 void MsgHandleLoop::MainLoop()
121 {
122     pthread_setname_np(pthread_self(), "OS_HCodecLoop");
123     OHOS::MediaAVCodec::SetThreadInteractiveQos(true);
124     while (true) {
125         MsgInfo info;
126         {
127             unique_lock<mutex> lock(m_token->m_mtx);
128             m_token->m_threadCond.wait(lock, [this] {
129                 return m_threadNeedStop || !m_token->m_msgQueue.empty();
130             });
131             if (m_threadNeedStop) {
132                 LOGD("stopped, remain %zu msg unprocessed", m_token->m_msgQueue.size());
133                 break;
134             }
135             TimeUs processUs = m_token->m_msgQueue.begin()->first;
136             TimeUs nowUs = GetNowUs();
137             if (processUs > nowUs) {
138                 m_token->m_threadCond.wait_for(lock, chrono::microseconds(processUs - nowUs));
139                 continue;
140             }
141             info = m_token->m_msgQueue.begin()->second;
142             m_token->m_msgQueue.erase(m_token->m_msgQueue.begin());
143         }
144         OnMsgReceived(info);
145     }
146 }
147 
GetNowUs()148 MsgHandleLoop::TimeUs MsgHandleLoop::GetNowUs()
149 {
150     auto now = chrono::steady_clock::now();
151     return chrono::duration_cast<chrono::microseconds>(now.time_since_epoch()).count();
152 }
153 
154