• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "recorder_message_processor.h"
17 #include "recorder_inner_defines.h"
18 #include "media_errors.h"
19 #include "media_log.h"
20 #include "scope_guard.h"
21 #include "i_recorder_engine.h"
22 
23 namespace {
24 using namespace OHOS::Media;
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecMsgProc"};
26 static const std::unordered_map<GstMessageType, RecorderMessageFeature> FEATURE_MSG_TYPE_CVT_TABLE = {
27     { GST_MESSAGE_EOS, REC_MSG_FEATURE_EOS_DONE },
28     { GST_MESSAGE_ASYNC_DONE, REC_MSG_FEATURE_ASYNC_DONE },
29     { GST_MESSAGE_STATE_CHANGED, REC_MSG_FEATURE_STATE_CHANGE_DONE },
30 };
31 }
32 
33 namespace OHOS {
34 namespace Media {
ProcessInfoMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)35 RecorderMsgProcResult RecorderMsgHandler::ProcessInfoMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
36 {
37     (void)prettyMsg;
38 
39     GstInfoMsgParser parser(msg);
40     CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
41     MEDIA_LOGI("[INFO] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
42 
43     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
44 }
45 
ProcessWarningMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)46 RecorderMsgProcResult RecorderMsgHandler::ProcessWarningMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
47 {
48     (void)prettyMsg;
49 
50     GstWarningMsgParser parser(msg);
51     CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
52     MEDIA_LOGW("[WARNING] %{public}s, %{public}s", parser.GetErr()->message, parser.GetDbg());
53 
54     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
55 }
56 
ProcessErrorMsgDefault(GstMessage & msg,RecorderMessage & prettyMsg)57 RecorderMsgProcResult RecorderMsgHandler::ProcessErrorMsgDefault(GstMessage &msg, RecorderMessage &prettyMsg)
58 {
59     GstErrorMsgParser parser(msg);
60     CHECK_AND_RETURN_RET(parser.InitCheck(), RecorderMsgProcResult::REC_MSG_PROC_FAILED);
61     MEDIA_LOGE("[ERROR] domain:0x%{public}x, code:0x%{public}x, msg:%{public}s, %{public}s.",
62         parser.GetErr()->domain, parser.GetErr()->code, parser.GetErr()->message, parser.GetDbg());
63 
64     if (parser.GetErr()->domain == GST_RESOURCE_ERROR) {
65         if (parser.GetErr()->code == GST_RESOURCE_ERROR_NOT_FOUND) {
66             // Video input timeout.
67             prettyMsg.detail = MSERR_DATA_SOURCE_IO_ERROR;
68         } else if (parser.GetErr()->code == GST_RESOURCE_ERROR_READ) {
69             // Audio input timeout.
70             prettyMsg.detail = MSERR_DATA_SOURCE_OBTAIN_MEM_ERROR;
71         } else if (parser.GetErr()->code == GST_RESOURCE_ERROR_SETTINGS) {
72             // Video input data error.
73             prettyMsg.detail = MSERR_DATA_SOURCE_ERROR_UNKNOWN;
74         } else {
75             prettyMsg.detail = MSERR_UNKNOWN;
76         }
77     } else {
78         prettyMsg.detail = MSERR_UNKNOWN;
79     }
80 
81     prettyMsg.type = REC_MSG_ERROR;
82     prettyMsg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
83 
84     return RecorderMsgProcResult::REC_MSG_PROC_OK;
85 }
86 
ProcessFeatureMessage(GstMessage & msg,RecorderMessage & prettyMsg)87 static RecorderMsgProcResult ProcessFeatureMessage(GstMessage &msg, RecorderMessage &prettyMsg)
88 {
89     auto featureMsgTypeIter = FEATURE_MSG_TYPE_CVT_TABLE.find(GST_MESSAGE_TYPE(&msg));
90     if (featureMsgTypeIter != FEATURE_MSG_TYPE_CVT_TABLE.end()) {
91         prettyMsg.type = REC_MSG_FEATURE;
92         prettyMsg.code = featureMsgTypeIter->second;
93         return RecorderMsgProcResult::REC_MSG_PROC_OK;
94     }
95 
96     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
97 }
98 
ProcessStateChangedMessage(GstMessage & msg,RecorderMessage & prettyMsg)99 static RecorderMsgProcResult ProcessStateChangedMessage(GstMessage &msg, RecorderMessage &prettyMsg)
100 {
101     GstState oldState = GST_STATE_NULL;
102     GstState newState = GST_STATE_NULL;
103     GstState pendingState = GST_STATE_NULL;
104 
105     gst_message_parse_state_changed(&msg, &oldState, &newState, &pendingState);
106     MEDIA_LOGI("%{public}s finished state change, oldState: %{public}s, newState: %{public}s",
107                GST_ELEMENT_NAME(msg.src), gst_element_state_get_name(oldState),
108                gst_element_state_get_name(newState));
109 
110     if (GST_IS_PIPELINE(msg.src)) {
111         prettyMsg.type = REC_MSG_FEATURE;
112         prettyMsg.code = REC_MSG_FEATURE_STATE_CHANGE_DONE;
113         prettyMsg.detail = static_cast<int32_t>(newState);
114         return RecorderMsgProcResult::REC_MSG_PROC_OK;
115     }
116 
117     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
118 }
119 
120 using MessageProcFunc = RecorderMsgProcResult (*)(GstMessage &msg, RecorderMessage &prettyMsg);
121 static const std::unordered_map<GstMessageType, MessageProcFunc> MSG_PROC_FUNC_TABLE = {
122     { GST_MESSAGE_INFO, &RecorderMsgHandler::ProcessInfoMsgDefault },
123     { GST_MESSAGE_WARNING, &RecorderMsgHandler::ProcessWarningMsgDefault },
124     { GST_MESSAGE_ERROR, &RecorderMsgHandler::ProcessErrorMsgDefault },
125     { GST_MESSAGE_EOS, &ProcessFeatureMessage },
126     { GST_MESSAGE_ASYNC_DONE, &ProcessFeatureMessage },
127     { GST_MESSAGE_STATE_CHANGED, &ProcessStateChangedMessage },
128 };
129 
BusCallback(GstBus * bus,GstMessage * msg,gpointer data)130 gboolean RecorderMsgProcessor::BusCallback(GstBus *bus, GstMessage *msg, gpointer data)
131 {
132     (void)bus;
133 
134     RecorderMsgProcessor *processor = reinterpret_cast<RecorderMsgProcessor *>(data);
135     if (processor == nullptr) {
136         MEDIA_LOGE("processor is nullptr !");
137         return FALSE;
138     }
139 
140     CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
141 
142     processor->ProcessMessage(*msg);
143     return TRUE;
144 }
145 
RecorderMsgProcessor(GstBus & gstBus,const MessageResCb & resCb)146 RecorderMsgProcessor::RecorderMsgProcessor(GstBus &gstBus, const MessageResCb &resCb)
147     : mainLoopGuard_("rec-pipe-guard"), msgResultCb_(resCb)
148 {
149     gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
150 }
151 
~RecorderMsgProcessor()152 RecorderMsgProcessor::~RecorderMsgProcessor()
153 {
154     (void)Reset();
155 
156     if (gstBus_ != nullptr) {
157         gst_object_unref(gstBus_);
158         gstBus_ = nullptr;
159     }
160 }
161 
Init()162 int32_t RecorderMsgProcessor::Init()
163 {
164     if (msgResultCb_ == nullptr) {
165         MEDIA_LOGE("message result callback is nullptr");
166         return MSERR_INVALID_VAL;
167     }
168 
169     ON_SCOPE_EXIT(0) { (void)Reset(); };
170 
171     mainLoop_ = g_main_loop_new(nullptr, FALSE);
172     CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
173 
174     busWatchId_ = gst_bus_add_watch(gstBus_, (GstBusFunc)&RecorderMsgProcessor::BusCallback, this);
175     CHECK_AND_RETURN_RET(busWatchId_ != 0, MSERR_INVALID_OPERATION);
176 
177     int32_t ret = mainLoopGuard_.Start();
178     CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
179 
180     auto mainLoopGuardTask = std::make_shared<TaskHandler<void>>([this] { g_main_loop_run(mainLoop_); });
181 
182     ret = mainLoopGuard_.EnqueueTask(mainLoopGuardTask);
183     CHECK_AND_RETURN_RET(ret == MSERR_OK, MSERR_INVALID_OPERATION);
184 
185     CANCEL_SCOPE_EXIT_GUARD(0);
186     return MSERR_OK;
187 }
188 
AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)189 void RecorderMsgProcessor::AddMsgHandler(std::shared_ptr<RecorderMsgHandler> handler)
190 {
191     if (handler == nullptr) {
192         MEDIA_LOGE("handler is nullptr");
193         return;
194     }
195 
196     std::unique_lock<std::mutex> lock(mutex_);
197     for (auto &item : msgHandlers_) {
198         if (item == handler) {
199             return;
200         }
201     }
202 
203     msgHandlers_.push_back(handler);
204 }
205 
Reset()206 int32_t RecorderMsgProcessor::Reset()
207 {
208     if (errorProcQ_ != nullptr) {
209         (void)errorProcQ_->Stop();
210         errorProcQ_ = nullptr;
211     }
212 
213     if (busWatchId_ != 0) {
214         (void)g_source_remove(busWatchId_);
215         busWatchId_ = 0;
216     }
217 
218     if (mainLoop_ != nullptr) {
219         if (g_main_loop_is_running(mainLoop_)) {
220             g_main_loop_quit(mainLoop_);
221         }
222         g_main_loop_unref(mainLoop_);
223         mainLoop_ = nullptr;
224     }
225 
226     (void)mainLoopGuard_.Stop();
227     return MSERR_OK;
228 }
229 
ProcessMessage(GstMessage & msg)230 void RecorderMsgProcessor::ProcessMessage(GstMessage &msg)
231 {
232     RecorderMessage prettyMsg {};
233     RecorderMsgProcResult rst = RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
234 
235     {
236         std::unique_lock<std::mutex> lock(mutex_);
237         for (auto &msgHandler : msgHandlers_) {
238             rst = msgHandler->OnMessageReceived(msg, prettyMsg);
239             if (rst != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
240                 break;
241             }
242         }
243     }
244 
245     if  (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
246         rst = ProcessMessageFinal(msg, prettyMsg);
247     }
248 
249     if (rst == RecorderMsgProcResult::REC_MSG_PROC_FAILED) {
250         NotifyInternalError(prettyMsg);
251         return;
252     }
253 
254     if (rst == RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
255         return;
256     }
257 
258     ReportMsgProcResult(prettyMsg);
259 }
260 
ProcessExtendMessage(const GstMessage & msg,const RecorderMessage & prettyMsg) const261 RecorderMsgProcResult RecorderMsgProcessor::ProcessExtendMessage(
262     const GstMessage &msg, const RecorderMessage &prettyMsg) const
263 {
264     (void)msg;
265     (void)prettyMsg;
266     // Check whether the message format is extended format. If yes, translate it here and return OK
267     // ohos extented format: ohos.ext type=*, code=*   the type value must exceed than 0x10000
268     return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
269 }
270 
ProcessMessageFinal(GstMessage & msg,RecorderMessage & prettyMsg)271 RecorderMsgProcResult RecorderMsgProcessor::ProcessMessageFinal(GstMessage &msg, RecorderMessage &prettyMsg)
272 {
273     if (!(GST_IS_PIPELINE(msg.src) || (GST_MESSAGE_TYPE(&msg) == GST_MESSAGE_ERROR))) {
274         auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
275         if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
276             return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
277         }
278         tblIter->second(msg, prettyMsg); // only print to log
279         return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
280     }
281 
282     prettyMsg.sourceId = INVALID_SOURCE_ID;
283 
284     RecorderMsgProcResult ret = ProcessExtendMessage(msg, prettyMsg);
285     if (ret != RecorderMsgProcResult::REC_MSG_PROC_IGNORE) {
286         return ret;
287     }
288 
289     auto tblIter = MSG_PROC_FUNC_TABLE.find(GST_MESSAGE_TYPE(&msg));
290     if (tblIter == MSG_PROC_FUNC_TABLE.end()) {
291         return RecorderMsgProcResult::REC_MSG_PROC_IGNORE;
292     }
293 
294     return tblIter->second(msg, prettyMsg);
295 }
296 
NotifyInternalError(RecorderMessage & msg)297 void RecorderMsgProcessor::NotifyInternalError(RecorderMessage &msg)
298 {
299     // sourceid keep unchanged.
300     msg.type = RecorderMessageType::REC_MSG_ERROR;
301     msg.code = IRecorderEngineObs::ErrorType::ERROR_INTERNAL;
302     msg.detail = MSERR_UNKNOWN;
303 
304     ReportMsgProcResult(msg);
305 }
306 
ReportMsgProcResult(const RecorderMessage & msg)307 void RecorderMsgProcessor::ReportMsgProcResult(const RecorderMessage &msg)
308 {
309     if (msg.type != REC_MSG_ERROR) {
310         return msgResultCb_(msg);
311     }
312 
313     if (errorProcQ_ == nullptr) {
314         errorProcQ_ = std::make_unique<TaskQueue>("rec-err-proc");
315         int32_t ret = errorProcQ_->Start();
316         CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
317     }
318 
319     auto errorProc = std::make_shared<TaskHandler<void>>([this, msg] { msgResultCb_(msg); });
320 
321     int32_t ret = errorProcQ_->EnqueueTask(errorProc);
322     CHECK_AND_RETURN_LOG(ret == MSERR_OK, "unable to async process error msg !");
323 }
324 } // namespace Media
325 } // namespace OHOS