1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "recorder_message_processor.h"
17 #include "recorder_inner_defines.h"
18 #include "media_errors.h"
19 #include "media_log.h"
20 #include "scope_guard.h"
21 #include "i_recorder_engine.h"
22
23 namespace {
24 using namespace OHOS::Media;
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecMsgProc"};
26 static const std::unordered_map<GstMessageType, RecorderMessageFeature> FEATURE_MSG_TYPE_CVT_TABLE = {
27 { GST_MESSAGE_EOS, REC_MSG_FEATURE_EOS_DONE },
28 { GST_MESSAGE_ASYNC_DONE, REC_MSG_FEATURE_ASYNC_DONE },
29 { GST_MESSAGE_STATE_CHANGED, REC_MSG_FEATURE_STATE_CHANGE_DONE },
30 };
31 }
32
33 namespace OHOS {
34 namespace Media {
ProcessInfoMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)35 RecorderMsgProcResult RecorderMsgHandler::ProcessInfoMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
36 {
37 (void)prettyMsg;
38
39 GstInfoMsgParser parser(msg);
40 CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
41 MEDIA_LOGI("[INFO] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
42
43 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
44 }
45
ProcessWarningMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)46 RecorderMsgProcResult RecorderMsgHandler::ProcessWarningMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
47 {
48 (void)prettyMsg;
49
50 GstWarningMsgParser parser(msg);
51 CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
52 MEDIA_LOGW("[WARNING] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
53
54 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
55 }
56
ProcessErrorMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)57 RecorderMsgProcResult RecorderMsgHandler::ProcessErrorMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
58 {
59 GstErrorMsgParser parser(msg);
60 CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
61 MEDIA_LOGE("[ERROR] domain:0x%{public}x, code:0x%{public}x, msg:%{public}s, %{public}s.",
62 parser.GetErr()->domain, parser.GetErr()->code, parser.GetErr()->message, parser.GetDbg());
63
64 if (parser.GetErr()->domain == GST_RESOURCE_ERROR) {
65 if (parser.GetErr()->code == GST_RESOURCE_ERROR_NOT_FOUND) {
66 // Video input timeout.
67 prettyMsg.detail = MSERR_DATA_SOURCE_IO_ERROR;
68 } else if (parser.GetErr()->code == GST_RESOURCE_ERROR_READ) {
69 // Audio input timeout.
70 prettyMsg.detail = MSERR_DATA_SOURCE_OBTAIN_MEM_ERROR;
71 } else if (parser.GetErr()->code == GST_RESOURCE_ERROR_SETTINGS) {
72 // Video input data error.
73 prettyMsg.detail = MSERR_DATA_SOURCE_ERROR_UNKNOWN;
74 } else {
75 prettyMsg.detail = MSERR_UNKNOWN;
76 }
77 } else {
78 prettyMsg.detail = MSERR_UNKNOWN;
79 }
80
81 prettyMsg.type = REC_MSG_ERROR;
82 prettyMsg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
83
84 return RecorderMsgProcResult::REC_MSG_PROC_OK;
85 }
86
ProcessFeatureMessage(GstMessage & msg,RecorderMessage & prettyMsg)87 static RecorderMsgProcResult ProcessFeatureMessage(GstMessage &msg, RecorderMessage &prettyMsg)
88 {
89 auto featureMsgTypeIter = FEATURE_MSG_TYPE_CVT_TABLE.find(GST_MESSAGE_TYPE(&msg));
90 if (featureMsgTypeIter != FEATURE_MSG_TYPE_CVT_TABLE.end()) {
91 prettyMsg.type = REC_MSG_FEATURE;
92 prettyMsg.code = featureMsgTypeIter->second;
93 return RecorderMsgProcResult::REC_MSG_PROC_OK;
94 }
95
96 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
97 }
98
ProcessStateChangedMessage(GstMessage & msg,RecorderMessage & prettyMsg)99 static RecorderMsgProcResult ProcessStateChangedMessage(GstMessage &msg, RecorderMessage &prettyMsg)
100 {
101 GstState oldState = GST_STATE_NULL;
102 GstState newState = GST_STATE_NULL;
103 GstState pendingState = GST_STATE_NULL;
104
105 gst_message_parse_state_changed(&msg, &oldState, &newState, &pendingState);
106 MEDIA_LOGI("%{public}s finished state change, oldState: %{public}s, newState: %{public}s",
107 GST_ELEMENT_NAME(msg.src), gst_element_state_get_name(oldState),
108 gst_element_state_get_name(newState));
109
110 if (GST_IS_PIPELINE(msg.src)) {
111 prettyMsg.type = REC_MSG_FEATURE;
112 prettyMsg.code = REC_MSG_FEATURE_STATE_CHANGE_DONE;
113 prettyMsg.detail = static_cast<int32_t>(newState);
114 return RecorderMsgProcResult::REC_MSG_PROC_OK;
115 }
116
117 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
118 }
119
120 using MessageProcFunc = RecorderMsgProcResult (*)(GstMessage &msg, RecorderMessage &prettyMsg);
121 static const std::unordered_map<GstMessageType, MessageProcFunc> MSG_PROC_FUNC_TABLE = {
122 { GST_MESSAGE_INFO, &RecorderMsgHandler::ProcessInfoMsgDefault },
123 { GST_MESSAGE_WARNING, &RecorderMsgHandler::ProcessWarningMsgDefault },
124 { GST_MESSAGE_ERROR, &RecorderMsgHandler::ProcessErrorMsgDefault },
125 { GST_MESSAGE_EOS, &ProcessFeatureMessage },
126 { GST_MESSAGE_ASYNC_DONE, &ProcessFeatureMessage },
127 { GST_MESSAGE_STATE_CHANGED, &ProcessStateChangedMessage },
128 };
129
BusCallback(GstBus * bus,GstMessage * msg,gpointer data)130 gboolean RecorderMsgProcessor::BusCallback(GstBus *bus, GstMessage *msg, gpointer data)
131 {
132 (void)bus;
133
134 RecorderMsgProcessor *processor = reinterpret_cast<RecorderMsgProcessor *>(data);
135 if (processor == nullptr) {
136 MEDIA_LOGE("processor is nullptr !");
137 return FALSE;
138 }
139
140 CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
141
142 processor->ProcessMessage(*msg);
143 return TRUE;
144 }
145
RecorderMsgProcessor(GstBus & gstBus,const MessageResCb & resCb)146 RecorderMsgProcessor::RecorderMsgProcessor(GstBus &gstBus, const MessageResCb &resCb)
147 : mainLoopGuard_("rec-pipe-guard"), msgResultCb_(resCb)
148 {
149 gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
150 }
151
~RecorderMsgProcessor()152 RecorderMsgProcessor::~RecorderMsgProcessor()
153 {
154 (void)Reset();
155
156 if (gstBus_ != nullptr) {
157 gst_object_unref(gstBus_);
158 gstBus_ = nullptr;
159 }
160 }
161
Init()162 int32_t RecorderMsgProcessor::Init()
163 {
164 if (msgResultCb_ == nullptr) {
165 MEDIA_LOGE("message result callback is nullptr");
166 return MSERR_INVALID_VAL;
167 }
168
169 ON_SCOPE_EXIT(0) { (void)Reset(); };
170
171 mainLoop_ = g_main_loop_new(nullptr, FALSE);
172 CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
173
174 busWatchId_ = gst_bus_add_watch(gstBus_, (GstBusFunc)&RecorderMsgProcessor::BusCallback, this);
175 CHECK_AND_RETURN_RET(busWatchId_ != 0, MSERR_INVALID_OPERATION);
176
177 int32_t ret = mainLoopGuard_.Start();
178 CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
179
180 auto mainLoopGuardTask = std::make_shared<TaskHandler<void>>([this] { g_main_loop_run(mainLoop_); });
181
182 ret = mainLoopGuard_.EnqueueTask(mainLoopGuardTask);
183 CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
184
185 CANCEL_SCOPE_EXIT_GUARD(0);
186 return MSERR_OK;
187 }
188
AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)189 void RecorderMsgProcessor::AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)
190 {
191 if (handler == nullptr) {
192 MEDIA_LOGE("handler is nullptr");
193 return;
194 }
195
196 std::unique_lock<std::mutex> lock(mutex_);
197 for (auto &item : msgHandlers_) {
198 if (item == handler) {
199 return;
200 }
201 }
202
203 msgHandlers_.push_back(handler);
204 }
205
Reset()206 int32_t RecorderMsgProcessor::Reset()
207 {
208 if (errorProcQ_ != nullptr) {
209 (void)errorProcQ_->Stop();
210 errorProcQ_ = nullptr;
211 }
212
213 if (busWatchId_ != 0) {
214 (void)g_source_remove(busWatchId_);
215 busWatchId_ = 0;
216 }
217
218 if (mainLoop_ != nullptr) {
219 if (g_main_loop_is_running(mainLoop_)) {
220 g_main_loop_quit(mainLoop_);
221 }
222 g_main_loop_unref(mainLoop_);
223 mainLoop_ = nullptr;
224 }
225
226 (void)mainLoopGuard_.Stop();
227 return MSERR_OK;
228 }
229
ProcessMessage(GstMessage & msg)230 void RecorderMsgProcessor::ProcessMessage(GstMessage &msg)
231 {
232 RecorderMessage prettyMsg {};
233 RecorderMsgProcResult rst = RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
234
235 {
236 std::unique_lock<std::mutex> lock(mutex_);
237 for (auto &msgHandler : msgHandlers_) {
238 rst = msgHandler->OnMessageReceived(msg, prettyMsg);
239 if (rst != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
240 break;
241 }
242 }
243 }
244
245 if (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
246 rst = ProcessMessageFinal(msg, prettyMsg);
247 }
248
249 if (rst == RecorderMsgProcResult::REC_MSG_PROC_FAILED) {
250 NotifyInternalError(prettyMsg);
251 return;
252 }
253
254 if (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
255 return;
256 }
257
258 ReportMsgProcResult(prettyMsg);
259 }
260
ProcessExtendMessage(const GstMessage & msg,const RecorderMessage & prettyMsg) const261 RecorderMsgProcResult RecorderMsgProcessor::ProcessExtendMessage(
262 const GstMessage &msg, const RecorderMessage &prettyMsg) const
263 {
264 (void)msg;
265 (void)prettyMsg;
266 // Check whether the message format is extended format. If yes, translate it here and return OK
267 // ohos extented format: ohos.ext type=*, code=* the type value must exceed than 0x10000
268 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
269 }
270
ProcessMessageFinal(GstMessage & msg,RecorderMessage & prettyMsg)271 RecorderMsgProcResult RecorderMsgProcessor::ProcessMessageFinal(GstMessage &msg, RecorderMessage &prettyMsg)
272 {
273 if (!(GST_IS_PIPELINE(msg.src) || (GST_MESSAGE_TYPE(&msg) == GST_MESSAGE_ERROR))) {
274 auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
275 if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
276 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
277 }
278 tblIter->second(msg, prettyMsg); // only print to log
279 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
280 }
281
282 prettyMsg.sourceId = INVALID_SOURCE_ID;
283
284 RecorderMsgProcResult ret = ProcessExtendMessage(msg, prettyMsg);
285 if (ret != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
286 return ret;
287 }
288
289 auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
290 if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
291 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
292 }
293
294 return tblIter->second(msg, prettyMsg);
295 }
296
NotifyInternalError(RecorderMessage & msg)297 void RecorderMsgProcessor::NotifyInternalError(RecorderMessage &msg)
298 {
299 // sourceid keep unchanged.
300 msg.type = RecorderMessageType::REC_MSG_ERROR;
301 msg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
302 msg.detail = MSERR_UNKNOWN;
303
304 ReportMsgProcResult(msg);
305 }
306
ReportMsgProcResult(const RecorderMessage & msg)307 void RecorderMsgProcessor::ReportMsgProcResult(const RecorderMessage &msg)
308 {
309 if (msg.type != REC_MSG_ERROR) {
310 return msgResultCb_(msg);
311 }
312
313 if (errorProcQ_ == nullptr) {
314 errorProcQ_ = std::make_unique<TaskQueue>("rec-err-proc");
315 int32_t ret = errorProcQ_->Start();
316 CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
317 }
318
319 auto errorProc = std::make_shared<TaskHandler<void>>([this, msg] { msgResultCb_(msg); });
320
321 int32_t ret = errorProcQ_->EnqueueTask(errorProc);
322 CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
323 }
324 } // namespace Media
325 } // namespace OHOS