• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "recorder_message_processor.h"
17 #include "recorder_inner_defines.h"
18 #include "media_errors.h"
19 #include "media_log.h"
20 #include "scope_guard.h"
21 #include "i_recorder_engine.h"
22 
23 namespace {
24 using namespace OHOS::Media;
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecMsgProc"};
26 static const std::unordered_map<GstMessageType, RecorderMessageFeature> FEATURE_MSG_TYPE_CVT_TABLE = {
27     { GST_MESSAGE_EOS, REC_MSG_FEATURE_EOS_DONE },
28     { GST_MESSAGE_ASYNC_DONE, REC_MSG_FEATURE_ASYNC_DONE },
29     { GST_MESSAGE_STATE_CHANGED, REC_MSG_FEATURE_STATE_CHANGE_DONE },
30 };
31 }
32 
33 namespace OHOS {
34 namespace Media {
ProcessInfoMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)35 RecorderMsgProcResult RecorderMsgHandler::ProcessInfoMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
36 {
37     (void)prettyMsg;
38 
39     GstInfoMsgParser parser(msg);
40     CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
41     MEDIA_LOGI("[INFO] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
42 
43     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
44 }
45 
ProcessWarningMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)46 RecorderMsgProcResult RecorderMsgHandler::ProcessWarningMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
47 {
48     (void)prettyMsg;
49 
50     GstWarningMsgParser parser(msg);
51     CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
52     MEDIA_LOGW("[WARNING] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
53 
54     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
55 }
56 
ProcessErrorMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)57 RecorderMsgProcResult RecorderMsgHandler::ProcessErrorMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
58 {
59     GstErrorMsgParser parser(msg);
60     CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
61     MEDIA_LOGE("[ERROR] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
62 
63     prettyMsg.type = REC_MSG_ERROR;
64     prettyMsg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
65     prettyMsg.detail = MSERR_UNKNOWN;
66 
67     return RecorderMsgProcResult::REC_MSG_PROC_OK;
68 }
69 
ProcessFeatureMessage(GstMessage & msg,RecorderMessage & prettyMsg)70 static RecorderMsgProcResult ProcessFeatureMessage(GstMessage &msg, RecorderMessage &prettyMsg)
71 {
72     auto featureMsgTypeIter = FEATURE_MSG_TYPE_CVT_TABLE.find(GST_MESSAGE_TYPE(&msg));
73     if (featureMsgTypeIter != FEATURE_MSG_TYPE_CVT_TABLE.end()) {
74         prettyMsg.type = REC_MSG_FEATURE;
75         prettyMsg.code = featureMsgTypeIter->second;
76         return RecorderMsgProcResult::REC_MSG_PROC_OK;
77     }
78 
79     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
80 }
81 
ProcessStateChangedMessage(GstMessage & msg,RecorderMessage & prettyMsg)82 static RecorderMsgProcResult ProcessStateChangedMessage(GstMessage &msg, RecorderMessage &prettyMsg)
83 {
84     GstState oldState = GST_STATE_NULL;
85     GstState newState = GST_STATE_NULL;
86     GstState pendingState = GST_STATE_NULL;
87 
88     gst_message_parse_state_changed(&msg, &oldState, &newState, &pendingState);
89     MEDIA_LOGI("%{public}s finished state change, oldState: %{public}s, newState: %{public}s",
90                GST_ELEMENT_NAME(msg.src), gst_element_state_get_name(oldState),
91                gst_element_state_get_name(newState));
92 
93     if (GST_IS_PIPELINE(msg.src)) {
94         prettyMsg.type = REC_MSG_FEATURE;
95         prettyMsg.code = REC_MSG_FEATURE_STATE_CHANGE_DONE;
96         prettyMsg.detail = static_cast<int32_t>(newState);
97         return RecorderMsgProcResult::REC_MSG_PROC_OK;
98     }
99 
100     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
101 }
102 
103 using MessageProcFunc = RecorderMsgProcResult (*)(GstMessage &msg, RecorderMessage &prettyMsg);
104 static const std::unordered_map<GstMessageType, MessageProcFunc> MSG_PROC_FUNC_TABLE = {
105     { GST_MESSAGE_INFO, &RecorderMsgHandler::ProcessInfoMsgDefault },
106     { GST_MESSAGE_WARNING, &RecorderMsgHandler::ProcessWarningMsgDefault },
107     { GST_MESSAGE_ERROR, &RecorderMsgHandler::ProcessErrorMsgDefault },
108     { GST_MESSAGE_EOS, &ProcessFeatureMessage },
109     { GST_MESSAGE_ASYNC_DONE, &ProcessFeatureMessage },
110     { GST_MESSAGE_STATE_CHANGED, &ProcessStateChangedMessage },
111 };
112 
BusCallback(GstBus * bus,GstMessage * msg,gpointer data)113 gboolean RecorderMsgProcessor::BusCallback(GstBus *bus, GstMessage *msg, gpointer data)
114 {
115     (void)bus;
116 
117     RecorderMsgProcessor *processor = reinterpret_cast<RecorderMsgProcessor *>(data);
118     if (processor == nullptr) {
119         MEDIA_LOGE("processor is nullptr !");
120         return FALSE;
121     }
122 
123     CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
124 
125     processor->ProcessMessage(*msg);
126     return TRUE;
127 }
128 
RecorderMsgProcessor(GstBus & gstBus,const MessageResCb & resCb)129 RecorderMsgProcessor::RecorderMsgProcessor(GstBus &gstBus, const MessageResCb &resCb)
130     : mainLoopGuard_("rec-pipe-guard"), msgResultCb_(resCb)
131 {
132     gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
133 }
134 
~RecorderMsgProcessor()135 RecorderMsgProcessor::~RecorderMsgProcessor()
136 {
137     (void)Reset();
138 
139     if (gstBus_ != nullptr) {
140         gst_object_unref(gstBus_);
141         gstBus_ = nullptr;
142     }
143 }
144 
Init()145 int32_t RecorderMsgProcessor::Init()
146 {
147     if (msgResultCb_ == nullptr) {
148         MEDIA_LOGE("message result callback is nullptr");
149         return MSERR_INVALID_VAL;
150     }
151 
152     ON_SCOPE_EXIT(0) { (void)Reset(); };
153 
154     mainLoop_ = g_main_loop_new(nullptr, FALSE);
155     CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
156 
157     busWatchId_ = gst_bus_add_watch(gstBus_, (GstBusFunc)&RecorderMsgProcessor::BusCallback, this);
158     CHECK_AND_RETURN_RET(busWatchId_ != 0, MSERR_INVALID_OPERATION);
159 
160     int32_t ret = mainLoopGuard_.Start();
161     CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
162 
163     auto mainLoopGuardTask = std::make_shared<TaskHandler<void>>([this] { g_main_loop_run(mainLoop_); });
164 
165     ret = mainLoopGuard_.EnqueueTask(mainLoopGuardTask);
166     CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
167 
168     CANCEL_SCOPE_EXIT_GUARD(0);
169     return MSERR_OK;
170 }
171 
AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)172 void RecorderMsgProcessor::AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)
173 {
174     if (handler == nullptr) {
175         MEDIA_LOGE("handler is nullptr");
176         return;
177     }
178 
179     std::unique_lock<std::mutex> lock(mutex_);
180     for (auto &item : msgHandlers_) {
181         if (item == handler) {
182             return;
183         }
184     }
185 
186     msgHandlers_.push_back(handler);
187 }
188 
Reset()189 int32_t RecorderMsgProcessor::Reset()
190 {
191     if (errorProcQ_ != nullptr) {
192         (void)errorProcQ_->Stop();
193         errorProcQ_ = nullptr;
194     }
195 
196     if (busWatchId_ != 0) {
197         (void)g_source_remove(busWatchId_);
198         busWatchId_ = 0;
199     }
200 
201     if (mainLoop_ != nullptr) {
202         if (g_main_loop_is_running(mainLoop_)) {
203             g_main_loop_quit(mainLoop_);
204         }
205         g_main_loop_unref(mainLoop_);
206         mainLoop_ = nullptr;
207     }
208 
209     (void)mainLoopGuard_.Stop();
210     return MSERR_OK;
211 }
212 
ProcessMessage(GstMessage & msg)213 void RecorderMsgProcessor::ProcessMessage(GstMessage &msg)
214 {
215     RecorderMessage prettyMsg {};
216     RecorderMsgProcResult rst = RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
217 
218     {
219         std::unique_lock<std::mutex> lock(mutex_);
220         for (auto &msgHandler : msgHandlers_) {
221             rst = msgHandler->OnMessageReceived(msg, prettyMsg);
222             if (rst != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
223                 break;
224             }
225         }
226     }
227 
228     if  (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
229         rst = ProcessMessageFinal(msg, prettyMsg);
230     }
231 
232     if (rst == RecorderMsgProcResult::REC_MSG_PROC_FAILED) {
233         NotifyInternalError(prettyMsg);
234         return;
235     }
236 
237     if (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
238         return;
239     }
240 
241     ReportMsgProcResult(prettyMsg);
242 }
243 
ProcessExtendMessage(GstMessage & msg,RecorderMessage & prettyMsg) const244 RecorderMsgProcResult RecorderMsgProcessor::ProcessExtendMessage(GstMessage &msg, RecorderMessage &prettyMsg) const
245 {
246     (void)msg;
247     (void)prettyMsg;
248     // Check whether the message format is extented format. If yes, translate it here and return OK
249     // ohos extented format: ohos.ext type=*, code=*   the type value must exceed than 0x10000
250     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
251 }
252 
ProcessMessageFinal(GstMessage & msg,RecorderMessage & prettyMsg)253 RecorderMsgProcResult RecorderMsgProcessor::ProcessMessageFinal(GstMessage &msg, RecorderMessage &prettyMsg)
254 {
255     if (!(GST_IS_PIPELINE(msg.src) || (GST_MESSAGE_TYPE(&msg) == GST_MESSAGE_ERROR))) {
256         auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
257         if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
258             return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
259         }
260         tblIter->second(msg, prettyMsg); // only print to log
261         return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
262     }
263 
264     prettyMsg.sourceId = INVALID_SOURCE_ID;
265 
266     RecorderMsgProcResult ret = ProcessExtendMessage(msg, prettyMsg);
267     if (ret != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
268         return ret;
269     }
270 
271     auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
272     if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
273         return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
274     }
275 
276     return tblIter->second(msg, prettyMsg);
277 }
278 
NotifyInternalError(RecorderMessage & msg)279 void RecorderMsgProcessor::NotifyInternalError(RecorderMessage &msg)
280 {
281     // sourceid keep unchanged.
282     msg.type = RecorderMessageType::REC_MSG_ERROR;
283     msg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
284     msg.detail = MSERR_UNKNOWN;
285 
286     ReportMsgProcResult(msg);
287 }
288 
ReportMsgProcResult(const RecorderMessage & msg)289 void RecorderMsgProcessor::ReportMsgProcResult(const RecorderMessage &msg)
290 {
291     if (msg.type != REC_MSG_ERROR) {
292         return msgResultCb_(msg);
293     }
294 
295     if (errorProcQ_ == nullptr) {
296         errorProcQ_ = std::make_unique<TaskQueue>("rec-err-proc");
297         int32_t ret = errorProcQ_->Start();
298         CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
299     }
300 
301     auto errorProc = std::make_shared<TaskHandler<void>>([this, msg] { msgResultCb_(msg); });
302 
303     int32_t ret = errorProcQ_->EnqueueTask(errorProc);
304     CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
305 }
306 } // namespace Media
307 } // namespace OHOS