• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "recorder_pipeline.h"
17 #include <gst/gst.h>
18 #include "string_ex.h"
19 #include "media_errors.h"
20 #include "media_log.h"
21 #include "i_recorder_engine.h"
22 #include "recorder_private_param.h"
23 
24 namespace {
25     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecorderPipeline"};
26 }
27 
28 namespace OHOS {
29 namespace Media {
RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)30 RecorderPipeline::RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)
31     : desc_(desc)
32 {
33     MEDIA_LOGD("enter, ctor");
34 }
35 
~RecorderPipeline()36 RecorderPipeline::~RecorderPipeline()
37 {
38     MEDIA_LOGD("enter, dtor");
39     (void)Reset();
40 }
41 
SetNotifier(RecorderMsgNotifier notifier)42 void RecorderPipeline::SetNotifier(RecorderMsgNotifier notifier)
43 {
44     std::unique_lock<std::mutex> lock(gstPipeMutex_);
45     notifier_ = notifier;
46 }
47 
Init()48 int32_t RecorderPipeline::Init()
49 {
50     if (desc_ == nullptr) {
51         MEDIA_LOGE("pipeline desc is nullptr");
52         return MSERR_INVALID_OPERATION;
53     }
54 
55     gstPipeline_ = reinterpret_cast<GstPipeline *>(gst_pipeline_new("recorder-pipeline"));
56     if (gstPipeline_ == nullptr) {
57         MEDIA_LOGE("Create gst pipeline failed !");
58         return MSERR_NO_MEMORY;
59     }
60 
61     GstBus *bus = gst_pipeline_get_bus(gstPipeline_);
62     CHECK_AND_RETURN_RET(bus != nullptr, MSERR_INVALID_OPERATION);
63 
64     auto msgResCb = std::bind(&RecorderPipeline::OnNotifyMsgProcResult, this, std::placeholders::_1);
65     msgProcessor_ = std::make_unique<RecorderMsgProcessor>(*bus, msgResCb);
66     gst_object_unref(bus);
67 
68     int32_t ret = msgProcessor_->Init();
69     if (ret != MSERR_OK) {
70         MEDIA_LOGE("Init RecorderMsgProcessor Failed !,  ret = %{publiuc}d", ret);
71         ClearResource();
72         return ret;
73     }
74 
75     for (auto &elem : desc_->allElems) {
76         msgProcessor_->AddMsgHandler(elem);
77     }
78 
79     return MSERR_OK;
80 }
81 
Prepare()82 int32_t RecorderPipeline::Prepare()
83 {
84     MEDIA_LOGD("enter Prepare");
85 
86     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
87 
88     int32_t ret = DoElemAction(&RecorderElement::Prepare);
89     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
90 
91     ret = SyncWaitChangeState(GST_STATE_PAUSED);
92     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
93 
94     return MSERR_OK;
95 }
96 
Start()97 int32_t RecorderPipeline::Start()
98 {
99     MEDIA_LOGD("enter Start");
100 
101     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
102 
103     int32_t ret = DoElemAction(&RecorderElement::Start);
104     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
105 
106     ret = SyncWaitChangeState(GST_STATE_PLAYING);
107     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
108 
109     isStarted_ = true;
110     return MSERR_OK;
111 }
112 
Pause()113 int32_t RecorderPipeline::Pause()
114 {
115     MEDIA_LOGD("enter Pause");
116 
117     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
118 
119     int32_t ret = DoElemAction(&RecorderElement::Pause);
120     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
121 
122     return SyncWaitChangeState(GST_STATE_PAUSED);
123 }
124 
Resume()125 int32_t RecorderPipeline::Resume()
126 {
127     MEDIA_LOGD("enter Resume");
128 
129     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
130 
131     int32_t ret = DoElemAction(&RecorderElement::Resume);
132     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
133 
134     return SyncWaitChangeState(GST_STATE_PLAYING);
135 }
136 
Stop(bool isDrainAll)137 int32_t RecorderPipeline::Stop(bool isDrainAll)
138 {
139     if (errorState_.load()) {
140         return MSERR_INVALID_STATE;
141     }
142 
143     if (currState_ == GST_STATE_NULL) {
144         return MSERR_OK;
145     }
146 
147     if (currState_ != GST_STATE_READY) {
148         MEDIA_LOGI("enter Stop, isDrainAll = %{public}d", isDrainAll);
149         DrainBuffer(isDrainAll);
150     }
151 
152     (void)DoElemAction(&RecorderElement::Stop, false);
153 
154     int32_t ret = SyncWaitChangeState(GST_STATE_NULL);
155     CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "Stop failed !");
156 
157     isStarted_ = false;
158     return MSERR_OK;
159 }
160 
SetParameter(int32_t sourceId,const RecorderParam & recParam)161 int32_t RecorderPipeline::SetParameter(int32_t sourceId, const RecorderParam &recParam)
162 {
163     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
164 
165     int32_t ret = MSERR_OK;
166     for (auto &elem : desc_->allElems) {
167         if (elem->GetSourceId() == sourceId)  {
168             ret = elem->SetParameter(recParam);
169             CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
170         }
171     }
172 
173     return ret;
174 }
175 
GetParameter(int32_t sourceId,RecorderParam & recParam)176 int32_t RecorderPipeline::GetParameter(int32_t sourceId, RecorderParam &recParam)
177 {
178     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
179 
180     if (desc_->srcElems.find(sourceId) == desc_->srcElems.end()) {
181         MEDIA_LOGE("invalid sourceId %{public}d", sourceId);
182         return MSERR_INVALID_VAL;
183     }
184     return desc_->srcElems[sourceId]->GetParameter(recParam);
185 }
186 
SyncWaitChangeState(GstState targetState)187 int32_t RecorderPipeline::SyncWaitChangeState(GstState targetState)
188 {
189     MEDIA_LOGI("change state to %{public}d", targetState);
190 
191     GstStateChangeReturn stateRet = gst_element_set_state((GstElement *)gstPipeline_, targetState);
192     CHECK_AND_RETURN_RET(stateRet != GST_STATE_CHANGE_FAILURE,  MSERR_INVALID_STATE);
193 
194     if (stateRet != GST_STATE_CHANGE_ASYNC) {
195         MEDIA_LOGI("finish change gstpipeline state to %{public}d.", targetState);
196         currState_ = targetState;
197         return MSERR_OK;
198     }
199 
200     MEDIA_LOGI("begin sync wait gstpipeline state change to %{public}d..........", targetState);
201     std::unique_lock<std::mutex> lock(gstPipeMutex_);
202     gstPipeCond_.wait(lock, [this, targetState] { return currState_ == targetState || errorState_.load(); });
203     if (errorState_.load()) {
204         MEDIA_LOGE("error happened, change state to %{public}d failed !", targetState);
205         return MSERR_INVALID_STATE;
206     }
207     MEDIA_LOGI("finish change gstpipeline state to %{public}d..........", targetState);
208     return MSERR_OK;
209 }
210 
DrainBuffer(bool isDrainAll)211 void RecorderPipeline::DrainBuffer(bool isDrainAll)
212 {
213     if (currState_ == GST_STATE_PAUSED) {
214         if (isStarted_) {
215             (void)SyncWaitChangeState(GST_STATE_PLAYING);
216         } else {
217             return;
218         }
219     }
220 
221     int32_t ret = MSERR_OK;
222     auto iter = desc_->allElems.begin();
223     for (size_t index = 0; index < desc_->srcElems.size(); index++, iter = std::next(iter)) {
224         ret = (*iter)->DrainAll(isDrainAll);
225         if (ret != MSERR_OK) {
226             MEDIA_LOGE("drain [%{public}d] failed for %{public}s", isDrainAll, (*iter)->GetName().c_str());
227             break;
228         }
229     }
230 
231     if (ret == MSERR_OK) {
232         (void)SyncWaitEOS();
233     }
234 }
235 
PostAndSyncWaitEOS()236 int32_t RecorderPipeline::PostAndSyncWaitEOS()
237 {
238     if (currState_ != GST_STATE_PLAYING) {
239         MEDIA_LOGE("curr state is not GST_STATE_PLAYING, ignore !");
240         return MSERR_OK;
241     }
242 
243     GstEvent *eos = gst_event_new_eos();
244     if (eos == nullptr) {
245         MEDIA_LOGE("Create EOS event failed");
246         return MSERR_INVALID_OPERATION;
247     }
248 
249     gboolean success = gst_element_send_event((GstElement *)gstPipeline_, eos);
250     if (!success) {
251         MEDIA_LOGE("Send EOS event failed");
252         return MSERR_INVALID_OPERATION;
253     }
254 
255     (void)SyncWaitEOS();
256     return MSERR_OK;
257 }
258 
SyncWaitEOS()259 bool RecorderPipeline::SyncWaitEOS()
260 {
261     MEDIA_LOGI("Wait EOS finished........................");
262     std::unique_lock<std::mutex> lock(gstPipeMutex_);
263     gstPipeCond_.wait(lock, [this] { return eosDone_ || errorState_.load(); });
264     if (!eosDone_) {
265         MEDIA_LOGE("error happened, wait eos done failed !");
266         return false;
267     }
268     eosDone_ = false;
269     MEDIA_LOGI("EOS finished........................");
270     return true;
271 }
272 
Reset()273 int32_t RecorderPipeline::Reset()
274 {
275     (void)Stop(false);
276     (void)DoElemAction(&RecorderElement::Reset, false);
277     ClearResource();
278     return MSERR_OK;
279 }
280 
DoElemAction(const ElemAction & action,bool needAllSucc)281 int32_t RecorderPipeline::DoElemAction(const ElemAction &action, bool needAllSucc)
282 {
283     if (desc_ == nullptr)  {
284         return MSERR_INVALID_OPERATION;
285     }
286 
287     bool allSucc = true;
288     for (auto &elem : desc_->allElems) {
289         int32_t ret = action(*elem);
290         if (ret == MSERR_OK) {
291             continue;
292         }
293         allSucc = false;
294         // if one element execute action fail, exit immediately.
295         if (needAllSucc) {
296             MEDIA_LOGE("element %{public}s execute action failed", elem->GetName().c_str());
297             return ret;
298         }
299     }
300 
301     return allSucc ? MSERR_OK : MSERR_INVALID_OPERATION;
302 }
303 
ClearResource()304 void RecorderPipeline::ClearResource()
305 {
306     if (msgProcessor_ != nullptr) {
307         (void)msgProcessor_->Reset();
308         msgProcessor_ = nullptr;
309     }
310 
311     if (gstPipeline_ != nullptr) {
312         gst_object_unref(gstPipeline_);
313         gstPipeline_ = nullptr;
314     }
315 
316     desc_ = nullptr;
317 }
318 
Dump()319 void RecorderPipeline::Dump()
320 {
321     MEDIA_LOGI("==========================Dump Recorder Parameters Begin=========================");
322     for (auto &elem : desc_->allElems) {
323         elem->Dump();
324     }
325     MEDIA_LOGI("==========================Dump Recorder Parameters End===========================");
326 }
327 
OnNotifyMsgProcResult(const RecorderMessage & msg)328 void RecorderPipeline::OnNotifyMsgProcResult(const RecorderMessage &msg)
329 {
330     if (msg.type == RecorderMessageType::REC_MSG_INFO) {
331         return ProcessInfoMessage(msg);
332     }
333 
334     if (msg.type == RecorderMessageType::REC_MSG_ERROR) {
335         return ProcessErrorMessage(msg);
336     }
337 
338     if (msg.type == RecorderMessageType::REC_MSG_FEATURE) {
339         return ProcessFeatureMessage(msg);
340     }
341 }
342 
ProcessInfoMessage(const RecorderMessage & msg)343 void RecorderPipeline::ProcessInfoMessage(const RecorderMessage &msg)
344 {
345     NotifyMessage(msg);
346 }
347 
ProcessErrorMessage(const RecorderMessage & msg)348 void RecorderPipeline::ProcessErrorMessage(const RecorderMessage &msg)
349 {
350     // ignore the error msg
351     if (errorState_.load() || (errorSources_.count(msg.sourceId) != 0)) {
352         return;
353     }
354 
355     if (CheckStopForError(msg)) {
356         StopForError(msg);
357         return;
358     }
359 
360     int ret = BypassOneSource(msg.sourceId);
361     if (ret != MSERR_OK) {
362         MEDIA_LOGE("bypass source[0x%{public}x] failed, stop recording.", msg.sourceId);
363         StopForError(msg);
364         return;
365     }
366     NotifyMessage(msg);
367 }
368 
ProcessFeatureMessage(const RecorderMessage & msg)369 void RecorderPipeline::ProcessFeatureMessage(const RecorderMessage &msg)
370 {
371     switch (msg.code) {
372         case REC_MSG_FEATURE_ASYNC_DONE: {
373             {
374                 std::unique_lock<std::mutex> lock(gstPipeMutex_);
375                 asyncDone_ = true;
376                 MEDIA_LOGI("Accept message GST_MESSAGE_ASYNC_DONE");
377             }
378             gstPipeCond_.notify_one();
379             break;
380         }
381         case REC_MSG_FEATURE_EOS_DONE: {
382             {
383                 std::unique_lock<std::mutex> lock(gstPipeMutex_);
384                 eosDone_ = true;
385                 MEDIA_LOGI("Accept message GST_MESSAGE_EOS");
386             }
387             gstPipeCond_.notify_one();
388             break;
389         }
390         case REC_MSG_FEATURE_STATE_CHANGE_DONE: {
391             {
392                 std::unique_lock<std::mutex> lock(gstPipeMutex_);
393                 currState_ = static_cast<GstState>(msg.detail);
394                 MEDIA_LOGI("Accept message REC_MSG_FEATURE_STATE_CHANGE_DONE, currState = %{public}d", currState_);
395             }
396             gstPipeCond_.notify_one();
397             break;
398         }
399         default:
400             MEDIA_LOGW("unknown feature message: %{public}d", msg.code);
401             break;
402     }
403 }
404 
NotifyMessage(const RecorderMessage & msg)405 void RecorderPipeline::NotifyMessage(const RecorderMessage &msg)
406 {
407     std::unique_lock<std::mutex> lock(gstPipeMutex_);
408     if (notifier_ != nullptr) {
409         notifier_(msg);
410     }
411 }
412 
CheckStopForError(const RecorderMessage & msg)413 bool RecorderPipeline::CheckStopForError(const RecorderMessage &msg)
414 {
415     // Not meaningful sourceId, means the error is related to all sources, and the recording must be stopped.
416     if (msg.sourceId == INVALID_SOURCE_ID || msg.sourceId == DUMMY_SOURCE_ID) {
417         return true;
418     }
419 
420     (void)errorSources_.emplace(msg.sourceId);
421     if (errorSources_.size() == desc_->srcElems.size()) {
422         return true;
423     }
424 
425     return false;
426 }
427 
StopForError(const RecorderMessage & msg)428 void RecorderPipeline::StopForError(const RecorderMessage &msg)
429 {
430     MEDIA_LOGE("Fatal error happened, stop recording. Error code: %{public}d, detail: %{public}d",
431                msg.code, msg.detail);
432 
433     errorState_.store(true);
434     DrainBuffer(false);
435     (void)DoElemAction(&RecorderElement::Stop, false);
436     (void)SyncWaitChangeState(GST_STATE_NULL);
437 
438     isStarted_ = false;
439     gstPipeCond_.notify_all();
440 
441     NotifyMessage(msg);
442 }
443 
BypassOneSource(int32_t sourceId)444 int32_t RecorderPipeline::BypassOneSource(int32_t sourceId)
445 {
446     MEDIA_LOGE("recorder source[0x%{public}x] has error happened, bypass it", sourceId);
447 
448     auto srcElemIter = desc_->srcElems.find(sourceId);
449     if (srcElemIter == desc_->srcElems.end() || srcElemIter->second == nullptr) {
450         MEDIA_LOGE("The sourceId 0x%{public}x is unrecognizable, ignored !", sourceId);
451         std::string srcIdStr;
452         for (auto &srcElemItem : desc_->srcElems) {
453             srcIdStr += DexToHexString(srcElemItem.first) + " ";
454         }
455         MEDIA_LOGE("Valid source id: %{public}s", srcIdStr.c_str());
456         return MSERR_INVALID_VAL;
457     }
458 
459     bool ret = srcElemIter->second->DrainAll(true);
460     CHECK_AND_RETURN_RET(ret, MSERR_INVALID_OPERATION);
461 
462     return MSERR_OK;
463 }
464 } // namespace Media
465 } // namespace OHOS
466