• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "avmeta_buffer_blocker.h"
17 #include "media_errors.h"
18 #include "media_log.h"
19 #include "gst_utils.h"
20 
21 namespace {
22     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "BufferBlocker"};
23 }
24 
25 namespace OHOS {
26 namespace Media {
27 using AVMetaBufferBlockerWrapper = ThizWrapper<AVMetaBufferBlocker>;
28 
BlockCallback(GstPad * pad,GstPadProbeInfo * info,gpointer usrdata)29 GstPadProbeReturn AVMetaBufferBlocker::BlockCallback(GstPad *pad, GstPadProbeInfo *info, gpointer usrdata)
30 {
31     if (pad == nullptr || info == nullptr || usrdata == nullptr) {
32         return GST_PAD_PROBE_PASS;
33     }
34 
35     auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(usrdata);
36     if (thizStrong != nullptr) {
37         return thizStrong->OnBlockCallback(*pad, *info);
38     }
39     return GST_PAD_PROBE_PASS;
40 }
41 
PadAdded(GstElement * elem,GstPad * pad,gpointer userData)42 void AVMetaBufferBlocker::PadAdded(GstElement *elem, GstPad *pad, gpointer userData)
43 {
44     if (elem == nullptr || pad == nullptr || userData == nullptr) {
45         return;
46     }
47 
48     auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(userData);
49     if (thizStrong != nullptr) {
50         return thizStrong->OnPadAdded(*elem, *pad);
51     }
52 }
53 
AVMetaBufferBlocker(GstElement & elem,bool direction,BufferRecievedNotifier notifier)54 AVMetaBufferBlocker::AVMetaBufferBlocker(GstElement &elem, bool direction, BufferRecievedNotifier notifier)
55     : elem_(elem), direction_(direction), notifier_(notifier)
56 {
57     MEDIA_LOGD("ctor, elem: %{public}s, direction: %{public}d, 0x%{public}06" PRIXPTR,
58         ELEM_NAME(&elem_), direction_, FAKE_POINTER(this));
59 }
60 
~AVMetaBufferBlocker()61 AVMetaBufferBlocker::~AVMetaBufferBlocker()
62 {
63     MEDIA_LOGD("dtor, 0x%{public}06" PRIXPTR, FAKE_POINTER(this));
64 }
65 
Init()66 void AVMetaBufferBlocker::Init()
67 {
68     MEDIA_LOGD("elem: %{public}s", ELEM_NAME(&elem_));
69     auto padList = direction_ ? elem_.srcpads : elem_.sinkpads;
70 
71     /**
72      * Defaultly, the probe type is blocked. If the pads count is greater than 1,
73      * we can only setup detecting probe, because the element maybe not start individual
74      * thread for each pad, all pad maybe drived by just one thread. If one pad is
75      * blocked, then all other pads can not detect any buffer.
76      */
77     GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
78     if (g_list_length(padList) > 1) {
79         MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
80                    ELEM_NAME(&elem_), direction_);
81         type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
82         probeRet_ = GST_PAD_PROBE_OK;
83     }
84 
85     // Add pad probe for all pads. If the pads count is greater than 1, we just detect buffer.
86     for (GList *node = g_list_first(padList); node != nullptr; node = node->next) {
87         if (node->data == nullptr) {
88             continue;
89         }
90         GstPad *pad = reinterpret_cast<GstPad *>(node->data);
91 
92         // the subtitle stream must be ignored, currently we dont support it.
93         std::string_view name = PAD_NAME(pad);
94         if (name.find("subtitle") != std::string_view::npos) {
95             continue;
96         }
97 
98         AddPadProbe(*pad, type);
99     }
100 
101     // listen to the "pad-added" signal to figure out whether the pads count is greater than 1.
102     AVMetaBufferBlockerWrapper *wrapper = new (std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
103     CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
104 
105     signalId_ = g_signal_connect_data(&elem_, "pad-added", GCallback(&AVMetaBufferBlocker::PadAdded), wrapper,
106         (GClosureNotify)&AVMetaBufferBlockerWrapper::OnDestory, static_cast<GConnectFlags>(0));
107     if (signalId_ == 0) {
108         delete wrapper;
109         MEDIA_LOGW("add signal failed for %{public}s", ELEM_NAME(&elem_));
110     }
111 }
112 
CheckUpStreamBlocked(GstPad & pad)113 bool AVMetaBufferBlocker::CheckUpStreamBlocked(GstPad &pad)
114 {
115     GstPad *upstreamPad = nullptr;
116 
117     if (GST_PAD_DIRECTION(&pad) == GST_PAD_SINK) {
118         upstreamPad = gst_pad_get_peer(&pad);
119     } else {
120         GstElement *elem = gst_pad_get_parent_element(&pad);
121         if (elem == nullptr) {
122             if (GST_IS_PROXY_PAD(&pad)) { // find the proxy sinkpad for decodebin
123                 return false;
124             }
125             MEDIA_LOGE("unexpected, avoid lock, return true");
126             return true;
127         }
128 
129         /**
130          * This is a multi-srcpads element, means that we meet the demuxer or multiqueue.
131          * There is no need to figure out whether the demuxer or multiqueue's sinkpads are
132          * blocked, because we guarante it will never happen.
133          */
134         if (g_list_length(elem->srcpads) > 1 || g_list_length(elem->sinkpads) == 0) {
135             gst_object_unref(elem);
136             return false;
137         }
138 
139         GList *node = g_list_first(elem->sinkpads);
140         if (node != nullptr && node->data != nullptr) {
141             upstreamPad = GST_PAD_CAST(gst_object_ref((GST_PAD_CAST(node->data))));
142         }
143 
144         gst_object_unref(elem);
145     }
146 
147     if (upstreamPad == nullptr) {
148         return false;
149     }
150 
151     if (gst_pad_is_blocked(upstreamPad)) {
152         gst_object_unref(upstreamPad);
153         return true;
154     }
155 
156     bool ret = CheckUpStreamBlocked(*upstreamPad);
157     gst_object_unref(upstreamPad);
158 
159     return ret;
160 }
161 
162 /**
163  * Call it after IsRemoved, this function maybe return false if there
164  * are no any probe setuped.
165  */
IsBufferDetected()166 bool AVMetaBufferBlocker::IsBufferDetected()
167 {
168     std::unique_lock<std::mutex> lock(mutex_);
169 
170     for (auto &item : padInfos_) {
171         if (item.detected) {
172             continue;
173         }
174 
175         MEDIA_LOGD("buffer not arrived for %{public}s's %{public}s",
176                    ELEM_NAME(&elem_), PAD_NAME(item.pad));
177 
178         /**
179          * if the upstream is blocked, we can not wait buffer arriving at this pad.
180          * Thus, we just remove the probe at this pad and set the detected to true.
181          * This operation maybe remove this block probe too early, but the upstream's
182          * blocker will guarantee at least receiving one frame of buffer to finish the meta
183          * extracting process.
184          */
185         if (item.probeId != 0 && CheckUpStreamBlocked(*item.pad)) {
186             MEDIA_LOGD("%{public}s's %{public}s upstream is blocked, dont need this blocker",
187                        ELEM_NAME(&elem_), PAD_NAME(item.pad));
188             gst_pad_remove_probe(item.pad, item.probeId);
189             item.probeId = 0;
190             item.detected = true;
191             continue;
192         }
193 
194         // not detect buffer and the upstream of this pad is not blocked.
195         return false;
196     }
197 
198     return true;
199 }
200 
IsRemoved()201 bool AVMetaBufferBlocker::IsRemoved()
202 {
203     std::unique_lock<std::mutex> lock(mutex_);
204     return isRemoved_;
205 }
206 
Remove()207 void AVMetaBufferBlocker::Remove()
208 {
209     std::unique_lock<std::mutex> lock(mutex_);
210     if (isHidden_) {
211         return;
212     }
213 
214     for (auto &padInfo : padInfos_) {
215         if (padInfo.probeId != 0) {
216             MEDIA_LOGD("cancel block at %{public}s's %{public}s", ELEM_NAME(&elem_), PAD_NAME(padInfo.pad));
217             gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
218             padInfo.probeId = 0;
219         }
220     }
221 
222     if (signalId_ != 0) {
223         g_signal_handler_disconnect(&elem_, signalId_);
224         signalId_ = 0;
225     }
226 
227     isRemoved_ = true;
228 }
229 
Hide()230 void AVMetaBufferBlocker::Hide()
231 {
232     std::unique_lock<std::mutex> lock(mutex_);
233     isHidden_ = true;
234 }
235 
GetElemName()236 std::string AVMetaBufferBlocker::GetElemName()
237 {
238     return ELEM_NAME(&elem_);
239 }
240 
CheckBufferDetected(GstPadProbeInfo & info)241 bool AVMetaBufferBlocker::CheckBufferDetected(GstPadProbeInfo &info)
242 {
243     auto type = static_cast<unsigned int>(info.type);
244     if ((type & (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST)) != 0) {
245         MEDIA_LOGD("buffer detected");
246         return true;
247     }
248 
249     if ((type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) != 0) {
250         GstEvent *event = gst_pad_probe_info_get_event(&info);
251         if (event == nullptr) {
252             return false;
253         }
254 
255         if (GST_EVENT_TYPE(event) != GST_EVENT_GAP && GST_EVENT_TYPE(event) != GST_EVENT_EOS) {
256             return false;
257         }
258 
259         MEDIA_LOGD("gap or eos detected");
260         const GstStructure *struc = gst_event_get_structure(event);
261         gchar *prettyString = gst_structure_to_string(struc);
262         if (prettyString != nullptr) {
263             MEDIA_LOGD("event: %{public}s", prettyString);
264             g_free(prettyString);
265         }
266         return true;
267     }
268 
269     return false;
270 }
271 
OnBlockCallback(GstPad & pad,GstPadProbeInfo & info)272 GstPadProbeReturn AVMetaBufferBlocker::OnBlockCallback(GstPad &pad, GstPadProbeInfo &info)
273 {
274     if (!CheckBufferDetected(info)) {
275         return probeRet_;
276     }
277 
278     std::unique_lock<std::mutex> lock(mutex_);
279     for (auto &padInfo : padInfos_) {
280         if (&pad != padInfo.pad) {
281             continue;
282         }
283 
284         if (padInfo.probeId == 0) {
285             MEDIA_LOGI("block already be canceled, exit");
286             return GST_PAD_PROBE_REMOVE;
287         }
288 
289         padInfo.detected = true;
290         MEDIA_LOGD("something arrived at %{public}s's pad %{public}s", PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
291         lock.unlock(); // ???
292 
293         if (notifier_ != nullptr) {
294             notifier_();
295         }
296         return GST_PAD_PROBE_OK;
297     }
298 
299     return probeRet_;
300 }
301 
OnPadAdded(GstElement & elem,GstPad & pad)302 void AVMetaBufferBlocker::OnPadAdded(GstElement &elem, GstPad &pad)
303 {
304     MEDIA_LOGD("demux %{public}s sinkpad %{public}s added", ELEM_NAME(&elem), PAD_NAME(&pad));
305 
306     std::unique_lock<std::mutex> lock(mutex_);
307 
308     /**
309      * if it've already been required to remove all probes, we need to
310      * reject to add probe to new pad.
311      */
312     if (isRemoved_) {
313         MEDIA_LOGI("block already be removed, exit");
314         return;
315     }
316 
317     GstPadDirection currDirection = direction_ ? GST_PAD_SRC : GST_PAD_SINK;
318     if (GST_PAD_DIRECTION(&pad) != currDirection) {
319         return;
320     }
321 
322     GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
323 
324     /**
325      * If it has one pad before, we must change the blocked probe to
326      * detecting probe, or no buffer can arrive at this new pad if the
327      * previous pad has been blocked, because the element maybe not
328      * start individual thread for each pad, all pad maybe drived by
329      * just one thread.
330      */
331     if (padInfos_.size() != 0) {
332         MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
333                    ELEM_NAME(&elem_), direction_);
334         type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
335         probeRet_ = GST_PAD_PROBE_OK;
336 
337         std::vector<PadInfo> temp;
338         temp.swap(padInfos_);
339 
340         for (auto &padInfo : temp) {
341             /**
342              * if the blocked probe setuped at this pad, we add detecting probe
343              * at it firstly, then remove the blocing probe, for avoiding to miss
344              * the first buffer passing through this pad.
345              */
346             if (padInfo.blocked && padInfo.probeId != 0) {
347                 AddPadProbe(*padInfo.pad, type);
348                 gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
349             } else {
350                 // not blocked probe, just move to new container.
351                 padInfos_.push_back(padInfo);
352             }
353         }
354     }
355 
356     // the subtitle stream must be ignored, currently we dont support it.
357     std::string_view name = PAD_NAME(&pad);
358     if (name.find("subtitle") != std::string_view::npos) {
359         return;
360     }
361 
362     // now, we can add the probe for new pad.
363     AddPadProbe(pad, type);
364 }
365 
AddPadProbe(GstPad & pad,GstPadProbeType type)366 void AVMetaBufferBlocker::AddPadProbe(GstPad &pad, GstPadProbeType type)
367 {
368     AVMetaBufferBlockerWrapper *wrapper = new(std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
369     CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
370 
371     bool blocked = type & GST_PAD_PROBE_TYPE_BLOCK;
372     gulong probeId = gst_pad_add_probe(&pad, type, BlockCallback,
373         wrapper, &AVMetaBufferBlockerWrapper::OnDestory);
374     if (probeId == 0) {
375         MEDIA_LOGW("add probe for %{public}s's pad %{public}s failed",
376                     PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
377         delete wrapper;
378     } else {
379         (void)padInfos_.emplace_back(PadInfo { &pad, probeId, false, blocked });
380     }
381 }
382 } // namespace Media
383 } // namespace OHOS
384