• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "recorder_pipeline.h"
17 #include <gst/gst.h>
18 #include "string_ex.h"
19 #include "media_errors.h"
20 #include "media_log.h"
21 #include "i_recorder_engine.h"
22 #include "recorder_private_param.h"
23 #include "scope_guard.h"
24 
25 namespace {
26     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecorderPipeline"};
27 }
28 
29 namespace OHOS {
30 namespace Media {
RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)31 RecorderPipeline::RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)
32     : desc_(desc)
33 {
34     MEDIA_LOGD("enter, ctor");
35 }
36 
~RecorderPipeline()37 RecorderPipeline::~RecorderPipeline()
38 {
39     MEDIA_LOGD("enter, dtor");
40     (void)Reset();
41 }
42 
SetNotifier(RecorderMsgNotifier notifier)43 void RecorderPipeline::SetNotifier(RecorderMsgNotifier notifier)
44 {
45     std::unique_lock<std::mutex> lock(gstPipeMutex_);
46     notifier_ = notifier;
47 }
48 
SetExecuteInCmdQ(RecorderExecuteInCmdQ executeInCmdQ)49 void RecorderPipeline::SetExecuteInCmdQ(RecorderExecuteInCmdQ executeInCmdQ)
50 {
51     std::unique_lock<std::mutex> lock(cmdQMutex_);
52     executeInCmdQ_ = executeInCmdQ;
53 }
54 
Init()55 int32_t RecorderPipeline::Init()
56 {
57     if (desc_ == nullptr) {
58         MEDIA_LOGE("pipeline desc is nullptr");
59         return MSERR_INVALID_OPERATION;
60     }
61 
62     gstPipeline_ = reinterpret_cast<GstPipeline *>(gst_pipeline_new("recorder-pipeline"));
63     if (gstPipeline_ == nullptr) {
64         MEDIA_LOGE("Create gst pipeline failed !");
65         return MSERR_NO_MEMORY;
66     }
67 
68     GstBus *bus = gst_pipeline_get_bus(gstPipeline_);
69     CHECK_AND_RETURN_RET(bus != nullptr, MSERR_INVALID_OPERATION);
70 
71     auto msgResCb = std::bind(&RecorderPipeline::OnNotifyMsgProcResult, this, std::placeholders::_1);
72     msgProcessor_ = std::make_unique<RecorderMsgProcessor>(*bus, msgResCb);
73     gst_object_unref(bus);
74 
75     int32_t ret = msgProcessor_->Init();
76     if (ret != MSERR_OK) {
77         MEDIA_LOGE("Init RecorderMsgProcessor Failed !,  ret = %{publiuc}d", ret);
78         ClearResource();
79         return ret;
80     }
81 
82     for (auto &elem : desc_->allElems) {
83         msgProcessor_->AddMsgHandler(elem);
84     }
85 
86     return MSERR_OK;
87 }
88 
Prepare()89 int32_t RecorderPipeline::Prepare()
90 {
91     MEDIA_LOGD("enter Prepare");
92 
93     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
94 
95     int32_t ret = DoElemAction(&RecorderElement::Prepare);
96     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
97 
98     ret = SyncWaitChangeState(GST_STATE_PAUSED);
99     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
100 
101     return MSERR_OK;
102 }
103 
Start()104 int32_t RecorderPipeline::Start()
105 {
106     MEDIA_LOGD("enter Start");
107 
108     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
109 
110     int32_t ret = DoElemAction(&RecorderElement::Start);
111     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
112 
113     ret = SyncWaitChangeState(GST_STATE_PLAYING);
114     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
115 
116     isStarted_ = true;
117     return MSERR_OK;
118 }
119 
Pause()120 int32_t RecorderPipeline::Pause()
121 {
122     MEDIA_LOGD("enter Pause");
123 
124     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
125 
126     int32_t ret = DoElemAction(&RecorderElement::Pause);
127     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
128 
129     return SyncWaitChangeState(GST_STATE_PAUSED);
130 }
131 
Resume()132 int32_t RecorderPipeline::Resume()
133 {
134     MEDIA_LOGD("enter Resume");
135 
136     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
137 
138     int32_t ret = DoElemAction(&RecorderElement::Resume);
139     CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
140 
141     return SyncWaitChangeState(GST_STATE_PLAYING);
142 }
143 
Stop(bool isDrainAll)144 int32_t RecorderPipeline::Stop(bool isDrainAll)
145 {
146     if (errorState_.load()) {
147         return MSERR_INVALID_STATE;
148     }
149 
150     if (currState_ == GST_STATE_NULL) {
151         return MSERR_OK;
152     }
153 
154     (void)DoElemAction(&RecorderElement::Stop, false);
155 
156     if (currState_ != GST_STATE_READY) {
157         MEDIA_LOGI("enter Stop, isDrainAll = %{public}d", isDrainAll);
158         DrainBuffer(isDrainAll);
159     }
160 
161     int32_t ret = SyncWaitChangeState(GST_STATE_NULL);
162     CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "Stop failed !");
163 
164     isStarted_ = false;
165     return MSERR_OK;
166 }
167 
SetParameter(int32_t sourceId,const RecorderParam & recParam)168 int32_t RecorderPipeline::SetParameter(int32_t sourceId, const RecorderParam &recParam)
169 {
170     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
171 
172     int32_t ret = MSERR_OK;
173     for (auto &elem : desc_->allElems) {
174         if (elem->GetSourceId() == sourceId)  {
175             ret = elem->SetParameter(recParam);
176             CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
177         }
178     }
179 
180     return ret;
181 }
182 
GetParameter(int32_t sourceId,RecorderParam & recParam)183 int32_t RecorderPipeline::GetParameter(int32_t sourceId, RecorderParam &recParam)
184 {
185     CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
186     CHECK_AND_RETURN_RET(desc_ != nullptr, MSERR_INVALID_STATE);
187 
188     if (desc_->srcElems.find(sourceId) == desc_->srcElems.end()) {
189         MEDIA_LOGE("invalid sourceId %{public}d", sourceId);
190         return MSERR_INVALID_VAL;
191     }
192     return desc_->srcElems[sourceId]->GetParameter(recParam);
193 }
194 
SyncWaitChangeState(GstState targetState)195 int32_t RecorderPipeline::SyncWaitChangeState(GstState targetState)
196 {
197     MEDIA_LOGI("change state to %{public}d", targetState);
198 
199     GstStateChangeReturn stateRet = gst_element_set_state((GstElement *)gstPipeline_, targetState);
200     if (stateRet == GST_STATE_CHANGE_FAILURE) {
201         errorState_.store(true);
202         return MSERR_INVALID_STATE;
203     }
204 
205     if (stateRet != GST_STATE_CHANGE_ASYNC) {
206         MEDIA_LOGI("finish change gstpipeline state to %{public}d.", targetState);
207         currState_ = targetState;
208         return MSERR_OK;
209     }
210 
211     MEDIA_LOGI("begin sync wait gstpipeline state change to %{public}d..........", targetState);
212     std::unique_lock<std::mutex> lock(gstPipeMutex_);
213     gstPipeCond_.wait(lock, [this, targetState] { return currState_ == targetState || errorState_.load(); });
214     if (errorState_.load()) {
215         MEDIA_LOGE("error happened, change state to %{public}d failed !", targetState);
216         return MSERR_INVALID_STATE;
217     }
218     MEDIA_LOGI("finish change gstpipeline state to %{public}d..........", targetState);
219     return MSERR_OK;
220 }
221 
DrainBuffer(bool isDrainAll)222 void RecorderPipeline::DrainBuffer(bool isDrainAll)
223 {
224     if (currState_ == GST_STATE_PAUSED) {
225         if (isStarted_) {
226             (void)SyncWaitChangeState(GST_STATE_PLAYING);
227         } else {
228             return;
229         }
230     }
231 
232     int32_t ret = MSERR_OK;
233     auto iter = desc_->allElems.begin();
234     for (size_t index = 0; index < desc_->srcElems.size(); index++, iter = std::next(iter)) {
235         ret = (*iter)->DrainAll(isDrainAll);
236         if (ret != MSERR_OK) {
237             MEDIA_LOGE("drain [%{public}d] failed for %{public}s", isDrainAll, (*iter)->GetName().c_str());
238             break;
239         }
240     }
241 
242     if (ret == MSERR_OK) {
243         (void)SyncWaitEOS();
244     }
245 }
246 
SyncWaitEOS()247 bool RecorderPipeline::SyncWaitEOS()
248 {
249     MEDIA_LOGI("Wait EOS finished........................");
250     std::unique_lock<std::mutex> lock(gstPipeMutex_);
251     if (errorState_.load()) {
252         static constexpr int32_t timeout = 1; // wait 1s for eos finished
253         gstPipeCond_.wait_for(lock, std::chrono::seconds(timeout), [this] { return eosDone_; });
254     } else {
255         gstPipeCond_.wait(lock, [this] { return eosDone_ || errorState_.load(); });
256     }
257 
258     if (!eosDone_) {
259         MEDIA_LOGE("error happened, wait eos done failed !");
260         return false;
261     }
262     eosDone_ = false;
263     MEDIA_LOGI("EOS finished........................");
264     return true;
265 }
266 
Reset()267 int32_t RecorderPipeline::Reset()
268 {
269     (void)Stop(false);
270     (void)DoElemAction(&RecorderElement::Reset, false);
271     ClearResource();
272     return MSERR_OK;
273 }
274 
DoElemAction(const ElemAction & action,bool needAllSucc)275 int32_t RecorderPipeline::DoElemAction(const ElemAction &action, bool needAllSucc)
276 {
277     if (desc_ == nullptr)  {
278         return MSERR_INVALID_OPERATION;
279     }
280 
281     bool allSucc = true;
282     for (auto &elem : desc_->allElems) {
283         int32_t ret = action(*elem);
284         if (ret == MSERR_OK) {
285             continue;
286         }
287         allSucc = false;
288         // if one element execute action fail, exit immediately.
289         if (needAllSucc) {
290             MEDIA_LOGE("element %{public}s execute action failed", elem->GetName().c_str());
291             return ret;
292         }
293     }
294 
295     return allSucc ? MSERR_OK : MSERR_INVALID_OPERATION;
296 }
297 
ClearResource()298 void RecorderPipeline::ClearResource()
299 {
300     if (msgProcessor_ != nullptr) {
301         (void)msgProcessor_->Reset();
302         msgProcessor_ = nullptr;
303     }
304 
305     if (gstPipeline_ != nullptr) {
306         gst_object_unref(gstPipeline_);
307         gstPipeline_ = nullptr;
308     }
309 
310     desc_ = nullptr;
311 }
312 
Dump()313 void RecorderPipeline::Dump()
314 {
315     MEDIA_LOGI("==========================Dump Recorder Parameters Begin=========================");
316     for (auto &elem : desc_->allElems) {
317         elem->Dump();
318     }
319     MEDIA_LOGI("==========================Dump Recorder Parameters End===========================");
320 }
321 
OnNotifyMsgProcResult(const RecorderMessage & msg)322 void RecorderPipeline::OnNotifyMsgProcResult(const RecorderMessage &msg)
323 {
324     if (msg.type == RecorderMessageType::REC_MSG_INFO) {
325         return ProcessInfoMessage(msg);
326     }
327 
328     if (msg.type == RecorderMessageType::REC_MSG_ERROR) {
329         return ProcessErrorMessage(msg);
330     }
331 
332     if (msg.type == RecorderMessageType::REC_MSG_FEATURE) {
333         return ProcessFeatureMessage(msg);
334     }
335 }
336 
ProcessInfoMessage(const RecorderMessage & msg)337 void RecorderPipeline::ProcessInfoMessage(const RecorderMessage &msg)
338 {
339     NotifyMessage(msg);
340 }
341 
ProcessErrorMessage(const RecorderMessage & msg)342 void RecorderPipeline::ProcessErrorMessage(const RecorderMessage &msg)
343 {
344     // ignore the error msg
345     if (errorState_.load() || (errorSources_.count(msg.sourceId) != 0)) {
346         return;
347     }
348 
349     if (CheckStopForError(msg)) {
350         StopForError(msg);
351         return;
352     }
353 
354     int ret = BypassOneSource(msg.sourceId);
355     if (ret != MSERR_OK) {
356         MEDIA_LOGE("bypass source[0x%{public}x] failed, stop recording.", msg.sourceId);
357         StopForError(msg);
358         return;
359     }
360     NotifyMessage(msg);
361 }
362 
ProcessFeatureMessage(const RecorderMessage & msg)363 void RecorderPipeline::ProcessFeatureMessage(const RecorderMessage &msg)
364 {
365     switch (msg.code) {
366         case REC_MSG_FEATURE_ASYNC_DONE: {
367             {
368                 std::unique_lock<std::mutex> lock(gstPipeMutex_);
369                 asyncDone_ = true;
370                 MEDIA_LOGI("Accept message GST_MESSAGE_ASYNC_DONE");
371             }
372             gstPipeCond_.notify_one();
373             break;
374         }
375         case REC_MSG_FEATURE_EOS_DONE: {
376             {
377                 std::unique_lock<std::mutex> lock(gstPipeMutex_);
378                 eosDone_ = true;
379                 MEDIA_LOGI("Accept message GST_MESSAGE_EOS");
380             }
381             gstPipeCond_.notify_one();
382             break;
383         }
384         case REC_MSG_FEATURE_STATE_CHANGE_DONE: {
385             {
386                 std::unique_lock<std::mutex> lock(gstPipeMutex_);
387                 currState_ = static_cast<GstState>(msg.detail);
388                 MEDIA_LOGI("Accept message REC_MSG_FEATURE_STATE_CHANGE_DONE, currState = %{public}d", currState_);
389             }
390             gstPipeCond_.notify_one();
391             break;
392         }
393         default:
394             MEDIA_LOGW("unknown feature message: %{public}d", msg.code);
395             break;
396     }
397 }
398 
NotifyMessage(const RecorderMessage & msg)399 void RecorderPipeline::NotifyMessage(const RecorderMessage &msg)
400 {
401     std::unique_lock<std::mutex> lock(gstPipeMutex_);
402     if (notifier_ != nullptr) {
403         notifier_(msg);
404     }
405 }
406 
CheckStopForError(const RecorderMessage & msg)407 bool RecorderPipeline::CheckStopForError(const RecorderMessage &msg)
408 {
409     // Not meaningful sourceId, means the error is related to all sources, and the recording must be stopped.
410     if (msg.sourceId == INVALID_SOURCE_ID || msg.sourceId == DUMMY_SOURCE_ID) {
411         return true;
412     }
413 
414     (void)errorSources_.emplace(msg.sourceId);
415     return errorSources_.size() == desc_->srcElems.size();
416 }
417 
StopForError(const RecorderMessage & msg)418 void RecorderPipeline::StopForError(const RecorderMessage &msg)
419 {
420     MEDIA_LOGE("Fatal error happened, stop recording. Error code: %{public}d, detail: %{public}d",
421                msg.code, msg.detail);
422     NotifyMessage(msg);
423 
424     errorState_.store(true);
425     gstPipeCond_.notify_all();
426 
427     auto stopforErrorTask = std::make_shared<TaskHandler<int32_t>>([this] {
428         (void)DoElemAction(&RecorderElement::Stop, false);
429         DrainBuffer(false);
430         (void)SyncWaitChangeState(GST_STATE_NULL);
431         return MSERR_OK;
432     });
433 
434     std::unique_lock<std::mutex> lock(cmdQMutex_);
435     CHECK_AND_RETURN(executeInCmdQ_ != nullptr);
436     (void)executeInCmdQ_(stopforErrorTask, true);
437 
438     isStarted_ = false;
439 }
440 
BypassOneSource(int32_t sourceId)441 int32_t RecorderPipeline::BypassOneSource(int32_t sourceId)
442 {
443     MEDIA_LOGE("recorder source[0x%{public}x] has error happened, bypass it", sourceId);
444 
445     auto srcElemIter = desc_->srcElems.find(sourceId);
446     if (srcElemIter == desc_->srcElems.end() || srcElemIter->second == nullptr) {
447         MEDIA_LOGE("The sourceId 0x%{public}x is unrecognizable, ignored !", sourceId);
448         std::string srcIdStr;
449         for (auto &srcElemItem : desc_->srcElems) {
450             srcIdStr += DexToHexString(srcElemItem.first) + " ";
451         }
452         MEDIA_LOGE("Valid source id: %{public}s", srcIdStr.c_str());
453         return MSERR_INVALID_VAL;
454     }
455 
456     bool ret = srcElemIter->second->DrainAll(true);
457     CHECK_AND_RETURN_RET(ret, MSERR_INVALID_OPERATION);
458 
459     return MSERR_OK;
460 }
461 } // namespace Media
462 } // namespace OHOS
463