1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "audio_endpoint.h"
17
18 #include <atomic>
19 #include <cinttypes>
20 #include <condition_variable>
21 #include <thread>
22 #include <vector>
23 #include <mutex>
24
25 #include "securec.h"
26
27 #include "audio_errors.h"
28 #include "audio_log.h"
29 #include "audio_schedule.h"
30 #include "audio_utils.h"
31 #include "fast_audio_renderer_sink.h"
32 #include "fast_audio_capturer_source.h"
33 #include "i_audio_capturer_source.h"
34 #include "linear_pos_time_model.h"
35 #include "policy_handler.h"
36 #include "remote_fast_audio_renderer_sink.h"
37 #include "remote_fast_audio_capturer_source.h"
38
39 // DUMP_PROCESS_FILE // define it for dump file
40 namespace OHOS {
41 namespace AudioStandard {
42 namespace {
43 static constexpr int32_t VOLUME_SHIFT_NUMBER = 16; // 1 >> 16 = 65536, max volume
44 static constexpr int64_t MAX_SPAN_DURATION_IN_NANO = 100000000; // 100ms
45 static constexpr int32_t SLEEP_TIME_IN_DEFAULT = 400; // 400ms
46 static constexpr int64_t DELTA_TO_REAL_READ_START_TIME = 0; // 0ms
47 }
48
49 class AudioEndpointInner : public AudioEndpoint {
50 public:
51 explicit AudioEndpointInner(EndpointType type);
52 ~AudioEndpointInner();
53
54 bool Config(const DeviceInfo &deviceInfo);
55 bool StartDevice();
56 bool StopDevice();
57
58 // when audio process start.
59 int32_t OnStart(IAudioProcessStream *processStream) override;
60 // when audio process pause.
61 int32_t OnPause(IAudioProcessStream *processStream) override;
62 // when audio process request update handle info.
63 int32_t OnUpdateHandleInfo(IAudioProcessStream *processStream) override;
64
65 /**
66 * Call LinkProcessStream when first create process or link other process with this endpoint.
67 * Here are cases:
68 * case1: endpointStatus_ = UNLINKED, link not running process; UNLINKED-->IDEL & godown
69 * case2: endpointStatus_ = UNLINKED, link running process; UNLINKED-->IDEL & godown
70 * case3: endpointStatus_ = IDEL, link not running process; IDEL-->IDEL
71 * case4: endpointStatus_ = IDEL, link running process; IDEL-->STARTING-->RUNNING
72 * case5: endpointStatus_ = RUNNING; RUNNING-->RUNNING
73 */
74 int32_t LinkProcessStream(IAudioProcessStream *processStream) override;
75 int32_t UnlinkProcessStream(IAudioProcessStream *processStream) override;
76
77 int32_t GetPreferBufferInfo(uint32_t &totalSizeInframe, uint32_t &spanSizeInframe) override;
78
79 void Dump(std::stringstream &dumpStringStream) override;
80
81 std::string GetEndpointName() override;
82
83 EndpointStatus GetStatus() override;
84
85 void Release() override;
86
87 private:
88 bool ConfigInputPoint(const DeviceInfo &deviceInfo);
89 int32_t PrepareDeviceBuffer(const DeviceInfo &deviceInfo);
90 int32_t GetAdapterBufferInfo(const DeviceInfo &deviceInfo);
91 void ReSyncPosition();
92 void RecordReSyncPosition();
93 void InitAudiobuffer(bool resetReadWritePos);
94 void ProcessData(const std::vector<AudioStreamData> &srcDataList, const AudioStreamData &dstData);
95 int64_t GetPredictNextReadTime(uint64_t posInFrame);
96 int64_t GetPredictNextWriteTime(uint64_t posInFrame);
97 bool PrepareNextLoop(uint64_t curWritePos, int64_t &wakeUpTime);
98 bool RecordPrepareNextLoop(uint64_t curReadPos, int64_t &wakeUpTime);
99
100 /**
101 * @brief Get the current read position in frame and the read-time with it.
102 *
103 * @param frames the read position in frame
104 * @param nanoTime the time in nanosecond when device-sink start read the buffer
105 */
106 bool GetDeviceHandleInfo(uint64_t &frames, int64_t &nanoTime);
107 int32_t GetProcLastWriteDoneInfo(const std::shared_ptr<OHAudioBuffer> processBuffer, uint64_t curWriteFrame,
108 uint64_t &proHandleFrame, int64_t &proHandleTime);
109
110 bool IsAnyProcessRunning();
111 bool CheckAllBufferReady(int64_t checkTime, uint64_t curWritePos);
112 bool ProcessToEndpointDataHandle(uint64_t curWritePos);
113 void GetAllReadyProcessData(std::vector<AudioStreamData> &audioDataList);
114
115 std::string GetStatusStr(EndpointStatus status);
116
117 int32_t WriteToSpecialProcBuf(const std::shared_ptr<OHAudioBuffer> &procBuf, const BufferDesc &readBuf);
118 void WriteToProcessBuffers(const BufferDesc &readBuf);
119 int32_t ReadFromEndpoint(uint64_t curReadPos);
120 bool KeepWorkloopRunning();
121
122 void EndpointWorkLoopFuc();
123 void RecordEndpointWorkLoopFuc();
124
125 // Call GetMmapHandlePosition in ipc may block more than a cycle, call it in another thread.
126 void AsyncGetPosTime();
127
128 private:
129 static constexpr int64_t ONE_MILLISECOND_DURATION = 1000000; // 1ms
130 static constexpr int64_t WRITE_TO_HDI_AHEAD_TIME = -1000000; // ahead 1ms
131 static constexpr int32_t UPDATE_THREAD_TIMEOUT = 1000; // 1000ms
132 enum ThreadStatus : uint32_t {
133 WAITTING = 0,
134 SLEEPING,
135 INRUNNING
136 };
137 // SamplingRate EncodingType SampleFormat Channel
138 DeviceInfo deviceInfo_;
139 AudioStreamInfo dstStreamInfo_;
140 EndpointType endpointType_;
141
142 std::mutex listLock_;
143 std::vector<IAudioProcessStream *> processList_;
144 std::vector<std::shared_ptr<OHAudioBuffer>> processBufferList_;
145
146 std::atomic<bool> isInited_ = false;
147
148 IMmapAudioRendererSink *fastSink_ = nullptr;
149 IMmapAudioCapturerSource *fastSource_ = nullptr;
150
151 LinearPosTimeModel readTimeModel_;
152 LinearPosTimeModel writeTimeModel_;
153
154 int64_t spanDuration_ = 0; // nano second
155 int64_t serverAheadReadTime_ = 0;
156 int dstBufferFd_ = -1; // -1: invalid fd.
157 uint32_t dstTotalSizeInframe_ = 0;
158 uint32_t dstSpanSizeInframe_ = 0;
159 uint32_t dstByteSizePerFrame_ = 0;
160 std::shared_ptr<OHAudioBuffer> dstAudioBuffer_ = nullptr;
161
162 std::atomic<EndpointStatus> endpointStatus_ = INVALID;
163
164 std::atomic<ThreadStatus> threadStatus_ = WAITTING;
165 std::thread endpointWorkThread_;
166 std::mutex loopThreadLock_;
167 std::condition_variable workThreadCV_;
168 int64_t lastHandleProcessTime_ = 0;
169
170 std::thread updatePosTimeThread_;
171 std::mutex updateThreadLock_;
172 std::condition_variable updateThreadCV_;
173 std::atomic<bool> stopUpdateThread_ = false;
174
175 std::atomic<uint64_t> posInFrame_ = 0;
176 std::atomic<int64_t> timeInNano_ = 0;
177
178 bool isDeviceRunningInIdel_ = true; // will call start sink when linked.
179 bool needReSyncPosition_ = true;
180 #ifdef DUMP_PROCESS_FILE
181 FILE *dcp_ = nullptr;
182 FILE *dump_hdi_ = nullptr;
183 #endif
184 };
185
GetInstance(EndpointType type,const DeviceInfo & deviceInfo)186 std::shared_ptr<AudioEndpoint> AudioEndpoint::GetInstance(EndpointType type, const DeviceInfo &deviceInfo)
187 {
188 std::shared_ptr<AudioEndpointInner> audioEndpoint = std::make_shared<AudioEndpointInner>(type);
189 CHECK_AND_RETURN_RET_LOG(audioEndpoint != nullptr, nullptr, "Create AudioEndpoint failed.");
190
191 if (!audioEndpoint->Config(deviceInfo)) {
192 AUDIO_ERR_LOG("Config AudioEndpoint failed.");
193 audioEndpoint = nullptr;
194 }
195 return audioEndpoint;
196 }
197
AudioEndpointInner(EndpointType type)198 AudioEndpointInner::AudioEndpointInner(EndpointType type) : endpointType_(type)
199 {
200 AUDIO_INFO_LOG("AudioEndpoint type:%{public}d", endpointType_);
201 }
202
GetEndpointName()203 std::string AudioEndpointInner::GetEndpointName()
204 {
205 // temp method to get deivce key, should be same with AudioService::GetAudioEndpointForDevice.
206 return deviceInfo_.networkId + std::to_string(deviceInfo_.deviceId);
207 }
208
GetStatus()209 AudioEndpoint::EndpointStatus AudioEndpointInner::GetStatus()
210 {
211 AUDIO_INFO_LOG("AudioEndpoint get status:%{public}s", GetStatusStr(endpointStatus_).c_str());
212 return endpointStatus_.load();
213 }
214
Release()215 void AudioEndpointInner::Release()
216 {
217 // Wait for thread end and then clear other data to avoid using any cleared data in thread.
218 AUDIO_INFO_LOG("%{public}s enter.", __func__);
219 if (!isInited_.load()) {
220 AUDIO_WARNING_LOG("already released");
221 return;
222 }
223
224 isInited_.store(false);
225 workThreadCV_.notify_all();
226 if (endpointWorkThread_.joinable()) {
227 AUDIO_DEBUG_LOG("AudioEndpoint join work thread start");
228 endpointWorkThread_.join();
229 AUDIO_DEBUG_LOG("AudioEndpoint join work thread end");
230 }
231
232 stopUpdateThread_.store(true);
233 updateThreadCV_.notify_all();
234 if (updatePosTimeThread_.joinable()) {
235 AUDIO_DEBUG_LOG("AudioEndpoint join update thread start");
236 updatePosTimeThread_.join();
237 AUDIO_DEBUG_LOG("AudioEndpoint join update thread end");
238 }
239
240 if (fastSink_ != nullptr) {
241 fastSink_->DeInit();
242 fastSink_ = nullptr;
243 }
244
245 if (fastSource_ != nullptr) {
246 fastSource_->DeInit();
247 fastSource_ = nullptr;
248 }
249
250 endpointStatus_.store(INVALID);
251
252 if (dstAudioBuffer_ != nullptr) {
253 AUDIO_INFO_LOG("Set device buffer null");
254 dstAudioBuffer_ = nullptr;
255 }
256 #ifdef DUMP_PROCESS_FILE
257 if (dcp_) {
258 fclose(dcp_);
259 dcp_ = nullptr;
260 }
261 if (dump_hdi_) {
262 fclose(dump_hdi_);
263 dump_hdi_ = nullptr;
264 }
265 #endif
266 }
267
~AudioEndpointInner()268 AudioEndpointInner::~AudioEndpointInner()
269 {
270 if (isInited_.load()) {
271 AudioEndpointInner::Release();
272 }
273 AUDIO_INFO_LOG("~AudioEndpoint()");
274 }
275
Dump(std::stringstream & dumpStringStream)276 void AudioEndpointInner::Dump(std::stringstream &dumpStringStream)
277 {
278 // dump endpoint stream info
279 dumpStringStream << std::endl << "Endpoint stream info:" << std::endl;
280 dumpStringStream << " samplingRate:" << dstStreamInfo_.samplingRate << std::endl;
281 dumpStringStream << " channels:" << dstStreamInfo_.channels << std::endl;
282 dumpStringStream << " format:" << dstStreamInfo_.format << std::endl;
283
284 // dump status info
285 dumpStringStream << " Current endpoint status:" << GetStatusStr(endpointStatus_) << std::endl;
286 if (dstAudioBuffer_ != nullptr) {
287 dumpStringStream << " Currend hdi read position:" << dstAudioBuffer_->GetCurReadFrame() << std::endl;
288 dumpStringStream << " Currend hdi write position:" << dstAudioBuffer_->GetCurWriteFrame() << std::endl;
289 }
290
291 // dump linked process info
292 std::lock_guard<std::mutex> lock(listLock_);
293 dumpStringStream << processBufferList_.size() << " linked process:" << std::endl;
294 for (auto item : processBufferList_) {
295 dumpStringStream << " process read position:" << item->GetCurReadFrame() << std::endl;
296 dumpStringStream << " process write position:" << item->GetCurWriteFrame() << std::endl << std::endl;
297 }
298 dumpStringStream << std::endl;
299 }
300
ConfigInputPoint(const DeviceInfo & deviceInfo)301 bool AudioEndpointInner::ConfigInputPoint(const DeviceInfo &deviceInfo)
302 {
303 AUDIO_INFO_LOG("%{public}s enter.", __func__);
304 IAudioSourceAttr attr = {};
305 attr.sampleRate = dstStreamInfo_.samplingRate;
306 attr.channel = dstStreamInfo_.channels;
307 attr.format = dstStreamInfo_.format;
308 attr.deviceNetworkId = deviceInfo.networkId.c_str();
309
310 if (deviceInfo.networkId == LOCAL_NETWORK_ID) {
311 attr.adapterName = "primary";
312 fastSource_ = FastAudioCapturerSource::GetInstance();
313 } else {
314 attr.adapterName = "remote";
315 fastSource_ = RemoteFastAudioCapturerSource::GetInstance(deviceInfo.networkId);
316 }
317 CHECK_AND_RETURN_RET_LOG(fastSource_ != nullptr, false, "ConfigInputPoint GetInstance failed.");
318
319 int32_t err = fastSource_->Init(attr);
320 if (err != SUCCESS || !fastSource_->IsInited()) {
321 AUDIO_ERR_LOG("%{public}s init remote fast fail, err %{public}d.", __func__, err);
322 fastSource_ = nullptr;
323 return false;
324 }
325 if (PrepareDeviceBuffer(deviceInfo) != SUCCESS) {
326 fastSource_->DeInit();
327 fastSource_ = nullptr;
328 return false;
329 }
330
331 bool ret = writeTimeModel_.ConfigSampleRate(dstStreamInfo_.samplingRate);
332 CHECK_AND_RETURN_RET_LOG(ret != false, false, "Config LinearPosTimeModel failed.");
333
334 endpointStatus_ = UNLINKED;
335 isInited_.store(true);
336 endpointWorkThread_ = std::thread(&AudioEndpointInner::RecordEndpointWorkLoopFuc, this);
337 pthread_setname_np(endpointWorkThread_.native_handle(), "AudioEndpointLoop");
338
339 #ifdef DUMP_PROCESS_FILE
340 dump_hdi_ = fopen("/data/data/server-capture-hdi.pcm", "a+");
341 if (dump_hdi_ == nullptr) {
342 AUDIO_ERR_LOG("Error opening pcm test file!");
343 }
344 #endif
345 return true;
346 }
347
Config(const DeviceInfo & deviceInfo)348 bool AudioEndpointInner::Config(const DeviceInfo &deviceInfo)
349 {
350 AUDIO_INFO_LOG("%{public}s enter, deviceRole %{public}d.", __func__, deviceInfo.deviceRole);
351 deviceInfo_ = deviceInfo;
352 dstStreamInfo_ = deviceInfo.audioStreamInfo;
353 if (deviceInfo.deviceRole == INPUT_DEVICE) {
354 return ConfigInputPoint(deviceInfo);
355 }
356
357 fastSink_ = deviceInfo.networkId == REMOTE_NETWORK_ID ?
358 RemoteFastAudioRendererSink::GetInstance(deviceInfo.networkId) : FastAudioRendererSink::GetInstance();
359 IAudioSinkAttr attr = {};
360 attr.adapterName = "primary";
361 attr.sampleRate = dstStreamInfo_.samplingRate; // 48000hz
362 attr.channel = dstStreamInfo_.channels; // STEREO = 2
363 attr.format = dstStreamInfo_.format; // SAMPLE_S16LE = 1
364 attr.sampleFmt = dstStreamInfo_.format;
365 attr.deviceNetworkId = deviceInfo.networkId.c_str();
366 attr.deviceType = static_cast<int32_t>(deviceInfo.deviceType);
367
368 fastSink_->Init(attr);
369 if (!fastSink_->IsInited()) {
370 fastSink_ = nullptr;
371 return false;
372 }
373 if (PrepareDeviceBuffer(deviceInfo) != SUCCESS) {
374 fastSink_->DeInit();
375 fastSink_ = nullptr;
376 return false;
377 }
378
379 float initVolume = 1.0; // init volume to 1.0
380 fastSink_->SetVolume(initVolume, initVolume);
381
382 bool ret = readTimeModel_.ConfigSampleRate(dstStreamInfo_.samplingRate);
383 CHECK_AND_RETURN_RET_LOG(ret != false, false, "Config LinearPosTimeModel failed.");
384
385 endpointStatus_ = UNLINKED;
386 isInited_.store(true);
387 endpointWorkThread_ = std::thread(&AudioEndpointInner::EndpointWorkLoopFuc, this);
388 pthread_setname_np(endpointWorkThread_.native_handle(), "AudioEndpointLoop");
389
390 // note: add this in record.
391 updatePosTimeThread_ = std::thread(&AudioEndpointInner::AsyncGetPosTime, this);
392 pthread_setname_np(updatePosTimeThread_.native_handle(), "AudioEndpointUpdate");
393
394 #ifdef DUMP_PROCESS_FILE
395 dcp_ = fopen("/data/data/server-read-client.pcm", "a+");
396 dump_hdi_ = fopen("/data/data/server-hdi.pcm", "a+");
397 if (dcp_ == nullptr || dump_hdi_ == nullptr) {
398 AUDIO_ERR_LOG("Error opening pcm test file!");
399 }
400 #endif
401 return true;
402 }
403
GetAdapterBufferInfo(const DeviceInfo & deviceInfo)404 int32_t AudioEndpointInner::GetAdapterBufferInfo(const DeviceInfo &deviceInfo)
405 {
406 int32_t ret = 0;
407 AUDIO_INFO_LOG("%{public}s enter, deviceRole %{public}d.", __func__, deviceInfo.deviceRole);
408 if (deviceInfo.deviceRole == INPUT_DEVICE) {
409 CHECK_AND_RETURN_RET_LOG(fastSource_ != nullptr, ERR_INVALID_HANDLE,
410 "%{public}s fast source is null.", __func__);
411 ret = fastSource_->GetMmapBufferInfo(dstBufferFd_, dstTotalSizeInframe_, dstSpanSizeInframe_,
412 dstByteSizePerFrame_);
413 } else {
414 CHECK_AND_RETURN_RET_LOG(fastSink_ != nullptr, ERR_INVALID_HANDLE, "%{public}s fast sink is null.", __func__);
415 ret = fastSink_->GetMmapBufferInfo(dstBufferFd_, dstTotalSizeInframe_, dstSpanSizeInframe_,
416 dstByteSizePerFrame_);
417 }
418
419 if (ret != SUCCESS || dstBufferFd_ == -1 || dstTotalSizeInframe_ == 0 || dstSpanSizeInframe_ == 0 ||
420 dstByteSizePerFrame_ == 0) {
421 AUDIO_ERR_LOG("%{public}s get mmap buffer info fail, ret %{public}d, dstBufferFd %{public}d, \
422 dstTotalSizeInframe %{public}d, dstSpanSizeInframe %{public}d, dstByteSizePerFrame %{public}d.",
423 __func__, ret, dstBufferFd_, dstTotalSizeInframe_, dstSpanSizeInframe_, dstByteSizePerFrame_);
424 return ERR_ILLEGAL_STATE;
425 }
426 AUDIO_DEBUG_LOG("%{public}s end, fd %{public}d.", __func__, dstBufferFd_);
427 return SUCCESS;
428 }
429
PrepareDeviceBuffer(const DeviceInfo & deviceInfo)430 int32_t AudioEndpointInner::PrepareDeviceBuffer(const DeviceInfo &deviceInfo)
431 {
432 AUDIO_INFO_LOG("%{public}s enter, deviceRole %{public}d.", __func__, deviceInfo.deviceRole);
433 if (dstAudioBuffer_ != nullptr) {
434 AUDIO_INFO_LOG("%{public}s endpoint buffer is preapred, fd:%{public}d", __func__, dstBufferFd_);
435 return SUCCESS;
436 }
437
438 int32_t ret = GetAdapterBufferInfo(deviceInfo);
439 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ERR_OPERATION_FAILED,
440 "%{public}s get adapter buffer Info fail, ret %{public}d.", __func__, ret);
441
442 // spanDuration_ may be less than the correct time of dstSpanSizeInframe_.
443 spanDuration_ = dstSpanSizeInframe_ * AUDIO_NS_PER_SECOND / dstStreamInfo_.samplingRate;
444 int64_t temp = spanDuration_ / 5 * 3; // 3/5 spanDuration
445 serverAheadReadTime_ = temp < ONE_MILLISECOND_DURATION ? ONE_MILLISECOND_DURATION : temp; // at least 1ms ahead.
446 AUDIO_DEBUG_LOG("%{public}s spanDuration %{public}" PRIu64" ns, serverAheadReadTime %{public}" PRIu64" ns.",
447 __func__, spanDuration_, serverAheadReadTime_);
448
449 if (spanDuration_ <= 0 || spanDuration_ >= MAX_SPAN_DURATION_IN_NANO) {
450 AUDIO_ERR_LOG("%{public}s mmap span info error, spanDuration %{public}" PRIu64".", __func__, spanDuration_);
451 return ERR_INVALID_PARAM;
452 }
453 dstAudioBuffer_ = OHAudioBuffer::CreateFromRemote(dstTotalSizeInframe_, dstSpanSizeInframe_, dstByteSizePerFrame_,
454 dstBufferFd_);
455 if (dstAudioBuffer_ == nullptr || dstAudioBuffer_->GetBufferHolder() != AudioBufferHolder::AUDIO_SERVER_ONLY) {
456 AUDIO_ERR_LOG("%{public}s create buffer from remote fail.", __func__);
457 return ERR_ILLEGAL_STATE;
458 }
459 dstAudioBuffer_->GetStreamStatus()->store(StreamStatus::STREAM_IDEL);
460
461 // clear data buffer
462 ret = memset_s(dstAudioBuffer_->GetDataBase(), dstAudioBuffer_->GetDataSize(), 0, dstAudioBuffer_->GetDataSize());
463 CHECK_AND_BREAK_LOG(ret == EOK, "%{public}s memset buffer fail, ret %{public}d, fd %{public}d.",
464 __func__, ret, dstBufferFd_);
465 InitAudiobuffer(true);
466
467 AUDIO_DEBUG_LOG("%{public}s end, fd %{public}d.", __func__, dstBufferFd_);
468 return SUCCESS;
469 }
470
InitAudiobuffer(bool resetReadWritePos)471 void AudioEndpointInner::InitAudiobuffer(bool resetReadWritePos)
472 {
473 CHECK_AND_RETURN_LOG((dstAudioBuffer_ != nullptr), "%{public}s: dst audio buffer is null.", __func__);
474 if (resetReadWritePos) {
475 dstAudioBuffer_->ResetCurReadWritePos(0, 0);
476 }
477
478 uint32_t spanCount = dstAudioBuffer_->GetSpanCount();
479 for (uint32_t i = 0; i < spanCount; i++) {
480 SpanInfo *spanInfo = dstAudioBuffer_->GetSpanInfoByIndex(i);
481 if (spanInfo == nullptr) {
482 AUDIO_ERR_LOG("InitAudiobuffer failed.");
483 return;
484 }
485 if (deviceInfo_.deviceRole == INPUT_DEVICE) {
486 spanInfo->spanStatus = SPAN_WRITE_DONE;
487 } else {
488 spanInfo->spanStatus = SPAN_READ_DONE;
489 }
490 spanInfo->offsetInFrame = 0;
491
492 spanInfo->readStartTime = 0;
493 spanInfo->readDoneTime = 0;
494
495 spanInfo->writeStartTime = 0;
496 spanInfo->writeDoneTime = 0;
497
498 spanInfo->volumeStart = 1 << VOLUME_SHIFT_NUMBER; // 65536 for initialize
499 spanInfo->volumeEnd = 1 << VOLUME_SHIFT_NUMBER; // 65536 for initialize
500 spanInfo->isMute = false;
501 }
502 return;
503 }
504
GetPreferBufferInfo(uint32_t & totalSizeInframe,uint32_t & spanSizeInframe)505 int32_t AudioEndpointInner::GetPreferBufferInfo(uint32_t &totalSizeInframe, uint32_t &spanSizeInframe)
506 {
507 totalSizeInframe = dstTotalSizeInframe_;
508 spanSizeInframe = dstSpanSizeInframe_;
509 return SUCCESS;
510 }
511
IsAnyProcessRunning()512 bool AudioEndpointInner::IsAnyProcessRunning()
513 {
514 std::lock_guard<std::mutex> lock(listLock_);
515 bool isRunning = false;
516 for (size_t i = 0; i < processBufferList_.size(); i++) {
517 if (processBufferList_[i]->GetStreamStatus()->load() == STREAM_RUNNING) {
518 isRunning = true;
519 break;
520 }
521 }
522 return isRunning;
523 }
524
RecordReSyncPosition()525 void AudioEndpointInner::RecordReSyncPosition()
526 {
527 AUDIO_INFO_LOG("%{public}s enter.", __func__);
528 uint64_t curHdiWritePos = 0;
529 int64_t writeTime = 0;
530 CHECK_AND_RETURN_LOG(GetDeviceHandleInfo(curHdiWritePos, writeTime),
531 "%{public}s get device handle info fail.", __func__);
532 AUDIO_DEBUG_LOG("%{public}s get capturer info, curHdiWritePos %{public}" PRIu64", writeTime %{public}" PRId64".",
533 __func__, curHdiWritePos, writeTime);
534 int64_t temp = ClockTime::GetCurNano() - writeTime;
535 if (temp > spanDuration_) {
536 AUDIO_WARNING_LOG("%{public}s GetDeviceHandleInfo cost long time %{public}" PRIu64".", __func__, temp);
537 }
538
539 writeTimeModel_.ResetFrameStamp(curHdiWritePos, writeTime);
540 uint64_t nextDstReadPos = curHdiWritePos;
541 uint64_t nextDstWritePos = curHdiWritePos;
542 InitAudiobuffer(false);
543 int32_t ret = dstAudioBuffer_->ResetCurReadWritePos(nextDstReadPos, nextDstWritePos);
544 if (ret != SUCCESS) {
545 AUDIO_ERR_LOG("ResetCurReadWritePos failed.");
546 return;
547 }
548
549 SpanInfo *nextReadSapn = dstAudioBuffer_->GetSpanInfo(nextDstReadPos);
550 CHECK_AND_RETURN_LOG(nextReadSapn != nullptr, "GetSpanInfo failed.");
551 nextReadSapn->offsetInFrame = nextDstReadPos;
552 nextReadSapn->spanStatus = SpanStatus::SPAN_WRITE_DONE;
553 }
554
ReSyncPosition()555 void AudioEndpointInner::ReSyncPosition()
556 {
557 Trace loopTrace("AudioEndpoint::ReSyncPosition");
558 uint64_t curHdiReadPos = 0;
559 int64_t readTime = 0;
560 if (!GetDeviceHandleInfo(curHdiReadPos, readTime)) {
561 AUDIO_ERR_LOG("ReSyncPosition call GetDeviceHandleInfo failed.");
562 return;
563 }
564 int64_t curTime = ClockTime::GetCurNano();
565 int64_t temp = curTime - readTime;
566 if (temp > spanDuration_) {
567 AUDIO_ERR_LOG("GetDeviceHandleInfo may cost long time.");
568 }
569
570 readTimeModel_.ResetFrameStamp(curHdiReadPos, readTime);
571 uint64_t nextDstWritePos = curHdiReadPos + dstSpanSizeInframe_;
572 InitAudiobuffer(false);
573 int32_t ret = dstAudioBuffer_->ResetCurReadWritePos(nextDstWritePos, nextDstWritePos);
574 if (ret != SUCCESS) {
575 AUDIO_ERR_LOG("ResetCurReadWritePos failed.");
576 return;
577 }
578
579 SpanInfo *nextWriteSapn = dstAudioBuffer_->GetSpanInfo(nextDstWritePos);
580 CHECK_AND_RETURN_LOG(nextWriteSapn != nullptr, "GetSpanInfo failed.");
581 nextWriteSapn->offsetInFrame = nextDstWritePos;
582 nextWriteSapn->spanStatus = SpanStatus::SPAN_READ_DONE;
583 return;
584 }
585
StartDevice()586 bool AudioEndpointInner::StartDevice()
587 {
588 AUDIO_INFO_LOG("%{public}s enter.", __func__);
589 // how to modify the status while unlinked and started?
590 if (endpointStatus_ != IDEL) {
591 AUDIO_ERR_LOG("Endpoint status is not IDEL");
592 return false;
593 }
594 endpointStatus_ = STARTING;
595 if (deviceInfo_.deviceRole == INPUT_DEVICE) {
596 if (fastSource_ == nullptr || fastSource_->Start() != SUCCESS) {
597 AUDIO_ERR_LOG("Source start failed.");
598 return false;
599 }
600 } else {
601 if (fastSink_ == nullptr || fastSink_->Start() != SUCCESS) {
602 AUDIO_ERR_LOG("Sink start failed.");
603 return false;
604 }
605 }
606
607 std::unique_lock<std::mutex> lock(loopThreadLock_);
608 needReSyncPosition_ = true;
609 endpointStatus_ = IsAnyProcessRunning() ? RUNNING : IDEL;
610 workThreadCV_.notify_all();
611 AUDIO_DEBUG_LOG("StartDevice out, status is %{public}s", GetStatusStr(endpointStatus_).c_str());
612 return true;
613 }
614
StopDevice()615 bool AudioEndpointInner::StopDevice()
616 {
617 AUDIO_INFO_LOG("StopDevice with status:%{public}s", GetStatusStr(endpointStatus_).c_str());
618 // todo
619 endpointStatus_ = STOPPING;
620 // Clear data buffer to avoid noise in some case.
621 if (dstAudioBuffer_ != nullptr) {
622 int32_t ret = memset_s(dstAudioBuffer_->GetDataBase(), dstAudioBuffer_->GetDataSize(), 0,
623 dstAudioBuffer_->GetDataSize());
624 AUDIO_INFO_LOG("StopDevice clear buffer ret:%{public}d", ret);
625 }
626 if (deviceInfo_.deviceRole == INPUT_DEVICE) {
627 if (fastSource_ == nullptr || fastSource_->Stop() != SUCCESS) {
628 AUDIO_ERR_LOG("Source stop failed.");
629 return false;
630 }
631 } else {
632 if (fastSink_ == nullptr || fastSink_->Stop() != SUCCESS) {
633 AUDIO_ERR_LOG("Sink stop failed.");
634 return false;
635 }
636 }
637 endpointStatus_ = STOPPED;
638 return true;
639 }
640
OnStart(IAudioProcessStream * processStream)641 int32_t AudioEndpointInner::OnStart(IAudioProcessStream *processStream)
642 {
643 AUDIO_INFO_LOG("OnStart endpoint status:%{public}s", GetStatusStr(endpointStatus_).c_str());
644 if (endpointStatus_ == RUNNING) {
645 AUDIO_INFO_LOG("OnStart find endpoint already in RUNNING.");
646 return SUCCESS;
647 }
648 if (endpointStatus_ == IDEL && !isDeviceRunningInIdel_) {
649 // call sink start
650 StartDevice();
651 endpointStatus_ = RUNNING;
652 }
653 return SUCCESS;
654 }
655
OnPause(IAudioProcessStream * processStream)656 int32_t AudioEndpointInner::OnPause(IAudioProcessStream *processStream)
657 {
658 AUDIO_INFO_LOG("OnPause endpoint status:%{public}s", GetStatusStr(endpointStatus_).c_str());
659 if (endpointStatus_ == RUNNING) {
660 endpointStatus_ = IsAnyProcessRunning() ? RUNNING : IDEL;
661 }
662 if (endpointStatus_ == IDEL && !isDeviceRunningInIdel_) {
663 // call sink stop when no process running?
664 AUDIO_INFO_LOG("OnPause status is IDEL, call stop");
665 }
666 // todo
667 return SUCCESS;
668 }
669
GetProcLastWriteDoneInfo(const std::shared_ptr<OHAudioBuffer> processBuffer,uint64_t curWriteFrame,uint64_t & proHandleFrame,int64_t & proHandleTime)670 int32_t AudioEndpointInner::GetProcLastWriteDoneInfo(const std::shared_ptr<OHAudioBuffer> processBuffer,
671 uint64_t curWriteFrame, uint64_t &proHandleFrame, int64_t &proHandleTime)
672 {
673 CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_INVALID_HANDLE, "Process found but buffer is null");
674 uint64_t curReadFrame = processBuffer->GetCurReadFrame();
675 SpanInfo *curWriteSpan = processBuffer->GetSpanInfo(curWriteFrame);
676 CHECK_AND_RETURN_RET_LOG(curWriteSpan != nullptr, ERR_INVALID_HANDLE,
677 "%{public}s curWriteSpan of curWriteFrame %{public}" PRIu64" is null", __func__, curWriteFrame);
678 if (curWriteSpan->spanStatus == SpanStatus::SPAN_WRITE_DONE || curWriteFrame < dstSpanSizeInframe_ ||
679 curWriteFrame < curReadFrame) {
680 proHandleFrame = curWriteFrame;
681 proHandleTime = curWriteSpan->writeDoneTime;
682 } else {
683 int32_t ret = GetProcLastWriteDoneInfo(processBuffer, curWriteFrame - dstSpanSizeInframe_,
684 proHandleFrame, proHandleTime);
685 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret,
686 "%{public}s get process last write done info fail, ret %{public}d.", __func__, ret);
687 }
688
689 AUDIO_INFO_LOG("%{public}s end, curWriteFrame %{public}" PRIu64", proHandleFrame %{public}" PRIu64", "
690 "proHandleTime %{public}" PRId64".", __func__, curWriteFrame, proHandleFrame, proHandleTime);
691 return SUCCESS;
692 }
693
OnUpdateHandleInfo(IAudioProcessStream * processStream)694 int32_t AudioEndpointInner::OnUpdateHandleInfo(IAudioProcessStream *processStream)
695 {
696 Trace trace("AudioEndpoint::OnUpdateHandleInfo");
697 bool isFind = false;
698 std::lock_guard<std::mutex> lock(listLock_);
699 auto processItr = processList_.begin();
700 while (processItr != processList_.end()) {
701 if (*processItr != processStream) {
702 processItr++;
703 continue;
704 }
705 std::shared_ptr<OHAudioBuffer> processBuffer = (*processItr)->GetStreamBuffer();
706 CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_OPERATION_FAILED, "Process found but buffer is null");
707 uint64_t proHandleFrame = 0;
708 int64_t proHandleTime = 0;
709 if (deviceInfo_.deviceRole == INPUT_DEVICE) {
710 uint64_t curWriteFrame = processBuffer->GetCurWriteFrame();
711 int32_t ret = GetProcLastWriteDoneInfo(processBuffer, curWriteFrame, proHandleFrame, proHandleTime);
712 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret,
713 "%{public}s get process last write done info fail, ret %{public}d.", __func__, ret);
714 processBuffer->SetHandleInfo(proHandleFrame, proHandleTime);
715 } else {
716 // For output device, handle info is updated in CheckAllBufferReady
717 processBuffer->GetHandleInfo(proHandleFrame, proHandleTime);
718 }
719 AUDIO_INFO_LOG("OnUpdateHandleInfo set process handle pos[%{public}" PRIu64"] time [%{public}" PRId64"], "
720 "deviceRole %{public}d.", proHandleFrame, proHandleTime, deviceInfo_.deviceRole);
721 isFind = true;
722 break;
723 }
724 if (!isFind) {
725 AUDIO_ERR_LOG("Can not find any process to UpdateHandleInfo");
726 return ERR_OPERATION_FAILED;
727 }
728 return SUCCESS;
729 }
730
LinkProcessStream(IAudioProcessStream * processStream)731 int32_t AudioEndpointInner::LinkProcessStream(IAudioProcessStream *processStream)
732 {
733 CHECK_AND_RETURN_RET_LOG(processStream != nullptr, ERR_INVALID_PARAM, "IAudioProcessStream is null");
734 std::shared_ptr<OHAudioBuffer> processBuffer = processStream->GetStreamBuffer();
735 CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_INVALID_PARAM, "processBuffer is null");
736
737 CHECK_AND_RETURN_RET_LOG(processList_.size() < MAX_LINKED_PROCESS, ERR_OPERATION_FAILED, "reach link limit.");
738
739 AUDIO_INFO_LOG("LinkProcessStream endpoint status:%{public}s.", GetStatusStr(endpointStatus_).c_str());
740
741 bool needEndpointRunning = processBuffer->GetStreamStatus()->load() == STREAM_RUNNING;
742
743 if (endpointStatus_ == STARTING) {
744 AUDIO_INFO_LOG("LinkProcessStream wait start begin.");
745 std::unique_lock<std::mutex> lock(loopThreadLock_);
746 workThreadCV_.wait(lock, [this] {
747 return endpointStatus_ != STARTING;
748 });
749 AUDIO_DEBUG_LOG("LinkProcessStream wait start end.");
750 }
751
752 if (endpointStatus_ == RUNNING) {
753 std::lock_guard<std::mutex> lock(listLock_);
754 processList_.push_back(processStream);
755 processBufferList_.push_back(processBuffer);
756 AUDIO_DEBUG_LOG("LinkProcessStream success.");
757 return SUCCESS;
758 }
759
760 if (endpointStatus_ == UNLINKED) {
761 endpointStatus_ = IDEL; // handle push_back in IDEL
762 if (isDeviceRunningInIdel_) {
763 StartDevice();
764 }
765 }
766
767 if (endpointStatus_ == IDEL) {
768 {
769 std::lock_guard<std::mutex> lock(listLock_);
770 processList_.push_back(processStream);
771 processBufferList_.push_back(processBuffer);
772 }
773 if (!needEndpointRunning) {
774 AUDIO_DEBUG_LOG("LinkProcessStream success, process stream status is not running.");
775 return SUCCESS;
776 }
777 // needEndpointRunning = true
778 if (isDeviceRunningInIdel_) {
779 endpointStatus_ = IsAnyProcessRunning() ? RUNNING : IDEL;
780 } else {
781 // needEndpointRunning = true & isDeviceRunningInIdel_ = false
782 // KeepWorkloopRunning will wait on IDEL
783 StartDevice();
784 }
785 AUDIO_DEBUG_LOG("LinkProcessStream success.");
786 return SUCCESS;
787 }
788
789 return SUCCESS;
790 }
791
UnlinkProcessStream(IAudioProcessStream * processStream)792 int32_t AudioEndpointInner::UnlinkProcessStream(IAudioProcessStream *processStream)
793 {
794 AUDIO_INFO_LOG("UnlinkProcessStream in status:%{public}s.", GetStatusStr(endpointStatus_).c_str());
795 CHECK_AND_RETURN_RET_LOG(processStream != nullptr, ERR_INVALID_PARAM, "IAudioProcessStream is null");
796 std::shared_ptr<OHAudioBuffer> processBuffer = processStream->GetStreamBuffer();
797 CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_INVALID_PARAM, "processBuffer is null");
798
799 bool isFind = false;
800 std::lock_guard<std::mutex> lock(listLock_);
801 auto processItr = processList_.begin();
802 auto bufferItr = processBufferList_.begin();
803 while (processItr != processList_.end()) {
804 if (*processItr == processStream && *bufferItr == processBuffer) {
805 processList_.erase(processItr);
806 processBufferList_.erase(bufferItr);
807 isFind = true;
808 break;
809 } else {
810 processItr++;
811 bufferItr++;
812 }
813 }
814 if (processList_.size() == 0) {
815 StopDevice();
816 endpointStatus_ = UNLINKED;
817 }
818
819 AUDIO_DEBUG_LOG("UnlinkProcessStream end, %{public}s the process.", (isFind ? "find and remove" : "not find"));
820 return SUCCESS;
821 }
822
CheckAllBufferReady(int64_t checkTime,uint64_t curWritePos)823 bool AudioEndpointInner::CheckAllBufferReady(int64_t checkTime, uint64_t curWritePos)
824 {
825 bool isAllReady = true;
826 {
827 // lock list without sleep
828 std::lock_guard<std::mutex> lock(listLock_);
829 for (size_t i = 0; i < processBufferList_.size(); i++) {
830 std::shared_ptr<OHAudioBuffer> tempBuffer = processBufferList_[i];
831 uint64_t eachCurReadPos = processBufferList_[i]->GetCurReadFrame();
832 lastHandleProcessTime_ = checkTime;
833 processBufferList_[i]->SetHandleInfo(eachCurReadPos, lastHandleProcessTime_); // update handle info
834 if (tempBuffer->GetStreamStatus()->load() != StreamStatus::STREAM_RUNNING) {
835 // Process is not running, server will continue to check the same location in the next cycle.
836 int64_t duration = 5000000; // 5ms
837 processBufferList_[i]->SetHandleInfo(eachCurReadPos, lastHandleProcessTime_ + duration);
838 continue; // process not running
839 }
840 uint64_t curRead = tempBuffer->GetCurReadFrame();
841 SpanInfo *curReadSpan = tempBuffer->GetSpanInfo(curRead);
842 if (curReadSpan == nullptr || curReadSpan->spanStatus != SpanStatus::SPAN_WRITE_DONE) {
843 AUDIO_WARNING_LOG("Find one process not ready"); // print uid of the process?
844 isAllReady = false;
845 break;
846 }
847 }
848 }
849
850 if (!isAllReady) {
851 Trace trace("AudioEndpoint::WaitAllProcessReady");
852 int64_t tempWakeupTime = readTimeModel_.GetTimeOfPos(curWritePos) + WRITE_TO_HDI_AHEAD_TIME;
853 if (tempWakeupTime - ClockTime::GetCurNano() < ONE_MILLISECOND_DURATION) {
854 ClockTime::RelativeSleep(ONE_MILLISECOND_DURATION);
855 } else {
856 ClockTime::AbsoluteSleep(tempWakeupTime); // sleep to hdi read time ahead 1ms.
857 }
858 }
859 return isAllReady;
860 }
861
ProcessData(const std::vector<AudioStreamData> & srcDataList,const AudioStreamData & dstData)862 void AudioEndpointInner::ProcessData(const std::vector<AudioStreamData> &srcDataList, const AudioStreamData &dstData)
863 {
864 size_t srcListSize = srcDataList.size();
865
866 for (size_t i = 0; i < srcListSize; i++) {
867 if (srcDataList[i].streamInfo.format != SAMPLE_S16LE || srcDataList[i].streamInfo.channels != STEREO ||
868 srcDataList[i].bufferDesc.bufLength != dstData.bufferDesc.bufLength ||
869 srcDataList[i].bufferDesc.dataLength != dstData.bufferDesc.dataLength) {
870 AUDIO_ERR_LOG("ProcessData failed, streamInfo are different");
871 return;
872 }
873 }
874
875 // Assum using the same format and same size
876 if (dstData.streamInfo.format != SAMPLE_S16LE || dstData.streamInfo.channels != STEREO) {
877 AUDIO_ERR_LOG("ProcessData failed, streamInfo are not support");
878 return;
879 }
880
881 size_t dataLength = dstData.bufferDesc.dataLength;
882 dataLength /= 2; // SAMPLE_S16LE--> 2 byte
883 int16_t *dstPtr = reinterpret_cast<int16_t *>(dstData.bufferDesc.buffer);
884 for (size_t offset = 0; dataLength > 0; dataLength--) {
885 int32_t sum = 0;
886 for (size_t i = 0; i < srcListSize; i++) {
887 int32_t vol = srcDataList[i].volumeStart; // change to modify volume of each channel
888 int16_t *srcPtr = reinterpret_cast<int16_t *>(srcDataList[i].bufferDesc.buffer) + offset;
889 sum += (*srcPtr * static_cast<int64_t>(vol)) >> VOLUME_SHIFT_NUMBER; // 1/65536
890 }
891 offset++;
892 *dstPtr++ = sum > INT16_MAX ? INT16_MAX : (sum < INT16_MIN ? INT16_MIN : sum);
893 }
894 }
895
896 // call with listLock_ hold
GetAllReadyProcessData(std::vector<AudioStreamData> & audioDataList)897 void AudioEndpointInner::GetAllReadyProcessData(std::vector<AudioStreamData> &audioDataList)
898 {
899 for (size_t i = 0; i < processBufferList_.size(); i++) {
900 uint64_t curRead = processBufferList_[i]->GetCurReadFrame();
901 Trace trace("AudioEndpoint::ReadProcessData->" + std::to_string(curRead));
902 SpanInfo *curReadSpan = processBufferList_[i]->GetSpanInfo(curRead);
903 if (curReadSpan == nullptr) {
904 AUDIO_ERR_LOG("GetSpanInfo failed, can not get client curReadSpan");
905 continue;
906 }
907 AudioStreamData streamData;
908 Volume vol = {true, 1.0f, 0};
909 AudioStreamType streamType = processList_[i]->GetAudioStreamType();
910 DeviceType deviceType = PolicyHandler::GetInstance().GetActiveOutPutDevice();
911 if (deviceInfo_.networkId == LOCAL_NETWORK_ID &&
912 PolicyHandler::GetInstance().GetSharedVolume(streamType, deviceType, vol)) {
913 streamData.volumeStart = vol.isMute ? 0 : static_cast<int32_t>(curReadSpan->volumeStart * vol.volumeFloat);
914 } else {
915 streamData.volumeStart = curReadSpan->volumeStart;
916 }
917 streamData.volumeEnd = curReadSpan->volumeEnd;
918 streamData.streamInfo = processList_[i]->GetStreamInfo();
919 SpanStatus targetStatus = SpanStatus::SPAN_WRITE_DONE;
920 if (curReadSpan->spanStatus.compare_exchange_strong(targetStatus, SpanStatus::SPAN_READING)) {
921 processBufferList_[i]->GetReadbuffer(curRead, streamData.bufferDesc); // check return?
922 audioDataList.push_back(streamData);
923 curReadSpan->readStartTime = ClockTime::GetCurNano();
924 #ifdef DUMP_PROCESS_FILE
925 if (dcp_ != nullptr) {
926 fwrite(static_cast<void *>(streamData.bufferDesc.buffer), 1, streamData.bufferDesc.bufLength, dcp_);
927 }
928 #endif
929 }
930 }
931 }
932
ProcessToEndpointDataHandle(uint64_t curWritePos)933 bool AudioEndpointInner::ProcessToEndpointDataHandle(uint64_t curWritePos)
934 {
935 std::lock_guard<std::mutex> lock(listLock_);
936
937 std::vector<AudioStreamData> audioDataList;
938 GetAllReadyProcessData(audioDataList);
939
940 AudioStreamData dstStreamData;
941 dstStreamData.streamInfo = dstStreamInfo_;
942 int32_t ret = dstAudioBuffer_->GetWriteBuffer(curWritePos, dstStreamData.bufferDesc);
943 CHECK_AND_RETURN_RET_LOG(((ret == SUCCESS && dstStreamData.bufferDesc.buffer != nullptr)), false,
944 "GetWriteBuffer failed, ret:%{public}d", ret);
945
946 SpanInfo *curWriteSpan = dstAudioBuffer_->GetSpanInfo(curWritePos);
947 CHECK_AND_RETURN_RET_LOG(curWriteSpan != nullptr, false, "GetSpanInfo failed, can not get curWriteSpan");
948
949 dstStreamData.volumeStart = curWriteSpan->volumeStart;
950 dstStreamData.volumeEnd = curWriteSpan->volumeEnd;
951
952 Trace trace("AudioEndpoint::WriteDstBuffer=>" + std::to_string(curWritePos));
953 // do write work
954 if (audioDataList.size() == 0) {
955 memset_s(dstStreamData.bufferDesc.buffer, dstStreamData.bufferDesc.bufLength, 0,
956 dstStreamData.bufferDesc.bufLength);
957 } else {
958 ProcessData(audioDataList, dstStreamData);
959 }
960
961 #ifdef DUMP_PROCESS_FILE
962 if (dump_hdi_ != nullptr) {
963 fwrite(static_cast<void *>(dstStreamData.bufferDesc.buffer), 1, dstStreamData.bufferDesc.bufLength, dump_hdi_);
964 }
965 #endif
966 return true;
967 }
968
GetPredictNextReadTime(uint64_t posInFrame)969 int64_t AudioEndpointInner::GetPredictNextReadTime(uint64_t posInFrame)
970 {
971 Trace trace("AudioEndpoint::GetPredictNextRead");
972 uint64_t handleSpanCnt = posInFrame / dstSpanSizeInframe_;
973 uint32_t startPeriodCnt = 20; // sync each time when start
974 uint32_t oneBigPeriodCnt = 40; // 200ms
975 if (handleSpanCnt < startPeriodCnt || handleSpanCnt % oneBigPeriodCnt == 0) {
976 updateThreadCV_.notify_all();
977 }
978 uint64_t readFrame = 0;
979 int64_t readtime = 0;
980 if (readTimeModel_.GetFrameStamp(readFrame, readtime)) {
981 if (readFrame != posInFrame_) {
982 readTimeModel_.UpdataFrameStamp(posInFrame_, timeInNano_);
983 }
984 }
985
986 int64_t nextHdiReadTime = readTimeModel_.GetTimeOfPos(posInFrame);
987 return nextHdiReadTime;
988 }
989
GetPredictNextWriteTime(uint64_t posInFrame)990 int64_t AudioEndpointInner::GetPredictNextWriteTime(uint64_t posInFrame)
991 {
992 uint64_t handleSpanCnt = posInFrame / dstSpanSizeInframe_;
993 uint32_t startPeriodCnt = 20;
994 uint32_t oneBigPeriodCnt = 40;
995 if (handleSpanCnt < startPeriodCnt || handleSpanCnt % oneBigPeriodCnt == 0) {
996 // todo sleep random little time but less than nextHdiReadTime - 2ms
997 uint64_t writeFrame = 0;
998 int64_t writeTime = 0;
999 if (GetDeviceHandleInfo(writeFrame, writeTime)) {
1000 writeTimeModel_.UpdataFrameStamp(writeFrame, writeTime);
1001 }
1002 }
1003 int64_t nextHdiWriteTime = writeTimeModel_.GetTimeOfPos(posInFrame);
1004 return nextHdiWriteTime;
1005 }
1006
RecordPrepareNextLoop(uint64_t curReadPos,int64_t & wakeUpTime)1007 bool AudioEndpointInner::RecordPrepareNextLoop(uint64_t curReadPos, int64_t &wakeUpTime)
1008 {
1009 uint64_t nextHandlePos = curReadPos + dstSpanSizeInframe_;
1010 int64_t nextHdiWriteTime = GetPredictNextWriteTime(nextHandlePos);
1011 int64_t tempDelay = 12000000; // 12ms
1012 wakeUpTime = nextHdiWriteTime + tempDelay;
1013
1014 int32_t ret = dstAudioBuffer_->SetCurWriteFrame(nextHandlePos);
1015 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, false, "%{public}s set dst buffer write frame fail, ret %{public}d.",
1016 __func__, ret);
1017 ret = dstAudioBuffer_->SetCurReadFrame(nextHandlePos);
1018 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, false, "%{public}s set dst buffer read frame fail, ret %{public}d.",
1019 __func__, ret);
1020
1021 return true;
1022 }
1023
PrepareNextLoop(uint64_t curWritePos,int64_t & wakeUpTime)1024 bool AudioEndpointInner::PrepareNextLoop(uint64_t curWritePos, int64_t &wakeUpTime)
1025 {
1026 uint64_t nextHandlePos = curWritePos + dstSpanSizeInframe_;
1027 Trace prepareTrace("AudioEndpoint::PrepareNextLoop " + std::to_string(nextHandlePos));
1028 int64_t nextHdiReadTime = GetPredictNextReadTime(nextHandlePos);
1029 wakeUpTime = nextHdiReadTime - serverAheadReadTime_;
1030
1031 SpanInfo *nextWriteSpan = dstAudioBuffer_->GetSpanInfo(nextHandlePos);
1032 if (nextWriteSpan == nullptr) {
1033 AUDIO_ERR_LOG("GetSpanInfo failed, can not get next write span");
1034 return false;
1035 }
1036
1037 int32_t ret1 = dstAudioBuffer_->SetCurWriteFrame(nextHandlePos);
1038 int32_t ret2 = dstAudioBuffer_->SetCurReadFrame(nextHandlePos);
1039 if (ret1 != SUCCESS || ret2 != SUCCESS) {
1040 AUDIO_ERR_LOG("SetCurWriteFrame or SetCurReadFrame failed, ret1:%{public}d ret2:%{public}d", ret1, ret2);
1041 return false;
1042 }
1043 // handl each process buffer info
1044 int64_t curReadDoneTime = ClockTime::GetCurNano();
1045 for (size_t i = 0; i < processBufferList_.size(); i++) {
1046 uint64_t eachCurReadPos = processBufferList_[i]->GetCurReadFrame();
1047 SpanInfo *tempSpan = processBufferList_[i]->GetSpanInfo(eachCurReadPos);
1048 if (tempSpan == nullptr) {
1049 AUDIO_ERR_LOG("GetSpanInfo failed, can not get process read span");
1050 return false;
1051 }
1052 SpanStatus targetStatus = SpanStatus::SPAN_READING;
1053 if (tempSpan->spanStatus.compare_exchange_strong(targetStatus, SpanStatus::SPAN_READ_DONE)) {
1054 tempSpan->readDoneTime = curReadDoneTime;
1055 BufferDesc bufferReadDone = { nullptr, 0, 0};
1056 processBufferList_[i]->GetReadbuffer(eachCurReadPos, bufferReadDone);
1057 if (bufferReadDone.buffer != nullptr && bufferReadDone.bufLength != 0) {
1058 memset_s(bufferReadDone.buffer, bufferReadDone.bufLength, 0, bufferReadDone.bufLength);
1059 }
1060 processBufferList_[i]->SetCurReadFrame(eachCurReadPos + dstSpanSizeInframe_); // use client span size
1061 } else if (processBufferList_[i]->GetStreamStatus()->load() == StreamStatus::STREAM_RUNNING) {
1062 AUDIO_WARNING_LOG("Current %{public}" PRIu64" span not ready:%{public}d", eachCurReadPos, targetStatus);
1063 }
1064 }
1065 return true;
1066 }
1067
GetDeviceHandleInfo(uint64_t & frames,int64_t & nanoTime)1068 bool AudioEndpointInner::GetDeviceHandleInfo(uint64_t &frames, int64_t &nanoTime)
1069 {
1070 Trace trace("AudioEndpoint::GetMmapHandlePosition");
1071 int64_t timeSec = 0;
1072 int64_t timeNanoSec = 0;
1073 int32_t ret = 0;
1074 if (deviceInfo_.deviceRole == INPUT_DEVICE) {
1075 if (fastSource_ == nullptr || !fastSource_->IsInited()) {
1076 AUDIO_ERR_LOG("Source start failed.");
1077 return false;
1078 }
1079 // GetMmapHandlePosition will call using ipc.
1080 ret = fastSource_->GetMmapHandlePosition(frames, timeSec, timeNanoSec);
1081 } else {
1082 if (fastSink_ == nullptr || !fastSink_->IsInited()) {
1083 AUDIO_ERR_LOG("GetDeviceHandleInfo failed: sink is not inited.");
1084 return false;
1085 }
1086 // GetMmapHandlePosition will call using ipc.
1087 ret = fastSink_->GetMmapHandlePosition(frames, timeSec, timeNanoSec);
1088 }
1089 if (ret != SUCCESS) {
1090 AUDIO_ERR_LOG("Call adapter GetMmapHandlePosition failed: %{public}d", ret);
1091 return false;
1092 }
1093 trace.End();
1094 nanoTime = timeNanoSec + timeSec * AUDIO_NS_PER_SECOND;
1095 Trace infoTrace("AudioEndpoint::GetDeviceHandleInfo frames=>" + std::to_string(frames) + " " +
1096 std::to_string(nanoTime) + " at " + std::to_string(ClockTime::GetCurNano()));
1097 nanoTime += DELTA_TO_REAL_READ_START_TIME; // global delay in server
1098 return true;
1099 }
1100
AsyncGetPosTime()1101 void AudioEndpointInner::AsyncGetPosTime()
1102 {
1103 AUDIO_INFO_LOG("AsyncGetPosTime thread start.");
1104 while (!stopUpdateThread_) {
1105 std::unique_lock<std::mutex> lock(updateThreadLock_);
1106 updateThreadCV_.wait_for(lock, std::chrono::milliseconds(UPDATE_THREAD_TIMEOUT));
1107 if (stopUpdateThread_) {
1108 break;
1109 }
1110 // get signaled, call get pos-time
1111 uint64_t curHdiHandlePos = posInFrame_;
1112 int64_t handleTime = timeInNano_;
1113 if (!GetDeviceHandleInfo(curHdiHandlePos, handleTime)) {
1114 AUDIO_WARNING_LOG("AsyncGetPosTime call GetDeviceHandleInfo failed.");
1115 continue;
1116 }
1117 // keep it
1118 if (posInFrame_ != curHdiHandlePos) {
1119 posInFrame_ = curHdiHandlePos;
1120 timeInNano_ = handleTime;
1121 }
1122 }
1123 }
1124
GetStatusStr(EndpointStatus status)1125 std::string AudioEndpointInner::GetStatusStr(EndpointStatus status)
1126 {
1127 switch (status) {
1128 case INVALID:
1129 return "INVALID";
1130 case UNLINKED:
1131 return "UNLINKED";
1132 case IDEL:
1133 return "IDEL";
1134 case STARTING:
1135 return "STARTING";
1136 case RUNNING:
1137 return "RUNNING";
1138 case STOPPING:
1139 return "STOPPING";
1140 case STOPPED:
1141 return "STOPPED";
1142 default:
1143 break;
1144 }
1145 return "NO_SUCH_STATUS";
1146 }
1147
KeepWorkloopRunning()1148 bool AudioEndpointInner::KeepWorkloopRunning()
1149 {
1150 EndpointStatus targetStatus = INVALID;
1151 switch (endpointStatus_.load()) {
1152 case RUNNING:
1153 return true;
1154 case IDEL:
1155 if (isDeviceRunningInIdel_) {
1156 return true;
1157 } else {
1158 targetStatus = STARTING;
1159 }
1160 break;
1161 case UNLINKED:
1162 targetStatus = IDEL;
1163 break;
1164 case STARTING:
1165 targetStatus = RUNNING;
1166 break;
1167 case STOPPING:
1168 targetStatus = STOPPED;
1169 break;
1170 default:
1171 break;
1172 }
1173
1174 // when return false, EndpointWorkLoopFuc will continue loop immediately. Wait to avoid a inifity loop.
1175 std::unique_lock<std::mutex> lock(loopThreadLock_);
1176 AUDIO_INFO_LOG("Status is %{public}s now, wait for %{public}s...", GetStatusStr(endpointStatus_).c_str(),
1177 GetStatusStr(targetStatus).c_str());
1178 threadStatus_ = WAITTING;
1179 workThreadCV_.wait_for(lock, std::chrono::milliseconds(SLEEP_TIME_IN_DEFAULT));
1180 AUDIO_DEBUG_LOG("Wait end. Cur is %{public}s now, target is %{public}s...", GetStatusStr(endpointStatus_).c_str(),
1181 GetStatusStr(targetStatus).c_str());
1182
1183 return false;
1184 }
1185
WriteToSpecialProcBuf(const std::shared_ptr<OHAudioBuffer> & procBuf,const BufferDesc & readBuf)1186 int32_t AudioEndpointInner::WriteToSpecialProcBuf(const std::shared_ptr<OHAudioBuffer> &procBuf,
1187 const BufferDesc &readBuf)
1188 {
1189 CHECK_AND_RETURN_RET_LOG(procBuf != nullptr, ERR_INVALID_HANDLE, "%{public}s process buffer is null.", __func__);
1190 uint64_t curWritePos = procBuf->GetCurWriteFrame();
1191 Trace trace("AudioEndpoint::WriteProcessData-<" + std::to_string(curWritePos));
1192 SpanInfo *curWriteSpan = procBuf->GetSpanInfo(curWritePos);
1193 CHECK_AND_RETURN_RET_LOG(curWriteSpan != nullptr, ERR_INVALID_HANDLE,
1194 "%{public}s get write span info of procBuf fail.", __func__);
1195
1196 AUDIO_DEBUG_LOG("%{public}s process buffer write start, curWritePos %{public}" PRIu64".", __func__, curWritePos);
1197 curWriteSpan->spanStatus.store(SpanStatus::SPAN_WRITTING);
1198 curWriteSpan->writeStartTime = ClockTime::GetCurNano();
1199
1200 BufferDesc writeBuf;
1201 int32_t ret = procBuf->GetWriteBuffer(curWritePos, writeBuf);
1202 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret, "%{public}s get write buffer fail, ret %{public}d.", __func__, ret);
1203 ret = memcpy_s(static_cast<void *>(writeBuf.buffer), writeBuf.bufLength,
1204 static_cast<void *>(readBuf.buffer), readBuf.bufLength);
1205 CHECK_AND_RETURN_RET_LOG(ret == EOK, ERR_WRITE_FAILED, "%{public}s memcpy data to process buffer fail, "
1206 "curWritePos %{public}" PRIu64", ret %{public}d.", __func__, curWritePos, ret);
1207
1208 curWriteSpan->writeDoneTime = ClockTime::GetCurNano();
1209 procBuf->SetHandleInfo(curWritePos, curWriteSpan->writeDoneTime);
1210 ret = procBuf->SetCurWriteFrame(curWritePos + dstSpanSizeInframe_);
1211 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret, "%{public}s set procBuf next write frame fail, ret %{public}d.",
1212 __func__, ret);
1213 curWriteSpan->spanStatus.store(SpanStatus::SPAN_WRITE_DONE);
1214 return SUCCESS;
1215 }
1216
WriteToProcessBuffers(const BufferDesc & readBuf)1217 void AudioEndpointInner::WriteToProcessBuffers(const BufferDesc &readBuf)
1218 {
1219 std::lock_guard<std::mutex> lock(listLock_);
1220 for (size_t i = 0; i < processBufferList_.size(); i++) {
1221 if (processBufferList_[i] == nullptr) {
1222 AUDIO_ERR_LOG("%{public}s process buffer %{public}zu is null.", __func__, i);
1223 continue;
1224 }
1225 if (processBufferList_[i]->GetStreamStatus()->load() != STREAM_RUNNING) {
1226 AUDIO_WARNING_LOG("%{public}s process buffer %{public}zu not running, stream status %{public}d.",
1227 __func__, i, processBufferList_[i]->GetStreamStatus()->load());
1228 continue;
1229 }
1230
1231 int32_t ret = WriteToSpecialProcBuf(processBufferList_[i], readBuf);
1232 if (ret != SUCCESS) {
1233 AUDIO_ERR_LOG("%{public}s endpoint write to process buffer %{public}zu fail, ret %{public}d.",
1234 __func__, i, ret);
1235 continue;
1236 }
1237 AUDIO_DEBUG_LOG("%{public}s endpoint process buffer %{public}zu write success.", __func__, i);
1238 }
1239 }
1240
ReadFromEndpoint(uint64_t curReadPos)1241 int32_t AudioEndpointInner::ReadFromEndpoint(uint64_t curReadPos)
1242 {
1243 Trace trace("AudioEndpoint::ReadDstBuffer=<" + std::to_string(curReadPos));
1244 AUDIO_DEBUG_LOG("%{public}s enter, dstAudioBuffer curReadPos %{public}" PRIu64".", __func__, curReadPos);
1245 CHECK_AND_RETURN_RET_LOG(dstAudioBuffer_ != nullptr, ERR_INVALID_HANDLE,
1246 "%{public}s dst audio buffer is null.", __func__);
1247 SpanInfo *curReadSpan = dstAudioBuffer_->GetSpanInfo(curReadPos);
1248 CHECK_AND_RETURN_RET_LOG(curReadSpan != nullptr, ERR_INVALID_HANDLE,
1249 "%{public}s get source read span info of source adapter fail.", __func__);
1250 curReadSpan->readStartTime = ClockTime::GetCurNano();
1251 curReadSpan->spanStatus.store(SpanStatus::SPAN_READING);
1252 BufferDesc readBuf;
1253 int32_t ret = dstAudioBuffer_->GetReadbuffer(curReadPos, readBuf);
1254 CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret, "%{public}s get read buffer fail, ret %{public}d.", __func__, ret);
1255 #ifdef DUMP_PROCESS_FILE
1256 if (dump_hdi_ != nullptr) {
1257 fwrite(static_cast<void *>(readBuf.buffer), 1, readBuf.bufLength, dump_hdi_);
1258 }
1259 #endif
1260
1261 WriteToProcessBuffers(readBuf);
1262 curReadSpan->readDoneTime = ClockTime::GetCurNano();
1263 curReadSpan->spanStatus.store(SpanStatus::SPAN_READ_DONE);
1264 return SUCCESS;
1265 }
1266
RecordEndpointWorkLoopFuc()1267 void AudioEndpointInner::RecordEndpointWorkLoopFuc()
1268 {
1269 ScheduleReportData(getpid(), gettid(), "pulseaudio");
1270 int64_t curTime = 0;
1271 uint64_t curReadPos = 0;
1272 int64_t wakeUpTime = ClockTime::GetCurNano();
1273 AUDIO_INFO_LOG("Record endpoint work loop fuc start.");
1274 while (isInited_.load()) {
1275 if (!KeepWorkloopRunning()) {
1276 continue;
1277 }
1278 threadStatus_ = INRUNNING;
1279 if (needReSyncPosition_) {
1280 RecordReSyncPosition();
1281 wakeUpTime = ClockTime::GetCurNano();
1282 needReSyncPosition_ = false;
1283 continue;
1284 }
1285 curTime = ClockTime::GetCurNano();
1286 Trace loopTrace("Record_loop_trace");
1287 if (curTime - wakeUpTime > ONE_MILLISECOND_DURATION) {
1288 AUDIO_WARNING_LOG("%{public}s Wake up too late!", __func__);
1289 }
1290
1291 curReadPos = dstAudioBuffer_->GetCurReadFrame();
1292 if (ReadFromEndpoint(curReadPos) != SUCCESS) {
1293 AUDIO_ERR_LOG("%{public}s read from endpoint to process service fail.", __func__);
1294 break;
1295 }
1296
1297 if (!RecordPrepareNextLoop(curReadPos, wakeUpTime)) {
1298 AUDIO_ERR_LOG("PrepareNextLoop failed!");
1299 break;
1300 }
1301
1302 loopTrace.End();
1303 threadStatus_ = SLEEPING;
1304 ClockTime::AbsoluteSleep(wakeUpTime);
1305 }
1306 }
1307
EndpointWorkLoopFuc()1308 void AudioEndpointInner::EndpointWorkLoopFuc()
1309 {
1310 ScheduleReportData(getpid(), gettid(), "pulseaudio");
1311 int64_t curTime = 0;
1312 uint64_t curWritePos = 0;
1313 int64_t wakeUpTime = ClockTime::GetCurNano();
1314 AUDIO_INFO_LOG("Endpoint work loop fuc start");
1315 int32_t ret = 0;
1316 while (isInited_.load()) {
1317 if (!KeepWorkloopRunning()) {
1318 continue;
1319 }
1320 ret = 0;
1321 threadStatus_ = INRUNNING;
1322 curTime = ClockTime::GetCurNano();
1323 Trace loopTrace("AudioEndpoint::loop_trace");
1324 if (needReSyncPosition_) {
1325 ReSyncPosition();
1326 wakeUpTime = curTime;
1327 needReSyncPosition_ = false;
1328 continue;
1329 }
1330 if (curTime - wakeUpTime > ONE_MILLISECOND_DURATION) {
1331 AUDIO_WARNING_LOG("Wake up too late!");
1332 }
1333
1334 // First, wake up at client may-write-done time, and check if all process write done.
1335 // If not, do another sleep to the possible latest write time.
1336 curWritePos = dstAudioBuffer_->GetCurWriteFrame();
1337 if (!CheckAllBufferReady(wakeUpTime, curWritePos)) {
1338 curTime = ClockTime::GetCurNano();
1339 }
1340
1341 // then do mix & write to hdi buffer and prepare next loop
1342 if (!ProcessToEndpointDataHandle(curWritePos)) {
1343 AUDIO_ERR_LOG("ProcessToEndpointDataHandle failed!");
1344 break;
1345 }
1346
1347 // prepare info of next loop
1348 if (!PrepareNextLoop(curWritePos, wakeUpTime)) {
1349 AUDIO_ERR_LOG("PrepareNextLoop failed!");
1350 break;
1351 }
1352
1353 loopTrace.End();
1354 // start sleep
1355 threadStatus_ = SLEEPING;
1356 ClockTime::AbsoluteSleep(wakeUpTime);
1357 }
1358 AUDIO_DEBUG_LOG("Endpoint work loop fuc end, ret %{public}d", ret);
1359 }
1360 } // namespace AudioStandard
1361 } // namespace OHOS
1362