1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "avmeta_buffer_blocker.h"
17 #include "media_errors.h"
18 #include "media_log.h"
19 #include "gst_utils.h"
20
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "BufferBlocker"};
23 }
24
25 namespace OHOS {
26 namespace Media {
27 using AVMetaBufferBlockerWrapper = ThizWrapper<AVMetaBufferBlocker>;
28
BlockCallback(GstPad * pad,GstPadProbeInfo * info,gpointer usrdata)29 GstPadProbeReturn AVMetaBufferBlocker::BlockCallback(GstPad *pad, GstPadProbeInfo *info, gpointer usrdata)
30 {
31 if (pad == nullptr || info == nullptr || usrdata == nullptr) {
32 return GST_PAD_PROBE_PASS;
33 }
34
35 auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(usrdata);
36 if (thizStrong != nullptr) {
37 return thizStrong->OnBlockCallback(*pad, *info);
38 }
39 return GST_PAD_PROBE_PASS;
40 }
41
PadAdded(GstElement * elem,GstPad * pad,gpointer userData)42 void AVMetaBufferBlocker::PadAdded(GstElement *elem, GstPad *pad, gpointer userData)
43 {
44 if (elem == nullptr || pad == nullptr || userData == nullptr) {
45 return;
46 }
47
48 auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(userData);
49 if (thizStrong != nullptr) {
50 return thizStrong->OnPadAdded(*elem, *pad);
51 }
52 }
53
AVMetaBufferBlocker(GstElement & elem,bool direction,BufferRecievedNotifier notifier)54 AVMetaBufferBlocker::AVMetaBufferBlocker(GstElement &elem, bool direction, BufferRecievedNotifier notifier)
55 : elem_(elem), direction_(direction), notifier_(notifier)
56 {
57 MEDIA_LOGD("ctor, elem: %{public}s, direction: %{public}d, 0x%{public}06" PRIXPTR,
58 ELEM_NAME(&elem_), direction_, FAKE_POINTER(this));
59 }
60
~AVMetaBufferBlocker()61 AVMetaBufferBlocker::~AVMetaBufferBlocker()
62 {
63 MEDIA_LOGD("dtor, 0x%{public}06" PRIXPTR, FAKE_POINTER(this));
64 }
65
Init()66 void AVMetaBufferBlocker::Init()
67 {
68 MEDIA_LOGD("elem: %{public}s", ELEM_NAME(&elem_));
69 auto padList = direction_ ? elem_.srcpads : elem_.sinkpads;
70
71 /**
72 * Defaultly, the probe type is blocked. If the pads count is greater than 1,
73 * we can only setup detecting probe, because the element maybe not start individual
74 * thread for each pad, all pad maybe drived by just one thread. If one pad is
75 * blocked, then all other pads can not detect any buffer.
76 */
77 GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
78 if (g_list_length(padList) > 1) {
79 MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
80 ELEM_NAME(&elem_), direction_);
81 type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
82 probeRet_ = GST_PAD_PROBE_OK;
83 }
84
85 // Add pad probe for all pads. If the pads count is greater than 1, we just detect buffer.
86 for (GList *node = g_list_first(padList); node != nullptr; node = node->next) {
87 if (node->data == nullptr) {
88 continue;
89 }
90 GstPad *pad = reinterpret_cast<GstPad *>(node->data);
91
92 // the subtitle stream must be ignored, currently we dont support it.
93 std::string_view name = PAD_NAME(pad);
94 if (name.find("subtitle") != std::string_view::npos) {
95 continue;
96 }
97
98 AddPadProbe(*pad, type);
99 }
100
101 // listen to the "pad-added" signal to figure out whether the pads count is greater than 1.
102 AVMetaBufferBlockerWrapper *wrapper = new (std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
103 CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
104
105 signalId_ = g_signal_connect_data(&elem_, "pad-added", GCallback(&AVMetaBufferBlocker::PadAdded), wrapper,
106 (GClosureNotify)&AVMetaBufferBlockerWrapper::OnDestory, static_cast<GConnectFlags>(0));
107 if (signalId_ == 0) {
108 delete wrapper;
109 MEDIA_LOGW("add signal failed for %{public}s", ELEM_NAME(&elem_));
110 }
111 }
112
CheckUpStreamBlocked(GstPad & pad)113 bool AVMetaBufferBlocker::CheckUpStreamBlocked(GstPad &pad)
114 {
115 GstPad *upstreamPad = nullptr;
116
117 if (GST_PAD_DIRECTION(&pad) == GST_PAD_SINK) {
118 upstreamPad = gst_pad_get_peer(&pad);
119 } else {
120 GstElement *elem = gst_pad_get_parent_element(&pad);
121 if (elem == nullptr) {
122 if (GST_IS_PROXY_PAD(&pad)) { // find the proxy sinkpad for decodebin
123 return false;
124 }
125 MEDIA_LOGE("unexpected, avoid lock, return true");
126 return true;
127 }
128
129 /**
130 * This is a multi-srcpads element, means that we meet the demuxer or multiqueue.
131 * There is no need to figure out whether the demuxer or multiqueue's sinkpads are
132 * blocked, because we guarante it will never happen.
133 */
134 if (g_list_length(elem->srcpads) > 1 || g_list_length(elem->sinkpads) == 0) {
135 gst_object_unref(elem);
136 return false;
137 }
138
139 GList *node = g_list_first(elem->sinkpads);
140 if (node != nullptr && node->data != nullptr) {
141 upstreamPad = GST_PAD_CAST(gst_object_ref((GST_PAD_CAST(node->data))));
142 }
143
144 gst_object_unref(elem);
145 }
146
147 if (upstreamPad == nullptr) {
148 return false;
149 }
150
151 if (gst_pad_is_blocked(upstreamPad)) {
152 gst_object_unref(upstreamPad);
153 return true;
154 }
155
156 bool ret = CheckUpStreamBlocked(*upstreamPad);
157 gst_object_unref(upstreamPad);
158
159 return ret;
160 }
161
162 /**
163 * Call it after IsRemoved, this function maybe return false if there
164 * are no any probe setuped.
165 */
IsBufferDetected()166 bool AVMetaBufferBlocker::IsBufferDetected()
167 {
168 std::unique_lock<std::mutex> lock(mutex_);
169
170 for (auto &item : padInfos_) {
171 if (item.detected) {
172 continue;
173 }
174
175 MEDIA_LOGD("buffer not arrived for %{public}s's %{public}s",
176 ELEM_NAME(&elem_), PAD_NAME(item.pad));
177
178 /**
179 * if the upstream is blocked, we can not wait buffer arriving at this pad.
180 * Thus, we just remove the probe at this pad and set the detected to true.
181 * This operation maybe remove this block probe too early, but the upstream's
182 * blocker will guarantee at least receiving one frame of buffer to finish the meta
183 * extracting process.
184 */
185 if (item.probeId != 0 && CheckUpStreamBlocked(*item.pad)) {
186 MEDIA_LOGD("%{public}s's %{public}s upstream is blocked, dont need this blocker",
187 ELEM_NAME(&elem_), PAD_NAME(item.pad));
188 gst_pad_remove_probe(item.pad, item.probeId);
189 item.probeId = 0;
190 item.detected = true;
191 continue;
192 }
193
194 // not detect buffer and the upstream of this pad is not blocked.
195 return false;
196 }
197
198 return true;
199 }
200
IsRemoved()201 bool AVMetaBufferBlocker::IsRemoved()
202 {
203 std::unique_lock<std::mutex> lock(mutex_);
204 return isRemoved_;
205 }
206
Remove()207 void AVMetaBufferBlocker::Remove()
208 {
209 std::unique_lock<std::mutex> lock(mutex_);
210 if (isHidden_) {
211 return;
212 }
213
214 for (auto &padInfo : padInfos_) {
215 if (padInfo.probeId != 0) {
216 MEDIA_LOGD("cancel block at %{public}s's %{public}s", ELEM_NAME(&elem_), PAD_NAME(padInfo.pad));
217 gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
218 padInfo.probeId = 0;
219 }
220 }
221
222 if (signalId_ != 0) {
223 g_signal_handler_disconnect(&elem_, signalId_);
224 signalId_ = 0;
225 }
226
227 isRemoved_ = true;
228 }
229
Hide()230 void AVMetaBufferBlocker::Hide()
231 {
232 std::unique_lock<std::mutex> lock(mutex_);
233 isHidden_ = true;
234 }
235
GetElemName()236 std::string AVMetaBufferBlocker::GetElemName()
237 {
238 return ELEM_NAME(&elem_);
239 }
240
CheckBufferDetected(GstPadProbeInfo & info)241 bool AVMetaBufferBlocker::CheckBufferDetected(GstPadProbeInfo &info)
242 {
243 auto type = static_cast<unsigned int>(info.type);
244 if ((type & (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST)) != 0) {
245 MEDIA_LOGD("buffer detected");
246 return true;
247 }
248
249 if ((type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) != 0) {
250 GstEvent *event = gst_pad_probe_info_get_event(&info);
251 if (event == nullptr) {
252 return false;
253 }
254
255 if (GST_EVENT_TYPE(event) != GST_EVENT_GAP && GST_EVENT_TYPE(event) != GST_EVENT_EOS) {
256 return false;
257 }
258
259 MEDIA_LOGD("gap or eos detected");
260 const GstStructure *struc = gst_event_get_structure(event);
261 gchar *prettyString = gst_structure_to_string(struc);
262 if (prettyString != nullptr) {
263 MEDIA_LOGD("event: %{public}s", prettyString);
264 g_free(prettyString);
265 }
266 return true;
267 }
268
269 return false;
270 }
271
OnBlockCallback(GstPad & pad,GstPadProbeInfo & info)272 GstPadProbeReturn AVMetaBufferBlocker::OnBlockCallback(GstPad &pad, GstPadProbeInfo &info)
273 {
274 if (!CheckBufferDetected(info)) {
275 return probeRet_;
276 }
277
278 std::unique_lock<std::mutex> lock(mutex_);
279 for (auto &padInfo : padInfos_) {
280 if (&pad != padInfo.pad) {
281 continue;
282 }
283
284 if (padInfo.probeId == 0) {
285 MEDIA_LOGI("block already be canceled, exit");
286 return GST_PAD_PROBE_REMOVE;
287 }
288
289 padInfo.detected = true;
290 MEDIA_LOGD("something arrived at %{public}s's pad %{public}s", PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
291 lock.unlock(); // ???
292
293 if (notifier_ != nullptr) {
294 notifier_();
295 }
296 return GST_PAD_PROBE_OK;
297 }
298
299 return probeRet_;
300 }
301
OnPadAdded(GstElement & elem,GstPad & pad)302 void AVMetaBufferBlocker::OnPadAdded(GstElement &elem, GstPad &pad)
303 {
304 MEDIA_LOGD("demux %{public}s sinkpad %{public}s added", ELEM_NAME(&elem), PAD_NAME(&pad));
305
306 std::unique_lock<std::mutex> lock(mutex_);
307
308 /**
309 * if it've already been required to remove all probes, we need to
310 * reject to add probe to new pad.
311 */
312 if (isRemoved_) {
313 MEDIA_LOGI("block already be removed, exit");
314 return;
315 }
316
317 GstPadDirection currDirection = direction_ ? GST_PAD_SRC : GST_PAD_SINK;
318 if (GST_PAD_DIRECTION(&pad) != currDirection) {
319 return;
320 }
321
322 GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
323
324 /**
325 * If it has one pad before, we must change the blocked probe to
326 * detecting probe, or no buffer can arrive at this new pad if the
327 * previous pad has been blocked, because the element maybe not
328 * start individual thread for each pad, all pad maybe drived by
329 * just one thread.
330 */
331 if (padInfos_.size() != 0) {
332 MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
333 ELEM_NAME(&elem_), direction_);
334 type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
335 probeRet_ = GST_PAD_PROBE_OK;
336
337 std::vector<PadInfo> temp;
338 temp.swap(padInfos_);
339
340 for (auto &padInfo : temp) {
341 /**
342 * if the blocked probe setuped at this pad, we add detecting probe
343 * at it firstly, then remove the blocing probe, for avoiding to miss
344 * the first buffer passing through this pad.
345 */
346 if (padInfo.blocked && padInfo.probeId != 0) {
347 AddPadProbe(*padInfo.pad, type);
348 gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
349 } else {
350 // not blocked probe, just move to new container.
351 padInfos_.push_back(padInfo);
352 }
353 }
354 }
355
356 // the subtitle stream must be ignored, currently we dont support it.
357 std::string_view name = PAD_NAME(&pad);
358 if (name.find("subtitle") != std::string_view::npos) {
359 return;
360 }
361
362 // now, we can add the probe for new pad.
363 AddPadProbe(pad, type);
364 }
365
AddPadProbe(GstPad & pad,GstPadProbeType type)366 void AVMetaBufferBlocker::AddPadProbe(GstPad &pad, GstPadProbeType type)
367 {
368 AVMetaBufferBlockerWrapper *wrapper = new(std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
369 CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
370
371 bool blocked = type & GST_PAD_PROBE_TYPE_BLOCK;
372 gulong probeId = gst_pad_add_probe(&pad, type, BlockCallback,
373 wrapper, &AVMetaBufferBlockerWrapper::OnDestory);
374 if (probeId == 0) {
375 MEDIA_LOGW("add probe for %{public}s's pad %{public}s failed",
376 PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
377 delete wrapper;
378 } else {
379 (void)padInfos_.emplace_back(PadInfo { &pad, probeId, false, blocked });
380 }
381 }
382 } // namespace Media
383 } // namespace OHOS
384