1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "recorder_pipeline.h"
17 #include <gst/gst.h>
18 #include "string_ex.h"
19 #include "media_errors.h"
20 #include "media_log.h"
21 #include "i_recorder_engine.h"
22 #include "recorder_private_param.h"
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "RecorderPipeline"};
26 }
27
28 namespace OHOS {
29 namespace Media {
RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)30 RecorderPipeline::RecorderPipeline(std::shared_ptr<RecorderPipelineDesc> desc)
31 : desc_(desc)
32 {
33 MEDIA_LOGD("enter, ctor");
34 }
35
~RecorderPipeline()36 RecorderPipeline::~RecorderPipeline()
37 {
38 MEDIA_LOGD("enter, dtor");
39 (void)Reset();
40 }
41
SetNotifier(RecorderMsgNotifier notifier)42 void RecorderPipeline::SetNotifier(RecorderMsgNotifier notifier)
43 {
44 std::unique_lock<std::mutex> lock(gstPipeMutex_);
45 notifier_ = notifier;
46 }
47
Init()48 int32_t RecorderPipeline::Init()
49 {
50 if (desc_ == nullptr) {
51 MEDIA_LOGE("pipeline desc is nullptr");
52 return MSERR_INVALID_OPERATION;
53 }
54
55 gstPipeline_ = reinterpret_cast<GstPipeline *>(gst_pipeline_new("recorder-pipeline"));
56 if (gstPipeline_ == nullptr) {
57 MEDIA_LOGE("Create gst pipeline failed !");
58 return MSERR_NO_MEMORY;
59 }
60
61 GstBus *bus = gst_pipeline_get_bus(gstPipeline_);
62 CHECK_AND_RETURN_RET(bus != nullptr, MSERR_INVALID_OPERATION);
63
64 auto msgResCb = std::bind(&RecorderPipeline::OnNotifyMsgProcResult, this, std::placeholders::_1);
65 msgProcessor_ = std::make_unique<RecorderMsgProcessor>(*bus, msgResCb);
66 gst_object_unref(bus);
67
68 int32_t ret = msgProcessor_->Init();
69 if (ret != MSERR_OK) {
70 MEDIA_LOGE("Init RecorderMsgProcessor Failed !, ret = %{publiuc}d", ret);
71 ClearResource();
72 return ret;
73 }
74
75 for (auto &elem : desc_->allElems) {
76 msgProcessor_->AddMsgHandler(elem);
77 }
78
79 return MSERR_OK;
80 }
81
Prepare()82 int32_t RecorderPipeline::Prepare()
83 {
84 MEDIA_LOGD("enter Prepare");
85
86 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
87
88 int32_t ret = DoElemAction(&RecorderElement::Prepare);
89 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
90
91 ret = SyncWaitChangeState(GST_STATE_PAUSED);
92 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
93
94 return MSERR_OK;
95 }
96
Start()97 int32_t RecorderPipeline::Start()
98 {
99 MEDIA_LOGD("enter Start");
100
101 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
102
103 int32_t ret = DoElemAction(&RecorderElement::Start);
104 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
105
106 ret = SyncWaitChangeState(GST_STATE_PLAYING);
107 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
108
109 isStarted_ = true;
110 return MSERR_OK;
111 }
112
Pause()113 int32_t RecorderPipeline::Pause()
114 {
115 MEDIA_LOGD("enter Pause");
116
117 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
118
119 int32_t ret = DoElemAction(&RecorderElement::Pause);
120 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
121
122 return SyncWaitChangeState(GST_STATE_PAUSED);
123 }
124
Resume()125 int32_t RecorderPipeline::Resume()
126 {
127 MEDIA_LOGD("enter Resume");
128
129 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
130
131 int32_t ret = DoElemAction(&RecorderElement::Resume);
132 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
133
134 return SyncWaitChangeState(GST_STATE_PLAYING);
135 }
136
Stop(bool isDrainAll)137 int32_t RecorderPipeline::Stop(bool isDrainAll)
138 {
139 if (errorState_.load()) {
140 return MSERR_INVALID_STATE;
141 }
142
143 if (currState_ == GST_STATE_NULL) {
144 return MSERR_OK;
145 }
146
147 if (currState_ != GST_STATE_READY) {
148 MEDIA_LOGI("enter Stop, isDrainAll = %{public}d", isDrainAll);
149 DrainBuffer(isDrainAll);
150 }
151
152 (void)DoElemAction(&RecorderElement::Stop, false);
153
154 int32_t ret = SyncWaitChangeState(GST_STATE_NULL);
155 CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ret, "Stop failed !");
156
157 isStarted_ = false;
158 return MSERR_OK;
159 }
160
SetParameter(int32_t sourceId,const RecorderParam & recParam)161 int32_t RecorderPipeline::SetParameter(int32_t sourceId, const RecorderParam &recParam)
162 {
163 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
164
165 int32_t ret = MSERR_OK;
166 for (auto &elem : desc_->allElems) {
167 if (elem->GetSourceId() == sourceId) {
168 ret = elem->SetParameter(recParam);
169 CHECK_AND_RETURN_RET(ret == MSERR_OK, ret);
170 }
171 }
172
173 return ret;
174 }
175
GetParameter(int32_t sourceId,RecorderParam & recParam)176 int32_t RecorderPipeline::GetParameter(int32_t sourceId, RecorderParam &recParam)
177 {
178 CHECK_AND_RETURN_RET(!errorState_.load(), MSERR_INVALID_STATE);
179
180 if (desc_->srcElems.find(sourceId) == desc_->srcElems.end()) {
181 MEDIA_LOGE("invalid sourceId %{public}d", sourceId);
182 return MSERR_INVALID_VAL;
183 }
184 return desc_->srcElems[sourceId]->GetParameter(recParam);
185 }
186
SyncWaitChangeState(GstState targetState)187 int32_t RecorderPipeline::SyncWaitChangeState(GstState targetState)
188 {
189 MEDIA_LOGI("change state to %{public}d", targetState);
190
191 GstStateChangeReturn stateRet = gst_element_set_state((GstElement *)gstPipeline_, targetState);
192 CHECK_AND_RETURN_RET(stateRet != GST_STATE_CHANGE_FAILURE, MSERR_INVALID_STATE);
193
194 if (stateRet != GST_STATE_CHANGE_ASYNC) {
195 MEDIA_LOGI("finish change gstpipeline state to %{public}d.", targetState);
196 currState_ = targetState;
197 return MSERR_OK;
198 }
199
200 MEDIA_LOGI("begin sync wait gstpipeline state change to %{public}d..........", targetState);
201 std::unique_lock<std::mutex> lock(gstPipeMutex_);
202 gstPipeCond_.wait(lock, [this, targetState] { return currState_ == targetState || errorState_.load(); });
203 if (errorState_.load()) {
204 MEDIA_LOGE("error happened, change state to %{public}d failed !", targetState);
205 return MSERR_INVALID_STATE;
206 }
207 MEDIA_LOGI("finish change gstpipeline state to %{public}d..........", targetState);
208 return MSERR_OK;
209 }
210
DrainBuffer(bool isDrainAll)211 void RecorderPipeline::DrainBuffer(bool isDrainAll)
212 {
213 if (currState_ == GST_STATE_PAUSED) {
214 if (isStarted_) {
215 (void)SyncWaitChangeState(GST_STATE_PLAYING);
216 } else {
217 return;
218 }
219 }
220
221 int32_t ret = MSERR_OK;
222 auto iter = desc_->allElems.begin();
223 for (size_t index = 0; index < desc_->srcElems.size(); index++, iter = std::next(iter)) {
224 ret = (*iter)->DrainAll(isDrainAll);
225 if (ret != MSERR_OK) {
226 MEDIA_LOGE("drain [%{public}d] failed for %{public}s", isDrainAll, (*iter)->GetName().c_str());
227 break;
228 }
229 }
230
231 if (ret == MSERR_OK) {
232 (void)SyncWaitEOS();
233 }
234 }
235
PostAndSyncWaitEOS()236 int32_t RecorderPipeline::PostAndSyncWaitEOS()
237 {
238 if (currState_ != GST_STATE_PLAYING) {
239 MEDIA_LOGE("curr state is not GST_STATE_PLAYING, ignore !");
240 return MSERR_OK;
241 }
242
243 GstEvent *eos = gst_event_new_eos();
244 if (eos == nullptr) {
245 MEDIA_LOGE("Create EOS event failed");
246 return MSERR_INVALID_OPERATION;
247 }
248
249 gboolean success = gst_element_send_event((GstElement *)gstPipeline_, eos);
250 if (!success) {
251 MEDIA_LOGE("Send EOS event failed");
252 return MSERR_INVALID_OPERATION;
253 }
254
255 (void)SyncWaitEOS();
256 return MSERR_OK;
257 }
258
SyncWaitEOS()259 bool RecorderPipeline::SyncWaitEOS()
260 {
261 MEDIA_LOGI("Wait EOS finished........................");
262 std::unique_lock<std::mutex> lock(gstPipeMutex_);
263 gstPipeCond_.wait(lock, [this] { return eosDone_ || errorState_.load(); });
264 if (!eosDone_) {
265 MEDIA_LOGE("error happened, wait eos done failed !");
266 return false;
267 }
268 eosDone_ = false;
269 MEDIA_LOGI("EOS finished........................");
270 return true;
271 }
272
Reset()273 int32_t RecorderPipeline::Reset()
274 {
275 (void)Stop(false);
276 (void)DoElemAction(&RecorderElement::Reset, false);
277 ClearResource();
278 return MSERR_OK;
279 }
280
DoElemAction(const ElemAction & action,bool needAllSucc)281 int32_t RecorderPipeline::DoElemAction(const ElemAction &action, bool needAllSucc)
282 {
283 if (desc_ == nullptr) {
284 return MSERR_INVALID_OPERATION;
285 }
286
287 bool allSucc = true;
288 for (auto &elem : desc_->allElems) {
289 int32_t ret = action(*elem);
290 if (ret == MSERR_OK) {
291 continue;
292 }
293 allSucc = false;
294 // if one element execute action fail, exit immediately.
295 if (needAllSucc) {
296 MEDIA_LOGE("element %{public}s execute action failed", elem->GetName().c_str());
297 return ret;
298 }
299 }
300
301 return allSucc ? MSERR_OK : MSERR_INVALID_OPERATION;
302 }
303
ClearResource()304 void RecorderPipeline::ClearResource()
305 {
306 if (msgProcessor_ != nullptr) {
307 (void)msgProcessor_->Reset();
308 msgProcessor_ = nullptr;
309 }
310
311 if (gstPipeline_ != nullptr) {
312 gst_object_unref(gstPipeline_);
313 gstPipeline_ = nullptr;
314 }
315
316 desc_ = nullptr;
317 }
318
Dump()319 void RecorderPipeline::Dump()
320 {
321 MEDIA_LOGI("==========================Dump Recorder Parameters Begin=========================");
322 for (auto &elem : desc_->allElems) {
323 elem->Dump();
324 }
325 MEDIA_LOGI("==========================Dump Recorder Parameters End===========================");
326 }
327
OnNotifyMsgProcResult(const RecorderMessage & msg)328 void RecorderPipeline::OnNotifyMsgProcResult(const RecorderMessage &msg)
329 {
330 if (msg.type == RecorderMessageType::REC_MSG_INFO) {
331 return ProcessInfoMessage(msg);
332 }
333
334 if (msg.type == RecorderMessageType::REC_MSG_ERROR) {
335 return ProcessErrorMessage(msg);
336 }
337
338 if (msg.type == RecorderMessageType::REC_MSG_FEATURE) {
339 return ProcessFeatureMessage(msg);
340 }
341 }
342
ProcessInfoMessage(const RecorderMessage & msg)343 void RecorderPipeline::ProcessInfoMessage(const RecorderMessage &msg)
344 {
345 NotifyMessage(msg);
346 }
347
ProcessErrorMessage(const RecorderMessage & msg)348 void RecorderPipeline::ProcessErrorMessage(const RecorderMessage &msg)
349 {
350 // ignore the error msg
351 if (errorState_.load() || (errorSources_.count(msg.sourceId) != 0)) {
352 return;
353 }
354
355 if (CheckStopForError(msg)) {
356 StopForError(msg);
357 return;
358 }
359
360 int ret = BypassOneSource(msg.sourceId);
361 if (ret != MSERR_OK) {
362 MEDIA_LOGE("bypass source[0x%{public}x] failed, stop recording.", msg.sourceId);
363 StopForError(msg);
364 return;
365 }
366 NotifyMessage(msg);
367 }
368
ProcessFeatureMessage(const RecorderMessage & msg)369 void RecorderPipeline::ProcessFeatureMessage(const RecorderMessage &msg)
370 {
371 switch (msg.code) {
372 case REC_MSG_FEATURE_ASYNC_DONE: {
373 {
374 std::unique_lock<std::mutex> lock(gstPipeMutex_);
375 asyncDone_ = true;
376 MEDIA_LOGI("Accept message GST_MESSAGE_ASYNC_DONE");
377 }
378 gstPipeCond_.notify_one();
379 break;
380 }
381 case REC_MSG_FEATURE_EOS_DONE: {
382 {
383 std::unique_lock<std::mutex> lock(gstPipeMutex_);
384 eosDone_ = true;
385 MEDIA_LOGI("Accept message GST_MESSAGE_EOS");
386 }
387 gstPipeCond_.notify_one();
388 break;
389 }
390 case REC_MSG_FEATURE_STATE_CHANGE_DONE: {
391 {
392 std::unique_lock<std::mutex> lock(gstPipeMutex_);
393 currState_ = static_cast<GstState>(msg.detail);
394 MEDIA_LOGI("Accept message REC_MSG_FEATURE_STATE_CHANGE_DONE, currState = %{public}d", currState_);
395 }
396 gstPipeCond_.notify_one();
397 break;
398 }
399 default:
400 MEDIA_LOGW("unknown feature message: %{public}d", msg.code);
401 break;
402 }
403 }
404
NotifyMessage(const RecorderMessage & msg)405 void RecorderPipeline::NotifyMessage(const RecorderMessage &msg)
406 {
407 std::unique_lock<std::mutex> lock(gstPipeMutex_);
408 if (notifier_ != nullptr) {
409 notifier_(msg);
410 }
411 }
412
CheckStopForError(const RecorderMessage & msg)413 bool RecorderPipeline::CheckStopForError(const RecorderMessage &msg)
414 {
415 // Not meaningful sourceId, means the error is related to all sources, and the recording must be stopped.
416 if (msg.sourceId == INVALID_SOURCE_ID || msg.sourceId == DUMMY_SOURCE_ID) {
417 return true;
418 }
419
420 (void)errorSources_.emplace(msg.sourceId);
421 if (errorSources_.size() == desc_->srcElems.size()) {
422 return true;
423 }
424
425 return false;
426 }
427
StopForError(const RecorderMessage & msg)428 void RecorderPipeline::StopForError(const RecorderMessage &msg)
429 {
430 MEDIA_LOGE("Fatal error happened, stop recording. Error code: %{public}d, detail: %{public}d",
431 msg.code, msg.detail);
432
433 errorState_.store(true);
434 DrainBuffer(false);
435 (void)DoElemAction(&RecorderElement::Stop, false);
436 (void)SyncWaitChangeState(GST_STATE_NULL);
437
438 isStarted_ = false;
439 gstPipeCond_.notify_all();
440
441 NotifyMessage(msg);
442 }
443
BypassOneSource(int32_t sourceId)444 int32_t RecorderPipeline::BypassOneSource(int32_t sourceId)
445 {
446 MEDIA_LOGE("recorder source[0x%{public}x] has error happened, bypass it", sourceId);
447
448 auto srcElemIter = desc_->srcElems.find(sourceId);
449 if (srcElemIter == desc_->srcElems.end() || srcElemIter->second == nullptr) {
450 MEDIA_LOGE("The sourceId 0x%{public}x is unrecognizable, ignored !", sourceId);
451 std::string srcIdStr;
452 for (auto &srcElemItem : desc_->srcElems) {
453 srcIdStr += DexToHexString(srcElemItem.first) + " ";
454 }
455 MEDIA_LOGE("Valid source id: %{public}s", srcIdStr.c_str());
456 return MSERR_INVALID_VAL;
457 }
458
459 bool ret = srcElemIter->second->DrainAll(true);
460 CHECK_AND_RETURN_RET(ret, MSERR_INVALID_OPERATION);
461
462 return MSERR_OK;
463 }
464 } // namespace Media
465 } // namespace OHOS
466