• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "avmeta_buffer_blocker.h"
17 #include "media_errors.h"
18 #include "media_log.h"
19 #include "gst_utils.h"
20 
21 namespace {
22     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "BufferBlocker"};
23 }
24 
25 namespace OHOS {
26 namespace Media {
27 using AVMetaBufferBlockerWrapper = ThizWrapper<AVMetaBufferBlocker>;
28 
BlockCallback(GstPad * pad,GstPadProbeInfo * info,gpointer usrdata)29 GstPadProbeReturn AVMetaBufferBlocker::BlockCallback(GstPad *pad, GstPadProbeInfo *info, gpointer usrdata)
30 {
31     if (pad == nullptr || info == nullptr || usrdata == nullptr) {
32         return GST_PAD_PROBE_PASS;
33     }
34 
35     auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(usrdata);
36     if (thizStrong != nullptr) {
37         return thizStrong->OnBlockCallback(*pad, *info);
38     }
39     return GST_PAD_PROBE_PASS;
40 }
41 
PadAdded(GstElement * elem,GstPad * pad,gpointer userData)42 void AVMetaBufferBlocker::PadAdded(GstElement *elem, GstPad *pad, gpointer userData)
43 {
44     CHECK_AND_RETURN(elem != nullptr && pad != nullptr && userData != nullptr);
45 
46     auto thizStrong = AVMetaBufferBlockerWrapper::TakeStrongThiz(userData);
47     if (thizStrong != nullptr) {
48         return thizStrong->OnPadAdded(*elem, *pad);
49     }
50 }
51 
AVMetaBufferBlocker(GstElement & elem,bool direction,BufferRecievedNotifier notifier)52 AVMetaBufferBlocker::AVMetaBufferBlocker(GstElement &elem, bool direction, BufferRecievedNotifier notifier)
53     : elem_(elem), direction_(direction), notifier_(notifier)
54 {
55     MEDIA_LOGD("ctor, elem: %{public}s, direction: %{public}d, 0x%{public}06" PRIXPTR,
56         ELEM_NAME(&elem_), direction_, FAKE_POINTER(this));
57 }
58 
~AVMetaBufferBlocker()59 AVMetaBufferBlocker::~AVMetaBufferBlocker()
60 {
61     MEDIA_LOGD("dtor, 0x%{public}06" PRIXPTR, FAKE_POINTER(this));
62 }
63 
Init()64 void AVMetaBufferBlocker::Init()
65 {
66     MEDIA_LOGD("elem: %{public}s", ELEM_NAME(&elem_));
67     auto padList = direction_ ? elem_.srcpads : elem_.sinkpads;
68 
69     /**
70      * Defaultly, the probe type is blocked. If the pads count is greater than 1,
71      * we can only setup detecting probe, because the element maybe not start individual
72      * thread for each pad, all pad maybe drived by just one thread. If one pad is
73      * blocked, then all other pads can not detect any buffer.
74      */
75     GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
76     if (g_list_length(padList) > 1) {
77         MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
78                    ELEM_NAME(&elem_), direction_);
79         type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
80         probeRet_ = GST_PAD_PROBE_OK;
81     }
82 
83     // Add pad probe for all pads. If the pads count is greater than 1, we just detect buffer.
84     for (GList *node = g_list_first(padList); node != nullptr; node = node->next) {
85         CHECK_AND_CONTINUE(node->data != nullptr);
86         GstPad *pad = reinterpret_cast<GstPad *>(node->data);
87 
88         // the subtitle stream must be ignored, currently we dont support it.
89         std::string_view name = PAD_NAME(pad);
90         if (name.find("subtitle") != std::string_view::npos) {
91             continue;
92         }
93 
94         AddPadProbe(*pad, type);
95     }
96 
97     // listen to the "pad-added" signal to figure out whether the pads count is greater than 1.
98     AVMetaBufferBlockerWrapper *wrapper = new (std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
99     CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
100 
101     signalId_ = g_signal_connect_data(&elem_, "pad-added", GCallback(&AVMetaBufferBlocker::PadAdded), wrapper,
102         (GClosureNotify)&AVMetaBufferBlockerWrapper::OnDestory, static_cast<GConnectFlags>(0));
103     if (signalId_ == 0) {
104         delete wrapper;
105         MEDIA_LOGW("add signal failed for %{public}s", ELEM_NAME(&elem_));
106     }
107 }
108 
CheckUpStreamBlocked(GstPad & pad)109 bool AVMetaBufferBlocker::CheckUpStreamBlocked(GstPad &pad)
110 {
111     GstPad *upstreamPad = nullptr;
112 
113     if (GST_PAD_DIRECTION(&pad) == GST_PAD_SINK) {
114         upstreamPad = gst_pad_get_peer(&pad);
115     } else {
116         GstElement *elem = gst_pad_get_parent_element(&pad);
117         if (elem == nullptr) {
118             if (GST_IS_PROXY_PAD(&pad)) { // find the proxy sinkpad for decodebin
119                 return false;
120             }
121             MEDIA_LOGE("unexpected, avoid lock, return true");
122             return true;
123         }
124 
125         /**
126          * This is a multi-srcpads element, means that we meet the demuxer or multiqueue.
127          * There is no need to figure out whether the demuxer or multiqueue's sinkpads are
128          * blocked, because we guarante it will never happen.
129          */
130         if (g_list_length(elem->srcpads) > 1 || g_list_length(elem->sinkpads) == 0) {
131             gst_object_unref(elem);
132             return false;
133         }
134 
135         GList *node = g_list_first(elem->sinkpads);
136         if (node != nullptr && node->data != nullptr) {
137             upstreamPad = GST_PAD_CAST(gst_object_ref((GST_PAD_CAST(node->data))));
138         }
139 
140         gst_object_unref(elem);
141     }
142 
143     if (upstreamPad == nullptr) {
144         return false;
145     }
146 
147     if (gst_pad_is_blocked(upstreamPad)) {
148         gst_object_unref(upstreamPad);
149         return true;
150     }
151 
152     bool ret = CheckUpStreamBlocked(*upstreamPad);
153     gst_object_unref(upstreamPad);
154 
155     return ret;
156 }
157 
158 /**
159  * Call it after IsRemoved, this function maybe return false if there
160  * are no any probe setuped.
161  */
IsBufferDetected()162 bool AVMetaBufferBlocker::IsBufferDetected()
163 {
164     std::unique_lock<std::mutex> lock(mutex_);
165 
166     for (auto &item : padInfos_) {
167         if (item.detected) {
168             continue;
169         }
170 
171         MEDIA_LOGD("buffer not arrived for %{public}s's %{public}s",
172                    ELEM_NAME(&elem_), PAD_NAME(item.pad));
173 
174         /**
175          * if the upstream is blocked, we can not wait buffer arriving at this pad.
176          * Thus, we just remove the probe at this pad and set the detected to true.
177          * This operation maybe remove this block probe too early, but the upstream's
178          * blocker will guarantee at least receiving one frame of buffer to finish the meta
179          * extracting process.
180          */
181         if (item.probeId != 0 && CheckUpStreamBlocked(*item.pad)) {
182             MEDIA_LOGD("%{public}s's %{public}s upstream is blocked, dont need this blocker",
183                        ELEM_NAME(&elem_), PAD_NAME(item.pad));
184             gst_pad_remove_probe(item.pad, item.probeId);
185             item.probeId = 0;
186             item.detected = true;
187             continue;
188         }
189 
190         // not detect buffer and the upstream of this pad is not blocked.
191         return false;
192     }
193 
194     return true;
195 }
196 
IsRemoved()197 bool AVMetaBufferBlocker::IsRemoved()
198 {
199     std::unique_lock<std::mutex> lock(mutex_);
200     return isRemoved_;
201 }
202 
Remove()203 void AVMetaBufferBlocker::Remove()
204 {
205     std::unique_lock<std::mutex> lock(mutex_);
206     if (isHidden_) {
207         return;
208     }
209 
210     for (auto &padInfo : padInfos_) {
211         if (padInfo.probeId != 0) {
212             MEDIA_LOGD("cancel block at %{public}s's %{public}s", ELEM_NAME(&elem_), PAD_NAME(padInfo.pad));
213             gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
214             padInfo.probeId = 0;
215         }
216     }
217 
218     if (signalId_ != 0) {
219         g_signal_handler_disconnect(&elem_, signalId_);
220         signalId_ = 0;
221     }
222 
223     isRemoved_ = true;
224 }
225 
Hide()226 void AVMetaBufferBlocker::Hide()
227 {
228     std::unique_lock<std::mutex> lock(mutex_);
229     isHidden_ = true;
230 }
231 
GetElemName()232 std::string AVMetaBufferBlocker::GetElemName()
233 {
234     return ELEM_NAME(&elem_);
235 }
236 
CheckBufferDetected(GstPadProbeInfo & info)237 bool AVMetaBufferBlocker::CheckBufferDetected(GstPadProbeInfo &info)
238 {
239     auto type = static_cast<unsigned int>(info.type);
240     if ((type & (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST)) != 0) {
241         MEDIA_LOGD("buffer detected");
242         return true;
243     }
244 
245     if ((type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) != 0) {
246         GstEvent *event = gst_pad_probe_info_get_event(&info);
247         if (event == nullptr) {
248             return false;
249         }
250 
251         if (GST_EVENT_TYPE(event) != GST_EVENT_GAP && GST_EVENT_TYPE(event) != GST_EVENT_EOS) {
252             return false;
253         }
254 
255         MEDIA_LOGD("gap or eos detected");
256         const GstStructure *struc = gst_event_get_structure(event);
257         gchar *prettyString = gst_structure_to_string(struc);
258         if (prettyString != nullptr) {
259             MEDIA_LOGD("event: %{public}s", prettyString);
260             g_free(prettyString);
261         }
262         return true;
263     }
264 
265     return false;
266 }
267 
OnBlockCallback(GstPad & pad,GstPadProbeInfo & info)268 GstPadProbeReturn AVMetaBufferBlocker::OnBlockCallback(GstPad &pad, GstPadProbeInfo &info)
269 {
270     if (!CheckBufferDetected(info)) {
271         return probeRet_;
272     }
273 
274     std::unique_lock<std::mutex> lock(mutex_);
275     for (auto &padInfo : padInfos_) {
276         if (&pad != padInfo.pad) {
277             continue;
278         }
279 
280         if (padInfo.probeId == 0) {
281             MEDIA_LOGI("block already be canceled, exit");
282             return GST_PAD_PROBE_REMOVE;
283         }
284 
285         padInfo.detected = true;
286         MEDIA_LOGD("something arrived at %{public}s's pad %{public}s", PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
287         lock.unlock(); // ???
288 
289         if (notifier_ != nullptr) {
290             notifier_();
291         }
292         return GST_PAD_PROBE_OK;
293     }
294 
295     return probeRet_;
296 }
297 
OnPadAdded(GstElement & elem,GstPad & pad)298 void AVMetaBufferBlocker::OnPadAdded(GstElement &elem, GstPad &pad)
299 {
300     MEDIA_LOGD("demux %{public}s sinkpad %{public}s added", ELEM_NAME(&elem), PAD_NAME(&pad));
301 
302     std::unique_lock<std::mutex> lock(mutex_);
303 
304     /**
305      * if it've already been required to remove all probes, we need to
306      * reject to add probe to new pad.
307      */
308     if (isRemoved_) {
309         MEDIA_LOGI("block already be removed, exit");
310         return;
311     }
312 
313     GstPadDirection currDirection = direction_ ? GST_PAD_SRC : GST_PAD_SINK;
314     if (GST_PAD_DIRECTION(&pad) != currDirection) {
315         return;
316     }
317 
318     GstPadProbeType type = GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM;
319 
320     /**
321      * If it has one pad before, we must change the blocked probe to
322      * detecting probe, or no buffer can arrive at this new pad if the
323      * previous pad has been blocked, because the element maybe not
324      * start individual thread for each pad, all pad maybe drived by
325      * just one thread.
326      */
327     if (padInfos_.size() != 0) {
328         MEDIA_LOGI("%{public}s's pads count is not 1, direction: %{public}d, only detect buffer, no blocked",
329                    ELEM_NAME(&elem_), direction_);
330         type = static_cast<GstPadProbeType>(type & ~GST_PAD_PROBE_TYPE_BLOCK);
331         probeRet_ = GST_PAD_PROBE_OK;
332 
333         std::vector<PadInfo> temp;
334         temp.swap(padInfos_);
335 
336         for (auto &padInfo : temp) {
337             /**
338              * if the blocked probe setuped at this pad, we add detecting probe
339              * at it firstly, then remove the blocing probe, for avoiding to miss
340              * the first buffer passing through this pad.
341              */
342             if (padInfo.blocked && padInfo.probeId != 0) {
343                 AddPadProbe(*padInfo.pad, type);
344                 gst_pad_remove_probe(padInfo.pad, padInfo.probeId);
345             } else {
346                 // not blocked probe, just move to new container.
347                 padInfos_.push_back(padInfo);
348             }
349         }
350     }
351 
352     // the subtitle stream must be ignored, currently we dont support it.
353     std::string_view name = PAD_NAME(&pad);
354     if (name.find("subtitle") != std::string_view::npos) {
355         return;
356     }
357 
358     // now, we can add the probe for new pad.
359     AddPadProbe(pad, type);
360 }
361 
AddPadProbe(GstPad & pad,GstPadProbeType type)362 void AVMetaBufferBlocker::AddPadProbe(GstPad &pad, GstPadProbeType type)
363 {
364     AVMetaBufferBlockerWrapper *wrapper = new(std::nothrow) AVMetaBufferBlockerWrapper(shared_from_this());
365     CHECK_AND_RETURN_LOG(wrapper != nullptr, "can not create this wrapper");
366 
367     bool blocked = type & GST_PAD_PROBE_TYPE_BLOCK;
368     gulong probeId = gst_pad_add_probe(&pad, type, BlockCallback,
369         wrapper, &AVMetaBufferBlockerWrapper::OnDestory);
370     if (probeId == 0) {
371         MEDIA_LOGW("add probe for %{public}s's pad %{public}s failed",
372                     PAD_PARENT_NAME(&pad), PAD_NAME(&pad));
373         delete wrapper;
374     } else {
375         (void)padInfos_.emplace_back(PadInfo { &pad, probeId, false, blocked });
376     }
377 }
378 } // namespace Media
379 } // namespace OHOS
380