1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "gst_msg_processor.h"
17 #include "media_errors.h"
18 #include "media_log.h"
19 #include "scope_guard.h"
20
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstMsgProc"};
23 }
24
25 namespace OHOS {
26 namespace Media {
GstMsgProcessor(GstBus & gstBus,const InnerMsgNotifier & notifier,const std::shared_ptr<IGstMsgConverter> & converter)27 GstMsgProcessor::GstMsgProcessor(
28 GstBus &gstBus,
29 const InnerMsgNotifier ¬ifier,
30 const std::shared_ptr<IGstMsgConverter> &converter)
31 : notifier_(notifier), guardTask_("msg_loop_guard"), msgConverter_(converter)
32 {
33 gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
34 MEDIA_LOGD("enter ctor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
35 }
36
~GstMsgProcessor()37 GstMsgProcessor::~GstMsgProcessor()
38 {
39 MEDIA_LOGD("enter dtor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
40
41 Reset();
42
43 if (gstBus_ != nullptr) {
44 gst_object_unref(gstBus_);
45 gstBus_ = nullptr;
46 }
47 }
48
Init()49 int32_t GstMsgProcessor::Init()
50 {
51 MEDIA_LOGD("Init enter");
52
53 if (notifier_ == nullptr) {
54 MEDIA_LOGE("message notifier is nullptr");
55 return MSERR_INVALID_VAL;
56 }
57
58 ON_SCOPE_EXIT(0) { Reset(); };
59
60 if (msgConverter_ == nullptr) {
61 msgConverter_ = std::make_shared<GstMsgConverterDefault>();
62 }
63
64 int32_t ret = guardTask_.Start();
65 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
66
67 auto mainLoopEnter = std::make_shared<TaskHandler<int32_t>>([this]() { return DoInit(); });
68 ret = guardTask_.EnqueueTask(mainLoopEnter);
69 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
70
71 auto result = mainLoopEnter->GetResult();
72 CHECK_AND_RETURN_RET_LOG(result.HasResult(), MSERR_UNKNOWN, "msg processor init failed");
73 CHECK_AND_RETURN_RET_LOG(result.Value() == MSERR_OK, result.Value(), "msg processor init failed");
74
75 std::unique_lock<std::mutex> lock(mutex_);
76 cond_.wait(lock, [this] { return !needWaiting_; }); // wait main loop run done
77
78 CANCEL_SCOPE_EXIT_GUARD(0);
79 MEDIA_LOGD("Init exit");
80 return MSERR_OK;
81 }
82
DoInit()83 int32_t GstMsgProcessor::DoInit()
84 {
85 ON_SCOPE_EXIT(0) { DoReset(); };
86
87 context_ = g_main_context_new();
88 CHECK_AND_RETURN_RET(context_ != nullptr, MSERR_NO_MEMORY);
89
90 mainLoop_ = g_main_loop_new(context_, false);
91 CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
92 g_main_context_push_thread_default(context_);
93
94 GSource *source = g_idle_source_new();
95 g_source_set_callback(source, (GSourceFunc)MainLoopRunDone, this, nullptr);
96 guint ret = g_source_attach(source, context_);
97 CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add idle source failed");
98 g_source_unref(source);
99
100 busSource_ = gst_bus_create_watch(gstBus_);
101 CHECK_AND_RETURN_RET_LOG(busSource_ != nullptr, MSERR_NO_MEMORY, "add bus source failed");
102 g_source_set_callback(busSource_, (GSourceFunc)&GstMsgProcessor::BusCallback, this, nullptr);
103 ret = g_source_attach(busSource_, context_);
104 CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add bus source failed");
105
106 auto mainLoopRun = std::make_shared<TaskHandler<void>>([this] {
107 MEDIA_LOGI("start msg main loop...");
108 g_main_loop_run(mainLoop_);
109 MEDIA_LOGI("stop msg main loop...");
110 DoReset();
111 });
112
113 int32_t ret1 = guardTask_.EnqueueTask(mainLoopRun);
114 CHECK_AND_RETURN_RET(ret1 == MSERR_OK, ret1);
115
116 CANCEL_SCOPE_EXIT_GUARD(0);
117 return MSERR_OK;
118 }
119
MainLoopRunDone(GstMsgProcessor * thiz)120 gboolean GstMsgProcessor::MainLoopRunDone(GstMsgProcessor *thiz)
121 {
122 if (thiz == nullptr) {
123 return G_SOURCE_REMOVE;
124 }
125
126 std::unique_lock<std::mutex> lock(thiz->mutex_);
127 thiz->needWaiting_ = false;
128 thiz->cond_.notify_one();
129
130 return G_SOURCE_REMOVE;
131 }
132
AddTickSource(int32_t type,uint32_t interval)133 void GstMsgProcessor::AddTickSource(int32_t type, uint32_t interval)
134 {
135 std::unique_lock<std::mutex> lock(mutex_);
136 auto iter = tickSource_.find(type);
137 if (iter != tickSource_.end()) {
138 return;
139 }
140
141 GSource *source = g_timeout_source_new(interval);
142 CHECK_AND_RETURN_LOG(source != nullptr, "add tick source failed");
143 TickCallbackInfo *tickCbInfo = static_cast<TickCallbackInfo *>(g_malloc0((gsize)sizeof(TickCallbackInfo)));
144 tickCbInfo->type = type;
145 tickCbInfo->msgProcessor = this;
146 g_source_set_callback(source, (GSourceFunc)&GstMsgProcessor::TickCallback, tickCbInfo,
147 (GDestroyNotify)&GstMsgProcessor::FreeTickType);
148 guint ret = g_source_attach(source, context_);
149 CHECK_AND_RETURN_LOG(ret > 0, "add tick source failed");
150 (void)tickSource_.emplace(type, source);
151 }
152
RemoveTickSourceByType(int32_t type)153 void GstMsgProcessor::RemoveTickSourceByType(int32_t type)
154 {
155 std::unique_lock<std::mutex> lock(mutex_);
156 auto iter = tickSource_.find(type);
157 if (iter == tickSource_.end()) {
158 return;
159 }
160
161 GSource *source = iter->second;
162 g_source_destroy(source);
163 g_source_unref(source);
164 source = nullptr;
165 (void)tickSource_.erase(iter);
166 }
167
RemoveTickSourceAll()168 void GstMsgProcessor::RemoveTickSourceAll()
169 {
170 std::unique_lock<std::mutex> lock(mutex_);
171 for (auto &[tickType, source] : tickSource_) {
172 g_source_destroy(source);
173 g_source_unref(source);
174 source = nullptr;
175 }
176
177 (void)tickSource_.clear();
178 }
179
FreeTickType(TickCallbackInfo * tickCbInfo)180 void GstMsgProcessor::FreeTickType(TickCallbackInfo *tickCbInfo)
181 {
182 g_free(tickCbInfo);
183 }
184
AddMsgFilter(const std::string & filter)185 void GstMsgProcessor::AddMsgFilter(const std::string &filter)
186 {
187 std::unique_lock<std::mutex> lock(mutex_);
188 for (const auto &elem : filters_) {
189 if (elem == filter) {
190 return;
191 }
192 }
193
194 MEDIA_LOGI("add msg filter: %{public}s", filter.c_str());
195 filters_.push_back(filter);
196 }
197
FlushBegin()198 void GstMsgProcessor::FlushBegin()
199 {
200 gst_bus_set_flushing(gstBus_, TRUE);
201 }
202
FlushEnd()203 void GstMsgProcessor::FlushEnd()
204 {
205 gst_bus_set_flushing(gstBus_, FALSE);
206 }
207
DoReset()208 void GstMsgProcessor::DoReset()
209 {
210 if (busSource_ != nullptr) {
211 g_source_destroy(busSource_);
212 g_source_unref(busSource_);
213 busSource_ = nullptr;
214 }
215
216 RemoveTickSourceAll();
217
218 if (mainLoop_ != nullptr) {
219 g_main_loop_unref(mainLoop_);
220 mainLoop_ = nullptr;
221 }
222
223 if (context_ != nullptr) {
224 g_main_context_pop_thread_default(context_);
225 g_main_context_unref(context_);
226 context_ = nullptr;
227 }
228 }
229
Reset()230 void GstMsgProcessor::Reset() noexcept
231 {
232 if (mainLoop_ != nullptr) {
233 if (g_main_loop_is_running(mainLoop_)) {
234 g_main_loop_quit(mainLoop_);
235 }
236 }
237
238 {
239 std::unique_lock<std::mutex> lock(mutex_);
240 needWaiting_ = false;
241 }
242
243 cond_.notify_all();
244 (void)guardTask_.Stop();
245 msgConverter_ = nullptr;
246 }
247
TickCallback(TickCallbackInfo * tickCbInfo)248 gboolean GstMsgProcessor::TickCallback(TickCallbackInfo *tickCbInfo)
249 {
250 if (tickCbInfo == nullptr || tickCbInfo->msgProcessor == nullptr) {
251 MEDIA_LOGE("tickCbInfo or msgProcessor is nullptr");
252 return FALSE;
253 }
254
255 InnerMessage innerMsg {};
256 innerMsg.type = tickCbInfo->type;
257 tickCbInfo->msgProcessor->notifier_(innerMsg);
258 return TRUE;
259 }
260
BusCallback(const GstBus * bus,GstMessage * msg,GstMsgProcessor * thiz)261 gboolean GstMsgProcessor::BusCallback(const GstBus *bus, GstMessage *msg, GstMsgProcessor *thiz)
262 {
263 (void)bus;
264 if (thiz == nullptr) {
265 MEDIA_LOGE("processor is nullptr");
266 return FALSE;
267 }
268 CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
269 thiz->ProcessGstMessage(*msg);
270 return TRUE;
271 }
272
ProcessGstMessage(GstMessage & msg)273 void GstMsgProcessor::ProcessGstMessage(GstMessage &msg)
274 {
275 InnerMessage innerMsg {};
276 innerMsg.type = InnerMsgType::INNER_MSG_UNKNOWN;
277
278 int32_t ret = msgConverter_->ConvertToInnerMsg(msg, innerMsg);
279 if (ret != MSERR_OK) {
280 MEDIA_LOGE("converter gst msg %{public}s failed, this msg is from %{public}s",
281 GST_MESSAGE_TYPE_NAME(&msg), GST_MESSAGE_SRC_NAME(&msg));
282 return;
283 }
284
285 if (innerMsg.type == InnerMsgType::INNER_MSG_UNKNOWN) {
286 return; // ignore.
287 }
288
289 // Customized messages(INNER_MSG_BUFFERING_USED_MQ_NUM), no interception required
290 if (innerMsg.type == InnerMsgType::INNER_MSG_BUFFERING_USED_MQ_NUM ||
291 innerMsg.type == InnerMsgType::INNER_MSG_ERROR) {
292 notifier_(innerMsg);
293 } else {
294 gchar *srcName = GST_OBJECT_NAME(GST_MESSAGE_SRC(&msg));
295 if (srcName == nullptr) {
296 return;
297 }
298 std::unique_lock<std::mutex> lock(mutex_);
299 std::vector<std::string> filtersTmp = filters_;
300 mutex_.unlock();
301 for (auto &filter : filtersTmp) {
302 if (filter.compare(srcName) == 0) {
303 notifier_(innerMsg);
304 return;
305 }
306 }
307 }
308 }
309 } // namespace Media
310 } // namespace OHOS
311