1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "FileFdSourcePlugin"
17
18 #include <cerrno>
19 #include <cstring>
20 #include <regex>
21 #include <memory>
22 #ifdef WIN32
23 #include <fcntl.h>
24 #else
25 #include <sys/types.h>
26 #include <unistd.h>
27 #endif
28 #include <sys/ioctl.h>
29 #include <sys/stat.h>
30 #include "common/log.h"
31 #include "osal/filesystem/file_system.h"
32 #include "file_fd_source_plugin.h"
33 #include "common/media_core.h"
34
35 namespace {
36 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "FileFdSourcePlugin" };
StrToLong(const std::string_view & str,int64_t & value)37 bool StrToLong(const std::string_view& str, int64_t& value)
38 {
39 FALSE_RETURN_V_MSG_E(!str.empty() && (isdigit(str.front()) || (str.front() == '-')),
40 false, "no valid string.");
41 std::string valStr(str);
42 char* end = nullptr;
43 errno = 0;
44 long long result = strtoll(valStr.c_str(), &end, 10); /* 10 means decimal */
45 FALSE_RETURN_V_MSG_E(result >= LLONG_MIN && result <= LLONG_MAX, false,
46 "call StrToLong func false, input str is: %{public}s!", valStr.c_str());
47 FALSE_RETURN_V_MSG_E(end != valStr.c_str() && end[0] == '\0' && errno != ERANGE, false,
48 "call StrToLong func false, input str is: %{public}s!", valStr.c_str());
49 value = result;
50 return true;
51 }
52 }
53
54 namespace OHOS {
55 namespace Media {
56 namespace Plugins {
57 namespace FileFdSource {
58 namespace {
59 constexpr int32_t FDPOS = 2;
60 constexpr int32_t READ_TIME = 3;
61 constexpr size_t CACHE_SIZE = 40 * 1024 * 1024;
62 constexpr size_t PER_CACHE_SIZE = 48 * 10 * 1024;;
63 constexpr int32_t TEN_MILLISECOUNDS = 10;
64 constexpr int32_t ONE_SECONDS = 1 * 1000;
65 constexpr int32_t CACHE_TIME_DEFAULT = 5;
66 constexpr int32_t SEEK_TIME_LOWER = 20;
67 constexpr int32_t SEEK_TIME_UPPER = 1000;
68 constexpr int32_t RECORD_TIME_INTERVAL = 1 * 1000;
69 constexpr int32_t MILLISECOUND_TO_SECOND = 1 * 1000;
70 constexpr int32_t RETRY_TIMES = 3;
71 constexpr int32_t TO_BYTE = 8;
72 constexpr int32_t PERCENT_100 = 100;
73 constexpr int32_t MAX_RANK = 100;
74 constexpr int32_t READ_RETRY = 2;
75 constexpr int32_t READ_ERROR_IO = EIO;
76 constexpr int32_t READ_ERROR_NOMEM = ENOMEM;
77 constexpr float CACHE_LEVEL_1 = 0.3;
78
79 constexpr unsigned int HMDFS_IOC = 0xf2;
80 #define IOCTL_CLOUD 2
81 #define HMDFS_IOC_HAS_CACHE _IOW(HMDFS_IOC, 6, struct HmdfsHasCache)
82 #define HMDFS_IOC_GET_LOCATION _IOR(HMDFS_IOC, 7, __u32)
83 #define HMDFS_IOC_CANCEL_READ _IO(HMDFS_IOC, 8)
84 #define HMDFS_IOC_RESTORE_READ _IO(HMDFS_IOC, 9)
85
GetFileSize(int32_t fd)86 uint64_t GetFileSize(int32_t fd)
87 {
88 uint64_t fileSize = 0;
89 struct stat s {};
90 int ret = fstat(fd, &s);
91 if (ret == 0) {
92 fileSize = static_cast<uint64_t>(s.st_size);
93 FALSE_RETURN_V_MSG_E(fileSize != 0, fileSize, "fileSize 0, fstat ret 0");
94 return fileSize;
95 } else {
96 MEDIA_LOG_W("GetFileSize error ret " PUBLIC_LOG_D32 ", errno " PUBLIC_LOG_D32, ret, errno);
97 }
98 return fileSize;
99 }
isNumber(const std::string & str)100 bool isNumber(const std::string& str)
101 {
102 return str.find_first_not_of("0123456789") == std::string::npos;
103 }
104 }
FileFdSourceRegister(const std::shared_ptr<Register> & reg)105 Status FileFdSourceRegister(const std::shared_ptr<Register>& reg)
106 {
107 MEDIA_LOG_I("fileSourceRegister is started");
108 SourcePluginDef definition;
109 definition.name = "FileFdSource";
110 definition.description = "File Fd source";
111 definition.rank = MAX_RANK; // 100: max rank
112 Capability capability;
113 capability.AppendFixedKey<std::vector<ProtocolType>>(Tag::MEDIA_PROTOCOL_TYPE, {ProtocolType::FD});
114 definition.AddInCaps(capability);
115 auto func = [](const std::string& name) -> std::shared_ptr<SourcePlugin> {
116 return std::make_shared<FileFdSourcePlugin>(name);
117 };
118 definition.SetCreator(func);
119 return reg->AddPlugin(definition);
120 }
121
__anon840d632c0402null122 PLUGIN_DEFINITION(FileFdSource, LicenseType::APACHE_V2, FileFdSourceRegister, [] {});
123
FileFdSourcePlugin(std::string name)124 FileFdSourcePlugin::FileFdSourcePlugin(std::string name)
125 : SourcePlugin(std::move(name))
126 {
127 }
128
~FileFdSourcePlugin()129 FileFdSourcePlugin::~FileFdSourcePlugin()
130 {
131 MEDIA_LOG_I("~FileFdSourcePlugin in.");
132 steadyClock_.Reset();
133 SetInterruptState(true);
134 MEDIA_LOG_I("~FileFdSourcePlugin isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
135 FALSE_RETURN_MSG(downloadTask_ != nullptr, "~FileFdSourcePlugin out.");
136 downloadTask_->Stop();
137 MEDIA_LOG_I("~FileFdSourcePlugin out.");
138 }
139
SetCallback(Callback * cb)140 Status FileFdSourcePlugin::SetCallback(Callback* cb)
141 {
142 MEDIA_LOG_D("SetCallback in " PUBLIC_LOG_D32, cb != nullptr);
143 callback_ = cb;
144 return Status::OK;
145 }
146
SetSource(std::shared_ptr<MediaSource> source)147 Status FileFdSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
148 {
149 MEDIA_LOG_I("SetSource in. %{private}s", source->GetSourceUri().c_str());
150 auto err = ParseUriInfo(source->GetSourceUri());
151 if (err != Status::OK) {
152 MEDIA_LOG_E("Parse file name from uri fail, uri %{private}s", source->GetSourceUri().c_str());
153 return err;
154 }
155 CheckFileType();
156 if (isCloudFile_) {
157 ringBuffer_ = std::make_shared<RingBuffer>(CACHE_SIZE);
158 FALSE_RETURN_V_MSG_E(!(ringBuffer_ == nullptr || !ringBuffer_->Init()),
159 Status::ERROR_NO_MEMORY, "memory is not enough ringBuffer_");
160 downloadTask_ = std::make_shared<Task>(std::string("downloadTaskFD"));
161 FALSE_RETURN_V_MSG_E(downloadTask_ != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
162 downloadTask_->RegisterJob([this] {
163 CacheDataLoop();
164 return 0;
165 });
166 downloadTask_->Start();
167 steadyClock_.Reset();
168 }
169 return Status::OK;
170 }
171
Read(std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)172 Status FileFdSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
173 {
174 return Read(0, buffer, offset, expectedLen);
175 }
176
Read(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)177 Status FileFdSourcePlugin::Read(int32_t streamId, std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
178 {
179 FALSE_RETURN_V_MSG_E(fd_ != -1, Status::ERROR_WRONG_STATE, "no valid fd");
180 if (isCloudFile_) {
181 return ReadOnlineFile(0, buffer, offset, expectedLen);
182 } else {
183 return ReadOfflineFile(0, buffer, offset, expectedLen);
184 }
185 }
186
ReadOfflineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)187 Status FileFdSourcePlugin::ReadOfflineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
188 uint64_t offset, size_t expectedLen)
189 {
190 std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
191 FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
192 expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
193 expectedLen = std::min(bufData->GetCapacity(), expectedLen);
194 MEDIA_LOG_D("ReadLocal buffer pos: " PUBLIC_LOG_U64 " , len:" PUBLIC_LOG_ZU, position_.load(), expectedLen);
195
196 int64_t offsetCur = lseek(fd_, 0, SEEK_CUR);
197 if (offsetCur < 0) {
198 MEDIA_LOG_E("Fd get offset failed ");
199 } else if (static_cast<uint64_t>(offsetCur) != position_.load()) {
200 MEDIA_LOG_E("Fd offsetCur has changed. offsetCur " PUBLIC_LOG_D64 ", offsetOld " PUBLIC_LOG_U64,
201 offsetCur, position_.load());
202 }
203 auto size = read(fd_, bufData->GetWritableAddr(expectedLen), expectedLen);
204 if (size <= 0) {
205 HandleReadResult(expectedLen, size);
206 FALSE_RETURN_V((loc_ != IOCTL_CLOUD || isEnableFdCache_ ||
207 (errno != READ_ERROR_NOMEM && errno != READ_ERROR_IO)), Status::ERROR_INVALID_DATA);
208 MEDIA_LOG_D("ReadLocal END_OF_STREAM");
209 return Status::END_OF_STREAM;
210 }
211 bufData->UpdateDataSize(size);
212 position_ += static_cast<uint64_t>(size);
213 if (buffer->GetMemory() != nullptr) {
214 MEDIA_LOG_D("ReadLocal position_ " PUBLIC_LOG_U64 ", readSize " PUBLIC_LOG_ZU,
215 position_.load(), buffer->GetMemory()->GetSize());
216 }
217 return Status::OK;
218 }
219
ReadOnlineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)220 Status FileFdSourcePlugin::ReadOnlineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
221 uint64_t offset, size_t expectedLen)
222 {
223 if (isBuffering_) {
224 if (HandleBuffering()) {
225 FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
226 FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
227 MEDIA_LOG_I("is buffering, return error again.");
228 return Status::ERROR_AGAIN;
229 }
230 }
231
232 std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
233 FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
234 expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
235 expectedLen = std::min(bufData->GetCapacity(), expectedLen);
236
237 // ringbuffer 0 after seek in 20ms, don't notify buffering
238 curReadTime_ = steadyClock2_.ElapsedMilliseconds();
239 if (isReadFrame_ && ringBuffer_->GetSize() < expectedLen && !HasCacheData(expectedLen, offset) &&
240 static_cast<size_t>(GetLastSize(position_)) > expectedLen) {
241 MEDIA_LOG_I("ringBuffer.size() " PUBLIC_LOG_ZU " curReadTime_ " PUBLIC_LOG_D64
242 " lastReadTime_ " PUBLIC_LOG_D64, ringBuffer_->GetSize(), curReadTime_, lastReadTime_);
243 CheckReadTime();
244 FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
245 FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
246 return Status::ERROR_AGAIN;
247 }
248
249 size_t size = ringBuffer_->ReadBuffer(bufData->GetWritableAddr(expectedLen), expectedLen, READ_RETRY);
250 if (size == 0) {
251 MEDIA_LOG_I("read size 0, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_D64 ", size:" PUBLIC_LOG_U64 ", pos:"
252 PUBLIC_LOG_U64 ", readBlock:" PUBLIC_LOG_D32, fd_, offset, size_, position_.load(), isReadBlocking_.load());
253 FALSE_RETURN_V_MSG_E(GetLastSize(position_) != 0, Status::END_OF_STREAM, "ReadCloud END_OF_STREAM");
254 bufData->UpdateDataSize(0);
255 return Status::OK;
256 }
257 bufData->UpdateDataSize(size);
258 int64_t ct = steadyClock2_.ElapsedMilliseconds() - curReadTime_;
259 if (ct > READ_TIME) {
260 MEDIA_LOG_I("ReadCloud buffer position " PUBLIC_LOG_U64 ", expectedLen " PUBLIC_LOG_ZU
261 " costTime: " PUBLIC_LOG_U64, position_.load(), expectedLen, ct);
262 }
263 position_ += static_cast<uint64_t>(size);
264 MEDIA_LOG_D("ringBuffer.size() " PUBLIC_LOG_ZU, ringBuffer_->GetSize());
265 return Status::OK;
266 }
267
SeekTo(uint64_t offset)268 Status FileFdSourcePlugin::SeekTo(uint64_t offset)
269 {
270 FALSE_RETURN_V_MSG_E(fd_ != -1 && seekable_ == Seekable::SEEKABLE,
271 Status::ERROR_WRONG_STATE, "no valid fd or no seekable.");
272
273 MEDIA_LOG_D("SeekTo offset: " PUBLIC_LOG_U64, offset);
274 if (isCloudFile_) {
275 return SeekToOnlineFile(offset);
276 } else {
277 return SeekToOfflineFile(offset);
278 }
279 }
280
SeekToOfflineFile(uint64_t offset)281 Status FileFdSourcePlugin::SeekToOfflineFile(uint64_t offset)
282 {
283 int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
284 if (ret == -1) {
285 MEDIA_LOG_E("SeekLocal failed, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
286 PUBLIC_LOG_S, fd_, offset, strerror(errno));
287 return Status::ERROR_UNKNOWN;
288 }
289 position_ = offset + static_cast<uint64_t>(offset_);
290 MEDIA_LOG_D("SeekLocal end ret " PUBLIC_LOG_D32 ", position_ " PUBLIC_LOG_U64, ret, position_.load());
291 return Status::OK;
292 }
293
SeekToOnlineFile(uint64_t offset)294 Status FileFdSourcePlugin::SeekToOnlineFile(uint64_t offset)
295 {
296 FALSE_RETURN_V_MSG_E(ringBuffer_ != nullptr, Status::ERROR_WRONG_STATE, "SeekCloud ringBuffer_ is nullptr");
297 MEDIA_LOG_D("SeekCloud, ringBuffer.size: " PUBLIC_LOG_ZU ", offset " PUBLIC_LOG_U64,
298 ringBuffer_->GetSize(), offset);
299 if (ringBuffer_->Seek(offset)) {
300 position_ = offset + static_cast<uint64_t>(offset_);
301 MEDIA_LOG_I("SeekCloud ringBuffer_ seek hit, offset " PUBLIC_LOG_U64, offset);
302 return Status::OK;
303 }
304 // First clear buffer, avoid no available buffer then task pause never exit.
305 ringBuffer_->SetActive(false);
306 inSeek_ = true;
307 if (downloadTask_ != nullptr) {
308 downloadTask_->Pause();
309 inSeek_ = false;
310 }
311 ringBuffer_->Clear();
312 ringBuffer_->SetMediaOffset(offset);
313 {
314 std::lock_guard<std::mutex> lock(interruptMutex_);
315 FALSE_RETURN_V(!isInterrupted_, Status::OK);
316 ringBuffer_->SetActive(true);
317 }
318
319 int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
320 if (ret == -1) {
321 MEDIA_LOG_E("SeekCloud failed, fd_ " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
322 PUBLIC_LOG_S, fd_, offset, strerror(errno));
323 return Status::ERROR_UNKNOWN;
324 }
325 position_ = offset + static_cast<uint64_t>(offset_);
326 cachePosition_ = position_.load();
327
328 MEDIA_LOG_D("SeekCloud end, fd_ " PUBLIC_LOG_D32 ", size_ " PUBLIC_LOG_U64 ", offset_ " PUBLIC_LOG_D64
329 ", position_ " PUBLIC_LOG_U64, fd_, size_, offset_, position_.load());
330 if (downloadTask_ != nullptr) {
331 downloadTask_->Start();
332 }
333 return Status::OK;
334 }
335
ParseUriInfo(const std::string & uri)336 Status FileFdSourcePlugin::ParseUriInfo(const std::string& uri)
337 {
338 if (uri.empty()) {
339 MEDIA_LOG_E("uri is empty");
340 return Status::ERROR_INVALID_PARAMETER;
341 }
342 std::smatch fdUriMatch;
343 FALSE_RETURN_V_MSG_E(std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)\\?offset=(.*)&size=(.*)")) ||
344 std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)")),
345 Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
346 FALSE_RETURN_V_MSG_E(fdUriMatch.size() >= FDPOS && isNumber(fdUriMatch[1].str()),
347 Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
348 fd_ = std::stoi(fdUriMatch[1].str()); // 1: sub match fd subscript
349 FALSE_RETURN_V_MSG_E(fd_ != -1 && FileSystem::IsRegularFile(fd_),
350 Status::ERROR_INVALID_PARAMETER, "Invalid fd: " PUBLIC_LOG_D32, fd_);
351 fileSize_ = GetFileSize(fd_);
352 if (fdUriMatch.size() == 4) { // 4:4 sub match
353 std::string offsetStr = fdUriMatch[2].str(); // 2: sub match offset subscript
354 FALSE_RETURN_V_MSG_E(StrToLong(offsetStr, offset_), Status::ERROR_INVALID_PARAMETER,
355 "Failed to read offset.");
356 if (static_cast<uint64_t>(offset_) > fileSize_) {
357 offset_ = static_cast<int64_t>(fileSize_);
358 }
359 size_ = static_cast<uint64_t>(std::stoll(fdUriMatch[3].str())); // 3: sub match size subscript
360 uint64_t remainingSize = fileSize_ - static_cast<uint64_t>(offset_);
361 if (size_ > remainingSize) {
362 size_ = remainingSize;
363 }
364 } else {
365 size_ = fileSize_;
366 offset_ = 0;
367 }
368 position_ = offset_;
369 seekable_ = FileSystem::IsSeekable(fd_) ? Seekable::SEEKABLE : Seekable::UNSEEKABLE;
370 if (seekable_ == Seekable::SEEKABLE) {
371 NOK_LOG(SeekTo(0));
372 }
373 MEDIA_LOG_I("Fd: " PUBLIC_LOG_D32 ", offset: " PUBLIC_LOG_D64 ", size: " PUBLIC_LOG_U64, fd_, offset_, size_);
374 return Status::OK;
375 }
376
CacheDataLoop()377 void FileFdSourcePlugin::CacheDataLoop()
378 {
379 if (isInterrupted_) {
380 MEDIA_LOG_E("CacheData break");
381 WaitForInterrupt(TEN_MILLISECOUNDS);
382 return;
383 }
384
385 int64_t curTime = steadyClock_.ElapsedMilliseconds();
386 GetCurrentSpeed(curTime);
387
388 size_t bufferSize = std::min(PER_CACHE_SIZE, static_cast<size_t>(GetLastSize(cachePosition_.load())));
389 if (bufferSize < 0) {
390 MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
391 WaitForInterrupt(TEN_MILLISECOUNDS);
392 return;
393 }
394
395 char* cacheBuffer = new char[bufferSize];
396 if (cacheBuffer == nullptr) {
397 MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
398 WaitForInterrupt(TEN_MILLISECOUNDS);
399 return;
400 }
401 int size = read(fd_, cacheBuffer, bufferSize);
402 if (size <= 0) {
403 DeleteCacheBuffer(cacheBuffer, bufferSize);
404 HandleReadResult(bufferSize, size);
405 return;
406 }
407 MEDIA_LOG_D("Cache fd: " PUBLIC_LOG_D32 ", cachePos_ " PUBLIC_LOG_U64 ", ringBuffer_.size() " PUBLIC_LOG_ZU
408 ", size_ " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_);
409 while (!ringBuffer_->WriteBuffer(cacheBuffer, size)) {
410 MEDIA_LOG_I("CacheData ringbuffer write failed");
411 if (inSeek_ || isInterrupted_) {
412 DeleteCacheBuffer(cacheBuffer, bufferSize);
413 return;
414 }
415 WaitForInterrupt(TEN_MILLISECOUNDS);
416 }
417 cachePosition_ += static_cast<uint64_t>(size);
418 downloadSize_ += static_cast<uint64_t>(size);
419
420 int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
421 if (ct > READ_TIME) {
422 MEDIA_LOG_I("Cache fd: " PUBLIC_LOG_D32 " cachePos:" PUBLIC_LOG_U64 " ringBuffer.size() " PUBLIC_LOG_ZU
423 ", size " PUBLIC_LOG_U64 " cTime: " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(),
424 size_, ct);
425 }
426
427 DeleteCacheBuffer(cacheBuffer, bufferSize);
428
429 if (isBuffering_ && (static_cast<int64_t>(ringBuffer_->GetSize()) > waterLineAbove_ ||
430 GetLastSize(cachePosition_.load()) == 0)) {
431 NotifyBufferingEnd();
432 }
433 }
434
HasCacheData(size_t bufferSize,uint64_t offset)435 bool FileFdSourcePlugin::HasCacheData(size_t bufferSize, uint64_t offset)
436 {
437 HmdfsHasCache ioctlData;
438 ioctlData.offset = static_cast<int64_t>(offset);
439 ioctlData.readSize = static_cast<int64_t>(bufferSize);
440 int32_t ioResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData); // 0在 -1不在
441
442 ioctlData.offset = static_cast<int64_t>(cachePosition_);
443 ioctlData.readSize = static_cast<int64_t>(PER_CACHE_SIZE);
444 int32_t ioCacheResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData);
445 // ioctl has cache
446 if (ioResult == 0 && ioCacheResult == 0) {
447 return true;
448 } else {
449 MEDIA_LOG_I("ioctl has no cache with errno " PUBLIC_LOG_D32, errno);
450 }
451 return false;
452 }
453
Stop()454 Status FileFdSourcePlugin::Stop()
455 {
456 MEDIA_LOG_I("Stop enter.");
457 isInterrupted_ = true;
458 MEDIA_LOG_I("Stop isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
459 FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
460 downloadTask_->StopAsync();
461 return Status::OK;
462 }
463
Reset()464 Status FileFdSourcePlugin::Reset()
465 {
466 MEDIA_LOG_I("Reset enter.");
467 isInterrupted_ = true;
468 MEDIA_LOG_I("Reset isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
469 FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
470 downloadTask_->StopAsync();
471 return Status::OK;
472 }
473
PauseDownloadTask(bool isAsync)474 void FileFdSourcePlugin::PauseDownloadTask(bool isAsync)
475 {
476 FALSE_RETURN(downloadTask_ != nullptr);
477 if (isAsync) {
478 downloadTask_->PauseAsync();
479 } else {
480 downloadTask_->Pause();
481 }
482 }
483
HandleBuffering()484 bool FileFdSourcePlugin::HandleBuffering()
485 {
486 MEDIA_LOG_I("HandleBuffering in.");
487 int32_t sleepTime = 0;
488 // return error again 1 time 1s, avoid ffmpeg error
489 while (sleepTime < ONE_SECONDS && !isInterrupted_ && isReadBlocking_) {
490 NotifyBufferingPercent();
491 if (!isBuffering_) {
492 break;
493 }
494 MEDIA_LOG_I("isBuffering.");
495 WaitForInterrupt(TEN_MILLISECOUNDS);
496 sleepTime += TEN_MILLISECOUNDS;
497 }
498 MEDIA_LOG_I("HandleBuffering out.");
499 return isBuffering_;
500 }
501
HandleReadResult(size_t bufferSize,int size)502 void FileFdSourcePlugin::HandleReadResult(size_t bufferSize, int size)
503 {
504 MEDIA_LOG_I("HandleReadResult size " PUBLIC_LOG_D32 ", fd " PUBLIC_LOG_D32 ", cachePosition_" PUBLIC_LOG_U64
505 ", position_ " PUBLIC_LOG_U64 ", bufferSize " PUBLIC_LOG_ZU ", size_ " PUBLIC_LOG_U64 ", offset_ "
506 PUBLIC_LOG_D64, size, fd_, cachePosition_.load(), position_.load(), bufferSize, size_, offset_);
507 if (size < 0) {
508 // errno EIO 5
509 MEDIA_LOG_E("read fail, errno " PUBLIC_LOG_D32, errno);
510
511 // read fail with errno, retry 3 * 10ms
512 retryTimes_++;
513 if (retryTimes_ >= RETRY_TIMES || isInterrupted_.load()) {
514 NotifyReadFail();
515 SetInterruptState(true);
516 }
517 WaitForInterrupt(TEN_MILLISECOUNDS);
518 } else {
519 cachePosition_ = 0;
520 PauseDownloadTask(false);
521 }
522 }
523
NotifyBufferingStart()524 void FileFdSourcePlugin::NotifyBufferingStart()
525 {
526 MEDIA_LOG_I("NotifyBufferingStart, ringBuffer.size() " PUBLIC_LOG_ZU
527 ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
528 isBuffering_ = true;
529 if (callback_ != nullptr && !isInterrupted_) {
530 MEDIA_LOG_I("Read OnEvent BUFFERING_START.");
531 callback_->OnEvent({PluginEventType::BUFFERING_START, {BufferingInfoType::BUFFERING_START}, "start"});
532 } else {
533 MEDIA_LOG_E("BUFFERING_START callback_ is nullptr or isInterrupted_ is true");
534 }
535 }
536
NotifyBufferingPercent()537 void FileFdSourcePlugin::NotifyBufferingPercent()
538 {
539 if (waterLineAbove_ != 0) {
540 int64_t bp = static_cast<float>(ringBuffer_->GetSize()) / waterLineAbove_ * PERCENT_100;
541 bp = bp > PERCENT_100 ? PERCENT_100 : bp;
542 if (isBuffering_ && callback_ != nullptr && !isInterrupted_) {
543 MEDIA_LOG_I("NotifyBufferingPercent, ringBuffer.size() " PUBLIC_LOG_ZU ", waterLineAbove_ " PUBLIC_LOG_U64
544 ", PERCENT " PUBLIC_LOG_D32, ringBuffer_->GetSize(), waterLineAbove_, static_cast<int32_t>(bp));
545 callback_->OnEvent({PluginEventType::EVENT_BUFFER_PROGRESS,
546 {BufferingInfoType::BUFFERING_PERCENT}, std::to_string(bp)});
547 } else {
548 MEDIA_LOG_E("EVENT_BUFFER_PROGRESS callback_ is nullptr or isInterrupted_ \
549 is true or isBuffering_ is false");
550 }
551 }
552 }
553
NotifyBufferingEnd()554 void FileFdSourcePlugin::NotifyBufferingEnd()
555 {
556 NotifyBufferingPercent();
557 MEDIA_LOG_I("NotifyBufferingEnd, ringBuffer.size() " PUBLIC_LOG_ZU
558 ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
559 isBuffering_ = false;
560 lastReadTime_ = 0;
561 if (callback_ != nullptr && !isInterrupted_) {
562 MEDIA_LOG_I("NotifyBufferingEnd success .");
563 callback_->OnEvent({PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
564 } else {
565 MEDIA_LOG_E("BUFFERING_END callback_ is nullptr or isInterrupted_ is true");
566 }
567 }
568
NotifyReadFail()569 void FileFdSourcePlugin::NotifyReadFail()
570 {
571 MEDIA_LOG_I("NotifyReadFail in.");
572 if (callback_ != nullptr && !isInterrupted_) {
573 MEDIA_LOG_I("Read OnEvent read fail");
574 callback_->OnEvent({PluginEventType::CLIENT_ERROR, {NetworkClientErrorCode::ERROR_TIME_OUT}, "read"});
575 } else {
576 MEDIA_LOG_E("CLIENT_ERROR callback_ is nullptr or isInterrupted_ is true");
577 }
578 }
579
SetDemuxerState(int32_t streamId)580 void FileFdSourcePlugin::SetDemuxerState(int32_t streamId)
581 {
582 MEDIA_LOG_I("SetDemuxerState");
583 isReadFrame_ = true;
584 }
585
SetCurrentBitRate(int32_t bitRate,int32_t streamID)586 Status FileFdSourcePlugin::SetCurrentBitRate(int32_t bitRate, int32_t streamID)
587 {
588 currentBitRate_ = bitRate / TO_BYTE; // 8b
589 MEDIA_LOG_I("currentBitRate: " PUBLIC_LOG_D32, currentBitRate_);
590 // default cache 0.3s
591 waterLineAbove_ = CACHE_LEVEL_1 * currentBitRate_;
592 return Status::OK;
593 }
594
SetBundleName(const std::string & bundleName)595 void FileFdSourcePlugin::SetBundleName(const std::string& bundleName)
596 {
597 MEDIA_LOG_I("SetBundleName bundleName: " PUBLIC_LOG_S, bundleName.c_str());
598 }
599
SetReadBlockingFlag(bool isAllowed)600 Status FileFdSourcePlugin::SetReadBlockingFlag(bool isAllowed)
601 {
602 MEDIA_LOG_I("SetReadBlockingFlag entered, IsReadBlockingAllowed %{public}d", isAllowed);
603 if (ringBuffer_) {
604 ringBuffer_->SetReadBlocking(isAllowed);
605 }
606 isReadBlocking_ = isAllowed;
607 return Status::OK;
608 }
609
SetInterruptState(bool isInterruptNeeded)610 void FileFdSourcePlugin::SetInterruptState(bool isInterruptNeeded)
611 {
612 bool isInterruptAllowed = true;
613 MEDIA_LOG_I("SetInterruptState isInterrupted_" PUBLIC_LOG_D32, isInterruptNeeded);
614 {
615 std::lock_guard<std::mutex> lock(interruptMutex_);
616 isInterruptAllowed = !(isInterrupted_ && isInterruptNeeded);
617 isInterrupted_ = isInterruptNeeded;
618 bufferCond_.notify_all();
619 }
620 if (ringBuffer_ != nullptr) {
621 if (isInterrupted_) {
622 ringBuffer_->SetActive(false);
623 } else {
624 ringBuffer_->SetActive(true);
625 }
626 }
627
628 if (isInterrupted_ && isInterruptAllowed && isCloudFile_) {
629 if (downloadTask_ != nullptr) {
630 downloadTask_->StopAsync();
631 }
632 int ret = ioctl(fd_, HMDFS_IOC_CANCEL_READ);
633 MEDIA_LOG_I("ioctl break read, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
634 }
635 }
636
GetSize(uint64_t & size)637 Status FileFdSourcePlugin::GetSize(uint64_t& size)
638 {
639 size = size_;
640 return Status::OK;
641 }
642
GetSeekable()643 Seekable FileFdSourcePlugin::GetSeekable()
644 {
645 MEDIA_LOG_D("GetSeekable in");
646 return seekable_;
647 }
648
CheckFileType()649 void FileFdSourcePlugin::CheckFileType()
650 {
651 int loc; // 1本地,2云端
652 int ioResult = ioctl(fd_, HMDFS_IOC_GET_LOCATION, &loc);
653 MEDIA_LOG_I("SetSource ioctl loc, ret " PUBLIC_LOG_D32 ", loc " PUBLIC_LOG_D32 ", errno"
654 PUBLIC_LOG_D32, ioResult, loc, errno);
655
656 if (ioResult == 0) {
657 loc_ = loc;
658 }
659 if (!isEnableFdCache_) {
660 isCloudFile_ = false;
661 return;
662 }
663
664 if (ioResult == 0) {
665 if (loc == IOCTL_CLOUD) {
666 isCloudFile_ = true;
667 MEDIA_LOG_I("ioctl file is cloud");
668 int ret = ioctl(fd_, HMDFS_IOC_RESTORE_READ);
669 MEDIA_LOG_I("ioctl restore fd, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
670 return;
671 } else {
672 isCloudFile_ = false;
673 MEDIA_LOG_I("ioctl file is local");
674 }
675 } else {
676 isCloudFile_ = false;
677 MEDIA_LOG_I("ioctl failed to get file type");
678 }
679 }
680
GetBufferPtr(std::shared_ptr<Buffer> & buffer,size_t expectedLen)681 std::shared_ptr<Memory> FileFdSourcePlugin::GetBufferPtr(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
682 {
683 if (!buffer) {
684 buffer = std::make_shared<Buffer>();
685 }
686 std::shared_ptr<Memory> bufData;
687 if (buffer->IsEmpty()) {
688 bufData = buffer->AllocMemory(nullptr, expectedLen);
689 } else {
690 bufData = buffer->GetMemory();
691 }
692 return bufData;
693 }
694
GetLastSize(uint64_t position)695 int64_t FileFdSourcePlugin::GetLastSize(uint64_t position)
696 {
697 int64_t ret = static_cast<int64_t>(size_) + offset_ - static_cast<int64_t>(position);
698 if (ret < 0) {
699 MEDIA_LOG_E("GetLastSize error, fd_ " PUBLIC_LOG_D32 ", offset_ " PUBLIC_LOG_D64 ", size_ "
700 PUBLIC_LOG_U64 ", position " PUBLIC_LOG_U64, fd_, offset_, size_, position);
701 }
702 return ret;
703 }
704
GetCurrentSpeed(int64_t curTime)705 void FileFdSourcePlugin::GetCurrentSpeed(int64_t curTime)
706 {
707 if ((curTime - lastCheckTime_) > RECORD_TIME_INTERVAL) {
708 MEDIA_LOG_I("CacheDataLoop curTime_: " PUBLIC_LOG_U64 " lastCheckTime_: "
709 PUBLIC_LOG_U64 " downloadSize_: " PUBLIC_LOG_U64, curTime, lastCheckTime_, downloadSize_);
710 float duration = static_cast<double>(curTime - lastCheckTime_) / MILLISECOUND_TO_SECOND;
711 avgDownloadSpeed_ = downloadSize_ / duration; //b/s
712 MEDIA_LOG_I("downloadDuration: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F,
713 duration, avgDownloadSpeed_);
714 downloadSize_ = 0;
715 lastCheckTime_ = curTime;
716 FALSE_RETURN(currentBitRate_ > 0);
717 UpdateWaterLineAbove();
718 }
719 }
720
UpdateWaterLineAbove()721 void FileFdSourcePlugin::UpdateWaterLineAbove()
722 {
723 FALSE_RETURN_MSG(currentBitRate_ > 0, "currentBitRate_ <= 0");
724 float cacheTime = GetCacheTime(avgDownloadSpeed_ / currentBitRate_);
725 MEDIA_LOG_I("cacheTime: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F
726 " currentBitRate: " PUBLIC_LOG_D32, cacheTime, avgDownloadSpeed_, currentBitRate_);
727 waterLineAbove_ = cacheTime * currentBitRate_ > GetLastSize(cachePosition_.load()) ?
728 GetLastSize(cachePosition_.load()) : cacheTime * currentBitRate_;
729 MEDIA_LOG_I("waterLineAbove_: " PUBLIC_LOG_U64, waterLineAbove_);
730 }
731
GetCacheTime(float num)732 float FileFdSourcePlugin::GetCacheTime(float num)
733 {
734 MEDIA_LOG_I("GetCacheTime with num: " PUBLIC_LOG_F, num);
735 if (num < 0) {
736 return CACHE_LEVEL_1;
737 }
738 if (num > 0 && num < 0.5) { // (0, 0.5)
739 return CACHE_TIME_DEFAULT;
740 } else if (num >= 0.5 && num < 1) { //[0.5, 1)
741 return CACHE_TIME_DEFAULT;
742 } else if (num >= 1) { //[1, 2)
743 return CACHE_LEVEL_1;
744 }
745 return CACHE_TIME_DEFAULT;
746 }
747
DeleteCacheBuffer(char * buffer,size_t bufferSize)748 void FileFdSourcePlugin::DeleteCacheBuffer(char* buffer, size_t bufferSize)
749 {
750 if (buffer != nullptr && bufferSize > 0) {
751 delete[] buffer;
752 }
753 }
754
CheckReadTime()755 void FileFdSourcePlugin::CheckReadTime()
756 {
757 if (IsValidTime(curReadTime_, lastReadTime_)) {
758 NotifyBufferingStart();
759 lastReadTime_ = 0;
760 } else {
761 if (lastReadTime_ == 0) {
762 lastReadTime_ = curReadTime_;
763 }
764 }
765 }
766
IsValidTime(int64_t curTime,int64_t lastTime)767 bool FileFdSourcePlugin::IsValidTime(int64_t curTime, int64_t lastTime)
768 {
769 return lastReadTime_ != 0 && curReadTime_ - lastReadTime_ < SEEK_TIME_UPPER &&
770 curReadTime_ - lastReadTime_ > SEEK_TIME_LOWER;
771 }
772
SetEnableOnlineFdCache(bool isEnableFdCache)773 void FileFdSourcePlugin::SetEnableOnlineFdCache(bool isEnableFdCache)
774 {
775 isEnableFdCache_ = isEnableFdCache;
776 }
777
WaitForInterrupt(int32_t waitTimeMS)778 void FileFdSourcePlugin::WaitForInterrupt(int32_t waitTimeMS)
779 {
780 std::unique_lock<std::mutex> lock(interruptMutex_);
781 bufferCond_.wait_for(lock, std::chrono::milliseconds(waitTimeMS), [&] { return isInterrupted_.load(); });
782 }
783
IsLocalFd()784 bool FileFdSourcePlugin::IsLocalFd()
785 {
786 return !isCloudFile_;
787 }
788 } // namespace FileFdSource
789 } // namespace Plugin
790 } // namespace Media
791 } // namespace OHOS