1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "recorder_pipeline.h"
17 #include <gst/gst.h>
18 #include "string_ex.h"
19 #include "media_errors.h"
20 #include "media_log.h"
21 #include "i_recorder_engine.h"
22 #include "recorder_private_param.h"
23 #include "scope_guard.h"
24
25 namespace {
26 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecorderPipeline"};
27 }
28
29 namespace OHOS {
30 namespace Media {
RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)31 RecorderPipeline::RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)
32 : desc_(desc)
33 {
34 MEDIA_LOGD("enter, ctor");
35 }
36
~RecorderPipeline()37 RecorderPipeline::~RecorderPipeline()
38 {
39 MEDIA_LOGD("enter, dtor");
40 (void)Reset();
41 }
42
SetNotifier(RecorderMsgNotifier notifier)43 void RecorderPipeline::SetNotifier(RecorderMsgNotifier notifier)
44 {
45 std::unique_lock<std::mutex> lock(gstPipeMutex_);
46 notifier_ = notifier;
47 }
48
SetExecuteInCmdQ(RecorderExecuteInCmdQ executeInCmdQ)49 void RecorderPipeline::SetExecuteInCmdQ(RecorderExecuteInCmdQ executeInCmdQ)
50 {
51 std::unique_lock<std::mutex> lock(cmdQMutex_);
52 executeInCmdQ_ = executeInCmdQ;
53 }
54
Init()55 int32_t RecorderPipeline::Init()
56 {
57 if (desc_ == nullptr) {
58 MEDIA_LOGE("pipeline desc is nullptr");
59 return MSERR_INVALID_OPERATION;
60 }
61
62 gstPipeline_ = reinterpret_cast<GstPipeline *>(gst_pipeline_new("recorder-pipeline"));
63 if (gstPipeline_ == nullptr) {
64 MEDIA_LOGE("Create gst pipeline failed !");
65 return MSERR_NO_MEMORY;
66 }
67
68 GstBus *bus = gst_pipeline_get_bus(gstPipeline_);
69 CHECK_AND_RETURN_RET(bus != nullptr, MSERR_INVALID_OPERATION);
70
71 auto msgResCb = std::bind(&RecorderPipeline::OnNotifyMsgProcResult, this, std::placeholders::_1);
72 msgProcessor_ = std::make_unique<RecorderMsgProcessor>(*bus, msgResCb);
73 gst_object_unref(bus);
74
75 int32_t ret = msgProcessor_->Init();
76 if (ret != MSERR_OK) {
77 MEDIA_LOGE("Init RecorderMsgProcessor Failed !, ret = %{publiuc}d", ret);
78 ClearResource();
79 return ret;
80 }
81
82 for (auto &elem : desc_->allElems) {
83 msgProcessor_->AddMsgHandler(elem);
84 }
85
86 return MSERR_OK;
87 }
88
Prepare()89 int32_t RecorderPipeline::Prepare()
90 {
91 MEDIA_LOGD("enter Prepare");
92
93 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
94
95 int32_t ret = DoElemAction(&RecorderElement::Prepare);
96 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
97
98 ret = SyncWaitChangeState(GST_STATE_PAUSED);
99 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
100
101 return MSERR_OK;
102 }
103
Start()104 int32_t RecorderPipeline::Start()
105 {
106 MEDIA_LOGD("enter Start");
107
108 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
109
110 int32_t ret = DoElemAction(&RecorderElement::Start);
111 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
112
113 ret = SyncWaitChangeState(GST_STATE_PLAYING);
114 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
115
116 isStarted_ = true;
117 return MSERR_OK;
118 }
119
Pause()120 int32_t RecorderPipeline::Pause()
121 {
122 MEDIA_LOGD("enter Pause");
123
124 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
125
126 int32_t ret = DoElemAction(&RecorderElement::Pause);
127 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
128
129 return SyncWaitChangeState(GST_STATE_PAUSED);
130 }
131
Resume()132 int32_t RecorderPipeline::Resume()
133 {
134 MEDIA_LOGD("enter Resume");
135
136 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
137
138 int32_t ret = DoElemAction(&RecorderElement::Resume);
139 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
140
141 return SyncWaitChangeState(GST_STATE_PLAYING);
142 }
143
Stop(bool isDrainAll)144 int32_t RecorderPipeline::Stop(bool isDrainAll)
145 {
146 if (errorState_.load()) {
147 return MSERR_INVALID_STATE;
148 }
149
150 if (currState_ == GST_STATE_NULL) {
151 return MSERR_OK;
152 }
153
154 (void)DoElemAction(&RecorderElement::Stop, false);
155
156 if (currState_ != GST_STATE_READY) {
157 MEDIA_LOGI("enter Stop, isDrainAll = %{public}d", isDrainAll);
158 DrainBuffer(isDrainAll);
159 }
160
161 int32_t ret = SyncWaitChangeState(GST_STATE_NULL);
162 CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "Stop failed !");
163
164 isStarted_ = false;
165 return MSERR_OK;
166 }
167
SetParameter(int32_t sourceId,const RecorderParam & recParam)168 int32_t RecorderPipeline::SetParameter(int32_t sourceId, const RecorderParam &recParam)
169 {
170 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
171
172 int32_t ret = MSERR_OK;
173 for (auto &elem : desc_->allElems) {
174 if (elem->GetSourceId() == sourceId) {
175 ret = elem->SetParameter(recParam);
176 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
177 }
178 }
179
180 return ret;
181 }
182
GetParameter(int32_t sourceId,RecorderParam & recParam)183 int32_t RecorderPipeline::GetParameter(int32_t sourceId, RecorderParam &recParam)
184 {
185 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
186 CHECK_AND_RETURN_RET(desc_ != nullptr, MSERR_INVALID_STATE);
187
188 if (desc_->srcElems.find(sourceId) == desc_->srcElems.end()) {
189 MEDIA_LOGE("invalid sourceId %{public}d", sourceId);
190 return MSERR_INVALID_VAL;
191 }
192 return desc_->srcElems[sourceId]->GetParameter(recParam);
193 }
194
SyncWaitChangeState(GstState targetState)195 int32_t RecorderPipeline::SyncWaitChangeState(GstState targetState)
196 {
197 MEDIA_LOGI("change state to %{public}d", targetState);
198
199 GstStateChangeReturn stateRet = gst_element_set_state((GstElement *)gstPipeline_, targetState);
200 if (stateRet == GST_STATE_CHANGE_FAILURE) {
201 errorState_.store(true);
202 return MSERR_INVALID_STATE;
203 }
204
205 if (stateRet != GST_STATE_CHANGE_ASYNC) {
206 MEDIA_LOGI("finish change gstpipeline state to %{public}d.", targetState);
207 currState_ = targetState;
208 return MSERR_OK;
209 }
210
211 MEDIA_LOGI("begin sync wait gstpipeline state change to %{public}d..........", targetState);
212 std::unique_lock<std::mutex> lock(gstPipeMutex_);
213 gstPipeCond_.wait(lock, [this, targetState] { return currState_ == targetState || errorState_.load(); });
214 if (errorState_.load()) {
215 MEDIA_LOGE("error happened, change state to %{public}d failed !", targetState);
216 return MSERR_INVALID_STATE;
217 }
218 MEDIA_LOGI("finish change gstpipeline state to %{public}d..........", targetState);
219 return MSERR_OK;
220 }
221
DrainBuffer(bool isDrainAll)222 void RecorderPipeline::DrainBuffer(bool isDrainAll)
223 {
224 if (currState_ == GST_STATE_PAUSED) {
225 if (isStarted_) {
226 (void)SyncWaitChangeState(GST_STATE_PLAYING);
227 } else {
228 return;
229 }
230 }
231
232 int32_t ret = MSERR_OK;
233 auto iter = desc_->allElems.begin();
234 for (size_t index = 0; index < desc_->srcElems.size(); index++, iter = std::next(iter)) {
235 ret = (*iter)->DrainAll(isDrainAll);
236 if (ret != MSERR_OK) {
237 MEDIA_LOGE("drain [%{public}d] failed for %{public}s", isDrainAll, (*iter)->GetName().c_str());
238 break;
239 }
240 }
241
242 if (ret == MSERR_OK) {
243 (void)SyncWaitEOS();
244 }
245 }
246
SyncWaitEOS()247 bool RecorderPipeline::SyncWaitEOS()
248 {
249 MEDIA_LOGI("Wait EOS finished........................");
250 std::unique_lock<std::mutex> lock(gstPipeMutex_);
251 if (errorState_.load()) {
252 static constexpr int32_t timeout = 1; // wait 1s for eos finished
253 gstPipeCond_.wait_for(lock, std::chrono::seconds(timeout), [this] { return eosDone_; });
254 } else {
255 gstPipeCond_.wait(lock, [this] { return eosDone_ || errorState_.load(); });
256 }
257
258 if (!eosDone_) {
259 MEDIA_LOGE("error happened, wait eos done failed !");
260 return false;
261 }
262 eosDone_ = false;
263 MEDIA_LOGI("EOS finished........................");
264 return true;
265 }
266
Reset()267 int32_t RecorderPipeline::Reset()
268 {
269 (void)Stop(false);
270 (void)DoElemAction(&RecorderElement::Reset, false);
271 ClearResource();
272 return MSERR_OK;
273 }
274
DoElemAction(const ElemAction & action,bool needAllSucc)275 int32_t RecorderPipeline::DoElemAction(const ElemAction &action, bool needAllSucc)
276 {
277 if (desc_ == nullptr) {
278 return MSERR_INVALID_OPERATION;
279 }
280
281 bool allSucc = true;
282 for (auto &elem : desc_->allElems) {
283 int32_t ret = action(*elem);
284 if (ret == MSERR_OK) {
285 continue;
286 }
287 allSucc = false;
288 // if one element execute action fail, exit immediately.
289 if (needAllSucc) {
290 MEDIA_LOGE("element %{public}s execute action failed", elem->GetName().c_str());
291 return ret;
292 }
293 }
294
295 return allSucc ? MSERR_OK : MSERR_INVALID_OPERATION;
296 }
297
ClearResource()298 void RecorderPipeline::ClearResource()
299 {
300 if (msgProcessor_ != nullptr) {
301 (void)msgProcessor_->Reset();
302 msgProcessor_ = nullptr;
303 }
304
305 if (gstPipeline_ != nullptr) {
306 gst_object_unref(gstPipeline_);
307 gstPipeline_ = nullptr;
308 }
309
310 desc_ = nullptr;
311 }
312
Dump()313 void RecorderPipeline::Dump()
314 {
315 MEDIA_LOGI("==========================Dump Recorder Parameters Begin=========================");
316 for (auto &elem : desc_->allElems) {
317 elem->Dump();
318 }
319 MEDIA_LOGI("==========================Dump Recorder Parameters End===========================");
320 }
321
OnNotifyMsgProcResult(const RecorderMessage & msg)322 void RecorderPipeline::OnNotifyMsgProcResult(const RecorderMessage &msg)
323 {
324 if (msg.type == RecorderMessageType::REC_MSG_INFO) {
325 return ProcessInfoMessage(msg);
326 }
327
328 if (msg.type == RecorderMessageType::REC_MSG_ERROR) {
329 return ProcessErrorMessage(msg);
330 }
331
332 if (msg.type == RecorderMessageType::REC_MSG_FEATURE) {
333 return ProcessFeatureMessage(msg);
334 }
335 }
336
ProcessInfoMessage(const RecorderMessage & msg)337 void RecorderPipeline::ProcessInfoMessage(const RecorderMessage &msg)
338 {
339 NotifyMessage(msg);
340 }
341
ProcessErrorMessage(const RecorderMessage & msg)342 void RecorderPipeline::ProcessErrorMessage(const RecorderMessage &msg)
343 {
344 // ignore the error msg
345 if (errorState_.load() || (errorSources_.count(msg.sourceId) != 0)) {
346 return;
347 }
348
349 if (CheckStopForError(msg)) {
350 StopForError(msg);
351 return;
352 }
353
354 int ret = BypassOneSource(msg.sourceId);
355 if (ret != MSERR_OK) {
356 MEDIA_LOGE("bypass source[0x%{public}x] failed, stop recording.", msg.sourceId);
357 StopForError(msg);
358 return;
359 }
360 NotifyMessage(msg);
361 }
362
ProcessFeatureMessage(const RecorderMessage & msg)363 void RecorderPipeline::ProcessFeatureMessage(const RecorderMessage &msg)
364 {
365 switch (msg.code) {
366 case REC_MSG_FEATURE_ASYNC_DONE: {
367 {
368 std::unique_lock<std::mutex> lock(gstPipeMutex_);
369 asyncDone_ = true;
370 MEDIA_LOGI("Accept message GST_MESSAGE_ASYNC_DONE");
371 }
372 gstPipeCond_.notify_one();
373 break;
374 }
375 case REC_MSG_FEATURE_EOS_DONE: {
376 {
377 std::unique_lock<std::mutex> lock(gstPipeMutex_);
378 eosDone_ = true;
379 MEDIA_LOGI("Accept message GST_MESSAGE_EOS");
380 }
381 gstPipeCond_.notify_one();
382 break;
383 }
384 case REC_MSG_FEATURE_STATE_CHANGE_DONE: {
385 {
386 std::unique_lock<std::mutex> lock(gstPipeMutex_);
387 currState_ = static_cast<GstState>(msg.detail);
388 MEDIA_LOGI("Accept message REC_MSG_FEATURE_STATE_CHANGE_DONE, currState = %{public}d", currState_);
389 }
390 gstPipeCond_.notify_one();
391 break;
392 }
393 default:
394 MEDIA_LOGW("unknown feature message: %{public}d", msg.code);
395 break;
396 }
397 }
398
NotifyMessage(const RecorderMessage & msg)399 void RecorderPipeline::NotifyMessage(const RecorderMessage &msg)
400 {
401 std::unique_lock<std::mutex> lock(gstPipeMutex_);
402 if (notifier_ != nullptr) {
403 notifier_(msg);
404 }
405 }
406
CheckStopForError(const RecorderMessage & msg)407 bool RecorderPipeline::CheckStopForError(const RecorderMessage &msg)
408 {
409 // Not meaningful sourceId, means the error is related to all sources, and the recording must be stopped.
410 if (msg.sourceId == INVALID_SOURCE_ID || msg.sourceId == DUMMY_SOURCE_ID) {
411 return true;
412 }
413
414 (void)errorSources_.emplace(msg.sourceId);
415 return errorSources_.size() == desc_->srcElems.size();
416 }
417
StopForError(const RecorderMessage & msg)418 void RecorderPipeline::StopForError(const RecorderMessage &msg)
419 {
420 MEDIA_LOGE("Fatal error happened, stop recording. Error code: %{public}d, detail: %{public}d",
421 msg.code, msg.detail);
422 NotifyMessage(msg);
423
424 errorState_.store(true);
425 gstPipeCond_.notify_all();
426
427 auto stopforErrorTask = std::make_shared<TaskHandler<int32_t>>([this] {
428 (void)DoElemAction(&RecorderElement::Stop, false);
429 DrainBuffer(false);
430 (void)SyncWaitChangeState(GST_STATE_NULL);
431 return MSERR_OK;
432 });
433
434 std::unique_lock<std::mutex> lock(cmdQMutex_);
435 CHECK_AND_RETURN(executeInCmdQ_ != nullptr);
436 (void)executeInCmdQ_(stopforErrorTask, true);
437
438 isStarted_ = false;
439 }
440
BypassOneSource(int32_t sourceId)441 int32_t RecorderPipeline::BypassOneSource(int32_t sourceId)
442 {
443 MEDIA_LOGE("recorder source[0x%{public}x] has error happened, bypass it", sourceId);
444
445 auto srcElemIter = desc_->srcElems.find(sourceId);
446 if (srcElemIter == desc_->srcElems.end() || srcElemIter->second == nullptr) {
447 MEDIA_LOGE("The sourceId 0x%{public}x is unrecognizable, ignored !", sourceId);
448 std::string srcIdStr;
449 for (auto &srcElemItem : desc_->srcElems) {
450 srcIdStr += DexToHexString(srcElemItem.first) + " ";
451 }
452 MEDIA_LOGE("Valid source id: %{public}s", srcIdStr.c_str());
453 return MSERR_INVALID_VAL;
454 }
455
456 bool ret = srcElemIter->second->DrainAll(true);
457 CHECK_AND_RETURN_RET(ret, MSERR_INVALID_OPERATION);
458
459 return MSERR_OK;
460 }
461 } // namespace Media
462 } // namespace OHOS
463