• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #define HST_LOG_TAG "FileFdSourcePlugin"
17 
18 #include <cerrno>
19 #include <cstring>
20 #include <regex>
21 #include <memory>
22 #ifdef WIN32
23 #include <fcntl.h>
24 #else
25 #include <sys/types.h>
26 #include <unistd.h>
27 #endif
28 #include <sys/ioctl.h>
29 #include <sys/stat.h>
30 #include "common/log.h"
31 #include "osal/filesystem/file_system.h"
32 #include "file_fd_source_plugin.h"
33 #include "common/media_core.h"
34 
35 namespace {
36 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "FileFdSourcePlugin" };
StrToLong(const std::string_view & str,int64_t & value)37 bool StrToLong(const std::string_view& str, int64_t& value)
38 {
39     FALSE_RETURN_V_MSG_E(!str.empty() && (isdigit(str.front()) || (str.front() == '-')),
40         false, "no valid string.");
41     std::string valStr(str);
42     char* end = nullptr;
43     errno = 0;
44     long long result = strtoll(valStr.c_str(), &end, 10); /* 10 means decimal */
45     FALSE_RETURN_V_MSG_E(result >= LLONG_MIN && result <= LLONG_MAX, false,
46         "call StrToLong func false,  input str is: %{public}s!", valStr.c_str());
47     FALSE_RETURN_V_MSG_E(end != valStr.c_str() && end[0] == '\0' && errno != ERANGE, false,
48         "call StrToLong func false,  input str is: %{public}s!", valStr.c_str());
49     value = result;
50     return true;
51 }
52 }
53 
54 namespace OHOS {
55 namespace Media {
56 namespace Plugins {
57 namespace FileFdSource {
58 namespace {
59 constexpr int32_t FDPOS                         = 2;
60 constexpr int32_t READ_TIME                     = 3;
61 constexpr size_t CACHE_SIZE                     = 40 * 1024 * 1024;
62 constexpr size_t PER_CACHE_SIZE                 = 48 * 10 * 1024;;
63 constexpr int32_t TEN_MILLISECOUNDS             = 10;
64 constexpr int32_t ONE_SECONDS                   = 1 * 1000;
65 constexpr int32_t CACHE_TIME_DEFAULT            = 5;
66 constexpr int32_t SEEK_TIME_LOWER               = 20;
67 constexpr int32_t SEEK_TIME_UPPER               = 1000;
68 constexpr int32_t RECORD_TIME_INTERVAL          = 1 * 1000;
69 constexpr int32_t MILLISECOUND_TO_SECOND        = 1 * 1000;
70 constexpr int32_t RETRY_TIMES                   = 3;
71 constexpr int32_t TO_BYTE                       = 8;
72 constexpr int32_t PERCENT_100                   = 100;
73 constexpr int32_t MAX_RANK                      = 100;
74 constexpr int32_t READ_RETRY                    = 2;
75 constexpr int32_t READ_ERROR_IO                 = EIO;
76 constexpr int32_t READ_ERROR_NOMEM              = ENOMEM;
77 constexpr float CACHE_LEVEL_1                   = 0.3;
78 
79 constexpr unsigned int HMDFS_IOC = 0xf2;
80 #define IOCTL_CLOUD 2
81 #define HMDFS_IOC_HAS_CACHE _IOW(HMDFS_IOC, 6, struct HmdfsHasCache)
82 #define HMDFS_IOC_GET_LOCATION _IOR(HMDFS_IOC, 7, __u32)
83 #define HMDFS_IOC_CANCEL_READ _IO(HMDFS_IOC, 8)
84 #define HMDFS_IOC_RESTORE_READ _IO(HMDFS_IOC, 9)
85 
GetFileSize(int32_t fd)86 uint64_t GetFileSize(int32_t fd)
87 {
88     uint64_t fileSize = 0;
89     struct stat s {};
90     int ret = fstat(fd, &s);
91     if (ret == 0) {
92         fileSize = static_cast<uint64_t>(s.st_size);
93         FALSE_RETURN_V_MSG_E(fileSize != 0, fileSize, "fileSize 0, fstat ret 0");
94         return fileSize;
95     } else {
96         MEDIA_LOG_W("GetFileSize error ret " PUBLIC_LOG_D32 ", errno " PUBLIC_LOG_D32, ret, errno);
97     }
98     return fileSize;
99 }
isNumber(const std::string & str)100 bool isNumber(const std::string& str)
101 {
102     return str.find_first_not_of("0123456789") == std::string::npos;
103 }
104 }
FileFdSourceRegister(const std::shared_ptr<Register> & reg)105 Status FileFdSourceRegister(const std::shared_ptr<Register>& reg)
106 {
107     MEDIA_LOG_I("fileSourceRegister is started");
108     SourcePluginDef definition;
109     definition.name = "FileFdSource";
110     definition.description = "File Fd source";
111     definition.rank = MAX_RANK; // 100: max rank
112     Capability capability;
113     capability.AppendFixedKey<std::vector<ProtocolType>>(Tag::MEDIA_PROTOCOL_TYPE, {ProtocolType::FD});
114     definition.AddInCaps(capability);
115     auto func = [](const std::string& name) -> std::shared_ptr<SourcePlugin> {
116         return std::make_shared<FileFdSourcePlugin>(name);
117     };
118     definition.SetCreator(func);
119     return reg->AddPlugin(definition);
120 }
121 
__anon840d632c0402null122 PLUGIN_DEFINITION(FileFdSource, LicenseType::APACHE_V2, FileFdSourceRegister, [] {});
123 
FileFdSourcePlugin(std::string name)124 FileFdSourcePlugin::FileFdSourcePlugin(std::string name)
125     : SourcePlugin(std::move(name))
126 {
127 }
128 
~FileFdSourcePlugin()129 FileFdSourcePlugin::~FileFdSourcePlugin()
130 {
131     MEDIA_LOG_I("~FileFdSourcePlugin in.");
132     steadyClock_.Reset();
133     SetInterruptState(true);
134     MEDIA_LOG_I("~FileFdSourcePlugin isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
135     FALSE_RETURN_MSG(downloadTask_ != nullptr, "~FileFdSourcePlugin out.");
136     downloadTask_->Stop();
137     MEDIA_LOG_I("~FileFdSourcePlugin out.");
138 }
139 
SetCallback(Callback * cb)140 Status FileFdSourcePlugin::SetCallback(Callback* cb)
141 {
142     MEDIA_LOG_D("SetCallback in " PUBLIC_LOG_D32, cb != nullptr);
143     callback_ = cb;
144     return Status::OK;
145 }
146 
SetSource(std::shared_ptr<MediaSource> source)147 Status FileFdSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
148 {
149     MEDIA_LOG_I("SetSource in. %{private}s", source->GetSourceUri().c_str());
150     auto err = ParseUriInfo(source->GetSourceUri());
151     if (err != Status::OK) {
152         MEDIA_LOG_E("Parse file name from uri fail, uri %{private}s", source->GetSourceUri().c_str());
153         return err;
154     }
155     CheckFileType();
156     if (isCloudFile_) {
157         ringBuffer_ = std::make_shared<RingBuffer>(CACHE_SIZE);
158         FALSE_RETURN_V_MSG_E(!(ringBuffer_ == nullptr || !ringBuffer_->Init()),
159             Status::ERROR_NO_MEMORY, "memory is not enough ringBuffer_");
160         downloadTask_ = std::make_shared<Task>(std::string("downloadTaskFD"));
161         FALSE_RETURN_V_MSG_E(downloadTask_ != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
162         downloadTask_->RegisterJob([this] {
163             CacheDataLoop();
164             return 0;
165         });
166         downloadTask_->Start();
167         steadyClock_.Reset();
168     }
169     return Status::OK;
170 }
171 
Read(std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)172 Status FileFdSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
173 {
174     return Read(0, buffer, offset, expectedLen);
175 }
176 
Read(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)177 Status FileFdSourcePlugin::Read(int32_t streamId, std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
178 {
179     FALSE_RETURN_V_MSG_E(fd_ != -1, Status::ERROR_WRONG_STATE, "no valid fd");
180     if (isCloudFile_) {
181         return ReadOnlineFile(0, buffer, offset, expectedLen);
182     } else {
183         return ReadOfflineFile(0, buffer, offset, expectedLen);
184     }
185 }
186 
ReadOfflineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)187 Status FileFdSourcePlugin::ReadOfflineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
188     uint64_t offset, size_t expectedLen)
189 {
190     std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
191     FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
192     expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
193     expectedLen = std::min(bufData->GetCapacity(), expectedLen);
194     MEDIA_LOG_D("ReadLocal buffer pos: " PUBLIC_LOG_U64 " , len:" PUBLIC_LOG_ZU, position_.load(), expectedLen);
195 
196     int64_t offsetCur = lseek(fd_, 0, SEEK_CUR);
197     if (offsetCur < 0) {
198         MEDIA_LOG_E("Fd get offset failed ");
199     } else if (static_cast<uint64_t>(offsetCur) != position_.load()) {
200         MEDIA_LOG_E("Fd offsetCur has changed. offsetCur " PUBLIC_LOG_D64 ", offsetOld " PUBLIC_LOG_U64,
201             offsetCur, position_.load());
202     }
203     auto size = read(fd_, bufData->GetWritableAddr(expectedLen), expectedLen);
204     if (size <= 0) {
205         HandleReadResult(expectedLen, size);
206         FALSE_RETURN_V((loc_ != IOCTL_CLOUD || isEnableFdCache_ ||
207             (errno != READ_ERROR_NOMEM && errno != READ_ERROR_IO)), Status::ERROR_INVALID_DATA);
208         MEDIA_LOG_D("ReadLocal END_OF_STREAM");
209         return Status::END_OF_STREAM;
210     }
211     bufData->UpdateDataSize(size);
212     position_ += static_cast<uint64_t>(size);
213     if (buffer->GetMemory() != nullptr) {
214         MEDIA_LOG_D("ReadLocal position_ " PUBLIC_LOG_U64 ", readSize " PUBLIC_LOG_ZU,
215             position_.load(), buffer->GetMemory()->GetSize());
216     }
217     return Status::OK;
218 }
219 
ReadOnlineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)220 Status FileFdSourcePlugin::ReadOnlineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
221     uint64_t offset, size_t expectedLen)
222 {
223     if (isBuffering_) {
224         if (HandleBuffering()) {
225             FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
226             FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
227             MEDIA_LOG_I("is buffering, return error again.");
228             return Status::ERROR_AGAIN;
229         }
230     }
231 
232     std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
233     FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
234     expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
235     expectedLen = std::min(bufData->GetCapacity(), expectedLen);
236 
237     // ringbuffer 0 after seek in 20ms, don't notify buffering
238     curReadTime_ = steadyClock2_.ElapsedMilliseconds();
239     if (isReadFrame_ &&  ringBuffer_->GetSize() < expectedLen && !HasCacheData(expectedLen, offset) &&
240         static_cast<size_t>(GetLastSize(position_)) > expectedLen) {
241         MEDIA_LOG_I("ringBuffer.size() " PUBLIC_LOG_ZU " curReadTime_ " PUBLIC_LOG_D64
242             " lastReadTime_ " PUBLIC_LOG_D64, ringBuffer_->GetSize(), curReadTime_, lastReadTime_);
243         CheckReadTime();
244         FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
245         FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
246         return Status::ERROR_AGAIN;
247     }
248 
249     size_t size = ringBuffer_->ReadBuffer(bufData->GetWritableAddr(expectedLen), expectedLen, READ_RETRY);
250     if (size == 0) {
251         MEDIA_LOG_I("read size 0, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_D64 ", size:" PUBLIC_LOG_U64 ", pos:"
252             PUBLIC_LOG_U64 ", readBlock:" PUBLIC_LOG_D32, fd_, offset, size_, position_.load(), isReadBlocking_.load());
253         FALSE_RETURN_V_MSG_E(GetLastSize(position_) != 0, Status::END_OF_STREAM, "ReadCloud END_OF_STREAM");
254         bufData->UpdateDataSize(0);
255         return Status::OK;
256     }
257     bufData->UpdateDataSize(size);
258     int64_t ct = steadyClock2_.ElapsedMilliseconds() - curReadTime_;
259     if (ct > READ_TIME) {
260         MEDIA_LOG_I("ReadCloud buffer position " PUBLIC_LOG_U64 ", expectedLen " PUBLIC_LOG_ZU
261         " costTime: " PUBLIC_LOG_U64, position_.load(), expectedLen, ct);
262     }
263     position_ += static_cast<uint64_t>(size);
264     MEDIA_LOG_D("ringBuffer.size() " PUBLIC_LOG_ZU, ringBuffer_->GetSize());
265     return Status::OK;
266 }
267 
SeekTo(uint64_t offset)268 Status FileFdSourcePlugin::SeekTo(uint64_t offset)
269 {
270     FALSE_RETURN_V_MSG_E(fd_ != -1 && seekable_ == Seekable::SEEKABLE,
271         Status::ERROR_WRONG_STATE, "no valid fd or no seekable.");
272 
273     MEDIA_LOG_D("SeekTo offset: " PUBLIC_LOG_U64, offset);
274     if (isCloudFile_) {
275         return SeekToOnlineFile(offset);
276     } else {
277         return SeekToOfflineFile(offset);
278     }
279 }
280 
SeekToOfflineFile(uint64_t offset)281 Status FileFdSourcePlugin::SeekToOfflineFile(uint64_t offset)
282 {
283     int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
284     if (ret == -1) {
285         MEDIA_LOG_E("SeekLocal failed, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
286             PUBLIC_LOG_S, fd_, offset, strerror(errno));
287         return Status::ERROR_UNKNOWN;
288     }
289     position_ = offset + static_cast<uint64_t>(offset_);
290     MEDIA_LOG_D("SeekLocal end ret " PUBLIC_LOG_D32 ", position_ " PUBLIC_LOG_U64, ret, position_.load());
291     return Status::OK;
292 }
293 
SeekToOnlineFile(uint64_t offset)294 Status FileFdSourcePlugin::SeekToOnlineFile(uint64_t offset)
295 {
296     FALSE_RETURN_V_MSG_E(ringBuffer_ != nullptr, Status::ERROR_WRONG_STATE, "SeekCloud ringBuffer_ is nullptr");
297     MEDIA_LOG_D("SeekCloud, ringBuffer.size: " PUBLIC_LOG_ZU ", offset " PUBLIC_LOG_U64,
298         ringBuffer_->GetSize(), offset);
299     if (ringBuffer_->Seek(offset)) {
300         position_ = offset + static_cast<uint64_t>(offset_);
301         MEDIA_LOG_I("SeekCloud ringBuffer_ seek hit, offset " PUBLIC_LOG_U64, offset);
302         return Status::OK;
303     }
304     // First clear buffer, avoid no available buffer then task pause never exit.
305     ringBuffer_->SetActive(false);
306     inSeek_ = true;
307     if (downloadTask_ != nullptr) {
308         downloadTask_->Pause();
309         inSeek_ = false;
310     }
311     ringBuffer_->Clear();
312     ringBuffer_->SetMediaOffset(offset);
313     {
314         std::lock_guard<std::mutex> lock(interruptMutex_);
315         FALSE_RETURN_V(!isInterrupted_, Status::OK);
316         ringBuffer_->SetActive(true);
317     }
318 
319     int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
320     if (ret == -1) {
321         MEDIA_LOG_E("SeekCloud failed, fd_ " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
322             PUBLIC_LOG_S, fd_, offset, strerror(errno));
323         return Status::ERROR_UNKNOWN;
324     }
325     position_ = offset + static_cast<uint64_t>(offset_);
326     cachePosition_ = position_.load();
327 
328     MEDIA_LOG_D("SeekCloud end, fd_ " PUBLIC_LOG_D32 ", size_ " PUBLIC_LOG_U64 ", offset_ " PUBLIC_LOG_D64
329         ", position_ " PUBLIC_LOG_U64, fd_, size_, offset_, position_.load());
330     if (downloadTask_ != nullptr) {
331         downloadTask_->Start();
332     }
333     return Status::OK;
334 }
335 
ParseUriInfo(const std::string & uri)336 Status FileFdSourcePlugin::ParseUriInfo(const std::string& uri)
337 {
338     if (uri.empty()) {
339         MEDIA_LOG_E("uri is empty");
340         return Status::ERROR_INVALID_PARAMETER;
341     }
342     std::smatch fdUriMatch;
343     FALSE_RETURN_V_MSG_E(std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)\\?offset=(.*)&size=(.*)")) ||
344         std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)")),
345         Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
346     FALSE_RETURN_V_MSG_E(fdUriMatch.size() >= FDPOS && isNumber(fdUriMatch[1].str()),
347         Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
348     fd_ = std::stoi(fdUriMatch[1].str()); // 1: sub match fd subscript
349     FALSE_RETURN_V_MSG_E(fd_ != -1 && FileSystem::IsRegularFile(fd_),
350         Status::ERROR_INVALID_PARAMETER, "Invalid fd: " PUBLIC_LOG_D32, fd_);
351     fileSize_ = GetFileSize(fd_);
352     if (fdUriMatch.size() == 4) { // 4:4 sub match
353         std::string offsetStr = fdUriMatch[2].str(); // 2: sub match offset subscript
354         FALSE_RETURN_V_MSG_E(StrToLong(offsetStr, offset_), Status::ERROR_INVALID_PARAMETER,
355             "Failed to read offset.");
356         if (static_cast<uint64_t>(offset_) > fileSize_) {
357             offset_ = static_cast<int64_t>(fileSize_);
358         }
359         size_ = static_cast<uint64_t>(std::stoll(fdUriMatch[3].str())); // 3: sub match size subscript
360         uint64_t remainingSize = fileSize_ - static_cast<uint64_t>(offset_);
361         if (size_ > remainingSize) {
362             size_ = remainingSize;
363         }
364     } else {
365         size_ = fileSize_;
366         offset_ = 0;
367     }
368     position_ = offset_;
369     seekable_ = FileSystem::IsSeekable(fd_) ? Seekable::SEEKABLE : Seekable::UNSEEKABLE;
370     if (seekable_ == Seekable::SEEKABLE) {
371         NOK_LOG(SeekTo(0));
372     }
373     MEDIA_LOG_I("Fd: " PUBLIC_LOG_D32 ", offset: " PUBLIC_LOG_D64 ", size: " PUBLIC_LOG_U64, fd_, offset_, size_);
374     return Status::OK;
375 }
376 
CacheDataLoop()377 void FileFdSourcePlugin::CacheDataLoop()
378 {
379     if (isInterrupted_) {
380         MEDIA_LOG_E("CacheData break");
381         WaitForInterrupt(TEN_MILLISECOUNDS);
382         return;
383     }
384 
385     int64_t curTime = steadyClock_.ElapsedMilliseconds();
386     GetCurrentSpeed(curTime);
387 
388     size_t bufferSize = std::min(PER_CACHE_SIZE, static_cast<size_t>(GetLastSize(cachePosition_.load())));
389     if (bufferSize < 0) {
390         MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
391         WaitForInterrupt(TEN_MILLISECOUNDS);
392         return;
393     }
394 
395     char* cacheBuffer = new char[bufferSize];
396     if (cacheBuffer == nullptr) {
397         MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
398         WaitForInterrupt(TEN_MILLISECOUNDS);
399         return;
400     }
401     int size = read(fd_, cacheBuffer, bufferSize);
402     if (size <= 0) {
403         DeleteCacheBuffer(cacheBuffer, bufferSize);
404         HandleReadResult(bufferSize, size);
405         return;
406     }
407     MEDIA_LOG_D("Cache fd: " PUBLIC_LOG_D32 ", cachePos_ " PUBLIC_LOG_U64 ", ringBuffer_.size() " PUBLIC_LOG_ZU
408         ", size_ " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_);
409     while (!ringBuffer_->WriteBuffer(cacheBuffer, size)) {
410         MEDIA_LOG_I("CacheData ringbuffer write failed");
411         if (inSeek_ || isInterrupted_) {
412             DeleteCacheBuffer(cacheBuffer, bufferSize);
413             return;
414         }
415         WaitForInterrupt(TEN_MILLISECOUNDS);
416     }
417     cachePosition_ += static_cast<uint64_t>(size);
418     downloadSize_ += static_cast<uint64_t>(size);
419 
420     int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
421     if (ct > READ_TIME) {
422         MEDIA_LOG_I("Cache fd: " PUBLIC_LOG_D32 " cachePos:" PUBLIC_LOG_U64 " ringBuffer.size() " PUBLIC_LOG_ZU
423         ", size " PUBLIC_LOG_U64 " cTime: " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(),
424         size_, ct);
425     }
426 
427     DeleteCacheBuffer(cacheBuffer, bufferSize);
428 
429     if (isBuffering_ && (static_cast<int64_t>(ringBuffer_->GetSize()) > waterLineAbove_ ||
430         GetLastSize(cachePosition_.load()) == 0)) {
431         NotifyBufferingEnd();
432     }
433 }
434 
HasCacheData(size_t bufferSize,uint64_t offset)435 bool FileFdSourcePlugin::HasCacheData(size_t bufferSize, uint64_t offset)
436 {
437     HmdfsHasCache ioctlData;
438     ioctlData.offset = static_cast<int64_t>(offset);
439     ioctlData.readSize = static_cast<int64_t>(bufferSize);
440     int32_t ioResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData); // 0在 -1不在
441 
442     ioctlData.offset = static_cast<int64_t>(cachePosition_);
443     ioctlData.readSize = static_cast<int64_t>(PER_CACHE_SIZE);
444     int32_t ioCacheResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData);
445     // ioctl has cache
446     if (ioResult == 0 && ioCacheResult == 0) {
447         return true;
448     } else {
449         MEDIA_LOG_I("ioctl has no cache with errno " PUBLIC_LOG_D32, errno);
450     }
451     return false;
452 }
453 
Stop()454 Status FileFdSourcePlugin::Stop()
455 {
456     MEDIA_LOG_I("Stop enter.");
457     isInterrupted_ = true;
458     MEDIA_LOG_I("Stop isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
459     FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
460     downloadTask_->StopAsync();
461     return Status::OK;
462 }
463 
Reset()464 Status FileFdSourcePlugin::Reset()
465 {
466     MEDIA_LOG_I("Reset enter.");
467     isInterrupted_ = true;
468     MEDIA_LOG_I("Reset isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
469     FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
470     downloadTask_->StopAsync();
471     return Status::OK;
472 }
473 
PauseDownloadTask(bool isAsync)474 void FileFdSourcePlugin::PauseDownloadTask(bool isAsync)
475 {
476     FALSE_RETURN(downloadTask_ != nullptr);
477     if (isAsync) {
478         downloadTask_->PauseAsync();
479     } else {
480         downloadTask_->Pause();
481     }
482 }
483 
HandleBuffering()484 bool FileFdSourcePlugin::HandleBuffering()
485 {
486     MEDIA_LOG_I("HandleBuffering in.");
487     int32_t sleepTime = 0;
488     // return error again 1 time 1s, avoid ffmpeg error
489     while (sleepTime < ONE_SECONDS && !isInterrupted_ && isReadBlocking_) {
490         NotifyBufferingPercent();
491         if (!isBuffering_) {
492             break;
493         }
494         MEDIA_LOG_I("isBuffering.");
495         WaitForInterrupt(TEN_MILLISECOUNDS);
496         sleepTime += TEN_MILLISECOUNDS;
497     }
498     MEDIA_LOG_I("HandleBuffering out.");
499     return isBuffering_;
500 }
501 
HandleReadResult(size_t bufferSize,int size)502 void FileFdSourcePlugin::HandleReadResult(size_t bufferSize, int size)
503 {
504     MEDIA_LOG_I("HandleReadResult size " PUBLIC_LOG_D32 ", fd " PUBLIC_LOG_D32 ", cachePosition_" PUBLIC_LOG_U64
505         ", position_ " PUBLIC_LOG_U64 ", bufferSize " PUBLIC_LOG_ZU ", size_ " PUBLIC_LOG_U64 ", offset_ "
506         PUBLIC_LOG_D64, size, fd_, cachePosition_.load(), position_.load(), bufferSize, size_, offset_);
507     if (size < 0) {
508         // errno EIO  5
509         MEDIA_LOG_E("read fail, errno " PUBLIC_LOG_D32, errno);
510 
511         // read fail with errno, retry 3 * 10ms
512         retryTimes_++;
513         if (retryTimes_ >= RETRY_TIMES || isInterrupted_.load()) {
514             NotifyReadFail();
515             SetInterruptState(true);
516         }
517         WaitForInterrupt(TEN_MILLISECOUNDS);
518     } else {
519         cachePosition_ = 0;
520         PauseDownloadTask(false);
521     }
522 }
523 
NotifyBufferingStart()524 void FileFdSourcePlugin::NotifyBufferingStart()
525 {
526     MEDIA_LOG_I("NotifyBufferingStart, ringBuffer.size() " PUBLIC_LOG_ZU
527         ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
528     isBuffering_ = true;
529     if (callback_ != nullptr && !isInterrupted_) {
530         MEDIA_LOG_I("Read OnEvent BUFFERING_START.");
531         callback_->OnEvent({PluginEventType::BUFFERING_START, {BufferingInfoType::BUFFERING_START}, "start"});
532     } else {
533         MEDIA_LOG_E("BUFFERING_START callback_ is nullptr or isInterrupted_ is true");
534     }
535 }
536 
NotifyBufferingPercent()537 void FileFdSourcePlugin::NotifyBufferingPercent()
538 {
539     if (waterLineAbove_ != 0) {
540         int64_t bp = static_cast<float>(ringBuffer_->GetSize()) / waterLineAbove_ * PERCENT_100;
541         bp = bp > PERCENT_100 ? PERCENT_100 : bp;
542         if (isBuffering_ && callback_ != nullptr && !isInterrupted_) {
543             MEDIA_LOG_I("NotifyBufferingPercent, ringBuffer.size() " PUBLIC_LOG_ZU ", waterLineAbove_ " PUBLIC_LOG_U64
544                 ", PERCENT " PUBLIC_LOG_D32, ringBuffer_->GetSize(), waterLineAbove_, static_cast<int32_t>(bp));
545             callback_->OnEvent({PluginEventType::EVENT_BUFFER_PROGRESS,
546                 {BufferingInfoType::BUFFERING_PERCENT}, std::to_string(bp)});
547         } else {
548             MEDIA_LOG_E("EVENT_BUFFER_PROGRESS callback_ is nullptr or isInterrupted_ \
549                 is true or isBuffering_ is false");
550         }
551     }
552 }
553 
NotifyBufferingEnd()554 void FileFdSourcePlugin::NotifyBufferingEnd()
555 {
556     NotifyBufferingPercent();
557     MEDIA_LOG_I("NotifyBufferingEnd, ringBuffer.size() " PUBLIC_LOG_ZU
558         ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
559     isBuffering_ = false;
560     lastReadTime_ = 0;
561     if (callback_ != nullptr && !isInterrupted_) {
562         MEDIA_LOG_I("NotifyBufferingEnd success .");
563         callback_->OnEvent({PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
564     } else {
565         MEDIA_LOG_E("BUFFERING_END callback_ is nullptr or isInterrupted_ is true");
566     }
567 }
568 
NotifyReadFail()569 void FileFdSourcePlugin::NotifyReadFail()
570 {
571     MEDIA_LOG_I("NotifyReadFail in.");
572     if (callback_ != nullptr && !isInterrupted_) {
573         MEDIA_LOG_I("Read OnEvent read fail");
574         callback_->OnEvent({PluginEventType::CLIENT_ERROR, {NetworkClientErrorCode::ERROR_TIME_OUT}, "read"});
575     } else {
576         MEDIA_LOG_E("CLIENT_ERROR callback_ is nullptr or isInterrupted_ is true");
577     }
578 }
579 
SetDemuxerState(int32_t streamId)580 void FileFdSourcePlugin::SetDemuxerState(int32_t streamId)
581 {
582     MEDIA_LOG_I("SetDemuxerState");
583     isReadFrame_ = true;
584 }
585 
SetCurrentBitRate(int32_t bitRate,int32_t streamID)586 Status FileFdSourcePlugin::SetCurrentBitRate(int32_t bitRate, int32_t streamID)
587 {
588     currentBitRate_ = bitRate / TO_BYTE; // 8b
589     MEDIA_LOG_I("currentBitRate: " PUBLIC_LOG_D32, currentBitRate_);
590     // default cache 0.3s
591     waterLineAbove_ = CACHE_LEVEL_1 * currentBitRate_;
592     return Status::OK;
593 }
594 
SetBundleName(const std::string & bundleName)595 void FileFdSourcePlugin::SetBundleName(const std::string& bundleName)
596 {
597     MEDIA_LOG_I("SetBundleName bundleName: " PUBLIC_LOG_S, bundleName.c_str());
598 }
599 
SetReadBlockingFlag(bool isAllowed)600 Status FileFdSourcePlugin::SetReadBlockingFlag(bool isAllowed)
601 {
602     MEDIA_LOG_I("SetReadBlockingFlag entered, IsReadBlockingAllowed %{public}d", isAllowed);
603     if (ringBuffer_) {
604         ringBuffer_->SetReadBlocking(isAllowed);
605     }
606     isReadBlocking_ = isAllowed;
607     return Status::OK;
608 }
609 
SetInterruptState(bool isInterruptNeeded)610 void FileFdSourcePlugin::SetInterruptState(bool isInterruptNeeded)
611 {
612     bool isInterruptAllowed = true;
613     MEDIA_LOG_I("SetInterruptState isInterrupted_" PUBLIC_LOG_D32, isInterruptNeeded);
614     {
615         std::lock_guard<std::mutex> lock(interruptMutex_);
616         isInterruptAllowed = !(isInterrupted_ && isInterruptNeeded);
617         isInterrupted_ = isInterruptNeeded;
618         bufferCond_.notify_all();
619     }
620     if (ringBuffer_ != nullptr) {
621         if (isInterrupted_) {
622             ringBuffer_->SetActive(false);
623         } else {
624             ringBuffer_->SetActive(true);
625         }
626     }
627 
628     if (isInterrupted_ && isInterruptAllowed && isCloudFile_) {
629         if (downloadTask_ != nullptr) {
630             downloadTask_->StopAsync();
631         }
632         int ret = ioctl(fd_, HMDFS_IOC_CANCEL_READ);
633         MEDIA_LOG_I("ioctl break read, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
634     }
635 }
636 
GetSize(uint64_t & size)637 Status FileFdSourcePlugin::GetSize(uint64_t& size)
638 {
639     size = size_;
640     return Status::OK;
641 }
642 
GetSeekable()643 Seekable FileFdSourcePlugin::GetSeekable()
644 {
645     MEDIA_LOG_D("GetSeekable in");
646     return seekable_;
647 }
648 
CheckFileType()649 void FileFdSourcePlugin::CheckFileType()
650 {
651     int loc; // 1本地,2云端
652     int ioResult = ioctl(fd_, HMDFS_IOC_GET_LOCATION, &loc);
653     MEDIA_LOG_I("SetSource ioctl loc, ret " PUBLIC_LOG_D32 ", loc " PUBLIC_LOG_D32 ", errno"
654         PUBLIC_LOG_D32, ioResult, loc, errno);
655 
656     if (ioResult == 0) {
657         loc_ = loc;
658     }
659     if (!isEnableFdCache_) {
660         isCloudFile_ = false;
661         return;
662     }
663 
664     if (ioResult == 0) {
665         if (loc == IOCTL_CLOUD) {
666             isCloudFile_ = true;
667             MEDIA_LOG_I("ioctl file is cloud");
668             int ret = ioctl(fd_, HMDFS_IOC_RESTORE_READ);
669             MEDIA_LOG_I("ioctl restore fd, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
670             return;
671         } else {
672             isCloudFile_ = false;
673             MEDIA_LOG_I("ioctl file is local");
674         }
675     } else {
676         isCloudFile_ = false;
677         MEDIA_LOG_I("ioctl failed to get file type");
678     }
679 }
680 
GetBufferPtr(std::shared_ptr<Buffer> & buffer,size_t expectedLen)681 std::shared_ptr<Memory> FileFdSourcePlugin::GetBufferPtr(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
682 {
683     if (!buffer) {
684         buffer = std::make_shared<Buffer>();
685     }
686     std::shared_ptr<Memory> bufData;
687     if (buffer->IsEmpty()) {
688         bufData = buffer->AllocMemory(nullptr, expectedLen);
689     } else {
690         bufData = buffer->GetMemory();
691     }
692     return bufData;
693 }
694 
GetLastSize(uint64_t position)695 int64_t FileFdSourcePlugin::GetLastSize(uint64_t position)
696 {
697     int64_t ret = static_cast<int64_t>(size_) + offset_ - static_cast<int64_t>(position);
698     if (ret < 0) {
699         MEDIA_LOG_E("GetLastSize error, fd_ " PUBLIC_LOG_D32 ", offset_ " PUBLIC_LOG_D64 ", size_ "
700             PUBLIC_LOG_U64 ", position " PUBLIC_LOG_U64, fd_, offset_, size_, position);
701     }
702     return ret;
703 }
704 
GetCurrentSpeed(int64_t curTime)705 void FileFdSourcePlugin::GetCurrentSpeed(int64_t curTime)
706 {
707     if ((curTime - lastCheckTime_) > RECORD_TIME_INTERVAL) {
708         MEDIA_LOG_I("CacheDataLoop curTime_: " PUBLIC_LOG_U64 " lastCheckTime_: "
709         PUBLIC_LOG_U64 " downloadSize_: " PUBLIC_LOG_U64, curTime, lastCheckTime_, downloadSize_);
710         float duration = static_cast<double>(curTime - lastCheckTime_) / MILLISECOUND_TO_SECOND;
711         avgDownloadSpeed_ = downloadSize_ / duration; //b/s
712         MEDIA_LOG_I("downloadDuration: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F,
713             duration, avgDownloadSpeed_);
714         downloadSize_ = 0;
715         lastCheckTime_ = curTime;
716         FALSE_RETURN(currentBitRate_ > 0);
717         UpdateWaterLineAbove();
718     }
719 }
720 
UpdateWaterLineAbove()721 void FileFdSourcePlugin::UpdateWaterLineAbove()
722 {
723     FALSE_RETURN_MSG(currentBitRate_ > 0, "currentBitRate_ <= 0");
724     float cacheTime = GetCacheTime(avgDownloadSpeed_ / currentBitRate_);
725     MEDIA_LOG_I("cacheTime: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F
726         " currentBitRate: " PUBLIC_LOG_D32, cacheTime, avgDownloadSpeed_, currentBitRate_);
727     waterLineAbove_ = cacheTime * currentBitRate_ > GetLastSize(cachePosition_.load()) ?
728         GetLastSize(cachePosition_.load()) : cacheTime * currentBitRate_;
729     MEDIA_LOG_I("waterLineAbove_: " PUBLIC_LOG_U64, waterLineAbove_);
730 }
731 
GetCacheTime(float num)732 float FileFdSourcePlugin::GetCacheTime(float num)
733 {
734     MEDIA_LOG_I("GetCacheTime with num: " PUBLIC_LOG_F, num);
735     if (num < 0) {
736         return CACHE_LEVEL_1;
737     }
738     if (num > 0 && num < 0.5) { // (0, 0.5)
739         return CACHE_TIME_DEFAULT;
740     } else if (num >= 0.5 && num < 1) { //[0.5, 1)
741         return CACHE_TIME_DEFAULT;
742     } else if (num >= 1) { //[1, 2)
743         return CACHE_LEVEL_1;
744     }
745     return CACHE_TIME_DEFAULT;
746 }
747 
DeleteCacheBuffer(char * buffer,size_t bufferSize)748 void FileFdSourcePlugin::DeleteCacheBuffer(char* buffer, size_t bufferSize)
749 {
750     if (buffer != nullptr && bufferSize > 0) {
751         delete[] buffer;
752     }
753 }
754 
CheckReadTime()755 void FileFdSourcePlugin::CheckReadTime()
756 {
757     if (IsValidTime(curReadTime_, lastReadTime_)) {
758         NotifyBufferingStart();
759         lastReadTime_ = 0;
760     } else {
761         if (lastReadTime_ == 0) {
762             lastReadTime_ = curReadTime_;
763         }
764     }
765 }
766 
IsValidTime(int64_t curTime,int64_t lastTime)767 bool FileFdSourcePlugin::IsValidTime(int64_t curTime, int64_t lastTime)
768 {
769     return lastReadTime_ != 0 && curReadTime_ - lastReadTime_ < SEEK_TIME_UPPER &&
770         curReadTime_ - lastReadTime_ > SEEK_TIME_LOWER;
771 }
772 
SetEnableOnlineFdCache(bool isEnableFdCache)773 void FileFdSourcePlugin::SetEnableOnlineFdCache(bool isEnableFdCache)
774 {
775     isEnableFdCache_ = isEnableFdCache;
776 }
777 
WaitForInterrupt(int32_t waitTimeMS)778 void FileFdSourcePlugin::WaitForInterrupt(int32_t waitTimeMS)
779 {
780     std::unique_lock<std::mutex> lock(interruptMutex_);
781     bufferCond_.wait_for(lock, std::chrono::milliseconds(waitTimeMS), [&] { return isInterrupted_.load(); });
782 }
783 
IsLocalFd()784 bool FileFdSourcePlugin::IsLocalFd()
785 {
786     return !isCloudFile_;
787 }
788 } // namespace FileFdSource
789 } // namespace Plugin
790 } // namespace Media
791 } // namespace OHOS