• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "gst_msg_processor.h"
17 #include <unordered_map>
18 #include "media_errors.h"
19 #include "media_log.h"
20 #include "scope_guard.h"
21 
22 namespace {
23     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstMsgProc"};
24 }
25 
26 namespace OHOS {
27 namespace Media {
GstMsgProcessor(GstBus & gstBus,const InnerMsgNotifier & notifier,const std::shared_ptr<IGstMsgConverter> & converter)28 GstMsgProcessor::GstMsgProcessor(
29     GstBus &gstBus,
30     const InnerMsgNotifier &notifier,
31     const std::shared_ptr<IGstMsgConverter> &converter)
32     : notifier_(notifier), guardTask_("msg_loop_guard"), msgConverter_(converter)
33 {
34     gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
35     MEDIA_LOGD("enter ctor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
36 }
37 
~GstMsgProcessor()38 GstMsgProcessor::~GstMsgProcessor()
39 {
40     MEDIA_LOGD("enter dtor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
41 
42     Reset();
43 
44     if (gstBus_ != nullptr) {
45         gst_object_unref(gstBus_);
46         gstBus_ = nullptr;
47     }
48 }
49 
Init()50 int32_t GstMsgProcessor::Init()
51 {
52     MEDIA_LOGD("Init enter");
53 
54     if (notifier_ == nullptr) {
55         MEDIA_LOGE("message notifier is nullptr");
56         return MSERR_INVALID_VAL;
57     }
58 
59     ON_SCOPE_EXIT(0) { Reset(); };
60 
61     if (msgConverter_ == nullptr) {
62         msgConverter_ = std::make_shared<GstMsgConverterDefault>();
63     }
64 
65     int32_t ret = guardTask_.Start();
66     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
67 
68     auto mainLoopEnter = std::make_shared<TaskHandler<int32_t>>([this]() { return DoInit(); });
69     ret = guardTask_.EnqueueTask(mainLoopEnter);
70     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
71 
72     auto result = mainLoopEnter->GetResult();
73     CHECK_AND_RETURN_RET_LOG(result.HasResult(), MSERR_UNKNOWN, "msg processor init failed");
74     CHECK_AND_RETURN_RET_LOG(result.Value() == MSERR_OK, result.Value(), "msg processor init failed");
75 
76     std::unique_lock<std::mutex> lock(mutex_);
77     cond_.wait(lock, [this] { return !needWaiting_; }); // wait main loop run done
78 
79     CANCEL_SCOPE_EXIT_GUARD(0);
80     MEDIA_LOGD("Init exit");
81     return MSERR_OK;
82 }
83 
DoInit()84 int32_t GstMsgProcessor::DoInit()
85 {
86     ON_SCOPE_EXIT(0) { DoReset(); };
87 
88     context_ = g_main_context_new();
89     CHECK_AND_RETURN_RET(context_ != nullptr, MSERR_NO_MEMORY);
90 
91     mainLoop_ = g_main_loop_new(context_, false);
92     CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
93     g_main_context_push_thread_default(context_);
94 
95     GSource *source = g_idle_source_new();
96     g_source_set_callback(source, (GSourceFunc)MainLoopRunDone, this, nullptr);
97     guint ret = g_source_attach(source, context_);
98     CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add idle source failed");
99     g_source_unref(source);
100 
101     busSource_ = gst_bus_create_watch(gstBus_);
102     CHECK_AND_RETURN_RET_LOG(busSource_ != nullptr, MSERR_NO_MEMORY, "add bus source failed");
103     g_source_set_callback(busSource_, (GSourceFunc)&GstMsgProcessor::BusCallback, this, nullptr);
104     ret = g_source_attach(busSource_, context_);
105     CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add bus source failed");
106 
107     auto mainLoopRun = std::make_shared<TaskHandler<void>>([this] {
108         MEDIA_LOGI("start msg main loop...");
109         g_main_loop_run(mainLoop_);
110         MEDIA_LOGI("stop msg main loop...");
111         DoReset();
112     });
113 
114     int32_t ret1 = guardTask_.EnqueueTask(mainLoopRun);
115     CHECK_AND_RETURN_RET(ret1 == MSERR_OK, ret1);
116 
117     CANCEL_SCOPE_EXIT_GUARD(0);
118     return MSERR_OK;
119 }
120 
MainLoopRunDone(GstMsgProcessor * thiz)121 gboolean GstMsgProcessor::MainLoopRunDone(GstMsgProcessor *thiz)
122 {
123     if (thiz == nullptr) {
124         return G_SOURCE_REMOVE;
125     }
126 
127     std::unique_lock<std::mutex> lock(thiz->mutex_);
128     thiz->needWaiting_ = false;
129     thiz->cond_.notify_one();
130 
131     return G_SOURCE_REMOVE;
132 }
133 
AddMsgFilter(const std::string & filter)134 void GstMsgProcessor::AddMsgFilter(const std::string &filter)
135 {
136     std::unique_lock<std::mutex> lock(mutex_);
137     for (auto &elem :  filters_)  {
138         if (elem == filter) {
139             return;
140         }
141     }
142 
143     MEDIA_LOGI("add msg filter: %{public}s", filter.c_str());
144     filters_.push_back(filter);
145 }
146 
FlushBegin()147 void GstMsgProcessor::FlushBegin()
148 {
149     gst_bus_set_flushing(gstBus_, TRUE);
150 }
151 
FlushEnd()152 void GstMsgProcessor::FlushEnd()
153 {
154     gst_bus_set_flushing(gstBus_, FALSE);
155 }
156 
DoReset()157 void GstMsgProcessor::DoReset()
158 {
159     if (busSource_ != nullptr) {
160         g_source_destroy(busSource_);
161         g_source_unref(busSource_);
162         busSource_ = nullptr;
163     }
164 
165     if (mainLoop_ != nullptr) {
166         g_main_loop_unref(mainLoop_);
167         mainLoop_ = nullptr;
168     }
169 
170     if (context_ != nullptr) {
171         g_main_context_pop_thread_default(context_);
172         g_main_context_unref(context_);
173         context_ = nullptr;
174     }
175 }
176 
Reset()177 void GstMsgProcessor::Reset() noexcept
178 {
179     if (mainLoop_ != nullptr) {
180         if (g_main_loop_is_running(mainLoop_)) {
181             g_main_loop_quit(mainLoop_);
182         }
183     }
184 
185     {
186         std::unique_lock<std::mutex> lock(mutex_);
187         needWaiting_ = false;
188     }
189 
190     cond_.notify_all();
191     (void)guardTask_.Stop();
192     msgConverter_ = nullptr;
193 }
194 
BusCallback(const GstBus * bus,GstMessage * msg,GstMsgProcessor * thiz)195 gboolean GstMsgProcessor::BusCallback(const GstBus *bus, GstMessage *msg, GstMsgProcessor *thiz)
196 {
197     (void)bus;
198     if (thiz == nullptr) {
199         MEDIA_LOGE("processor is nullptr");
200         return FALSE;
201     }
202     CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
203     thiz->ProcessGstMessage(*msg);
204     return TRUE;
205 }
206 
ProcessGstMessage(GstMessage & msg)207 void GstMsgProcessor::ProcessGstMessage(GstMessage &msg)
208 {
209     InnerMessage innerMsg {};
210     innerMsg.type = InnerMsgType::INNER_MSG_UNKNOWN;
211 
212     int32_t ret = msgConverter_->ConvertToInnerMsg(msg, innerMsg);
213     if (ret != MSERR_OK) {
214         MEDIA_LOGE("converter gst msg %{public}s failed, this msg is from %{public}s",
215                    GST_MESSAGE_TYPE_NAME(&msg), GST_MESSAGE_SRC_NAME(&msg));
216         return;
217     }
218 
219     if (innerMsg.type == InnerMsgType::INNER_MSG_UNKNOWN) {
220         return; // ignore.
221     }
222 
223     if (innerMsg.type != InnerMsgType::INNER_MSG_ERROR) {
224         gchar *srcName = GST_OBJECT_NAME(GST_MESSAGE_SRC(&msg));
225         if (srcName == nullptr) {
226             return;
227         }
228         std::unique_lock<std::mutex> lock(mutex_);
229         for (auto &filter : filters_) {
230             if (filter.compare(srcName) == 0) {
231                 notifier_(innerMsg);
232                 return;
233             }
234         }
235     } else {
236         notifier_(innerMsg);
237     }
238 }
239 } // namespace Media
240 } // namespace OHOS
241