• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #define HST_LOG_TAG "FileFdSourcePlugin"
17 
18 #include <cerrno>
19 #include <cstring>
20 #include <regex>
21 #include <memory>
22 #ifdef WIN32
23 #include <fcntl.h>
24 #else
25 #include <sys/types.h>
26 #include <unistd.h>
27 #endif
28 #include <sys/ioctl.h>
29 #include <sys/stat.h>
30 #include "common/log.h"
31 #include "osal/filesystem/file_system.h"
32 #include "file_fd_source_plugin.h"
33 #include "common/media_core.h"
34 
35 namespace {
36 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "FileFdSourcePlugin" };
StrToLong(const std::string_view & str,int64_t & value)37 bool StrToLong(const std::string_view& str, int64_t& value)
38 {
39     FALSE_RETURN_V_MSG_E(!str.empty() && (isdigit(str.front()) || (str.front() == '-')),
40         false, "no valid string.");
41     std::string valStr(str);
42     char* end = nullptr;
43     errno = 0;
44     long long result = strtoll(valStr.c_str(), &end, 10); /* 10 means decimal */
45     FALSE_RETURN_V_MSG_E(result >= LLONG_MIN && result <= LLONG_MAX, false,
46         "call StrToLong func false,  input str is: %{public}s!", valStr.c_str());
47     FALSE_RETURN_V_MSG_E(end != valStr.c_str() && end[0] == '\0' && errno != ERANGE, false,
48         "call StrToLong func false,  input str is: %{public}s!", valStr.c_str());
49     value = result;
50     return true;
51 }
52 }
53 
54 namespace OHOS {
55 namespace Media {
56 namespace Plugins {
57 namespace FileFdSource {
58 namespace {
59 constexpr int32_t FDPOS                         = 2;
60 constexpr int32_t READ_TIME                     = 3;
61 constexpr size_t CACHE_SIZE                     = 40 * 1024 * 1024;
62 constexpr size_t PER_CACHE_SIZE                 = 48 * 10 * 1024;;
63 constexpr int32_t TEN_MILLISECOUNDS             = 10;
64 constexpr int32_t ONE_SECONDS                   = 1 * 1000;
65 constexpr int32_t CACHE_TIME_DEFAULT            = 5;
66 constexpr int32_t SEEK_TIME_LOWER               = 20;
67 constexpr int32_t SEEK_TIME_UPPER               = 1000;
68 constexpr int32_t RECORD_TIME_INTERVAL          = 1 * 1000;
69 constexpr int32_t MILLISECOUND_TO_SECOND        = 1 * 1000;
70 constexpr int32_t RETRY_TIMES                   = 3;
71 constexpr int32_t TO_BYTE                       = 8;
72 constexpr int32_t PERCENT_100                   = 100;
73 constexpr int32_t MAX_RANK                      = 100;
74 constexpr int32_t READ_RETRY                    = 2;
75 constexpr int32_t READ_ERROR_IO                 = EIO;
76 constexpr int32_t READ_ERROR_NOMEM              = ENOMEM;
77 constexpr float CACHE_LEVEL_1                   = 0.3;
78 
79 constexpr unsigned int HMDFS_IOC = 0xf2;
80 #define IOCTL_CLOUD 2
81 #define HMDFS_IOC_HAS_CACHE _IOW(HMDFS_IOC, 6, struct HmdfsHasCache)
82 #define HMDFS_IOC_GET_LOCATION _IOR(HMDFS_IOC, 7, __u32)
83 #define HMDFS_IOC_CANCEL_READ _IO(HMDFS_IOC, 8)
84 #define HMDFS_IOC_RESTORE_READ _IO(HMDFS_IOC, 9)
85 
GetFileSize(int32_t fd)86 uint64_t GetFileSize(int32_t fd)
87 {
88     uint64_t fileSize = 0;
89     struct stat s {};
90     int ret = fstat(fd, &s);
91     if (ret == 0) {
92         fileSize = static_cast<uint64_t>(s.st_size);
93         FALSE_RETURN_V_MSG_E(fileSize != 0, fileSize, "fileSize 0, fstat ret 0");
94         return fileSize;
95     } else {
96         MEDIA_LOG_W("GetFileSize error ret " PUBLIC_LOG_D32 ", errno " PUBLIC_LOG_D32, ret, errno);
97     }
98     return fileSize;
99 }
isNumber(const std::string & str)100 bool isNumber(const std::string& str)
101 {
102     return str.find_first_not_of("0123456789") == std::string::npos;
103 }
104 }
FileFdSourceRegister(const std::shared_ptr<Register> & reg)105 Status FileFdSourceRegister(const std::shared_ptr<Register>& reg)
106 {
107     MEDIA_LOG_I("fileSourceRegister is started");
108     SourcePluginDef definition;
109     definition.name = "FileFdSource";
110     definition.description = "File Fd source";
111     definition.rank = MAX_RANK; // 100: max rank
112     Capability capability;
113     capability.AppendFixedKey<std::vector<ProtocolType>>(Tag::MEDIA_PROTOCOL_TYPE, {ProtocolType::FD});
114     definition.AddInCaps(capability);
115     auto func = [](const std::string& name) -> std::shared_ptr<SourcePlugin> {
116         return std::make_shared<FileFdSourcePlugin>(name);
117     };
118     definition.SetCreator(func);
119     return reg->AddPlugin(definition);
120 }
121 
__anon65a7374e0402null122 PLUGIN_DEFINITION(FileFdSource, LicenseType::APACHE_V2, FileFdSourceRegister, [] {});
123 
FileFdSourcePlugin(std::string name)124 FileFdSourcePlugin::FileFdSourcePlugin(std::string name)
125     : SourcePlugin(std::move(name))
126 {
127 }
128 
~FileFdSourcePlugin()129 FileFdSourcePlugin::~FileFdSourcePlugin()
130 {
131     MEDIA_LOG_I("~FileFdSourcePlugin in.");
132     steadyClock_.Reset();
133     SetInterruptState(true);
134     MEDIA_LOG_I("~FileFdSourcePlugin isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
135     FALSE_RETURN_MSG(downloadTask_ != nullptr, "~FileFdSourcePlugin out.");
136     downloadTask_->Stop();
137     MEDIA_LOG_I("~FileFdSourcePlugin out.");
138 }
139 
SetCallback(Callback * cb)140 Status FileFdSourcePlugin::SetCallback(Callback* cb)
141 {
142     MEDIA_LOG_D("SetCallback in " PUBLIC_LOG_D32, cb != nullptr);
143     callback_ = cb;
144     return Status::OK;
145 }
146 
SetSource(std::shared_ptr<MediaSource> source)147 Status FileFdSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
148 {
149     MEDIA_LOG_I("SetSource in. %{private}s", source->GetSourceUri().c_str());
150     auto err = ParseUriInfo(source->GetSourceUri());
151     if (err != Status::OK) {
152         MEDIA_LOG_E("Parse file name from uri fail, uri %{private}s", source->GetSourceUri().c_str());
153         return err;
154     }
155     CheckFileType();
156     if (isCloudFile_ && isEnableFdCache_) {
157         ringBuffer_ = std::make_shared<RingBuffer>(CACHE_SIZE);
158         FALSE_RETURN_V_MSG_E(!(ringBuffer_ == nullptr || !ringBuffer_->Init()),
159             Status::ERROR_NO_MEMORY, "memory is not enough ringBuffer_");
160         downloadTask_ = std::make_shared<Task>(std::string("downloadTaskFD"));
161         FALSE_RETURN_V_MSG_E(downloadTask_ != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
162         downloadTask_->RegisterJob([this] {
163             CacheDataLoop();
164             return 0;
165         });
166         downloadTask_->Start();
167         steadyClock_.Reset();
168     }
169     return Status::OK;
170 }
171 
Read(std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)172 Status FileFdSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
173 {
174     return Read(0, buffer, offset, expectedLen);
175 }
176 
Read(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)177 Status FileFdSourcePlugin::Read(int32_t streamId, std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
178 {
179     FALSE_RETURN_V_MSG_E(fd_ != -1, Status::ERROR_WRONG_STATE, "no valid fd");
180     if (isCloudFile_) {
181         return ReadOnlineFile(0, buffer, offset, expectedLen);
182     } else {
183         return ReadOfflineFile(0, buffer, offset, expectedLen);
184     }
185 }
186 
ReadOfflineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)187 Status FileFdSourcePlugin::ReadOfflineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
188     uint64_t offset, size_t expectedLen)
189 {
190     std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
191     FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
192     expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
193     expectedLen = std::min(bufData->GetCapacity(), expectedLen);
194     MEDIA_LOG_D("ReadLocal buffer pos: " PUBLIC_LOG_U64 " , len:" PUBLIC_LOG_ZU, position_.load(), expectedLen);
195 
196     int64_t offsetCur = lseek(fd_, 0, SEEK_CUR);
197     if (offsetCur < 0) {
198         MEDIA_LOG_E("Fd get offset failed ");
199     } else if (static_cast<uint64_t>(offsetCur) != position_.load()) {
200         MEDIA_LOG_E("Fd offsetCur has changed. offsetCur " PUBLIC_LOG_D64 ", offsetOld " PUBLIC_LOG_U64,
201             offsetCur, position_.load());
202     }
203     auto size = read(fd_, bufData->GetWritableAddr(expectedLen), expectedLen);
204     if (size <= 0) {
205         HandleReadResult(expectedLen, size);
206         FALSE_RETURN_V((loc_ != IOCTL_CLOUD || isEnableFdCache_ ||
207             (errno != READ_ERROR_NOMEM && errno != READ_ERROR_IO)), Status::ERROR_INVALID_DATA);
208         MEDIA_LOG_D("ReadLocal END_OF_STREAM");
209         return Status::END_OF_STREAM;
210     }
211     bufData->UpdateDataSize(size);
212     position_ += static_cast<uint64_t>(size);
213     if (buffer->GetMemory() != nullptr) {
214         MEDIA_LOG_D("ReadLocal position_ " PUBLIC_LOG_U64 ", readSize " PUBLIC_LOG_ZU,
215             position_.load(), buffer->GetMemory()->GetSize());
216     }
217     return Status::OK;
218 }
219 
ReadOnlineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)220 Status FileFdSourcePlugin::ReadOnlineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
221     uint64_t offset, size_t expectedLen)
222 {
223     if (isBuffering_) {
224         if (HandleBuffering()) {
225             FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
226             FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
227             MEDIA_LOG_I("is buffering, return error again.");
228             return Status::ERROR_AGAIN;
229         }
230     }
231 
232     std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
233     FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
234     expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
235     expectedLen = std::min(bufData->GetCapacity(), expectedLen);
236 
237     // ringbuffer 0 after seek in 20ms, don't notify buffering
238     curReadTime_ = steadyClock2_.ElapsedMilliseconds();
239     if (isReadFrame_ &&  ringBuffer_->GetSize() < expectedLen && !HasCacheData(expectedLen, offset) &&
240         static_cast<size_t>(GetLastSize(position_)) > expectedLen) {
241         MEDIA_LOG_I("ringBuffer.size() " PUBLIC_LOG_ZU " curReadTime_ " PUBLIC_LOG_D64
242             " lastReadTime_ " PUBLIC_LOG_D64 " pos_ " PUBLIC_LOG_U64 " cachePos_ " PUBLIC_LOG_U64
243             " expectedLen  " PUBLIC_LOG_ZU " offset_ " PUBLIC_LOG_D64 " offset " PUBLIC_LOG_U64,
244             ringBuffer_->GetSize(), curReadTime_, lastReadTime_, position_.load(), cachePosition_.load(),
245             expectedLen, offset_, offset);
246         CheckReadTime();
247         FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
248         FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
249         return Status::ERROR_AGAIN;
250     }
251 
252     size_t size = ringBuffer_->ReadBuffer(bufData->GetWritableAddr(expectedLen), expectedLen, READ_RETRY);
253     if (size == 0) {
254         MEDIA_LOG_I("read size 0, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_D64 ", size:" PUBLIC_LOG_U64 ", pos:"
255             PUBLIC_LOG_U64 ", readBlock:" PUBLIC_LOG_D32, fd_, offset, size_, position_.load(), isReadBlocking_.load());
256         FALSE_RETURN_V_MSG_E(GetLastSize(position_) != 0, Status::END_OF_STREAM, "ReadCloud END_OF_STREAM");
257         bufData->UpdateDataSize(0);
258         return Status::OK;
259     }
260     bufData->UpdateDataSize(size);
261     int64_t ct = steadyClock2_.ElapsedMilliseconds() - curReadTime_;
262     if (ct > READ_TIME) {
263         MEDIA_LOG_I("ReadCloud buffer position " PUBLIC_LOG_U64 ", expectedLen " PUBLIC_LOG_ZU
264         " costTime: " PUBLIC_LOG_U64, position_.load(), expectedLen, ct);
265     }
266     position_ += static_cast<uint64_t>(size);
267     MEDIA_LOG_D("ringBuffer.size() " PUBLIC_LOG_ZU, ringBuffer_->GetSize());
268     return Status::OK;
269 }
270 
SeekTo(uint64_t offset)271 Status FileFdSourcePlugin::SeekTo(uint64_t offset)
272 {
273     FALSE_RETURN_V_MSG_E(fd_ != -1 && seekable_ == Seekable::SEEKABLE,
274         Status::ERROR_WRONG_STATE, "no valid fd or no seekable.");
275 
276     MEDIA_LOG_D("SeekTo offset: " PUBLIC_LOG_U64, offset);
277     if (isCloudFile_) {
278         return SeekToOnlineFile(offset);
279     } else {
280         return SeekToOfflineFile(offset);
281     }
282 }
283 
SeekToOfflineFile(uint64_t offset)284 Status FileFdSourcePlugin::SeekToOfflineFile(uint64_t offset)
285 {
286     int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
287     if (ret == -1) {
288         MEDIA_LOG_E("SeekLocal failed, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
289             PUBLIC_LOG_S, fd_, offset, strerror(errno));
290         return Status::ERROR_UNKNOWN;
291     }
292     position_ = offset + static_cast<uint64_t>(offset_);
293     MEDIA_LOG_D("SeekLocal end ret " PUBLIC_LOG_D32 ", position_ " PUBLIC_LOG_U64, ret, position_.load());
294     return Status::OK;
295 }
296 
SeekToOnlineFile(uint64_t offset)297 Status FileFdSourcePlugin::SeekToOnlineFile(uint64_t offset)
298 {
299     FALSE_RETURN_V_MSG_E(ringBuffer_ != nullptr, Status::ERROR_WRONG_STATE, "SeekCloud ringBuffer_ is nullptr");
300     MEDIA_LOG_D("SeekCloud, ringBuffer.size: " PUBLIC_LOG_ZU ", offset " PUBLIC_LOG_U64,
301         ringBuffer_->GetSize(), offset);
302     if (ringBuffer_->Seek(offset)) {
303         position_ = offset + static_cast<uint64_t>(offset_);
304         MEDIA_LOG_I("SeekCloud ringBuffer_ seek hit, offset " PUBLIC_LOG_U64, offset);
305         return Status::OK;
306     }
307     // First clear buffer, avoid no available buffer then task pause never exit.
308     ringBuffer_->SetActive(false);
309     inSeek_ = true;
310     if (downloadTask_ != nullptr) {
311         downloadTask_->Pause();
312         inSeek_ = false;
313     }
314     ringBuffer_->Clear();
315     ringBuffer_->SetMediaOffset(offset);
316     {
317         std::lock_guard<std::mutex> lock(interruptMutex_);
318         FALSE_RETURN_V(!isInterrupted_, Status::OK);
319         ringBuffer_->SetActive(true);
320     }
321 
322     int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
323     if (ret == -1) {
324         MEDIA_LOG_E("SeekCloud failed, fd_ " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
325             PUBLIC_LOG_S, fd_, offset, strerror(errno));
326         return Status::ERROR_UNKNOWN;
327     }
328     position_ = offset + static_cast<uint64_t>(offset_);
329     cachePosition_ = position_.load();
330 
331     MEDIA_LOG_D("SeekCloud end, fd_ " PUBLIC_LOG_D32 ", size_ " PUBLIC_LOG_U64 ", offset_ " PUBLIC_LOG_D64
332         ", position_ " PUBLIC_LOG_U64, fd_, size_, offset_, position_.load());
333     if (downloadTask_ != nullptr) {
334         downloadTask_->Start();
335     }
336     return Status::OK;
337 }
338 
ParseUriInfo(const std::string & uri)339 Status FileFdSourcePlugin::ParseUriInfo(const std::string& uri)
340 {
341     if (uri.empty()) {
342         MEDIA_LOG_E("uri is empty");
343         return Status::ERROR_INVALID_PARAMETER;
344     }
345     std::smatch fdUriMatch;
346     FALSE_RETURN_V_MSG_E(std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)\\?offset=(.*)&size=(.*)")) ||
347         std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)")),
348         Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
349     FALSE_RETURN_V_MSG_E(fdUriMatch.size() >= FDPOS && isNumber(fdUriMatch[1].str()),
350         Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
351     fd_ = std::stoi(fdUriMatch[1].str()); // 1: sub match fd subscript
352     FALSE_RETURN_V_MSG_E(fd_ != -1 && FileSystem::IsRegularFile(fd_),
353         Status::ERROR_INVALID_PARAMETER, "Invalid fd: " PUBLIC_LOG_D32, fd_);
354     fileSize_ = GetFileSize(fd_);
355     if (fdUriMatch.size() == 4) { // 4:4 sub match
356         std::string offsetStr = fdUriMatch[2].str(); // 2: sub match offset subscript
357         FALSE_RETURN_V_MSG_E(StrToLong(offsetStr, offset_), Status::ERROR_INVALID_PARAMETER,
358             "Failed to read offset.");
359         if (static_cast<uint64_t>(offset_) > fileSize_) {
360             offset_ = static_cast<int64_t>(fileSize_);
361         }
362         size_ = static_cast<uint64_t>(std::stoll(fdUriMatch[3].str())); // 3: sub match size subscript
363         uint64_t remainingSize = fileSize_ - static_cast<uint64_t>(offset_);
364         if (size_ > remainingSize) {
365             size_ = remainingSize;
366         }
367     } else {
368         size_ = fileSize_;
369         offset_ = 0;
370     }
371     position_ = offset_;
372     seekable_ = FileSystem::IsSeekable(fd_) ? Seekable::SEEKABLE : Seekable::UNSEEKABLE;
373     if (seekable_ == Seekable::SEEKABLE) {
374         NOK_LOG(SeekTo(0));
375     }
376     MEDIA_LOG_I("Fd: " PUBLIC_LOG_D32 ", offset: " PUBLIC_LOG_D64 ", size: " PUBLIC_LOG_U64, fd_, offset_, size_);
377     return Status::OK;
378 }
379 
CacheDataLoop()380 void FileFdSourcePlugin::CacheDataLoop()
381 {
382     if (isInterrupted_) {
383         MEDIA_LOG_E("CacheData break");
384         WaitForInterrupt(TEN_MILLISECOUNDS);
385         return;
386     }
387 
388     int64_t curTime = steadyClock_.ElapsedMilliseconds();
389     GetCurrentSpeed(curTime);
390 
391     size_t bufferSize = std::min(PER_CACHE_SIZE, static_cast<size_t>(GetLastSize(cachePosition_.load())));
392     if (bufferSize < 0) {
393         MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
394         WaitForInterrupt(TEN_MILLISECOUNDS);
395         return;
396     }
397 
398     char* cacheBuffer = new char[bufferSize];
399     if (cacheBuffer == nullptr) {
400         MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
401         WaitForInterrupt(TEN_MILLISECOUNDS);
402         return;
403     }
404     int size = read(fd_, cacheBuffer, bufferSize);
405     if (size <= 0) {
406         int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
407         MEDIA_LOG_I("read failed, cost time: " PUBLIC_LOG_U64, ct);
408         DeleteCacheBuffer(cacheBuffer, bufferSize);
409         HandleReadResult(bufferSize, size);
410         return;
411     }
412     MEDIA_LOG_D("Cache fd: " PUBLIC_LOG_D32 ", cachePos_ " PUBLIC_LOG_U64 ", ringBuffer_.size() " PUBLIC_LOG_ZU
413         ", size_ " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_);
414     while (!ringBuffer_->WriteBuffer(cacheBuffer, size)) {
415         MEDIA_LOG_I("CacheData ringbuffer write failed");
416         if (inSeek_ || isInterrupted_) {
417             DeleteCacheBuffer(cacheBuffer, bufferSize);
418             return;
419         }
420         WaitForInterrupt(TEN_MILLISECOUNDS);
421     }
422     cachePosition_ += static_cast<uint64_t>(size);
423     downloadSize_ += static_cast<uint64_t>(size);
424 
425     int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
426     if (ct > READ_TIME) {
427         MEDIA_LOG_I("Cache fd:" PUBLIC_LOG_D32 " pos:" PUBLIC_LOG_U64 " rb:" PUBLIC_LOG_ZU
428         " size:" PUBLIC_LOG_U64 " ct:" PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(),
429         size_, ct);
430     }
431 
432     DeleteCacheBuffer(cacheBuffer, bufferSize);
433 
434     CheckAndNotifyBufferingEnd();
435 }
436 
CheckAndNotifyBufferingEnd()437 void FileFdSourcePlugin::CheckAndNotifyBufferingEnd()
438 {
439     if (isBuffering_ && (static_cast<int64_t>(ringBuffer_->GetSize()) > waterLineAbove_ ||
440         GetLastSize(cachePosition_.load()) == 0)) {
441         NotifyBufferingEnd();
442     }
443 }
444 
HasCacheData(size_t bufferSize,uint64_t offset)445 bool FileFdSourcePlugin::HasCacheData(size_t bufferSize, uint64_t offset)
446 {
447     HmdfsHasCache ioctlData;
448     ioctlData.offset = static_cast<int64_t>(offset);
449     ioctlData.readSize = static_cast<int64_t>(bufferSize);
450     int32_t ioResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData); // 0 has, -1 hasnot
451 
452     ioctlData.offset = static_cast<int64_t>(cachePosition_);
453     ioctlData.readSize = static_cast<int64_t>(PER_CACHE_SIZE);
454     int32_t ioCacheResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData);
455     // ioctl has cache
456     if (ioResult == 0 && ioCacheResult == 0) {
457         return true;
458     } else {
459         MEDIA_LOG_I("ioctl has no cache with errno " PUBLIC_LOG_D32 " cachePosition_ " PUBLIC_LOG_U64
460             " offset " PUBLIC_LOG_U64 " bufferSize " PUBLIC_LOG_ZU, errno, cachePosition_.load(), offset, bufferSize);
461     }
462     return false;
463 }
464 
Stop()465 Status FileFdSourcePlugin::Stop()
466 {
467     MEDIA_LOG_I("Stop enter.");
468     isInterrupted_ = true;
469     MEDIA_LOG_I("Stop isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
470     FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
471     downloadTask_->StopAsync();
472     return Status::OK;
473 }
474 
Reset()475 Status FileFdSourcePlugin::Reset()
476 {
477     MEDIA_LOG_I("Reset enter.");
478     isInterrupted_ = true;
479     MEDIA_LOG_I("Reset isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
480     FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
481     downloadTask_->StopAsync();
482     return Status::OK;
483 }
484 
PauseDownloadTask(bool isAsync)485 void FileFdSourcePlugin::PauseDownloadTask(bool isAsync)
486 {
487     FALSE_RETURN(downloadTask_ != nullptr);
488     if (isAsync) {
489         downloadTask_->PauseAsync();
490     } else {
491         downloadTask_->Pause();
492     }
493 }
494 
HandleBuffering()495 bool FileFdSourcePlugin::HandleBuffering()
496 {
497     MEDIA_LOG_I("HandleBuffering in.");
498     int32_t sleepTime = 0;
499     // return error again 1 time 1s, avoid ffmpeg error
500     while (sleepTime < ONE_SECONDS && !isInterrupted_ && isReadBlocking_) {
501         NotifyBufferingPercent();
502         if (!isBuffering_) {
503             break;
504         }
505         MEDIA_LOG_I("isBuffering.");
506         WaitForInterrupt(TEN_MILLISECOUNDS);
507         sleepTime += TEN_MILLISECOUNDS;
508     }
509     MEDIA_LOG_I("HandleBuffering out.");
510     return isBuffering_;
511 }
512 
HandleReadResult(size_t bufferSize,int size)513 void FileFdSourcePlugin::HandleReadResult(size_t bufferSize, int size)
514 {
515     MEDIA_LOG_I("HandleReadResult size " PUBLIC_LOG_D32 ", fd " PUBLIC_LOG_D32 ", cachePosition_" PUBLIC_LOG_U64
516         ", position_ " PUBLIC_LOG_U64 ", bufferSize " PUBLIC_LOG_ZU ", size_ " PUBLIC_LOG_U64 ", offset_ "
517         PUBLIC_LOG_D64, size, fd_, cachePosition_.load(), position_.load(), bufferSize, size_, offset_);
518     if (size < 0) {
519         // errno EIO  5
520         MEDIA_LOG_E("read fail, errno " PUBLIC_LOG_D32, errno);
521 
522         // read fail with errno, retry 3 * 10ms
523         retryTimes_++;
524         if (retryTimes_ >= RETRY_TIMES || isInterrupted_.load()) {
525             NotifyReadFail();
526             SetInterruptState(true);
527         }
528         WaitForInterrupt(TEN_MILLISECOUNDS);
529     } else {
530         cachePosition_ = 0;
531         PauseDownloadTask(false);
532     }
533 }
534 
NotifyBufferingStart()535 void FileFdSourcePlugin::NotifyBufferingStart()
536 {
537     MEDIA_LOG_I("NotifyBufferingStart, ringBuffer.size() " PUBLIC_LOG_ZU
538         ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
539     isBuffering_ = true;
540     if (callback_ != nullptr && !isInterrupted_) {
541         MEDIA_LOG_I("Read OnEvent BUFFERING_START.");
542         callback_->OnEvent({PluginEventType::BUFFERING_START, {BufferingInfoType::BUFFERING_START}, "start"});
543     } else {
544         MEDIA_LOG_E("BUFFERING_START callback_ is nullptr or isInterrupted_ is true");
545     }
546 }
547 
NotifyBufferingPercent()548 void FileFdSourcePlugin::NotifyBufferingPercent()
549 {
550     if (waterLineAbove_ != 0) {
551         int64_t bp = static_cast<float>(ringBuffer_->GetSize()) / waterLineAbove_ * PERCENT_100;
552         bp = bp > PERCENT_100 ? PERCENT_100 : bp;
553         if (isBuffering_ && callback_ != nullptr && !isInterrupted_) {
554             MEDIA_LOG_I("NotifyBufferingPercent, ringBuffer.size() " PUBLIC_LOG_ZU ", waterLineAbove_ " PUBLIC_LOG_U64
555                 ", PERCENT " PUBLIC_LOG_D32, ringBuffer_->GetSize(), waterLineAbove_, static_cast<int32_t>(bp));
556             callback_->OnEvent({PluginEventType::EVENT_BUFFER_PROGRESS,
557                 {BufferingInfoType::BUFFERING_PERCENT}, std::to_string(bp)});
558         } else {
559             MEDIA_LOG_E("EVENT_BUFFER_PROGRESS callback_ is nullptr or isInterrupted_ \
560                 is true or isBuffering_ is false");
561         }
562     }
563 }
564 
NotifyBufferingEnd()565 void FileFdSourcePlugin::NotifyBufferingEnd()
566 {
567     NotifyBufferingPercent();
568     MEDIA_LOG_I("NotifyBufferingEnd, ringBuffer.size() " PUBLIC_LOG_ZU
569         ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
570     isBuffering_ = false;
571     lastReadTime_ = 0;
572     if (callback_ != nullptr && !isInterrupted_) {
573         MEDIA_LOG_I("NotifyBufferingEnd success .");
574         callback_->OnEvent({PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
575     } else {
576         MEDIA_LOG_E("BUFFERING_END callback_ is nullptr or isInterrupted_ is true");
577     }
578 }
579 
NotifyReadFail()580 void FileFdSourcePlugin::NotifyReadFail()
581 {
582     MEDIA_LOG_I("NotifyReadFail in.");
583     if (callback_ != nullptr && !isInterrupted_) {
584         MEDIA_LOG_I("Read OnEvent read fail");
585         callback_->OnEvent({PluginEventType::CLIENT_ERROR,
586             static_cast<int32_t>(NetworkClientErrorCode::ERROR_TIME_OUT), "read"});
587     } else {
588         MEDIA_LOG_E("CLIENT_ERROR callback_ is nullptr or isInterrupted_ is true");
589     }
590 }
591 
SetDemuxerState(int32_t streamId)592 void FileFdSourcePlugin::SetDemuxerState(int32_t streamId)
593 {
594     MEDIA_LOG_I("SetDemuxerState");
595     isReadFrame_ = true;
596 }
597 
SetCurrentBitRate(int32_t bitRate,int32_t streamID)598 Status FileFdSourcePlugin::SetCurrentBitRate(int32_t bitRate, int32_t streamID)
599 {
600     currentBitRate_ = bitRate / TO_BYTE; // 8b
601     MEDIA_LOG_I("currentBitRate: " PUBLIC_LOG_D32, currentBitRate_);
602     // default cache 0.3s
603     waterLineAbove_ = CACHE_LEVEL_1 * currentBitRate_;
604     return Status::OK;
605 }
606 
SetBundleName(const std::string & bundleName)607 void FileFdSourcePlugin::SetBundleName(const std::string& bundleName)
608 {
609     MEDIA_LOG_I("SetBundleName bundleName: " PUBLIC_LOG_S, bundleName.c_str());
610 }
611 
SetReadBlockingFlag(bool isAllowed)612 Status FileFdSourcePlugin::SetReadBlockingFlag(bool isAllowed)
613 {
614     MEDIA_LOG_I("SetReadBlockingFlag entered, IsReadBlockingAllowed %{public}d", isAllowed);
615     if (ringBuffer_) {
616         ringBuffer_->SetReadBlocking(isAllowed);
617     }
618     isReadBlocking_ = isAllowed;
619     return Status::OK;
620 }
621 
SetInterruptState(bool isInterruptNeeded)622 void FileFdSourcePlugin::SetInterruptState(bool isInterruptNeeded)
623 {
624     bool isInterruptAllowed = true;
625     MEDIA_LOG_I("SetInterruptState isInterrupted_" PUBLIC_LOG_D32, isInterruptNeeded);
626     {
627         std::lock_guard<std::mutex> lock(interruptMutex_);
628         isInterruptAllowed = !(isInterrupted_ && isInterruptNeeded);
629         isInterrupted_ = isInterruptNeeded;
630         bufferCond_.notify_all();
631     }
632     if (ringBuffer_ != nullptr) {
633         if (isInterrupted_) {
634             ringBuffer_->SetActive(false);
635         } else {
636             ringBuffer_->SetActive(true);
637         }
638     }
639 
640     if (isInterrupted_ && isInterruptAllowed && isCloudFile_) {
641         if (downloadTask_ != nullptr) {
642             downloadTask_->StopAsync();
643         }
644         int ret = ioctl(fd_, HMDFS_IOC_CANCEL_READ);
645         MEDIA_LOG_I("ioctl break read, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
646     }
647 }
648 
GetSize(uint64_t & size)649 Status FileFdSourcePlugin::GetSize(uint64_t& size)
650 {
651     size = size_;
652     return Status::OK;
653 }
654 
GetSeekable()655 Seekable FileFdSourcePlugin::GetSeekable()
656 {
657     MEDIA_LOG_D("GetSeekable in");
658     return seekable_;
659 }
660 
CheckFileType()661 void FileFdSourcePlugin::CheckFileType()
662 {
663     int loc; // 1本地,2云端
664     int ioResult = ioctl(fd_, HMDFS_IOC_GET_LOCATION, &loc);
665     MEDIA_LOG_I("SetSource ioctl loc, ret " PUBLIC_LOG_D32 ", loc " PUBLIC_LOG_D32 ", errno"
666         PUBLIC_LOG_D32, ioResult, loc, errno);
667 
668     if (ioResult == 0) {
669         loc_ = loc;
670     }
671     if (!isEnableFdCache_) {
672         isCloudFile_ = false;
673         return;
674     }
675 
676     if (ioResult == 0) {
677         if (loc == IOCTL_CLOUD) {
678             isCloudFile_ = true;
679             MEDIA_LOG_I("ioctl file is cloud");
680             int ret = ioctl(fd_, HMDFS_IOC_RESTORE_READ);
681             MEDIA_LOG_I("ioctl restore fd, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
682             return;
683         } else {
684             isCloudFile_ = false;
685             MEDIA_LOG_I("ioctl file is local");
686         }
687     } else {
688         isCloudFile_ = false;
689         MEDIA_LOG_I("ioctl failed to get file type");
690     }
691 }
692 
GetBufferPtr(std::shared_ptr<Buffer> & buffer,size_t expectedLen)693 std::shared_ptr<Memory> FileFdSourcePlugin::GetBufferPtr(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
694 {
695     if (!buffer) {
696         buffer = std::make_shared<Buffer>();
697     }
698     std::shared_ptr<Memory> bufData;
699     if (buffer->IsEmpty()) {
700         bufData = buffer->AllocMemory(nullptr, expectedLen);
701     } else {
702         bufData = buffer->GetMemory();
703     }
704     return bufData;
705 }
706 
GetLastSize(uint64_t position)707 int64_t FileFdSourcePlugin::GetLastSize(uint64_t position)
708 {
709     int64_t ret = static_cast<int64_t>(size_) + offset_ - static_cast<int64_t>(position);
710     if (ret < 0) {
711         MEDIA_LOG_E("GetLastSize error, fd_ " PUBLIC_LOG_D32 ", offset_ " PUBLIC_LOG_D64 ", size_ "
712             PUBLIC_LOG_U64 ", position " PUBLIC_LOG_U64, fd_, offset_, size_, position);
713     }
714     return ret;
715 }
716 
GetCurrentSpeed(int64_t curTime)717 void FileFdSourcePlugin::GetCurrentSpeed(int64_t curTime)
718 {
719     if ((curTime - lastCheckTime_) > RECORD_TIME_INTERVAL) {
720         MEDIA_LOG_I("CacheDataLoop curTime_: " PUBLIC_LOG_U64 " lastCheckTime_: "
721         PUBLIC_LOG_U64 " downloadSize_: " PUBLIC_LOG_U64, curTime, lastCheckTime_, downloadSize_);
722         float duration = static_cast<double>(curTime - lastCheckTime_) / MILLISECOUND_TO_SECOND;
723         avgDownloadSpeed_ = downloadSize_ / duration; //b/s
724         MEDIA_LOG_I("downloadDuration: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F,
725             duration, avgDownloadSpeed_);
726         downloadSize_ = 0;
727         lastCheckTime_ = curTime;
728         FALSE_RETURN(currentBitRate_ > 0);
729         UpdateWaterLineAbove();
730     }
731 }
732 
UpdateWaterLineAbove()733 void FileFdSourcePlugin::UpdateWaterLineAbove()
734 {
735     FALSE_RETURN_MSG(currentBitRate_ > 0, "currentBitRate_ <= 0");
736     float cacheTime = GetCacheTime(avgDownloadSpeed_ / currentBitRate_);
737     MEDIA_LOG_I("cacheTime: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F
738         " currentBitRate: " PUBLIC_LOG_D32, cacheTime, avgDownloadSpeed_, currentBitRate_);
739     waterLineAbove_ = cacheTime * currentBitRate_ > GetLastSize(cachePosition_.load()) ?
740         GetLastSize(cachePosition_.load()) : cacheTime * currentBitRate_;
741     MEDIA_LOG_I("waterLineAbove_: " PUBLIC_LOG_U64, waterLineAbove_);
742 }
743 
GetCacheTime(float num)744 float FileFdSourcePlugin::GetCacheTime(float num)
745 {
746     MEDIA_LOG_I("GetCacheTime with num: " PUBLIC_LOG_F, num);
747     if (num < 0) {
748         return CACHE_LEVEL_1;
749     }
750     if (num > 0 && num < 0.5) { // (0, 0.5)
751         return CACHE_TIME_DEFAULT;
752     } else if (num >= 0.5 && num < 1) { //[0.5, 1)
753         return CACHE_TIME_DEFAULT;
754     } else if (num >= 1) { //[1, 2)
755         return CACHE_LEVEL_1;
756     }
757     return CACHE_TIME_DEFAULT;
758 }
759 
DeleteCacheBuffer(char * buffer,size_t bufferSize)760 void FileFdSourcePlugin::DeleteCacheBuffer(char* buffer, size_t bufferSize)
761 {
762     if (buffer != nullptr && bufferSize >= 0) {
763         delete[] buffer;
764     }
765 }
766 
CheckReadTime()767 void FileFdSourcePlugin::CheckReadTime()
768 {
769     if (IsValidTime(curReadTime_, lastReadTime_)) {
770         NotifyBufferingStart();
771         lastReadTime_ = 0;
772     } else {
773         if (lastReadTime_ == 0) {
774             lastReadTime_ = curReadTime_;
775         }
776     }
777 }
778 
IsValidTime(int64_t curTime,int64_t lastTime)779 bool FileFdSourcePlugin::IsValidTime(int64_t curTime, int64_t lastTime)
780 {
781     return lastReadTime_ != 0 && curReadTime_ - lastReadTime_ < SEEK_TIME_UPPER &&
782         curReadTime_ - lastReadTime_ > SEEK_TIME_LOWER;
783 }
784 
SetEnableOnlineFdCache(bool isEnableFdCache)785 void FileFdSourcePlugin::SetEnableOnlineFdCache(bool isEnableFdCache)
786 {
787     isEnableFdCache_ = isEnableFdCache;
788 }
789 
WaitForInterrupt(int32_t waitTimeMS)790 void FileFdSourcePlugin::WaitForInterrupt(int32_t waitTimeMS)
791 {
792     std::unique_lock<std::mutex> lock(interruptMutex_);
793     bufferCond_.wait_for(lock, std::chrono::milliseconds(waitTimeMS), [&] { return isInterrupted_.load(); });
794 }
795 
IsLocalFd()796 bool FileFdSourcePlugin::IsLocalFd()
797 {
798     return !isCloudFile_;
799 }
800 } // namespace FileFdSource
801 } // namespace Plugin
802 } // namespace Media
803 } // namespace OHOS