• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "gst_msg_processor.h"
17 #include "media_errors.h"
18 #include "media_log.h"
19 #include "scope_guard.h"
20 
21 namespace {
22     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstMsgProc"};
23 }
24 
25 namespace OHOS {
26 namespace Media {
GstMsgProcessor(GstBus & gstBus,const InnerMsgNotifier & notifier,const std::shared_ptr<IGstMsgConverter> & converter)27 GstMsgProcessor::GstMsgProcessor(
28     GstBus &gstBus,
29     const InnerMsgNotifier &notifier,
30     const std::shared_ptr<IGstMsgConverter> &converter)
31     : notifier_(notifier), guardTask_("msg_loop_guard"), msgConverter_(converter)
32 {
33     gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
34     MEDIA_LOGD("enter ctor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
35 }
36 
~GstMsgProcessor()37 GstMsgProcessor::~GstMsgProcessor()
38 {
39     MEDIA_LOGD("enter dtor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
40 
41     Reset();
42 
43     if (gstBus_ != nullptr) {
44         gst_object_unref(gstBus_);
45         gstBus_ = nullptr;
46     }
47 }
48 
Init()49 int32_t GstMsgProcessor::Init()
50 {
51     MEDIA_LOGD("Init enter");
52 
53     if (notifier_ == nullptr) {
54         MEDIA_LOGE("message notifier is nullptr");
55         return MSERR_INVALID_VAL;
56     }
57 
58     ON_SCOPE_EXIT(0) { Reset(); };
59 
60     if (msgConverter_ == nullptr) {
61         msgConverter_ = std::make_shared<GstMsgConverterDefault>();
62     }
63 
64     int32_t ret = guardTask_.Start();
65     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
66 
67     auto mainLoopEnter = std::make_shared<TaskHandler<int32_t>>([this]() { return DoInit(); });
68     ret = guardTask_.EnqueueTask(mainLoopEnter);
69     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
70 
71     auto result = mainLoopEnter->GetResult();
72     CHECK_AND_RETURN_RET_LOG(result.HasResult(), MSERR_UNKNOWN, "msg processor init failed");
73     CHECK_AND_RETURN_RET_LOG(result.Value() == MSERR_OK, result.Value(), "msg processor init failed");
74 
75     std::unique_lock<std::mutex> lock(mutex_);
76     cond_.wait(lock, [this] { return !needWaiting_; }); // wait main loop run done
77 
78     CANCEL_SCOPE_EXIT_GUARD(0);
79     MEDIA_LOGD("Init exit");
80     return MSERR_OK;
81 }
82 
DoInit()83 int32_t GstMsgProcessor::DoInit()
84 {
85     ON_SCOPE_EXIT(0) { DoReset(); };
86 
87     context_ = g_main_context_new();
88     CHECK_AND_RETURN_RET(context_ != nullptr, MSERR_NO_MEMORY);
89 
90     mainLoop_ = g_main_loop_new(context_, false);
91     CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
92     g_main_context_push_thread_default(context_);
93 
94     GSource *source = g_idle_source_new();
95     g_source_set_callback(source, (GSourceFunc)MainLoopRunDone, this, nullptr);
96     guint ret = g_source_attach(source, context_);
97     CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add idle source failed");
98     g_source_unref(source);
99 
100     busSource_ = gst_bus_create_watch(gstBus_);
101     CHECK_AND_RETURN_RET_LOG(busSource_ != nullptr, MSERR_NO_MEMORY, "add bus source failed");
102     g_source_set_callback(busSource_, (GSourceFunc)&GstMsgProcessor::BusCallback, this, nullptr);
103     ret = g_source_attach(busSource_, context_);
104     CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add bus source failed");
105 
106     auto mainLoopRun = std::make_shared<TaskHandler<void>>([this] {
107         MEDIA_LOGI("start msg main loop...");
108         g_main_loop_run(mainLoop_);
109         MEDIA_LOGI("stop msg main loop...");
110         DoReset();
111     });
112 
113     int32_t ret1 = guardTask_.EnqueueTask(mainLoopRun);
114     CHECK_AND_RETURN_RET(ret1 == MSERR_OK, ret1);
115 
116     CANCEL_SCOPE_EXIT_GUARD(0);
117     return MSERR_OK;
118 }
119 
MainLoopRunDone(GstMsgProcessor * thiz)120 gboolean GstMsgProcessor::MainLoopRunDone(GstMsgProcessor *thiz)
121 {
122     if (thiz == nullptr) {
123         return G_SOURCE_REMOVE;
124     }
125 
126     std::unique_lock<std::mutex> lock(thiz->mutex_);
127     thiz->needWaiting_ = false;
128     thiz->cond_.notify_one();
129 
130     return G_SOURCE_REMOVE;
131 }
132 
AddTickSource(int32_t type,uint32_t interval)133 void GstMsgProcessor::AddTickSource(int32_t type, uint32_t interval)
134 {
135     std::unique_lock<std::mutex> lock(mutex_);
136     auto iter = tickSource_.find(type);
137     if (iter != tickSource_.end()) {
138         return;
139     }
140 
141     GSource *source = g_timeout_source_new(interval);
142     CHECK_AND_RETURN_LOG(source != nullptr, "add tick source failed");
143     TickCallbackInfo *tickCbInfo = static_cast<TickCallbackInfo *>(g_malloc0((gsize)sizeof(TickCallbackInfo)));
144     tickCbInfo->type = type;
145     tickCbInfo->msgProcessor = this;
146     g_source_set_callback(source, (GSourceFunc)&GstMsgProcessor::TickCallback, tickCbInfo,
147         (GDestroyNotify)&GstMsgProcessor::FreeTickType);
148     guint ret = g_source_attach(source, context_);
149     CHECK_AND_RETURN_LOG(ret > 0, "add tick source failed");
150     (void)tickSource_.emplace(type, source);
151 }
152 
RemoveTickSourceByType(int32_t type)153 void GstMsgProcessor::RemoveTickSourceByType(int32_t type)
154 {
155     std::unique_lock<std::mutex> lock(mutex_);
156     auto iter = tickSource_.find(type);
157     if (iter == tickSource_.end()) {
158         return;
159     }
160 
161     GSource *source = iter->second;
162     g_source_destroy(source);
163     g_source_unref(source);
164     source = nullptr;
165     (void)tickSource_.erase(iter);
166 }
167 
RemoveTickSourceAll()168 void GstMsgProcessor::RemoveTickSourceAll()
169 {
170     std::unique_lock<std::mutex> lock(mutex_);
171     for (auto &[tickType, source] : tickSource_) {
172         g_source_destroy(source);
173         g_source_unref(source);
174         source = nullptr;
175     }
176 
177     (void)tickSource_.clear();
178 }
179 
FreeTickType(TickCallbackInfo * tickCbInfo)180 void GstMsgProcessor::FreeTickType(TickCallbackInfo *tickCbInfo)
181 {
182     g_free(tickCbInfo);
183 }
184 
AddMsgFilter(const std::string & filter)185 void GstMsgProcessor::AddMsgFilter(const std::string &filter)
186 {
187     std::unique_lock<std::mutex> lock(mutex_);
188     for (const auto &elem : filters_)  {
189         if (elem == filter) {
190             return;
191         }
192     }
193 
194     MEDIA_LOGI("add msg filter: %{public}s", filter.c_str());
195     filters_.push_back(filter);
196 }
197 
FlushBegin()198 void GstMsgProcessor::FlushBegin()
199 {
200     gst_bus_set_flushing(gstBus_, TRUE);
201 }
202 
FlushEnd()203 void GstMsgProcessor::FlushEnd()
204 {
205     gst_bus_set_flushing(gstBus_, FALSE);
206 }
207 
DoReset()208 void GstMsgProcessor::DoReset()
209 {
210     if (busSource_ != nullptr) {
211         g_source_destroy(busSource_);
212         g_source_unref(busSource_);
213         busSource_ = nullptr;
214     }
215 
216     RemoveTickSourceAll();
217 
218     if (mainLoop_ != nullptr) {
219         g_main_loop_unref(mainLoop_);
220         mainLoop_ = nullptr;
221     }
222 
223     if (context_ != nullptr) {
224         g_main_context_pop_thread_default(context_);
225         g_main_context_unref(context_);
226         context_ = nullptr;
227     }
228 }
229 
Reset()230 void GstMsgProcessor::Reset() noexcept
231 {
232     if (mainLoop_ != nullptr) {
233         if (g_main_loop_is_running(mainLoop_)) {
234             g_main_loop_quit(mainLoop_);
235         }
236     }
237 
238     {
239         std::unique_lock<std::mutex> lock(mutex_);
240         needWaiting_ = false;
241     }
242 
243     cond_.notify_all();
244     (void)guardTask_.Stop();
245     msgConverter_ = nullptr;
246 }
247 
TickCallback(TickCallbackInfo * tickCbInfo)248 gboolean GstMsgProcessor::TickCallback(TickCallbackInfo *tickCbInfo)
249 {
250     if (tickCbInfo == nullptr || tickCbInfo->msgProcessor == nullptr) {
251         MEDIA_LOGE("tickCbInfo or msgProcessor is nullptr");
252         return FALSE;
253     }
254 
255     InnerMessage innerMsg {};
256     innerMsg.type = tickCbInfo->type;
257     tickCbInfo->msgProcessor->notifier_(innerMsg);
258     return TRUE;
259 }
260 
BusCallback(const GstBus * bus,GstMessage * msg,GstMsgProcessor * thiz)261 gboolean GstMsgProcessor::BusCallback(const GstBus *bus, GstMessage *msg, GstMsgProcessor *thiz)
262 {
263     (void)bus;
264     if (thiz == nullptr) {
265         MEDIA_LOGE("processor is nullptr");
266         return FALSE;
267     }
268     CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
269     thiz->ProcessGstMessage(*msg);
270     return TRUE;
271 }
272 
ProcessGstMessage(GstMessage & msg)273 void GstMsgProcessor::ProcessGstMessage(GstMessage &msg)
274 {
275     InnerMessage innerMsg {};
276     innerMsg.type = InnerMsgType::INNER_MSG_UNKNOWN;
277 
278     int32_t ret = msgConverter_->ConvertToInnerMsg(msg, innerMsg);
279     if (ret != MSERR_OK) {
280         MEDIA_LOGE("converter gst msg %{public}s failed, this msg is from %{public}s",
281                    GST_MESSAGE_TYPE_NAME(&msg), GST_MESSAGE_SRC_NAME(&msg));
282         return;
283     }
284 
285     if (innerMsg.type == InnerMsgType::INNER_MSG_UNKNOWN) {
286         return; // ignore.
287     }
288 
289     // Customized messages(INNER_MSG_BUFFERING_USED_MQ_NUM), no interception required
290     if (innerMsg.type == InnerMsgType::INNER_MSG_BUFFERING_USED_MQ_NUM ||
291         innerMsg.type == InnerMsgType::INNER_MSG_ERROR) {
292         notifier_(innerMsg);
293     } else {
294         gchar *srcName = GST_OBJECT_NAME(GST_MESSAGE_SRC(&msg));
295         if (srcName == nullptr) {
296             return;
297         }
298         std::unique_lock<std::mutex> lock(mutex_);
299         std::vector<std::string> filtersTmp = filters_;
300         mutex_.unlock();
301         for (auto &filter : filtersTmp) {
302             if (filter.compare(srcName) == 0) {
303                 notifier_(innerMsg);
304                 return;
305             }
306         }
307     }
308 }
309 } // namespace Media
310 } // namespace OHOS
311