1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "gst_msg_processor.h"
17 #include <unordered_map>
18 #include "media_errors.h"
19 #include "media_log.h"
20 #include "scope_guard.h"
21
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "GstMsgProc"};
24 }
25
26 namespace OHOS {
27 namespace Media {
GstMsgProcessor(GstBus & gstBus,const InnerMsgNotifier & notifier,const std::shared_ptr<IGstMsgConverter> & converter)28 GstMsgProcessor::GstMsgProcessor(
29 GstBus &gstBus,
30 const InnerMsgNotifier ¬ifier,
31 const std::shared_ptr<IGstMsgConverter> &converter)
32 : notifier_(notifier), guardTask_("msg_loop_guard"), msgConverter_(converter)
33 {
34 gstBus_ = GST_BUS_CAST(gst_object_ref(&gstBus));
35 MEDIA_LOGD("enter ctor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
36 }
37
~GstMsgProcessor()38 GstMsgProcessor::~GstMsgProcessor()
39 {
40 MEDIA_LOGD("enter dtor, instance: 0x%{public}06" PRIXPTR "", FAKE_POINTER(this));
41
42 Reset();
43
44 if (gstBus_ != nullptr) {
45 gst_object_unref(gstBus_);
46 gstBus_ = nullptr;
47 }
48 }
49
Init()50 int32_t GstMsgProcessor::Init()
51 {
52 MEDIA_LOGD("Init enter");
53
54 if (notifier_ == nullptr) {
55 MEDIA_LOGE("message notifier is nullptr");
56 return MSERR_INVALID_VAL;
57 }
58
59 ON_SCOPE_EXIT(0) { Reset(); };
60
61 if (msgConverter_ == nullptr) {
62 msgConverter_ = std::make_shared<GstMsgConverterDefault>();
63 }
64
65 int32_t ret = guardTask_.Start();
66 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
67
68 auto mainLoopEnter = std::make_shared<TaskHandler<int32_t>>([this]() { return DoInit(); });
69 ret = guardTask_.EnqueueTask(mainLoopEnter);
70 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
71
72 auto result = mainLoopEnter->GetResult();
73 CHECK_AND_RETURN_RET_LOG(result.HasResult(), MSERR_UNKNOWN, "msg processor init failed");
74 CHECK_AND_RETURN_RET_LOG(result.Value() == MSERR_OK, result.Value(), "msg processor init failed");
75
76 std::unique_lock<std::mutex> lock(mutex_);
77 cond_.wait(lock, [this] { return !needWaiting_; }); // wait main loop run done
78
79 CANCEL_SCOPE_EXIT_GUARD(0);
80 MEDIA_LOGD("Init exit");
81 return MSERR_OK;
82 }
83
DoInit()84 int32_t GstMsgProcessor::DoInit()
85 {
86 ON_SCOPE_EXIT(0) { DoReset(); };
87
88 context_ = g_main_context_new();
89 CHECK_AND_RETURN_RET(context_ != nullptr, MSERR_NO_MEMORY);
90
91 mainLoop_ = g_main_loop_new(context_, false);
92 CHECK_AND_RETURN_RET(mainLoop_ != nullptr, MSERR_NO_MEMORY);
93 g_main_context_push_thread_default(context_);
94
95 GSource *source = g_idle_source_new();
96 g_source_set_callback(source, (GSourceFunc)MainLoopRunDone, this, nullptr);
97 guint ret = g_source_attach(source, context_);
98 CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add idle source failed");
99 g_source_unref(source);
100
101 busSource_ = gst_bus_create_watch(gstBus_);
102 CHECK_AND_RETURN_RET_LOG(busSource_ != nullptr, MSERR_NO_MEMORY, "add bus source failed");
103 g_source_set_callback(busSource_, (GSourceFunc)&GstMsgProcessor::BusCallback, this, nullptr);
104 ret = g_source_attach(busSource_, context_);
105 CHECK_AND_RETURN_RET_LOG(ret > 0, MSERR_INVALID_OPERATION, "add bus source failed");
106
107 auto mainLoopRun = std::make_shared<TaskHandler<void>>([this] {
108 MEDIA_LOGI("start msg main loop...");
109 g_main_loop_run(mainLoop_);
110 MEDIA_LOGI("stop msg main loop...");
111 DoReset();
112 });
113
114 int32_t ret1 = guardTask_.EnqueueTask(mainLoopRun);
115 CHECK_AND_RETURN_RET(ret1 == MSERR_OK, ret1);
116
117 CANCEL_SCOPE_EXIT_GUARD(0);
118 return MSERR_OK;
119 }
120
MainLoopRunDone(GstMsgProcessor * thiz)121 gboolean GstMsgProcessor::MainLoopRunDone(GstMsgProcessor *thiz)
122 {
123 if (thiz == nullptr) {
124 return G_SOURCE_REMOVE;
125 }
126
127 std::unique_lock<std::mutex> lock(thiz->mutex_);
128 thiz->needWaiting_ = false;
129 thiz->cond_.notify_one();
130
131 return G_SOURCE_REMOVE;
132 }
133
AddMsgFilter(const std::string & filter)134 void GstMsgProcessor::AddMsgFilter(const std::string &filter)
135 {
136 std::unique_lock<std::mutex> lock(mutex_);
137 for (auto &elem : filters_) {
138 if (elem == filter) {
139 return;
140 }
141 }
142
143 MEDIA_LOGI("add msg filter: %{public}s", filter.c_str());
144 filters_.push_back(filter);
145 }
146
FlushBegin()147 void GstMsgProcessor::FlushBegin()
148 {
149 gst_bus_set_flushing(gstBus_, TRUE);
150 }
151
FlushEnd()152 void GstMsgProcessor::FlushEnd()
153 {
154 gst_bus_set_flushing(gstBus_, FALSE);
155 }
156
DoReset()157 void GstMsgProcessor::DoReset()
158 {
159 if (busSource_ != nullptr) {
160 g_source_destroy(busSource_);
161 g_source_unref(busSource_);
162 busSource_ = nullptr;
163 }
164
165 if (mainLoop_ != nullptr) {
166 g_main_loop_unref(mainLoop_);
167 mainLoop_ = nullptr;
168 }
169
170 if (context_ != nullptr) {
171 g_main_context_pop_thread_default(context_);
172 g_main_context_unref(context_);
173 context_ = nullptr;
174 }
175 }
176
Reset()177 void GstMsgProcessor::Reset() noexcept
178 {
179 if (mainLoop_ != nullptr) {
180 if (g_main_loop_is_running(mainLoop_)) {
181 g_main_loop_quit(mainLoop_);
182 }
183 }
184
185 {
186 std::unique_lock<std::mutex> lock(mutex_);
187 needWaiting_ = false;
188 }
189
190 cond_.notify_all();
191 (void)guardTask_.Stop();
192 msgConverter_ = nullptr;
193 }
194
BusCallback(const GstBus * bus,GstMessage * msg,GstMsgProcessor * thiz)195 gboolean GstMsgProcessor::BusCallback(const GstBus *bus, GstMessage *msg, GstMsgProcessor *thiz)
196 {
197 (void)bus;
198 if (thiz == nullptr) {
199 MEDIA_LOGE("processor is nullptr");
200 return FALSE;
201 }
202 CHECK_AND_RETURN_RET(msg != nullptr, FALSE);
203 thiz->ProcessGstMessage(*msg);
204 return TRUE;
205 }
206
ProcessGstMessage(GstMessage & msg)207 void GstMsgProcessor::ProcessGstMessage(GstMessage &msg)
208 {
209 InnerMessage innerMsg {};
210 innerMsg.type = InnerMsgType::INNER_MSG_UNKNOWN;
211
212 int32_t ret = msgConverter_->ConvertToInnerMsg(msg, innerMsg);
213 if (ret != MSERR_OK) {
214 MEDIA_LOGE("converter gst msg %{public}s failed, this msg is from %{public}s",
215 GST_MESSAGE_TYPE_NAME(&msg), GST_MESSAGE_SRC_NAME(&msg));
216 return;
217 }
218
219 if (innerMsg.type == InnerMsgType::INNER_MSG_UNKNOWN) {
220 return; // ignore.
221 }
222
223 if (innerMsg.type != InnerMsgType::INNER_MSG_ERROR) {
224 gchar *srcName = GST_OBJECT_NAME(GST_MESSAGE_SRC(&msg));
225 if (srcName == nullptr) {
226 return;
227 }
228 std::unique_lock<std::mutex> lock(mutex_);
229 for (auto &filter : filters_) {
230 if (filter.compare(srcName) == 0) {
231 notifier_(innerMsg);
232 return;
233 }
234 }
235 } else {
236 notifier_(innerMsg);
237 }
238 }
239 } // namespace Media
240 } // namespace OHOS
241