• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "audio_endpoint.h"
17 
18 #include <atomic>
19 #include <cinttypes>
20 #include <condition_variable>
21 #include <thread>
22 #include <vector>
23 #include <mutex>
24 
25 #include "securec.h"
26 
27 #include "audio_errors.h"
28 #include "audio_log.h"
29 #include "audio_schedule.h"
30 #include "audio_utils.h"
31 #include "fast_audio_renderer_sink.h"
32 #include "fast_audio_capturer_source.h"
33 #include "i_audio_capturer_source.h"
34 #include "linear_pos_time_model.h"
35 #include "policy_handler.h"
36 #include "remote_fast_audio_renderer_sink.h"
37 #include "remote_fast_audio_capturer_source.h"
38 
39 // DUMP_PROCESS_FILE // define it for dump file
40 namespace OHOS {
41 namespace AudioStandard {
42 namespace {
43     static constexpr int32_t VOLUME_SHIFT_NUMBER = 16; // 1 >> 16 = 65536, max volume
44     static constexpr int64_t MAX_SPAN_DURATION_IN_NANO = 100000000; // 100ms
45     static constexpr int32_t SLEEP_TIME_IN_DEFAULT = 400; // 400ms
46     static constexpr int64_t DELTA_TO_REAL_READ_START_TIME = 0; // 0ms
47 }
48 
49 class AudioEndpointInner : public AudioEndpoint {
50 public:
51     explicit AudioEndpointInner(EndpointType type);
52     ~AudioEndpointInner();
53 
54     bool Config(const DeviceInfo &deviceInfo);
55     bool StartDevice();
56     bool StopDevice();
57 
58     // when audio process start.
59     int32_t OnStart(IAudioProcessStream *processStream) override;
60     // when audio process pause.
61     int32_t OnPause(IAudioProcessStream *processStream) override;
62     // when audio process request update handle info.
63     int32_t OnUpdateHandleInfo(IAudioProcessStream *processStream) override;
64 
65     /**
66      * Call LinkProcessStream when first create process or link other process with this endpoint.
67      * Here are cases:
68      *   case1: endpointStatus_ = UNLINKED, link not running process; UNLINKED-->IDEL & godown
69      *   case2: endpointStatus_ = UNLINKED, link running process; UNLINKED-->IDEL & godown
70      *   case3: endpointStatus_ = IDEL, link not running process; IDEL-->IDEL
71      *   case4: endpointStatus_ = IDEL, link running process; IDEL-->STARTING-->RUNNING
72      *   case5: endpointStatus_ = RUNNING; RUNNING-->RUNNING
73     */
74     int32_t LinkProcessStream(IAudioProcessStream *processStream) override;
75     int32_t UnlinkProcessStream(IAudioProcessStream *processStream) override;
76 
77     int32_t GetPreferBufferInfo(uint32_t &totalSizeInframe, uint32_t &spanSizeInframe) override;
78 
79     void Dump(std::stringstream &dumpStringStream) override;
80 
81     std::string GetEndpointName() override;
82 
83     EndpointStatus GetStatus() override;
84 
85     void Release() override;
86 
87 private:
88     bool ConfigInputPoint(const DeviceInfo &deviceInfo);
89     int32_t PrepareDeviceBuffer(const DeviceInfo &deviceInfo);
90     int32_t GetAdapterBufferInfo(const DeviceInfo &deviceInfo);
91     void ReSyncPosition();
92     void RecordReSyncPosition();
93     void InitAudiobuffer(bool resetReadWritePos);
94     void ProcessData(const std::vector<AudioStreamData> &srcDataList, const AudioStreamData &dstData);
95     int64_t GetPredictNextReadTime(uint64_t posInFrame);
96     int64_t GetPredictNextWriteTime(uint64_t posInFrame);
97     bool PrepareNextLoop(uint64_t curWritePos, int64_t &wakeUpTime);
98     bool RecordPrepareNextLoop(uint64_t curReadPos, int64_t &wakeUpTime);
99 
100     /**
101      * @brief Get the current read position in frame and the read-time with it.
102      *
103      * @param frames the read position in frame
104      * @param nanoTime the time in nanosecond when device-sink start read the buffer
105     */
106     bool GetDeviceHandleInfo(uint64_t &frames, int64_t &nanoTime);
107     int32_t GetProcLastWriteDoneInfo(const std::shared_ptr<OHAudioBuffer> processBuffer, uint64_t curWriteFrame,
108         uint64_t &proHandleFrame, int64_t &proHandleTime);
109 
110     bool IsAnyProcessRunning();
111     bool CheckAllBufferReady(int64_t checkTime, uint64_t curWritePos);
112     bool ProcessToEndpointDataHandle(uint64_t curWritePos);
113     void GetAllReadyProcessData(std::vector<AudioStreamData> &audioDataList);
114 
115     std::string GetStatusStr(EndpointStatus status);
116 
117     int32_t WriteToSpecialProcBuf(const std::shared_ptr<OHAudioBuffer> &procBuf, const BufferDesc &readBuf);
118     void WriteToProcessBuffers(const BufferDesc &readBuf);
119     int32_t ReadFromEndpoint(uint64_t curReadPos);
120     bool KeepWorkloopRunning();
121 
122     void EndpointWorkLoopFuc();
123     void RecordEndpointWorkLoopFuc();
124 
125     // Call GetMmapHandlePosition in ipc may block more than a cycle, call it in another thread.
126     void AsyncGetPosTime();
127 
128 private:
129     static constexpr int64_t ONE_MILLISECOND_DURATION = 1000000; // 1ms
130     static constexpr int64_t WRITE_TO_HDI_AHEAD_TIME = -1000000; // ahead 1ms
131     static constexpr int32_t UPDATE_THREAD_TIMEOUT = 1000; // 1000ms
132     enum ThreadStatus : uint32_t {
133         WAITTING = 0,
134         SLEEPING,
135         INRUNNING
136     };
137     // SamplingRate EncodingType SampleFormat Channel
138     DeviceInfo deviceInfo_;
139     AudioStreamInfo dstStreamInfo_;
140     EndpointType endpointType_;
141 
142     std::mutex listLock_;
143     std::vector<IAudioProcessStream *> processList_;
144     std::vector<std::shared_ptr<OHAudioBuffer>> processBufferList_;
145 
146     std::atomic<bool> isInited_ = false;
147 
148     IMmapAudioRendererSink *fastSink_ = nullptr;
149     IMmapAudioCapturerSource *fastSource_ = nullptr;
150 
151     LinearPosTimeModel readTimeModel_;
152     LinearPosTimeModel writeTimeModel_;
153 
154     int64_t spanDuration_ = 0; // nano second
155     int64_t serverAheadReadTime_ = 0;
156     int dstBufferFd_ = -1; // -1: invalid fd.
157     uint32_t dstTotalSizeInframe_ = 0;
158     uint32_t dstSpanSizeInframe_ = 0;
159     uint32_t dstByteSizePerFrame_ = 0;
160     std::shared_ptr<OHAudioBuffer> dstAudioBuffer_ = nullptr;
161 
162     std::atomic<EndpointStatus> endpointStatus_ = INVALID;
163 
164     std::atomic<ThreadStatus> threadStatus_ = WAITTING;
165     std::thread endpointWorkThread_;
166     std::mutex loopThreadLock_;
167     std::condition_variable workThreadCV_;
168     int64_t lastHandleProcessTime_ = 0;
169 
170     std::thread updatePosTimeThread_;
171     std::mutex updateThreadLock_;
172     std::condition_variable updateThreadCV_;
173     std::atomic<bool> stopUpdateThread_ = false;
174 
175     std::atomic<uint64_t> posInFrame_ = 0;
176     std::atomic<int64_t> timeInNano_ = 0;
177 
178     bool isDeviceRunningInIdel_ = true; // will call start sink when linked.
179     bool needReSyncPosition_ = true;
180 #ifdef DUMP_PROCESS_FILE
181     FILE *dcp_ = nullptr;
182     FILE *dump_hdi_ = nullptr;
183 #endif
184 };
185 
GetInstance(EndpointType type,const DeviceInfo & deviceInfo)186 std::shared_ptr<AudioEndpoint> AudioEndpoint::GetInstance(EndpointType type, const DeviceInfo &deviceInfo)
187 {
188     std::shared_ptr<AudioEndpointInner> audioEndpoint = std::make_shared<AudioEndpointInner>(type);
189     CHECK_AND_RETURN_RET_LOG(audioEndpoint != nullptr, nullptr, "Create AudioEndpoint failed.");
190 
191     if (!audioEndpoint->Config(deviceInfo)) {
192         AUDIO_ERR_LOG("Config AudioEndpoint failed.");
193         audioEndpoint = nullptr;
194     }
195     return audioEndpoint;
196 }
197 
AudioEndpointInner(EndpointType type)198 AudioEndpointInner::AudioEndpointInner(EndpointType type) : endpointType_(type)
199 {
200     AUDIO_INFO_LOG("AudioEndpoint type:%{public}d", endpointType_);
201 }
202 
GetEndpointName()203 std::string AudioEndpointInner::GetEndpointName()
204 {
205     // temp method to get deivce key, should be same with AudioService::GetAudioEndpointForDevice.
206     return deviceInfo_.networkId + std::to_string(deviceInfo_.deviceId);
207 }
208 
GetStatus()209 AudioEndpoint::EndpointStatus AudioEndpointInner::GetStatus()
210 {
211     AUDIO_INFO_LOG("AudioEndpoint get status:%{public}s", GetStatusStr(endpointStatus_).c_str());
212     return endpointStatus_.load();
213 }
214 
Release()215 void AudioEndpointInner::Release()
216 {
217     // Wait for thread end and then clear other data to avoid using any cleared data in thread.
218     AUDIO_INFO_LOG("%{public}s enter.", __func__);
219     if (!isInited_.load()) {
220         AUDIO_WARNING_LOG("already released");
221         return;
222     }
223 
224     isInited_.store(false);
225     workThreadCV_.notify_all();
226     if (endpointWorkThread_.joinable()) {
227         AUDIO_DEBUG_LOG("AudioEndpoint join work thread start");
228         endpointWorkThread_.join();
229         AUDIO_DEBUG_LOG("AudioEndpoint join work thread end");
230     }
231 
232     stopUpdateThread_.store(true);
233     updateThreadCV_.notify_all();
234     if (updatePosTimeThread_.joinable()) {
235         AUDIO_DEBUG_LOG("AudioEndpoint join update thread start");
236         updatePosTimeThread_.join();
237         AUDIO_DEBUG_LOG("AudioEndpoint join update thread end");
238     }
239 
240     if (fastSink_ != nullptr) {
241         fastSink_->DeInit();
242         fastSink_ = nullptr;
243     }
244 
245     if (fastSource_ != nullptr) {
246         fastSource_->DeInit();
247         fastSource_ = nullptr;
248     }
249 
250     endpointStatus_.store(INVALID);
251 
252     if (dstAudioBuffer_ != nullptr) {
253         AUDIO_INFO_LOG("Set device buffer null");
254         dstAudioBuffer_ = nullptr;
255     }
256 #ifdef DUMP_PROCESS_FILE
257     if (dcp_) {
258         fclose(dcp_);
259         dcp_ = nullptr;
260     }
261     if (dump_hdi_) {
262         fclose(dump_hdi_);
263         dump_hdi_ = nullptr;
264     }
265 #endif
266 }
267 
~AudioEndpointInner()268 AudioEndpointInner::~AudioEndpointInner()
269 {
270     if (isInited_.load()) {
271         AudioEndpointInner::Release();
272     }
273     AUDIO_INFO_LOG("~AudioEndpoint()");
274 }
275 
Dump(std::stringstream & dumpStringStream)276 void AudioEndpointInner::Dump(std::stringstream &dumpStringStream)
277 {
278     // dump endpoint stream info
279     dumpStringStream << std::endl << "Endpoint stream info:" << std::endl;
280     dumpStringStream << " samplingRate:" << dstStreamInfo_.samplingRate << std::endl;
281     dumpStringStream << " channels:" << dstStreamInfo_.channels << std::endl;
282     dumpStringStream << " format:" << dstStreamInfo_.format << std::endl;
283 
284     // dump status info
285     dumpStringStream << " Current endpoint status:" << GetStatusStr(endpointStatus_) << std::endl;
286     if (dstAudioBuffer_ != nullptr) {
287         dumpStringStream << " Currend hdi read position:" << dstAudioBuffer_->GetCurReadFrame() << std::endl;
288         dumpStringStream << " Currend hdi write position:" << dstAudioBuffer_->GetCurWriteFrame() << std::endl;
289     }
290 
291     // dump linked process info
292     std::lock_guard<std::mutex> lock(listLock_);
293     dumpStringStream << processBufferList_.size() << " linked process:" << std::endl;
294     for (auto item : processBufferList_) {
295         dumpStringStream << " process read position:" << item->GetCurReadFrame() << std::endl;
296         dumpStringStream << " process write position:" << item->GetCurWriteFrame() << std::endl << std::endl;
297     }
298     dumpStringStream << std::endl;
299 }
300 
ConfigInputPoint(const DeviceInfo & deviceInfo)301 bool AudioEndpointInner::ConfigInputPoint(const DeviceInfo &deviceInfo)
302 {
303     AUDIO_INFO_LOG("%{public}s enter.", __func__);
304     IAudioSourceAttr attr = {};
305     attr.sampleRate = dstStreamInfo_.samplingRate;
306     attr.channel = dstStreamInfo_.channels;
307     attr.format = dstStreamInfo_.format;
308     attr.deviceNetworkId = deviceInfo.networkId.c_str();
309 
310     if (deviceInfo.networkId == LOCAL_NETWORK_ID) {
311         attr.adapterName = "primary";
312         fastSource_ = FastAudioCapturerSource::GetInstance();
313     } else {
314         attr.adapterName = "remote";
315         fastSource_ = RemoteFastAudioCapturerSource::GetInstance(deviceInfo.networkId);
316     }
317     CHECK_AND_RETURN_RET_LOG(fastSource_ != nullptr, false, "ConfigInputPoint GetInstance failed.");
318 
319     int32_t err = fastSource_->Init(attr);
320     if (err != SUCCESS || !fastSource_->IsInited()) {
321         AUDIO_ERR_LOG("%{public}s init remote fast fail, err %{public}d.", __func__, err);
322         fastSource_ = nullptr;
323         return false;
324     }
325     if (PrepareDeviceBuffer(deviceInfo) != SUCCESS) {
326         fastSource_->DeInit();
327         fastSource_ = nullptr;
328         return false;
329     }
330 
331     bool ret = writeTimeModel_.ConfigSampleRate(dstStreamInfo_.samplingRate);
332     CHECK_AND_RETURN_RET_LOG(ret != false, false, "Config LinearPosTimeModel failed.");
333 
334     endpointStatus_ = UNLINKED;
335     isInited_.store(true);
336     endpointWorkThread_ = std::thread(&AudioEndpointInner::RecordEndpointWorkLoopFuc, this);
337     pthread_setname_np(endpointWorkThread_.native_handle(), "AudioEndpointLoop");
338 
339 #ifdef DUMP_PROCESS_FILE
340     dump_hdi_ = fopen("/data/data/server-capture-hdi.pcm", "a+");
341     if (dump_hdi_ == nullptr) {
342         AUDIO_ERR_LOG("Error opening pcm test file!");
343     }
344 #endif
345     return true;
346 }
347 
Config(const DeviceInfo & deviceInfo)348 bool AudioEndpointInner::Config(const DeviceInfo &deviceInfo)
349 {
350     AUDIO_INFO_LOG("%{public}s enter, deviceRole %{public}d.", __func__, deviceInfo.deviceRole);
351     deviceInfo_ = deviceInfo;
352     dstStreamInfo_ = deviceInfo.audioStreamInfo;
353     if (deviceInfo.deviceRole == INPUT_DEVICE) {
354         return ConfigInputPoint(deviceInfo);
355     }
356 
357     fastSink_ = deviceInfo.networkId == REMOTE_NETWORK_ID ?
358         RemoteFastAudioRendererSink::GetInstance(deviceInfo.networkId) : FastAudioRendererSink::GetInstance();
359     IAudioSinkAttr attr = {};
360     attr.adapterName = "primary";
361     attr.sampleRate = dstStreamInfo_.samplingRate; // 48000hz
362     attr.channel = dstStreamInfo_.channels; // STEREO = 2
363     attr.format = dstStreamInfo_.format; // SAMPLE_S16LE = 1
364     attr.sampleFmt = dstStreamInfo_.format;
365     attr.deviceNetworkId = deviceInfo.networkId.c_str();
366     attr.deviceType = static_cast<int32_t>(deviceInfo.deviceType);
367 
368     fastSink_->Init(attr);
369     if (!fastSink_->IsInited()) {
370         fastSink_ = nullptr;
371         return false;
372     }
373     if (PrepareDeviceBuffer(deviceInfo) != SUCCESS) {
374         fastSink_->DeInit();
375         fastSink_ = nullptr;
376         return false;
377     }
378 
379     float initVolume = 1.0; // init volume to 1.0
380     fastSink_->SetVolume(initVolume, initVolume);
381 
382     bool ret = readTimeModel_.ConfigSampleRate(dstStreamInfo_.samplingRate);
383     CHECK_AND_RETURN_RET_LOG(ret != false, false, "Config LinearPosTimeModel failed.");
384 
385     endpointStatus_ = UNLINKED;
386     isInited_.store(true);
387     endpointWorkThread_ = std::thread(&AudioEndpointInner::EndpointWorkLoopFuc, this);
388     pthread_setname_np(endpointWorkThread_.native_handle(), "AudioEndpointLoop");
389 
390     // note: add this in record.
391     updatePosTimeThread_ = std::thread(&AudioEndpointInner::AsyncGetPosTime, this);
392     pthread_setname_np(updatePosTimeThread_.native_handle(), "AudioEndpointUpdate");
393 
394 #ifdef DUMP_PROCESS_FILE
395     dcp_ = fopen("/data/data/server-read-client.pcm", "a+");
396     dump_hdi_ = fopen("/data/data/server-hdi.pcm", "a+");
397     if (dcp_ == nullptr || dump_hdi_ == nullptr) {
398         AUDIO_ERR_LOG("Error opening pcm test file!");
399     }
400 #endif
401     return true;
402 }
403 
GetAdapterBufferInfo(const DeviceInfo & deviceInfo)404 int32_t AudioEndpointInner::GetAdapterBufferInfo(const DeviceInfo &deviceInfo)
405 {
406     int32_t ret = 0;
407     AUDIO_INFO_LOG("%{public}s enter, deviceRole %{public}d.", __func__, deviceInfo.deviceRole);
408     if (deviceInfo.deviceRole == INPUT_DEVICE) {
409         CHECK_AND_RETURN_RET_LOG(fastSource_ != nullptr, ERR_INVALID_HANDLE,
410             "%{public}s fast source is null.", __func__);
411         ret = fastSource_->GetMmapBufferInfo(dstBufferFd_, dstTotalSizeInframe_, dstSpanSizeInframe_,
412         dstByteSizePerFrame_);
413     } else {
414         CHECK_AND_RETURN_RET_LOG(fastSink_ != nullptr, ERR_INVALID_HANDLE, "%{public}s fast sink is null.", __func__);
415         ret = fastSink_->GetMmapBufferInfo(dstBufferFd_, dstTotalSizeInframe_, dstSpanSizeInframe_,
416         dstByteSizePerFrame_);
417     }
418 
419     if (ret != SUCCESS || dstBufferFd_ == -1 || dstTotalSizeInframe_ == 0 || dstSpanSizeInframe_ == 0 ||
420         dstByteSizePerFrame_ == 0) {
421         AUDIO_ERR_LOG("%{public}s get mmap buffer info fail, ret %{public}d, dstBufferFd %{public}d, \
422             dstTotalSizeInframe %{public}d, dstSpanSizeInframe %{public}d, dstByteSizePerFrame %{public}d.",
423             __func__, ret, dstBufferFd_, dstTotalSizeInframe_, dstSpanSizeInframe_, dstByteSizePerFrame_);
424         return ERR_ILLEGAL_STATE;
425     }
426     AUDIO_DEBUG_LOG("%{public}s end, fd %{public}d.", __func__, dstBufferFd_);
427     return SUCCESS;
428 }
429 
PrepareDeviceBuffer(const DeviceInfo & deviceInfo)430 int32_t AudioEndpointInner::PrepareDeviceBuffer(const DeviceInfo &deviceInfo)
431 {
432     AUDIO_INFO_LOG("%{public}s enter, deviceRole %{public}d.", __func__, deviceInfo.deviceRole);
433     if (dstAudioBuffer_ != nullptr) {
434         AUDIO_INFO_LOG("%{public}s endpoint buffer is preapred, fd:%{public}d", __func__, dstBufferFd_);
435         return SUCCESS;
436     }
437 
438     int32_t ret = GetAdapterBufferInfo(deviceInfo);
439     CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ERR_OPERATION_FAILED,
440         "%{public}s get adapter buffer Info fail, ret %{public}d.", __func__, ret);
441 
442     // spanDuration_ may be less than the correct time of dstSpanSizeInframe_.
443     spanDuration_ = dstSpanSizeInframe_ * AUDIO_NS_PER_SECOND / dstStreamInfo_.samplingRate;
444     int64_t temp = spanDuration_ / 5 * 3; // 3/5 spanDuration
445     serverAheadReadTime_ = temp < ONE_MILLISECOND_DURATION ? ONE_MILLISECOND_DURATION : temp; // at least 1ms ahead.
446     AUDIO_DEBUG_LOG("%{public}s spanDuration %{public}" PRIu64" ns, serverAheadReadTime %{public}" PRIu64" ns.",
447         __func__, spanDuration_, serverAheadReadTime_);
448 
449     if (spanDuration_ <= 0 || spanDuration_ >= MAX_SPAN_DURATION_IN_NANO) {
450         AUDIO_ERR_LOG("%{public}s mmap span info error, spanDuration %{public}" PRIu64".", __func__, spanDuration_);
451         return ERR_INVALID_PARAM;
452     }
453     dstAudioBuffer_ = OHAudioBuffer::CreateFromRemote(dstTotalSizeInframe_, dstSpanSizeInframe_, dstByteSizePerFrame_,
454         dstBufferFd_);
455     if (dstAudioBuffer_ == nullptr || dstAudioBuffer_->GetBufferHolder() != AudioBufferHolder::AUDIO_SERVER_ONLY) {
456         AUDIO_ERR_LOG("%{public}s create buffer from remote fail.", __func__);
457         return ERR_ILLEGAL_STATE;
458     }
459     dstAudioBuffer_->GetStreamStatus()->store(StreamStatus::STREAM_IDEL);
460 
461     // clear data buffer
462     ret = memset_s(dstAudioBuffer_->GetDataBase(), dstAudioBuffer_->GetDataSize(), 0, dstAudioBuffer_->GetDataSize());
463     CHECK_AND_BREAK_LOG(ret == EOK, "%{public}s memset buffer fail, ret %{public}d, fd %{public}d.",
464         __func__, ret, dstBufferFd_);
465     InitAudiobuffer(true);
466 
467     AUDIO_DEBUG_LOG("%{public}s end, fd %{public}d.", __func__, dstBufferFd_);
468     return SUCCESS;
469 }
470 
InitAudiobuffer(bool resetReadWritePos)471 void AudioEndpointInner::InitAudiobuffer(bool resetReadWritePos)
472 {
473     CHECK_AND_RETURN_LOG((dstAudioBuffer_ != nullptr), "%{public}s: dst audio buffer is null.", __func__);
474     if (resetReadWritePos) {
475         dstAudioBuffer_->ResetCurReadWritePos(0, 0);
476     }
477 
478     uint32_t spanCount = dstAudioBuffer_->GetSpanCount();
479     for (uint32_t i = 0; i < spanCount; i++) {
480         SpanInfo *spanInfo = dstAudioBuffer_->GetSpanInfoByIndex(i);
481         if (spanInfo == nullptr) {
482             AUDIO_ERR_LOG("InitAudiobuffer failed.");
483             return;
484         }
485         if (deviceInfo_.deviceRole == INPUT_DEVICE) {
486             spanInfo->spanStatus = SPAN_WRITE_DONE;
487         } else {
488             spanInfo->spanStatus = SPAN_READ_DONE;
489         }
490         spanInfo->offsetInFrame = 0;
491 
492         spanInfo->readStartTime = 0;
493         spanInfo->readDoneTime = 0;
494 
495         spanInfo->writeStartTime = 0;
496         spanInfo->writeDoneTime = 0;
497 
498         spanInfo->volumeStart = 1 << VOLUME_SHIFT_NUMBER; // 65536 for initialize
499         spanInfo->volumeEnd = 1 << VOLUME_SHIFT_NUMBER; // 65536 for initialize
500         spanInfo->isMute = false;
501     }
502     return;
503 }
504 
GetPreferBufferInfo(uint32_t & totalSizeInframe,uint32_t & spanSizeInframe)505 int32_t AudioEndpointInner::GetPreferBufferInfo(uint32_t &totalSizeInframe, uint32_t &spanSizeInframe)
506 {
507     totalSizeInframe = dstTotalSizeInframe_;
508     spanSizeInframe = dstSpanSizeInframe_;
509     return SUCCESS;
510 }
511 
IsAnyProcessRunning()512 bool AudioEndpointInner::IsAnyProcessRunning()
513 {
514     std::lock_guard<std::mutex> lock(listLock_);
515     bool isRunning = false;
516     for (size_t i = 0; i < processBufferList_.size(); i++) {
517         if (processBufferList_[i]->GetStreamStatus()->load() == STREAM_RUNNING) {
518             isRunning = true;
519             break;
520         }
521     }
522     return isRunning;
523 }
524 
RecordReSyncPosition()525 void AudioEndpointInner::RecordReSyncPosition()
526 {
527     AUDIO_INFO_LOG("%{public}s enter.", __func__);
528     uint64_t curHdiWritePos = 0;
529     int64_t writeTime = 0;
530     CHECK_AND_RETURN_LOG(GetDeviceHandleInfo(curHdiWritePos, writeTime),
531         "%{public}s get device handle info fail.", __func__);
532     AUDIO_DEBUG_LOG("%{public}s get capturer info, curHdiWritePos %{public}" PRIu64", writeTime %{public}" PRId64".",
533         __func__, curHdiWritePos, writeTime);
534     int64_t temp = ClockTime::GetCurNano() - writeTime;
535     if (temp > spanDuration_) {
536         AUDIO_WARNING_LOG("%{public}s GetDeviceHandleInfo cost long time %{public}" PRIu64".", __func__, temp);
537     }
538 
539     writeTimeModel_.ResetFrameStamp(curHdiWritePos, writeTime);
540     uint64_t nextDstReadPos = curHdiWritePos;
541     uint64_t nextDstWritePos = curHdiWritePos;
542     InitAudiobuffer(false);
543     int32_t ret = dstAudioBuffer_->ResetCurReadWritePos(nextDstReadPos, nextDstWritePos);
544     if (ret != SUCCESS) {
545         AUDIO_ERR_LOG("ResetCurReadWritePos failed.");
546         return;
547     }
548 
549     SpanInfo *nextReadSapn = dstAudioBuffer_->GetSpanInfo(nextDstReadPos);
550     CHECK_AND_RETURN_LOG(nextReadSapn != nullptr, "GetSpanInfo failed.");
551     nextReadSapn->offsetInFrame = nextDstReadPos;
552     nextReadSapn->spanStatus = SpanStatus::SPAN_WRITE_DONE;
553 }
554 
ReSyncPosition()555 void AudioEndpointInner::ReSyncPosition()
556 {
557     Trace loopTrace("AudioEndpoint::ReSyncPosition");
558     uint64_t curHdiReadPos = 0;
559     int64_t readTime = 0;
560     if (!GetDeviceHandleInfo(curHdiReadPos, readTime)) {
561         AUDIO_ERR_LOG("ReSyncPosition call GetDeviceHandleInfo failed.");
562         return;
563     }
564     int64_t curTime = ClockTime::GetCurNano();
565     int64_t temp = curTime - readTime;
566     if (temp > spanDuration_) {
567         AUDIO_ERR_LOG("GetDeviceHandleInfo may cost long time.");
568     }
569 
570     readTimeModel_.ResetFrameStamp(curHdiReadPos, readTime);
571     uint64_t nextDstWritePos = curHdiReadPos + dstSpanSizeInframe_;
572     InitAudiobuffer(false);
573     int32_t ret = dstAudioBuffer_->ResetCurReadWritePos(nextDstWritePos, nextDstWritePos);
574     if (ret != SUCCESS) {
575         AUDIO_ERR_LOG("ResetCurReadWritePos failed.");
576         return;
577     }
578 
579     SpanInfo *nextWriteSapn = dstAudioBuffer_->GetSpanInfo(nextDstWritePos);
580     CHECK_AND_RETURN_LOG(nextWriteSapn != nullptr, "GetSpanInfo failed.");
581     nextWriteSapn->offsetInFrame = nextDstWritePos;
582     nextWriteSapn->spanStatus = SpanStatus::SPAN_READ_DONE;
583     return;
584 }
585 
StartDevice()586 bool AudioEndpointInner::StartDevice()
587 {
588     AUDIO_INFO_LOG("%{public}s enter.", __func__);
589     // how to modify the status while unlinked and started?
590     if (endpointStatus_ != IDEL) {
591         AUDIO_ERR_LOG("Endpoint status is not IDEL");
592         return false;
593     }
594     endpointStatus_ = STARTING;
595     if (deviceInfo_.deviceRole == INPUT_DEVICE) {
596         if (fastSource_ == nullptr || fastSource_->Start() != SUCCESS) {
597             AUDIO_ERR_LOG("Source start failed.");
598             return false;
599         }
600     } else {
601         if (fastSink_ == nullptr || fastSink_->Start() != SUCCESS) {
602             AUDIO_ERR_LOG("Sink start failed.");
603             return false;
604         }
605     }
606 
607     std::unique_lock<std::mutex> lock(loopThreadLock_);
608     needReSyncPosition_ = true;
609     endpointStatus_ = IsAnyProcessRunning() ? RUNNING : IDEL;
610     workThreadCV_.notify_all();
611     AUDIO_DEBUG_LOG("StartDevice out, status is %{public}s", GetStatusStr(endpointStatus_).c_str());
612     return true;
613 }
614 
StopDevice()615 bool AudioEndpointInner::StopDevice()
616 {
617     AUDIO_INFO_LOG("StopDevice with status:%{public}s", GetStatusStr(endpointStatus_).c_str());
618     // todo
619     endpointStatus_ = STOPPING;
620     // Clear data buffer to avoid noise in some case.
621     if (dstAudioBuffer_ != nullptr) {
622         int32_t ret = memset_s(dstAudioBuffer_->GetDataBase(), dstAudioBuffer_->GetDataSize(), 0,
623             dstAudioBuffer_->GetDataSize());
624         AUDIO_INFO_LOG("StopDevice clear buffer ret:%{public}d", ret);
625     }
626     if (deviceInfo_.deviceRole == INPUT_DEVICE) {
627         if (fastSource_ == nullptr || fastSource_->Stop() != SUCCESS) {
628             AUDIO_ERR_LOG("Source stop failed.");
629             return false;
630         }
631     } else {
632         if (fastSink_ == nullptr || fastSink_->Stop() != SUCCESS) {
633             AUDIO_ERR_LOG("Sink stop failed.");
634             return false;
635         }
636     }
637     endpointStatus_ = STOPPED;
638     return true;
639 }
640 
OnStart(IAudioProcessStream * processStream)641 int32_t AudioEndpointInner::OnStart(IAudioProcessStream *processStream)
642 {
643     AUDIO_INFO_LOG("OnStart endpoint status:%{public}s", GetStatusStr(endpointStatus_).c_str());
644     if (endpointStatus_ == RUNNING) {
645         AUDIO_INFO_LOG("OnStart find endpoint already in RUNNING.");
646         return SUCCESS;
647     }
648     if (endpointStatus_ == IDEL && !isDeviceRunningInIdel_) {
649         // call sink start
650         StartDevice();
651         endpointStatus_ = RUNNING;
652     }
653     return SUCCESS;
654 }
655 
OnPause(IAudioProcessStream * processStream)656 int32_t AudioEndpointInner::OnPause(IAudioProcessStream *processStream)
657 {
658     AUDIO_INFO_LOG("OnPause endpoint status:%{public}s", GetStatusStr(endpointStatus_).c_str());
659     if (endpointStatus_ == RUNNING) {
660         endpointStatus_ = IsAnyProcessRunning() ? RUNNING : IDEL;
661     }
662     if (endpointStatus_ == IDEL && !isDeviceRunningInIdel_) {
663         // call sink stop when no process running?
664         AUDIO_INFO_LOG("OnPause status is IDEL, call stop");
665     }
666     // todo
667     return SUCCESS;
668 }
669 
GetProcLastWriteDoneInfo(const std::shared_ptr<OHAudioBuffer> processBuffer,uint64_t curWriteFrame,uint64_t & proHandleFrame,int64_t & proHandleTime)670 int32_t AudioEndpointInner::GetProcLastWriteDoneInfo(const std::shared_ptr<OHAudioBuffer> processBuffer,
671     uint64_t curWriteFrame, uint64_t &proHandleFrame, int64_t &proHandleTime)
672 {
673     CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_INVALID_HANDLE, "Process found but buffer is null");
674     uint64_t curReadFrame = processBuffer->GetCurReadFrame();
675     SpanInfo *curWriteSpan = processBuffer->GetSpanInfo(curWriteFrame);
676     CHECK_AND_RETURN_RET_LOG(curWriteSpan != nullptr, ERR_INVALID_HANDLE,
677         "%{public}s curWriteSpan of curWriteFrame %{public}" PRIu64" is null", __func__, curWriteFrame);
678     if (curWriteSpan->spanStatus == SpanStatus::SPAN_WRITE_DONE || curWriteFrame < dstSpanSizeInframe_ ||
679         curWriteFrame < curReadFrame) {
680         proHandleFrame = curWriteFrame;
681         proHandleTime = curWriteSpan->writeDoneTime;
682     } else {
683         int32_t ret = GetProcLastWriteDoneInfo(processBuffer, curWriteFrame - dstSpanSizeInframe_,
684             proHandleFrame, proHandleTime);
685         CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret,
686             "%{public}s get process last write done info fail, ret %{public}d.", __func__, ret);
687     }
688 
689     AUDIO_INFO_LOG("%{public}s end, curWriteFrame %{public}" PRIu64", proHandleFrame %{public}" PRIu64", "
690         "proHandleTime %{public}" PRId64".", __func__, curWriteFrame, proHandleFrame, proHandleTime);
691     return SUCCESS;
692 }
693 
OnUpdateHandleInfo(IAudioProcessStream * processStream)694 int32_t AudioEndpointInner::OnUpdateHandleInfo(IAudioProcessStream *processStream)
695 {
696     Trace trace("AudioEndpoint::OnUpdateHandleInfo");
697     bool isFind = false;
698     std::lock_guard<std::mutex> lock(listLock_);
699     auto processItr = processList_.begin();
700     while (processItr != processList_.end()) {
701         if (*processItr != processStream) {
702             processItr++;
703             continue;
704         }
705         std::shared_ptr<OHAudioBuffer> processBuffer = (*processItr)->GetStreamBuffer();
706         CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_OPERATION_FAILED, "Process found but buffer is null");
707         uint64_t proHandleFrame = 0;
708         int64_t proHandleTime = 0;
709         if (deviceInfo_.deviceRole == INPUT_DEVICE) {
710             uint64_t curWriteFrame = processBuffer->GetCurWriteFrame();
711             int32_t ret = GetProcLastWriteDoneInfo(processBuffer, curWriteFrame, proHandleFrame, proHandleTime);
712             CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret,
713                 "%{public}s get process last write done info fail, ret %{public}d.", __func__, ret);
714             processBuffer->SetHandleInfo(proHandleFrame, proHandleTime);
715         } else {
716             // For output device, handle info is updated in CheckAllBufferReady
717             processBuffer->GetHandleInfo(proHandleFrame, proHandleTime);
718         }
719         AUDIO_INFO_LOG("OnUpdateHandleInfo set process handle pos[%{public}" PRIu64"] time [%{public}" PRId64"], "
720             "deviceRole %{public}d.", proHandleFrame, proHandleTime, deviceInfo_.deviceRole);
721         isFind = true;
722         break;
723     }
724     if (!isFind) {
725         AUDIO_ERR_LOG("Can not find any process to UpdateHandleInfo");
726         return ERR_OPERATION_FAILED;
727     }
728     return SUCCESS;
729 }
730 
LinkProcessStream(IAudioProcessStream * processStream)731 int32_t AudioEndpointInner::LinkProcessStream(IAudioProcessStream *processStream)
732 {
733     CHECK_AND_RETURN_RET_LOG(processStream != nullptr, ERR_INVALID_PARAM, "IAudioProcessStream is null");
734     std::shared_ptr<OHAudioBuffer> processBuffer = processStream->GetStreamBuffer();
735     CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_INVALID_PARAM, "processBuffer is null");
736 
737     CHECK_AND_RETURN_RET_LOG(processList_.size() < MAX_LINKED_PROCESS, ERR_OPERATION_FAILED, "reach link limit.");
738 
739     AUDIO_INFO_LOG("LinkProcessStream endpoint status:%{public}s.", GetStatusStr(endpointStatus_).c_str());
740 
741     bool needEndpointRunning = processBuffer->GetStreamStatus()->load() == STREAM_RUNNING;
742 
743     if (endpointStatus_ == STARTING) {
744         AUDIO_INFO_LOG("LinkProcessStream wait start begin.");
745         std::unique_lock<std::mutex> lock(loopThreadLock_);
746         workThreadCV_.wait(lock, [this] {
747             return endpointStatus_ != STARTING;
748         });
749         AUDIO_DEBUG_LOG("LinkProcessStream wait start end.");
750     }
751 
752     if (endpointStatus_ == RUNNING) {
753         std::lock_guard<std::mutex> lock(listLock_);
754         processList_.push_back(processStream);
755         processBufferList_.push_back(processBuffer);
756         AUDIO_DEBUG_LOG("LinkProcessStream success.");
757         return SUCCESS;
758     }
759 
760     if (endpointStatus_ == UNLINKED) {
761         endpointStatus_ = IDEL; // handle push_back in IDEL
762         if (isDeviceRunningInIdel_) {
763             StartDevice();
764         }
765     }
766 
767     if (endpointStatus_ == IDEL) {
768         {
769             std::lock_guard<std::mutex> lock(listLock_);
770             processList_.push_back(processStream);
771             processBufferList_.push_back(processBuffer);
772         }
773         if (!needEndpointRunning) {
774             AUDIO_DEBUG_LOG("LinkProcessStream success, process stream status is not running.");
775             return SUCCESS;
776         }
777         // needEndpointRunning = true
778         if (isDeviceRunningInIdel_) {
779             endpointStatus_ = IsAnyProcessRunning() ? RUNNING : IDEL;
780         } else {
781             // needEndpointRunning = true & isDeviceRunningInIdel_ = false
782             // KeepWorkloopRunning will wait on IDEL
783             StartDevice();
784         }
785         AUDIO_DEBUG_LOG("LinkProcessStream success.");
786         return SUCCESS;
787     }
788 
789     return SUCCESS;
790 }
791 
UnlinkProcessStream(IAudioProcessStream * processStream)792 int32_t AudioEndpointInner::UnlinkProcessStream(IAudioProcessStream *processStream)
793 {
794     AUDIO_INFO_LOG("UnlinkProcessStream in status:%{public}s.", GetStatusStr(endpointStatus_).c_str());
795     CHECK_AND_RETURN_RET_LOG(processStream != nullptr, ERR_INVALID_PARAM, "IAudioProcessStream is null");
796     std::shared_ptr<OHAudioBuffer> processBuffer = processStream->GetStreamBuffer();
797     CHECK_AND_RETURN_RET_LOG(processBuffer != nullptr, ERR_INVALID_PARAM, "processBuffer is null");
798 
799     bool isFind = false;
800     std::lock_guard<std::mutex> lock(listLock_);
801     auto processItr = processList_.begin();
802     auto bufferItr = processBufferList_.begin();
803     while (processItr != processList_.end()) {
804         if (*processItr == processStream && *bufferItr == processBuffer) {
805             processList_.erase(processItr);
806             processBufferList_.erase(bufferItr);
807             isFind = true;
808             break;
809         } else {
810             processItr++;
811             bufferItr++;
812         }
813     }
814     if (processList_.size() == 0) {
815         StopDevice();
816         endpointStatus_ = UNLINKED;
817     }
818 
819     AUDIO_DEBUG_LOG("UnlinkProcessStream end, %{public}s the process.", (isFind ? "find and remove" : "not find"));
820     return SUCCESS;
821 }
822 
CheckAllBufferReady(int64_t checkTime,uint64_t curWritePos)823 bool AudioEndpointInner::CheckAllBufferReady(int64_t checkTime, uint64_t curWritePos)
824 {
825     bool isAllReady = true;
826     {
827         // lock list without sleep
828         std::lock_guard<std::mutex> lock(listLock_);
829         for (size_t i = 0; i < processBufferList_.size(); i++) {
830             std::shared_ptr<OHAudioBuffer> tempBuffer = processBufferList_[i];
831             uint64_t eachCurReadPos = processBufferList_[i]->GetCurReadFrame();
832             lastHandleProcessTime_ = checkTime;
833             processBufferList_[i]->SetHandleInfo(eachCurReadPos, lastHandleProcessTime_); // update handle info
834             if (tempBuffer->GetStreamStatus()->load() != StreamStatus::STREAM_RUNNING) {
835                 // Process is not running, server will continue to check the same location in the next cycle.
836                 int64_t duration = 5000000; // 5ms
837                 processBufferList_[i]->SetHandleInfo(eachCurReadPos, lastHandleProcessTime_ + duration);
838                 continue; // process not running
839             }
840             uint64_t curRead = tempBuffer->GetCurReadFrame();
841             SpanInfo *curReadSpan = tempBuffer->GetSpanInfo(curRead);
842             if (curReadSpan == nullptr || curReadSpan->spanStatus != SpanStatus::SPAN_WRITE_DONE) {
843                 AUDIO_WARNING_LOG("Find one process not ready"); // print uid of the process?
844                 isAllReady = false;
845                 break;
846             }
847         }
848     }
849 
850     if (!isAllReady) {
851         Trace trace("AudioEndpoint::WaitAllProcessReady");
852         int64_t tempWakeupTime = readTimeModel_.GetTimeOfPos(curWritePos) + WRITE_TO_HDI_AHEAD_TIME;
853         if (tempWakeupTime - ClockTime::GetCurNano() < ONE_MILLISECOND_DURATION) {
854             ClockTime::RelativeSleep(ONE_MILLISECOND_DURATION);
855         } else {
856             ClockTime::AbsoluteSleep(tempWakeupTime); // sleep to hdi read time ahead 1ms.
857         }
858     }
859     return isAllReady;
860 }
861 
ProcessData(const std::vector<AudioStreamData> & srcDataList,const AudioStreamData & dstData)862 void AudioEndpointInner::ProcessData(const std::vector<AudioStreamData> &srcDataList, const AudioStreamData &dstData)
863 {
864     size_t srcListSize = srcDataList.size();
865 
866     for (size_t i = 0; i < srcListSize; i++) {
867         if (srcDataList[i].streamInfo.format != SAMPLE_S16LE || srcDataList[i].streamInfo.channels != STEREO ||
868             srcDataList[i].bufferDesc.bufLength != dstData.bufferDesc.bufLength ||
869             srcDataList[i].bufferDesc.dataLength != dstData.bufferDesc.dataLength) {
870             AUDIO_ERR_LOG("ProcessData failed, streamInfo are different");
871             return;
872         }
873     }
874 
875     // Assum using the same format and same size
876     if (dstData.streamInfo.format != SAMPLE_S16LE || dstData.streamInfo.channels != STEREO) {
877         AUDIO_ERR_LOG("ProcessData failed, streamInfo are not support");
878         return;
879     }
880 
881     size_t dataLength = dstData.bufferDesc.dataLength;
882     dataLength /= 2; // SAMPLE_S16LE--> 2 byte
883     int16_t *dstPtr = reinterpret_cast<int16_t *>(dstData.bufferDesc.buffer);
884     for (size_t offset = 0; dataLength > 0; dataLength--) {
885         int32_t sum = 0;
886         for (size_t i = 0; i < srcListSize; i++) {
887             int32_t vol = srcDataList[i].volumeStart; // change to modify volume of each channel
888             int16_t *srcPtr = reinterpret_cast<int16_t *>(srcDataList[i].bufferDesc.buffer) + offset;
889             sum += (*srcPtr * static_cast<int64_t>(vol)) >> VOLUME_SHIFT_NUMBER; // 1/65536
890         }
891         offset++;
892         *dstPtr++ = sum > INT16_MAX ? INT16_MAX : (sum < INT16_MIN ? INT16_MIN : sum);
893     }
894 }
895 
896 // call with listLock_ hold
GetAllReadyProcessData(std::vector<AudioStreamData> & audioDataList)897 void AudioEndpointInner::GetAllReadyProcessData(std::vector<AudioStreamData> &audioDataList)
898 {
899     for (size_t i = 0; i < processBufferList_.size(); i++) {
900         uint64_t curRead = processBufferList_[i]->GetCurReadFrame();
901         Trace trace("AudioEndpoint::ReadProcessData->" + std::to_string(curRead));
902         SpanInfo *curReadSpan = processBufferList_[i]->GetSpanInfo(curRead);
903         if (curReadSpan == nullptr) {
904             AUDIO_ERR_LOG("GetSpanInfo failed, can not get client curReadSpan");
905             continue;
906         }
907         AudioStreamData streamData;
908         Volume vol = {true, 1.0f, 0};
909         AudioStreamType streamType = processList_[i]->GetAudioStreamType();
910         DeviceType deviceType = PolicyHandler::GetInstance().GetActiveOutPutDevice();
911         if (deviceInfo_.networkId == LOCAL_NETWORK_ID &&
912             PolicyHandler::GetInstance().GetSharedVolume(streamType, deviceType, vol)) {
913             streamData.volumeStart = vol.isMute ? 0 : static_cast<int32_t>(curReadSpan->volumeStart * vol.volumeFloat);
914         } else {
915             streamData.volumeStart = curReadSpan->volumeStart;
916         }
917         streamData.volumeEnd = curReadSpan->volumeEnd;
918         streamData.streamInfo = processList_[i]->GetStreamInfo();
919         SpanStatus targetStatus = SpanStatus::SPAN_WRITE_DONE;
920         if (curReadSpan->spanStatus.compare_exchange_strong(targetStatus, SpanStatus::SPAN_READING)) {
921             processBufferList_[i]->GetReadbuffer(curRead, streamData.bufferDesc); // check return?
922             audioDataList.push_back(streamData);
923             curReadSpan->readStartTime = ClockTime::GetCurNano();
924 #ifdef DUMP_PROCESS_FILE
925             if (dcp_ != nullptr) {
926                 fwrite(static_cast<void *>(streamData.bufferDesc.buffer), 1, streamData.bufferDesc.bufLength, dcp_);
927             }
928 #endif
929         }
930     }
931 }
932 
ProcessToEndpointDataHandle(uint64_t curWritePos)933 bool AudioEndpointInner::ProcessToEndpointDataHandle(uint64_t curWritePos)
934 {
935     std::lock_guard<std::mutex> lock(listLock_);
936 
937     std::vector<AudioStreamData> audioDataList;
938     GetAllReadyProcessData(audioDataList);
939 
940     AudioStreamData dstStreamData;
941     dstStreamData.streamInfo = dstStreamInfo_;
942     int32_t ret = dstAudioBuffer_->GetWriteBuffer(curWritePos, dstStreamData.bufferDesc);
943     CHECK_AND_RETURN_RET_LOG(((ret == SUCCESS && dstStreamData.bufferDesc.buffer != nullptr)), false,
944         "GetWriteBuffer failed, ret:%{public}d", ret);
945 
946     SpanInfo *curWriteSpan = dstAudioBuffer_->GetSpanInfo(curWritePos);
947     CHECK_AND_RETURN_RET_LOG(curWriteSpan != nullptr, false, "GetSpanInfo failed, can not get curWriteSpan");
948 
949     dstStreamData.volumeStart = curWriteSpan->volumeStart;
950     dstStreamData.volumeEnd = curWriteSpan->volumeEnd;
951 
952     Trace trace("AudioEndpoint::WriteDstBuffer=>" + std::to_string(curWritePos));
953     // do write work
954     if (audioDataList.size() == 0) {
955         memset_s(dstStreamData.bufferDesc.buffer, dstStreamData.bufferDesc.bufLength, 0,
956             dstStreamData.bufferDesc.bufLength);
957     } else {
958         ProcessData(audioDataList, dstStreamData);
959     }
960 
961 #ifdef DUMP_PROCESS_FILE
962     if (dump_hdi_ != nullptr) {
963         fwrite(static_cast<void *>(dstStreamData.bufferDesc.buffer), 1, dstStreamData.bufferDesc.bufLength, dump_hdi_);
964     }
965 #endif
966     return true;
967 }
968 
GetPredictNextReadTime(uint64_t posInFrame)969 int64_t AudioEndpointInner::GetPredictNextReadTime(uint64_t posInFrame)
970 {
971     Trace trace("AudioEndpoint::GetPredictNextRead");
972     uint64_t handleSpanCnt = posInFrame / dstSpanSizeInframe_;
973     uint32_t startPeriodCnt = 20; // sync each time when start
974     uint32_t oneBigPeriodCnt = 40; // 200ms
975     if (handleSpanCnt < startPeriodCnt || handleSpanCnt % oneBigPeriodCnt == 0) {
976         updateThreadCV_.notify_all();
977     }
978     uint64_t readFrame = 0;
979     int64_t readtime = 0;
980     if (readTimeModel_.GetFrameStamp(readFrame, readtime)) {
981         if (readFrame != posInFrame_) {
982             readTimeModel_.UpdataFrameStamp(posInFrame_, timeInNano_);
983         }
984     }
985 
986     int64_t nextHdiReadTime = readTimeModel_.GetTimeOfPos(posInFrame);
987     return nextHdiReadTime;
988 }
989 
GetPredictNextWriteTime(uint64_t posInFrame)990 int64_t AudioEndpointInner::GetPredictNextWriteTime(uint64_t posInFrame)
991 {
992     uint64_t handleSpanCnt = posInFrame / dstSpanSizeInframe_;
993     uint32_t startPeriodCnt = 20;
994     uint32_t oneBigPeriodCnt = 40;
995     if (handleSpanCnt < startPeriodCnt || handleSpanCnt % oneBigPeriodCnt == 0) {
996         // todo sleep random little time but less than nextHdiReadTime - 2ms
997         uint64_t writeFrame = 0;
998         int64_t writeTime = 0;
999         if (GetDeviceHandleInfo(writeFrame, writeTime)) {
1000             writeTimeModel_.UpdataFrameStamp(writeFrame, writeTime);
1001         }
1002     }
1003     int64_t nextHdiWriteTime = writeTimeModel_.GetTimeOfPos(posInFrame);
1004     return nextHdiWriteTime;
1005 }
1006 
RecordPrepareNextLoop(uint64_t curReadPos,int64_t & wakeUpTime)1007 bool AudioEndpointInner::RecordPrepareNextLoop(uint64_t curReadPos, int64_t &wakeUpTime)
1008 {
1009     uint64_t nextHandlePos = curReadPos + dstSpanSizeInframe_;
1010     int64_t nextHdiWriteTime = GetPredictNextWriteTime(nextHandlePos);
1011     int64_t tempDelay = 12000000; // 12ms
1012     wakeUpTime = nextHdiWriteTime + tempDelay;
1013 
1014     int32_t ret = dstAudioBuffer_->SetCurWriteFrame(nextHandlePos);
1015     CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, false, "%{public}s set dst buffer write frame fail, ret %{public}d.",
1016         __func__, ret);
1017     ret = dstAudioBuffer_->SetCurReadFrame(nextHandlePos);
1018     CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, false, "%{public}s set dst buffer read frame fail, ret %{public}d.",
1019         __func__, ret);
1020 
1021     return true;
1022 }
1023 
PrepareNextLoop(uint64_t curWritePos,int64_t & wakeUpTime)1024 bool AudioEndpointInner::PrepareNextLoop(uint64_t curWritePos, int64_t &wakeUpTime)
1025 {
1026     uint64_t nextHandlePos = curWritePos + dstSpanSizeInframe_;
1027     Trace prepareTrace("AudioEndpoint::PrepareNextLoop " + std::to_string(nextHandlePos));
1028     int64_t nextHdiReadTime = GetPredictNextReadTime(nextHandlePos);
1029     wakeUpTime = nextHdiReadTime - serverAheadReadTime_;
1030 
1031     SpanInfo *nextWriteSpan = dstAudioBuffer_->GetSpanInfo(nextHandlePos);
1032     if (nextWriteSpan == nullptr) {
1033         AUDIO_ERR_LOG("GetSpanInfo failed, can not get next write span");
1034         return false;
1035     }
1036 
1037     int32_t ret1 = dstAudioBuffer_->SetCurWriteFrame(nextHandlePos);
1038     int32_t ret2 = dstAudioBuffer_->SetCurReadFrame(nextHandlePos);
1039     if (ret1 != SUCCESS || ret2 != SUCCESS) {
1040         AUDIO_ERR_LOG("SetCurWriteFrame or SetCurReadFrame failed, ret1:%{public}d ret2:%{public}d", ret1, ret2);
1041         return false;
1042     }
1043     // handl each process buffer info
1044     int64_t curReadDoneTime = ClockTime::GetCurNano();
1045     for (size_t i = 0; i < processBufferList_.size(); i++) {
1046         uint64_t eachCurReadPos = processBufferList_[i]->GetCurReadFrame();
1047         SpanInfo *tempSpan = processBufferList_[i]->GetSpanInfo(eachCurReadPos);
1048         if (tempSpan == nullptr) {
1049             AUDIO_ERR_LOG("GetSpanInfo failed, can not get process read span");
1050             return false;
1051         }
1052         SpanStatus targetStatus = SpanStatus::SPAN_READING;
1053         if (tempSpan->spanStatus.compare_exchange_strong(targetStatus, SpanStatus::SPAN_READ_DONE)) {
1054             tempSpan->readDoneTime = curReadDoneTime;
1055             BufferDesc bufferReadDone = { nullptr, 0, 0};
1056             processBufferList_[i]->GetReadbuffer(eachCurReadPos, bufferReadDone);
1057             if (bufferReadDone.buffer != nullptr && bufferReadDone.bufLength != 0) {
1058                 memset_s(bufferReadDone.buffer, bufferReadDone.bufLength, 0, bufferReadDone.bufLength);
1059             }
1060             processBufferList_[i]->SetCurReadFrame(eachCurReadPos + dstSpanSizeInframe_); // use client span size
1061         } else if (processBufferList_[i]->GetStreamStatus()->load() == StreamStatus::STREAM_RUNNING) {
1062             AUDIO_WARNING_LOG("Current %{public}" PRIu64" span not ready:%{public}d", eachCurReadPos, targetStatus);
1063         }
1064     }
1065     return true;
1066 }
1067 
GetDeviceHandleInfo(uint64_t & frames,int64_t & nanoTime)1068 bool AudioEndpointInner::GetDeviceHandleInfo(uint64_t &frames, int64_t &nanoTime)
1069 {
1070     Trace trace("AudioEndpoint::GetMmapHandlePosition");
1071     int64_t timeSec = 0;
1072     int64_t timeNanoSec = 0;
1073     int32_t ret = 0;
1074     if (deviceInfo_.deviceRole == INPUT_DEVICE) {
1075         if (fastSource_ == nullptr || !fastSource_->IsInited()) {
1076             AUDIO_ERR_LOG("Source start failed.");
1077             return false;
1078         }
1079         // GetMmapHandlePosition will call using ipc.
1080         ret = fastSource_->GetMmapHandlePosition(frames, timeSec, timeNanoSec);
1081     } else {
1082         if (fastSink_ == nullptr || !fastSink_->IsInited()) {
1083             AUDIO_ERR_LOG("GetDeviceHandleInfo failed: sink is not inited.");
1084             return false;
1085         }
1086         // GetMmapHandlePosition will call using ipc.
1087         ret = fastSink_->GetMmapHandlePosition(frames, timeSec, timeNanoSec);
1088     }
1089     if (ret != SUCCESS) {
1090         AUDIO_ERR_LOG("Call adapter GetMmapHandlePosition failed: %{public}d", ret);
1091         return false;
1092     }
1093     trace.End();
1094     nanoTime = timeNanoSec + timeSec * AUDIO_NS_PER_SECOND;
1095     Trace infoTrace("AudioEndpoint::GetDeviceHandleInfo frames=>" + std::to_string(frames) + " " +
1096         std::to_string(nanoTime) + " at " + std::to_string(ClockTime::GetCurNano()));
1097     nanoTime += DELTA_TO_REAL_READ_START_TIME; // global delay in server
1098     return true;
1099 }
1100 
AsyncGetPosTime()1101 void AudioEndpointInner::AsyncGetPosTime()
1102 {
1103     AUDIO_INFO_LOG("AsyncGetPosTime thread start.");
1104     while (!stopUpdateThread_) {
1105         std::unique_lock<std::mutex> lock(updateThreadLock_);
1106         updateThreadCV_.wait_for(lock, std::chrono::milliseconds(UPDATE_THREAD_TIMEOUT));
1107         if (stopUpdateThread_) {
1108             break;
1109         }
1110         // get signaled, call get pos-time
1111         uint64_t curHdiHandlePos = posInFrame_;
1112         int64_t handleTime = timeInNano_;
1113         if (!GetDeviceHandleInfo(curHdiHandlePos, handleTime)) {
1114             AUDIO_WARNING_LOG("AsyncGetPosTime call GetDeviceHandleInfo failed.");
1115             continue;
1116         }
1117         // keep it
1118         if (posInFrame_ != curHdiHandlePos) {
1119             posInFrame_ = curHdiHandlePos;
1120             timeInNano_ = handleTime;
1121         }
1122     }
1123 }
1124 
GetStatusStr(EndpointStatus status)1125 std::string AudioEndpointInner::GetStatusStr(EndpointStatus status)
1126 {
1127     switch (status) {
1128         case INVALID:
1129             return "INVALID";
1130         case UNLINKED:
1131             return "UNLINKED";
1132         case IDEL:
1133             return "IDEL";
1134         case STARTING:
1135             return "STARTING";
1136         case RUNNING:
1137             return "RUNNING";
1138         case STOPPING:
1139             return "STOPPING";
1140         case STOPPED:
1141             return "STOPPED";
1142         default:
1143             break;
1144     }
1145     return "NO_SUCH_STATUS";
1146 }
1147 
KeepWorkloopRunning()1148 bool AudioEndpointInner::KeepWorkloopRunning()
1149 {
1150     EndpointStatus targetStatus = INVALID;
1151     switch (endpointStatus_.load()) {
1152         case RUNNING:
1153             return true;
1154         case IDEL:
1155             if (isDeviceRunningInIdel_) {
1156                 return true;
1157             } else {
1158                 targetStatus = STARTING;
1159             }
1160             break;
1161         case UNLINKED:
1162             targetStatus = IDEL;
1163             break;
1164         case STARTING:
1165             targetStatus = RUNNING;
1166             break;
1167         case STOPPING:
1168             targetStatus = STOPPED;
1169             break;
1170         default:
1171             break;
1172     }
1173 
1174     // when return false, EndpointWorkLoopFuc will continue loop immediately. Wait to avoid a inifity loop.
1175     std::unique_lock<std::mutex> lock(loopThreadLock_);
1176     AUDIO_INFO_LOG("Status is %{public}s now, wait for %{public}s...", GetStatusStr(endpointStatus_).c_str(),
1177         GetStatusStr(targetStatus).c_str());
1178     threadStatus_ = WAITTING;
1179     workThreadCV_.wait_for(lock, std::chrono::milliseconds(SLEEP_TIME_IN_DEFAULT));
1180     AUDIO_DEBUG_LOG("Wait end. Cur is %{public}s now, target is %{public}s...", GetStatusStr(endpointStatus_).c_str(),
1181         GetStatusStr(targetStatus).c_str());
1182 
1183     return false;
1184 }
1185 
WriteToSpecialProcBuf(const std::shared_ptr<OHAudioBuffer> & procBuf,const BufferDesc & readBuf)1186 int32_t AudioEndpointInner::WriteToSpecialProcBuf(const std::shared_ptr<OHAudioBuffer> &procBuf,
1187     const BufferDesc &readBuf)
1188 {
1189     CHECK_AND_RETURN_RET_LOG(procBuf != nullptr, ERR_INVALID_HANDLE, "%{public}s process buffer is null.", __func__);
1190     uint64_t curWritePos = procBuf->GetCurWriteFrame();
1191     Trace trace("AudioEndpoint::WriteProcessData-<" + std::to_string(curWritePos));
1192     SpanInfo *curWriteSpan = procBuf->GetSpanInfo(curWritePos);
1193     CHECK_AND_RETURN_RET_LOG(curWriteSpan != nullptr, ERR_INVALID_HANDLE,
1194         "%{public}s get write span info of procBuf fail.", __func__);
1195 
1196     AUDIO_DEBUG_LOG("%{public}s process buffer write start, curWritePos %{public}" PRIu64".", __func__, curWritePos);
1197     curWriteSpan->spanStatus.store(SpanStatus::SPAN_WRITTING);
1198     curWriteSpan->writeStartTime = ClockTime::GetCurNano();
1199 
1200     BufferDesc writeBuf;
1201     int32_t ret = procBuf->GetWriteBuffer(curWritePos, writeBuf);
1202     CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret, "%{public}s get write buffer fail, ret %{public}d.", __func__, ret);
1203     ret = memcpy_s(static_cast<void *>(writeBuf.buffer), writeBuf.bufLength,
1204         static_cast<void *>(readBuf.buffer), readBuf.bufLength);
1205     CHECK_AND_RETURN_RET_LOG(ret == EOK, ERR_WRITE_FAILED, "%{public}s memcpy data to process buffer fail, "
1206         "curWritePos %{public}" PRIu64", ret %{public}d.", __func__, curWritePos, ret);
1207 
1208     curWriteSpan->writeDoneTime = ClockTime::GetCurNano();
1209     procBuf->SetHandleInfo(curWritePos, curWriteSpan->writeDoneTime);
1210     ret = procBuf->SetCurWriteFrame(curWritePos + dstSpanSizeInframe_);
1211     CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret, "%{public}s set procBuf next write frame fail, ret %{public}d.",
1212         __func__, ret);
1213     curWriteSpan->spanStatus.store(SpanStatus::SPAN_WRITE_DONE);
1214     return SUCCESS;
1215 }
1216 
WriteToProcessBuffers(const BufferDesc & readBuf)1217 void AudioEndpointInner::WriteToProcessBuffers(const BufferDesc &readBuf)
1218 {
1219     std::lock_guard<std::mutex> lock(listLock_);
1220     for (size_t i = 0; i < processBufferList_.size(); i++) {
1221         if (processBufferList_[i] == nullptr) {
1222             AUDIO_ERR_LOG("%{public}s process buffer %{public}zu is null.", __func__, i);
1223             continue;
1224         }
1225         if (processBufferList_[i]->GetStreamStatus()->load() != STREAM_RUNNING) {
1226             AUDIO_WARNING_LOG("%{public}s process buffer %{public}zu not running, stream status %{public}d.",
1227                 __func__, i, processBufferList_[i]->GetStreamStatus()->load());
1228             continue;
1229         }
1230 
1231         int32_t ret = WriteToSpecialProcBuf(processBufferList_[i], readBuf);
1232         if (ret != SUCCESS) {
1233             AUDIO_ERR_LOG("%{public}s endpoint write to process buffer %{public}zu fail, ret %{public}d.",
1234                 __func__, i, ret);
1235             continue;
1236         }
1237         AUDIO_DEBUG_LOG("%{public}s endpoint process buffer %{public}zu write success.", __func__, i);
1238     }
1239 }
1240 
ReadFromEndpoint(uint64_t curReadPos)1241 int32_t AudioEndpointInner::ReadFromEndpoint(uint64_t curReadPos)
1242 {
1243     Trace trace("AudioEndpoint::ReadDstBuffer=<" + std::to_string(curReadPos));
1244     AUDIO_DEBUG_LOG("%{public}s enter, dstAudioBuffer curReadPos %{public}" PRIu64".", __func__, curReadPos);
1245     CHECK_AND_RETURN_RET_LOG(dstAudioBuffer_ != nullptr, ERR_INVALID_HANDLE,
1246         "%{public}s dst audio buffer is null.", __func__);
1247     SpanInfo *curReadSpan = dstAudioBuffer_->GetSpanInfo(curReadPos);
1248     CHECK_AND_RETURN_RET_LOG(curReadSpan != nullptr, ERR_INVALID_HANDLE,
1249         "%{public}s get source read span info of source adapter fail.", __func__);
1250     curReadSpan->readStartTime = ClockTime::GetCurNano();
1251     curReadSpan->spanStatus.store(SpanStatus::SPAN_READING);
1252     BufferDesc readBuf;
1253     int32_t ret = dstAudioBuffer_->GetReadbuffer(curReadPos, readBuf);
1254     CHECK_AND_RETURN_RET_LOG(ret == SUCCESS, ret, "%{public}s get read buffer fail, ret %{public}d.", __func__, ret);
1255 #ifdef DUMP_PROCESS_FILE
1256     if (dump_hdi_ != nullptr) {
1257         fwrite(static_cast<void *>(readBuf.buffer), 1, readBuf.bufLength, dump_hdi_);
1258     }
1259 #endif
1260 
1261     WriteToProcessBuffers(readBuf);
1262     curReadSpan->readDoneTime = ClockTime::GetCurNano();
1263     curReadSpan->spanStatus.store(SpanStatus::SPAN_READ_DONE);
1264     return SUCCESS;
1265 }
1266 
RecordEndpointWorkLoopFuc()1267 void AudioEndpointInner::RecordEndpointWorkLoopFuc()
1268 {
1269     ScheduleReportData(getpid(), gettid(), "pulseaudio");
1270     int64_t curTime = 0;
1271     uint64_t curReadPos = 0;
1272     int64_t wakeUpTime = ClockTime::GetCurNano();
1273     AUDIO_INFO_LOG("Record endpoint work loop fuc start.");
1274     while (isInited_.load()) {
1275         if (!KeepWorkloopRunning()) {
1276             continue;
1277         }
1278         threadStatus_ = INRUNNING;
1279         if (needReSyncPosition_) {
1280             RecordReSyncPosition();
1281             wakeUpTime = ClockTime::GetCurNano();
1282             needReSyncPosition_ = false;
1283             continue;
1284         }
1285         curTime = ClockTime::GetCurNano();
1286         Trace loopTrace("Record_loop_trace");
1287         if (curTime - wakeUpTime > ONE_MILLISECOND_DURATION) {
1288             AUDIO_WARNING_LOG("%{public}s Wake up too late!", __func__);
1289         }
1290 
1291         curReadPos = dstAudioBuffer_->GetCurReadFrame();
1292         if (ReadFromEndpoint(curReadPos) != SUCCESS) {
1293             AUDIO_ERR_LOG("%{public}s read from endpoint to process service fail.", __func__);
1294             break;
1295         }
1296 
1297         if (!RecordPrepareNextLoop(curReadPos, wakeUpTime)) {
1298             AUDIO_ERR_LOG("PrepareNextLoop failed!");
1299             break;
1300         }
1301 
1302         loopTrace.End();
1303         threadStatus_ = SLEEPING;
1304         ClockTime::AbsoluteSleep(wakeUpTime);
1305     }
1306 }
1307 
EndpointWorkLoopFuc()1308 void AudioEndpointInner::EndpointWorkLoopFuc()
1309 {
1310     ScheduleReportData(getpid(), gettid(), "pulseaudio");
1311     int64_t curTime = 0;
1312     uint64_t curWritePos = 0;
1313     int64_t wakeUpTime = ClockTime::GetCurNano();
1314     AUDIO_INFO_LOG("Endpoint work loop fuc start");
1315     int32_t ret = 0;
1316     while (isInited_.load()) {
1317         if (!KeepWorkloopRunning()) {
1318             continue;
1319         }
1320         ret = 0;
1321         threadStatus_ = INRUNNING;
1322         curTime = ClockTime::GetCurNano();
1323         Trace loopTrace("AudioEndpoint::loop_trace");
1324         if (needReSyncPosition_) {
1325             ReSyncPosition();
1326             wakeUpTime = curTime;
1327             needReSyncPosition_ = false;
1328             continue;
1329         }
1330         if (curTime - wakeUpTime > ONE_MILLISECOND_DURATION) {
1331             AUDIO_WARNING_LOG("Wake up too late!");
1332         }
1333 
1334         // First, wake up at client may-write-done time, and check if all process write done.
1335         // If not, do another sleep to the possible latest write time.
1336         curWritePos = dstAudioBuffer_->GetCurWriteFrame();
1337         if (!CheckAllBufferReady(wakeUpTime, curWritePos)) {
1338             curTime = ClockTime::GetCurNano();
1339         }
1340 
1341         // then do mix & write to hdi buffer and prepare next loop
1342         if (!ProcessToEndpointDataHandle(curWritePos)) {
1343             AUDIO_ERR_LOG("ProcessToEndpointDataHandle failed!");
1344             break;
1345         }
1346 
1347         // prepare info of next loop
1348         if (!PrepareNextLoop(curWritePos, wakeUpTime)) {
1349             AUDIO_ERR_LOG("PrepareNextLoop failed!");
1350             break;
1351         }
1352 
1353         loopTrace.End();
1354         // start sleep
1355         threadStatus_ = SLEEPING;
1356         ClockTime::AbsoluteSleep(wakeUpTime);
1357     }
1358     AUDIO_DEBUG_LOG("Endpoint work loop fuc end, ret %{public}d", ret);
1359 }
1360 } // namespace AudioStandard
1361 } // namespace OHOS
1362