1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "recorder_message_processor.h"
17 #include "recorder_inner_defines.h"
18 #include "media_errors.h"
19 #include "media_log.h"
20 #include "scope_guard.h"
21 #include "i_recorder_engine.h"
22
23 namespace {
24 using namespace OHOS::Media;
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecMsgProc"};
26 static const std::unordered_map<GstMessageType, RecorderMessageFeature> FEATURE_MSG_TYPE_CVT_TABLE = {
27 { GST_MESSAGE_EOS, REC_MSG_FEATURE_EOS_DONE },
28 { GST_MESSAGE_ASYNC_DONE, REC_MSG_FEATURE_ASYNC_DONE },
29 { GST_MESSAGE_STATE_CHANGED, REC_MSG_FEATURE_STATE_CHANGE_DONE },
30 };
31 }
32
33 namespace OHOS {
34 namespace Media {
ProcessInfoMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)35 RecorderMsgProcResult RecorderMsgHandler::ProcessInfoMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
36 {
37 (void)prettyMsg;
38
39 GstInfoMsgParser parser(msg);
40 CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
41 MEDIA_LOGI("[INFO] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
42
43 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
44 }
45
ProcessWarningMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)46 RecorderMsgProcResult RecorderMsgHandler::ProcessWarningMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
47 {
48 (void)prettyMsg;
49
50 GstWarningMsgParser parser(msg);
51 CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
52 MEDIA_LOGW("[WARNING] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
53
54 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
55 }
56
ProcessErrorMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)57 RecorderMsgProcResult RecorderMsgHandler::ProcessErrorMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
58 {
59 GstErrorMsgParser parser(msg);
60 CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
61 MEDIA_LOGE("[ERROR] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
62
63 prettyMsg.type = REC_MSG_ERROR;
64 prettyMsg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
65 prettyMsg.detail = MSERR_UNKNOWN;
66
67 return RecorderMsgProcResult::REC_MSG_PROC_OK;
68 }
69
ProcessFeatureMessage(GstMessage & msg,RecorderMessage & prettyMsg)70 static RecorderMsgProcResult ProcessFeatureMessage(GstMessage &msg, RecorderMessage &prettyMsg)
71 {
72 auto featureMsgTypeIter = FEATURE_MSG_TYPE_CVT_TABLE.find(GST_MESSAGE_TYPE(&msg));
73 if (featureMsgTypeIter != FEATURE_MSG_TYPE_CVT_TABLE.end()) {
74 prettyMsg.type = REC_MSG_FEATURE;
75 prettyMsg.code = featureMsgTypeIter->second;
76 return RecorderMsgProcResult::REC_MSG_PROC_OK;
77 }
78
79 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
80 }
81
ProcessStateChangedMessage(GstMessage & msg,RecorderMessage & prettyMsg)82 static RecorderMsgProcResult ProcessStateChangedMessage(GstMessage &msg, RecorderMessage &prettyMsg)
83 {
84 GstState oldState = GST_STATE_NULL;
85 GstState newState = GST_STATE_NULL;
86 GstState pendingState = GST_STATE_NULL;
87
88 gst_message_parse_state_changed(&msg, &oldState, &newState, &pendingState);
89 MEDIA_LOGI("%{public}s finished state change, oldState: %{public}s, newState: %{public}s",
90 GST_ELEMENT_NAME(msg.src), gst_element_state_get_name(oldState),
91 gst_element_state_get_name(newState));
92
93 if (GST_IS_PIPELINE(msg.src)) {
94 prettyMsg.type = REC_MSG_FEATURE;
95 prettyMsg.code = REC_MSG_FEATURE_STATE_CHANGE_DONE;
96 prettyMsg.detail = static_cast<int32_t>(newState);
97 return RecorderMsgProcResult::REC_MSG_PROC_OK;
98 }
99
100 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
101 }
102
103 using MessageProcFunc = RecorderMsgProcResult (*)(GstMessage &msg, RecorderMessage &prettyMsg);
104 static const std::unordered_map<GstMessageType, MessageProcFunc> MSG_PROC_FUNC_TABLE = {
105 { GST_MESSAGE_INFO, &RecorderMsgHandler::ProcessInfoMsgDefault },
106 { GST_MESSAGE_WARNING, &RecorderMsgHandler::ProcessWarningMsgDefault },
107 { GST_MESSAGE_ERROR, &RecorderMsgHandler::ProcessErrorMsgDefault },
108 { GST_MESSAGE_EOS, &ProcessFeatureMessage },
109 { GST_MESSAGE_ASYNC_DONE, &ProcessFeatureMessage },
110 { GST_MESSAGE_STATE_CHANGED, &ProcessStateChangedMessage },
111 };
112
BusCallback(GstBus * bus,GstMessage * msg,gpointer data)113 gboolean RecorderMsgProcessor::BusCallback(GstBus *bus, GstMessage *msg, gpointer data)
114 {
115 (void)bus;
116
117 RecorderMsgProcessor *processor = reinterpret_cast<RecorderMsgProcessor *>(data);
118 if (processor == nullptr) {
119 MEDIA_LOGE("processor is nullptr !");
120 return FALSE;
121 }
122
123 CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
124
125 processor->ProcessMessage(*msg);
126 return TRUE;
127 }
128
RecorderMsgProcessor(GstBus & gstBus,const MessageResCb & resCb)129 RecorderMsgProcessor::RecorderMsgProcessor(GstBus &gstBus, const MessageResCb &resCb)
130 : mainLoopGuard_("rec-pipe-guard"), msgResultCb_(resCb)
131 {
132 gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
133 }
134
~RecorderMsgProcessor()135 RecorderMsgProcessor::~RecorderMsgProcessor()
136 {
137 (void)Reset();
138
139 if (gstBus_ != nullptr) {
140 gst_object_unref(gstBus_);
141 gstBus_ = nullptr;
142 }
143 }
144
Init()145 int32_t RecorderMsgProcessor::Init()
146 {
147 if (msgResultCb_ == nullptr) {
148 MEDIA_LOGE("message result callback is nullptr");
149 return MSERR_INVALID_VAL;
150 }
151
152 ON_SCOPE_EXIT(0) { (void)Reset(); };
153
154 mainLoop_ = g_main_loop_new(nullptr, FALSE);
155 CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
156
157 busWatchId_ = gst_bus_add_watch(gstBus_, (GstBusFunc)&RecorderMsgProcessor::BusCallback, this);
158 CHECK_AND_RETURN_RET(busWatchId_ != 0, MSERR_INVALID_OPERATION);
159
160 int32_t ret = mainLoopGuard_.Start();
161 CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
162
163 auto mainLoopGuardTask = std::make_shared<TaskHandler<void>>([this] { g_main_loop_run(mainLoop_); });
164
165 ret = mainLoopGuard_.EnqueueTask(mainLoopGuardTask);
166 CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
167
168 CANCEL_SCOPE_EXIT_GUARD(0);
169 return MSERR_OK;
170 }
171
AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)172 void RecorderMsgProcessor::AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)
173 {
174 if (handler == nullptr) {
175 MEDIA_LOGE("handler is nullptr");
176 return;
177 }
178
179 std::unique_lock<std::mutex> lock(mutex_);
180 for (auto &item : msgHandlers_) {
181 if (item == handler) {
182 return;
183 }
184 }
185
186 msgHandlers_.push_back(handler);
187 }
188
Reset()189 int32_t RecorderMsgProcessor::Reset()
190 {
191 if (errorProcQ_ != nullptr) {
192 (void)errorProcQ_->Stop();
193 errorProcQ_ = nullptr;
194 }
195
196 if (busWatchId_ != 0) {
197 (void)g_source_remove(busWatchId_);
198 busWatchId_ = 0;
199 }
200
201 if (mainLoop_ != nullptr) {
202 if (g_main_loop_is_running(mainLoop_)) {
203 g_main_loop_quit(mainLoop_);
204 }
205 g_main_loop_unref(mainLoop_);
206 mainLoop_ = nullptr;
207 }
208
209 (void)mainLoopGuard_.Stop();
210 return MSERR_OK;
211 }
212
ProcessMessage(GstMessage & msg)213 void RecorderMsgProcessor::ProcessMessage(GstMessage &msg)
214 {
215 RecorderMessage prettyMsg {};
216 RecorderMsgProcResult rst = RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
217
218 {
219 std::unique_lock<std::mutex> lock(mutex_);
220 for (auto &msgHandler : msgHandlers_) {
221 rst = msgHandler->OnMessageReceived(msg, prettyMsg);
222 if (rst != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
223 break;
224 }
225 }
226 }
227
228 if (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
229 rst = ProcessMessageFinal(msg, prettyMsg);
230 }
231
232 if (rst == RecorderMsgProcResult::REC_MSG_PROC_FAILED) {
233 NotifyInternalError(prettyMsg);
234 return;
235 }
236
237 if (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
238 return;
239 }
240
241 ReportMsgProcResult(prettyMsg);
242 }
243
ProcessExtendMessage(GstMessage & msg,RecorderMessage & prettyMsg) const244 RecorderMsgProcResult RecorderMsgProcessor::ProcessExtendMessage(GstMessage &msg, RecorderMessage &prettyMsg) const
245 {
246 (void)msg;
247 (void)prettyMsg;
248 // Check whether the message format is extented format. If yes, translate it here and return OK
249 // ohos extented format: ohos.ext type=*, code=* the type value must exceed than 0x10000
250 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
251 }
252
ProcessMessageFinal(GstMessage & msg,RecorderMessage & prettyMsg)253 RecorderMsgProcResult RecorderMsgProcessor::ProcessMessageFinal(GstMessage &msg, RecorderMessage &prettyMsg)
254 {
255 if (!(GST_IS_PIPELINE(msg.src) || (GST_MESSAGE_TYPE(&msg) == GST_MESSAGE_ERROR))) {
256 auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
257 if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
258 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
259 }
260 tblIter->second(msg, prettyMsg); // only print to log
261 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
262 }
263
264 prettyMsg.sourceId = INVALID_SOURCE_ID;
265
266 RecorderMsgProcResult ret = ProcessExtendMessage(msg, prettyMsg);
267 if (ret != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
268 return ret;
269 }
270
271 auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
272 if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
273 return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
274 }
275
276 return tblIter->second(msg, prettyMsg);
277 }
278
NotifyInternalError(RecorderMessage & msg)279 void RecorderMsgProcessor::NotifyInternalError(RecorderMessage &msg)
280 {
281 // sourceid keep unchanged.
282 msg.type = RecorderMessageType::REC_MSG_ERROR;
283 msg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
284 msg.detail = MSERR_UNKNOWN;
285
286 ReportMsgProcResult(msg);
287 }
288
ReportMsgProcResult(const RecorderMessage & msg)289 void RecorderMsgProcessor::ReportMsgProcResult(const RecorderMessage &msg)
290 {
291 if (msg.type != REC_MSG_ERROR) {
292 return msgResultCb_(msg);
293 }
294
295 if (errorProcQ_ == nullptr) {
296 errorProcQ_ = std::make_unique<TaskQueue>("rec-err-proc");
297 int32_t ret = errorProcQ_->Start();
298 CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
299 }
300
301 auto errorProc = std::make_shared<TaskHandler<void>>([this, msg] { msgResultCb_(msg); });
302
303 int32_t ret = errorProcQ_->EnqueueTask(errorProc);
304 CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
305 }
306 } // namespace Media
307 } // namespace OHOS