1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "avmeta_buffer_blocker.h"
17 #include "media_errors.h"
18 #include "media_log.h"
19 #include "gst_utils.h"
20
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "BufferBlocker"};
23 }
24
25 namespace OHOS {
26 namespace Media {
27 using AVMetaBufferBlockerWrapper = ThizWrapper<AVMetaBufferBlocker>;
28
BlockCallback(GstPad * pad,GstPadProbeInfo * info,gpointer usrdata)29 GstPadProbeReturn AVMetaBufferBlocker::BlockCallback(GstPad *pad, GstPadProbeInfo *info, gpointer usrdata)
30 {
31 if (pad == nullptr || info == nullptr || usrdata == nullptr) {
32 return GST_PAD_PROBE_PASS;
33 }
34
35 auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(usrdata);
36 if (thizStrong != nullptr) {
37 return thizStrong->OnBlockCallback(*pad, *info);
38 }
39 return GST_PAD_PROBE_PASS;
40 }
41
PadAdded(GstElement * elem,GstPad * pad,gpointer userData)42 void AVMetaBufferBlocker::PadAdded(GstElement *elem, GstPad *pad, gpointer userData)
43 {
44 CHECK_AND_RETURN(elem != nullptr && pad != nullptr && userData != nullptr);
45
46 auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(userData);
47 if (thizStrong != nullptr) {
48 return thizStrong->OnPadAdded(*elem, *pad);
49 }
50 }
51
AVMetaBufferBlocker(GstElement & elem,bool direction,BufferRecievedNotifier notifier)52 AVMetaBufferBlocker::AVMetaBufferBlocker(GstElement &elem, bool direction, BufferRecievedNotifier notifier)
53 : elem_(elem), direction_(direction), notifier_(notifier)
54 {
55 MEDIA_LOGD("ctor, elem: %{public}s, direction: %{public}d, 0x%{public}06" PRIXPTR,
56 ELEM_NAME(&elem_), direction_, FAKE_POINTER(this));
57 }
58
~AVMetaBufferBlocker()59 AVMetaBufferBlocker::~AVMetaBufferBlocker()
60 {
61 MEDIA_LOGD("dtor, 0x%{public}06" PRIXPTR, FAKE_POINTER(this));
62 }
63
Init()64 void AVMetaBufferBlocker::Init()
65 {
66 MEDIA_LOGD("elem: %{public}s", ELEM_NAME(&elem_));
67 auto padList = direction_ ? elem_.srcpads : elem_.sinkpads;
68
69 /**
70 * Defaultly, the probe type is blocked. If the pads count is greater than 1,
71 * we can only setup detecting probe, because the element maybe not start individual
72 * thread for each pad, all pad maybe drived by just one thread. If one pad is
73 * blocked, then all other pads can not detect any buffer.
74 */
75 GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
76 if (g_list_length(padList) > 1) {
77 MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
78 ELEM_NAME(&elem_), direction_);
79 type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
80 probeRet_ = GST_PAD_PROBE_OK;
81 }
82
83 // Add pad probe for all pads. If the pads count is greater than 1, we just detect buffer.
84 for (GList *node = g_list_first(padList); node != nullptr; node = node->next) {
85 CHECK_AND_CONTINUE(node->data != nullptr);
86 GstPad *pad = reinterpret_cast<GstPad *>(node->data);
87
88 // the subtitle stream must be ignored, currently we dont support it.
89 std::string_view name = PAD_NAME(pad);
90 if (name.find("subtitle") != std::string_view::npos) {
91 continue;
92 }
93
94 AddPadProbe(*pad, type);
95 }
96
97 // listen to the "pad-added" signal to figure out whether the pads count is greater than 1.
98 AVMetaBufferBlockerWrapper *wrapper = new (std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
99 CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
100
101 signalId_ = g_signal_connect_data(&elem_, "pad-added", GCallback(&AVMetaBufferBlocker::PadAdded), wrapper,
102 (GClosureNotify)&AVMetaBufferBlockerWrapper::OnDestory, static_cast<GConnectFlags>(0));
103 if (signalId_ == 0) {
104 delete wrapper;
105 MEDIA_LOGW("add signal failed for %{public}s", ELEM_NAME(&elem_));
106 }
107 }
108
CheckUpStreamBlocked(GstPad & pad)109 bool AVMetaBufferBlocker::CheckUpStreamBlocked(GstPad &pad)
110 {
111 GstPad *upstreamPad = nullptr;
112
113 if (GST_PAD_DIRECTION(&pad) == GST_PAD_SINK) {
114 upstreamPad = gst_pad_get_peer(&pad);
115 } else {
116 GstElement *elem = gst_pad_get_parent_element(&pad);
117 if (elem == nullptr) {
118 if (GST_IS_PROXY_PAD(&pad)) { // find the proxy sinkpad for decodebin
119 return false;
120 }
121 MEDIA_LOGE("unexpected, avoid lock, return true");
122 return true;
123 }
124
125 /**
126 * This is a multi-srcpads element, means that we meet the demuxer or multiqueue.
127 * There is no need to figure out whether the demuxer or multiqueue's sinkpads are
128 * blocked, because we guarante it will never happen.
129 */
130 if (g_list_length(elem->srcpads) > 1 || g_list_length(elem->sinkpads) == 0) {
131 gst_object_unref(elem);
132 return false;
133 }
134
135 GList *node = g_list_first(elem->sinkpads);
136 if (node != nullptr && node->data != nullptr) {
137 upstreamPad = GST_PAD_CAST(gst_object_ref((GST_PAD_CAST(node->data))));
138 }
139
140 gst_object_unref(elem);
141 }
142
143 if (upstreamPad == nullptr) {
144 return false;
145 }
146
147 if (gst_pad_is_blocked(upstreamPad)) {
148 gst_object_unref(upstreamPad);
149 return true;
150 }
151
152 bool ret = CheckUpStreamBlocked(*upstreamPad);
153 gst_object_unref(upstreamPad);
154
155 return ret;
156 }
157
158 /**
159 * Call it after IsRemoved, this function maybe return false if there
160 * are no any probe setuped.
161 */
IsBufferDetected()162 bool AVMetaBufferBlocker::IsBufferDetected()
163 {
164 std::unique_lock<std::mutex> lock(mutex_);
165
166 for (auto &item : padInfos_) {
167 if (item.detected) {
168 continue;
169 }
170
171 MEDIA_LOGD("buffer not arrived for %{public}s's %{public}s",
172 ELEM_NAME(&elem_), PAD_NAME(item.pad));
173
174 /**
175 * if the upstream is blocked, we can not wait buffer arriving at this pad.
176 * Thus, we just remove the probe at this pad and set the detected to true.
177 * This operation maybe remove this block probe too early, but the upstream's
178 * blocker will guarantee at least receiving one frame of buffer to finish the meta
179 * extracting process.
180 */
181 if (item.probeId != 0 && CheckUpStreamBlocked(*item.pad)) {
182 MEDIA_LOGD("%{public}s's %{public}s upstream is blocked, dont need this blocker",
183 ELEM_NAME(&elem_), PAD_NAME(item.pad));
184 gst_pad_remove_probe(item.pad, item.probeId);
185 item.probeId = 0;
186 item.detected = true;
187 continue;
188 }
189
190 // not detect buffer and the upstream of this pad is not blocked.
191 return false;
192 }
193
194 return true;
195 }
196
IsRemoved()197 bool AVMetaBufferBlocker::IsRemoved()
198 {
199 std::unique_lock<std::mutex> lock(mutex_);
200 return isRemoved_;
201 }
202
Remove()203 void AVMetaBufferBlocker::Remove()
204 {
205 std::unique_lock<std::mutex> lock(mutex_);
206 if (isHidden_) {
207 return;
208 }
209
210 for (auto &padInfo : padInfos_) {
211 if (padInfo.probeId != 0) {
212 MEDIA_LOGD("cancel block at %{public}s's %{public}s", ELEM_NAME(&elem_), PAD_NAME(padInfo.pad));
213 gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
214 padInfo.probeId = 0;
215 }
216 }
217
218 if (signalId_ != 0) {
219 g_signal_handler_disconnect(&elem_, signalId_);
220 signalId_ = 0;
221 }
222
223 isRemoved_ = true;
224 }
225
Hide()226 void AVMetaBufferBlocker::Hide()
227 {
228 std::unique_lock<std::mutex> lock(mutex_);
229 isHidden_ = true;
230 }
231
GetElemName()232 std::string AVMetaBufferBlocker::GetElemName()
233 {
234 return ELEM_NAME(&elem_);
235 }
236
CheckBufferDetected(GstPadProbeInfo & info)237 bool AVMetaBufferBlocker::CheckBufferDetected(GstPadProbeInfo &info)
238 {
239 auto type = static_cast<unsigned int>(info.type);
240 if ((type & (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST)) != 0) {
241 MEDIA_LOGD("buffer detected");
242 return true;
243 }
244
245 if ((type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) != 0) {
246 GstEvent *event = gst_pad_probe_info_get_event(&info);
247 if (event == nullptr) {
248 return false;
249 }
250
251 if (GST_EVENT_TYPE(event) != GST_EVENT_GAP && GST_EVENT_TYPE(event) != GST_EVENT_EOS) {
252 return false;
253 }
254
255 MEDIA_LOGD("gap or eos detected");
256 const GstStructure *struc = gst_event_get_structure(event);
257 gchar *prettyString = gst_structure_to_string(struc);
258 if (prettyString != nullptr) {
259 MEDIA_LOGD("event: %{public}s", prettyString);
260 g_free(prettyString);
261 }
262 return true;
263 }
264
265 return false;
266 }
267
OnBlockCallback(GstPad & pad,GstPadProbeInfo & info)268 GstPadProbeReturn AVMetaBufferBlocker::OnBlockCallback(GstPad &pad, GstPadProbeInfo &info)
269 {
270 if (!CheckBufferDetected(info)) {
271 return probeRet_;
272 }
273
274 std::unique_lock<std::mutex> lock(mutex_);
275 for (auto &padInfo : padInfos_) {
276 if (&pad != padInfo.pad) {
277 continue;
278 }
279
280 if (padInfo.probeId == 0) {
281 MEDIA_LOGI("block already be canceled, exit");
282 return GST_PAD_PROBE_REMOVE;
283 }
284
285 padInfo.detected = true;
286 MEDIA_LOGD("something arrived at %{public}s's pad %{public}s", PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
287 lock.unlock(); // ???
288
289 if (notifier_ != nullptr) {
290 notifier_();
291 }
292 return GST_PAD_PROBE_OK;
293 }
294
295 return probeRet_;
296 }
297
OnPadAdded(GstElement & elem,GstPad & pad)298 void AVMetaBufferBlocker::OnPadAdded(GstElement &elem, GstPad &pad)
299 {
300 MEDIA_LOGD("demux %{public}s sinkpad %{public}s added", ELEM_NAME(&elem), PAD_NAME(&pad));
301
302 std::unique_lock<std::mutex> lock(mutex_);
303
304 /**
305 * if it've already been required to remove all probes, we need to
306 * reject to add probe to new pad.
307 */
308 if (isRemoved_) {
309 MEDIA_LOGI("block already be removed, exit");
310 return;
311 }
312
313 GstPadDirection currDirection = direction_ ? GST_PAD_SRC : GST_PAD_SINK;
314 if (GST_PAD_DIRECTION(&pad) != currDirection) {
315 return;
316 }
317
318 GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
319
320 /**
321 * If it has one pad before, we must change the blocked probe to
322 * detecting probe, or no buffer can arrive at this new pad if the
323 * previous pad has been blocked, because the element maybe not
324 * start individual thread for each pad, all pad maybe drived by
325 * just one thread.
326 */
327 if (padInfos_.size() != 0) {
328 MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
329 ELEM_NAME(&elem_), direction_);
330 type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
331 probeRet_ = GST_PAD_PROBE_OK;
332
333 std::vector<PadInfo> temp;
334 temp.swap(padInfos_);
335
336 for (auto &padInfo : temp) {
337 /**
338 * if the blocked probe setuped at this pad, we add detecting probe
339 * at it firstly, then remove the blocing probe, for avoiding to miss
340 * the first buffer passing through this pad.
341 */
342 if (padInfo.blocked && padInfo.probeId != 0) {
343 AddPadProbe(*padInfo.pad, type);
344 gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
345 } else {
346 // not blocked probe, just move to new container.
347 padInfos_.push_back(padInfo);
348 }
349 }
350 }
351
352 // the subtitle stream must be ignored, currently we dont support it.
353 std::string_view name = PAD_NAME(&pad);
354 if (name.find("subtitle") != std::string_view::npos) {
355 return;
356 }
357
358 // now, we can add the probe for new pad.
359 AddPadProbe(pad, type);
360 }
361
AddPadProbe(GstPad & pad,GstPadProbeType type)362 void AVMetaBufferBlocker::AddPadProbe(GstPad &pad, GstPadProbeType type)
363 {
364 AVMetaBufferBlockerWrapper *wrapper = new(std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
365 CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
366
367 bool blocked = type & GST_PAD_PROBE_TYPE_BLOCK;
368 gulong probeId = gst_pad_add_probe(&pad, type, BlockCallback,
369 wrapper, &AVMetaBufferBlockerWrapper::OnDestory);
370 if (probeId == 0) {
371 MEDIA_LOGW("add probe for %{public}s's pad %{public}s failed",
372 PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
373 delete wrapper;
374 } else {
375 (void)padInfos_.emplace_back(PadInfo { &pad, probeId, false, blocked });
376 }
377 }
378 } // namespace Media
379 } // namespace OHOS
380