1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "FileFdSourcePlugin"
17
18 #include <cerrno>
19 #include <cstring>
20 #include <regex>
21 #include <memory>
22 #ifdef WIN32
23 #include <fcntl.h>
24 #else
25 #include <sys/types.h>
26 #include <unistd.h>
27 #endif
28 #include <sys/ioctl.h>
29 #include <sys/stat.h>
30 #include "common/log.h"
31 #include "osal/filesystem/file_system.h"
32 #include "file_fd_source_plugin.h"
33 #include "common/media_core.h"
34
35 namespace {
36 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "FileFdSourcePlugin" };
StrToLong(const std::string_view & str,int64_t & value)37 bool StrToLong(const std::string_view& str, int64_t& value)
38 {
39 FALSE_RETURN_V_MSG_E(!str.empty() && (isdigit(str.front()) || (str.front() == '-')),
40 false, "no valid string.");
41 std::string valStr(str);
42 char* end = nullptr;
43 errno = 0;
44 long long result = strtoll(valStr.c_str(), &end, 10); /* 10 means decimal */
45 FALSE_RETURN_V_MSG_E(result >= LLONG_MIN && result <= LLONG_MAX, false,
46 "call StrToLong func false, input str is: %{public}s!", valStr.c_str());
47 FALSE_RETURN_V_MSG_E(end != valStr.c_str() && end[0] == '\0' && errno != ERANGE, false,
48 "call StrToLong func false, input str is: %{public}s!", valStr.c_str());
49 value = result;
50 return true;
51 }
52 }
53
54 namespace OHOS {
55 namespace Media {
56 namespace Plugins {
57 namespace FileFdSource {
58 namespace {
59 constexpr int32_t FDPOS = 2;
60 constexpr int32_t READ_TIME = 3;
61 constexpr size_t CACHE_SIZE = 40 * 1024 * 1024;
62 constexpr size_t PER_CACHE_SIZE = 48 * 10 * 1024;;
63 constexpr int32_t TEN_MILLISECOUNDS = 10;
64 constexpr int32_t ONE_SECONDS = 1 * 1000;
65 constexpr int32_t CACHE_TIME_DEFAULT = 5;
66 constexpr int32_t SEEK_TIME_LOWER = 20;
67 constexpr int32_t SEEK_TIME_UPPER = 1000;
68 constexpr int32_t RECORD_TIME_INTERVAL = 1 * 1000;
69 constexpr int32_t MILLISECOUND_TO_SECOND = 1 * 1000;
70 constexpr int32_t RETRY_TIMES = 3;
71 constexpr int32_t TO_BYTE = 8;
72 constexpr int32_t PERCENT_100 = 100;
73 constexpr int32_t MAX_RANK = 100;
74 constexpr int32_t READ_RETRY = 2;
75 constexpr int32_t READ_ERROR_IO = EIO;
76 constexpr int32_t READ_ERROR_NOMEM = ENOMEM;
77 constexpr float CACHE_LEVEL_1 = 0.3;
78
79 constexpr unsigned int HMDFS_IOC = 0xf2;
80 #define IOCTL_CLOUD 2
81 #define HMDFS_IOC_HAS_CACHE _IOW(HMDFS_IOC, 6, struct HmdfsHasCache)
82 #define HMDFS_IOC_GET_LOCATION _IOR(HMDFS_IOC, 7, __u32)
83 #define HMDFS_IOC_CANCEL_READ _IO(HMDFS_IOC, 8)
84 #define HMDFS_IOC_RESTORE_READ _IO(HMDFS_IOC, 9)
85
GetFileSize(int32_t fd)86 uint64_t GetFileSize(int32_t fd)
87 {
88 uint64_t fileSize = 0;
89 struct stat s {};
90 int ret = fstat(fd, &s);
91 if (ret == 0) {
92 fileSize = static_cast<uint64_t>(s.st_size);
93 FALSE_RETURN_V_MSG_E(fileSize != 0, fileSize, "fileSize 0, fstat ret 0");
94 return fileSize;
95 } else {
96 MEDIA_LOG_W("GetFileSize error ret " PUBLIC_LOG_D32 ", errno " PUBLIC_LOG_D32, ret, errno);
97 }
98 return fileSize;
99 }
isNumber(const std::string & str)100 bool isNumber(const std::string& str)
101 {
102 return str.find_first_not_of("0123456789") == std::string::npos;
103 }
104 }
FileFdSourceRegister(const std::shared_ptr<Register> & reg)105 Status FileFdSourceRegister(const std::shared_ptr<Register>& reg)
106 {
107 MEDIA_LOG_I("fileSourceRegister is started");
108 SourcePluginDef definition;
109 definition.name = "FileFdSource";
110 definition.description = "File Fd source";
111 definition.rank = MAX_RANK; // 100: max rank
112 Capability capability;
113 capability.AppendFixedKey<std::vector<ProtocolType>>(Tag::MEDIA_PROTOCOL_TYPE, {ProtocolType::FD});
114 definition.AddInCaps(capability);
115 auto func = [](const std::string& name) -> std::shared_ptr<SourcePlugin> {
116 return std::make_shared<FileFdSourcePlugin>(name);
117 };
118 definition.SetCreator(func);
119 return reg->AddPlugin(definition);
120 }
121
__anon65a7374e0402null122 PLUGIN_DEFINITION(FileFdSource, LicenseType::APACHE_V2, FileFdSourceRegister, [] {});
123
FileFdSourcePlugin(std::string name)124 FileFdSourcePlugin::FileFdSourcePlugin(std::string name)
125 : SourcePlugin(std::move(name))
126 {
127 }
128
~FileFdSourcePlugin()129 FileFdSourcePlugin::~FileFdSourcePlugin()
130 {
131 MEDIA_LOG_I("~FileFdSourcePlugin in.");
132 steadyClock_.Reset();
133 SetInterruptState(true);
134 MEDIA_LOG_I("~FileFdSourcePlugin isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
135 FALSE_RETURN_MSG(downloadTask_ != nullptr, "~FileFdSourcePlugin out.");
136 downloadTask_->Stop();
137 MEDIA_LOG_I("~FileFdSourcePlugin out.");
138 }
139
SetCallback(Callback * cb)140 Status FileFdSourcePlugin::SetCallback(Callback* cb)
141 {
142 MEDIA_LOG_D("SetCallback in " PUBLIC_LOG_D32, cb != nullptr);
143 callback_ = cb;
144 return Status::OK;
145 }
146
SetSource(std::shared_ptr<MediaSource> source)147 Status FileFdSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
148 {
149 MEDIA_LOG_I("SetSource in. %{private}s", source->GetSourceUri().c_str());
150 auto err = ParseUriInfo(source->GetSourceUri());
151 if (err != Status::OK) {
152 MEDIA_LOG_E("Parse file name from uri fail, uri %{private}s", source->GetSourceUri().c_str());
153 return err;
154 }
155 CheckFileType();
156 if (isCloudFile_ && isEnableFdCache_) {
157 ringBuffer_ = std::make_shared<RingBuffer>(CACHE_SIZE);
158 FALSE_RETURN_V_MSG_E(!(ringBuffer_ == nullptr || !ringBuffer_->Init()),
159 Status::ERROR_NO_MEMORY, "memory is not enough ringBuffer_");
160 downloadTask_ = std::make_shared<Task>(std::string("downloadTaskFD"));
161 FALSE_RETURN_V_MSG_E(downloadTask_ != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
162 downloadTask_->RegisterJob([this] {
163 CacheDataLoop();
164 return 0;
165 });
166 downloadTask_->Start();
167 steadyClock_.Reset();
168 }
169 return Status::OK;
170 }
171
Read(std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)172 Status FileFdSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
173 {
174 return Read(0, buffer, offset, expectedLen);
175 }
176
Read(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)177 Status FileFdSourcePlugin::Read(int32_t streamId, std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
178 {
179 FALSE_RETURN_V_MSG_E(fd_ != -1, Status::ERROR_WRONG_STATE, "no valid fd");
180 if (isCloudFile_) {
181 return ReadOnlineFile(0, buffer, offset, expectedLen);
182 } else {
183 return ReadOfflineFile(0, buffer, offset, expectedLen);
184 }
185 }
186
ReadOfflineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)187 Status FileFdSourcePlugin::ReadOfflineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
188 uint64_t offset, size_t expectedLen)
189 {
190 std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
191 FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
192 expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
193 expectedLen = std::min(bufData->GetCapacity(), expectedLen);
194 MEDIA_LOG_D("ReadLocal buffer pos: " PUBLIC_LOG_U64 " , len:" PUBLIC_LOG_ZU, position_.load(), expectedLen);
195
196 int64_t offsetCur = lseek(fd_, 0, SEEK_CUR);
197 if (offsetCur < 0) {
198 MEDIA_LOG_E("Fd get offset failed ");
199 } else if (static_cast<uint64_t>(offsetCur) != position_.load()) {
200 MEDIA_LOG_E("Fd offsetCur has changed. offsetCur " PUBLIC_LOG_D64 ", offsetOld " PUBLIC_LOG_U64,
201 offsetCur, position_.load());
202 }
203 auto size = read(fd_, bufData->GetWritableAddr(expectedLen), expectedLen);
204 if (size <= 0) {
205 HandleReadResult(expectedLen, size);
206 FALSE_RETURN_V((loc_ != IOCTL_CLOUD || isEnableFdCache_ ||
207 (errno != READ_ERROR_NOMEM && errno != READ_ERROR_IO)), Status::ERROR_INVALID_DATA);
208 MEDIA_LOG_D("ReadLocal END_OF_STREAM");
209 return Status::END_OF_STREAM;
210 }
211 bufData->UpdateDataSize(size);
212 position_ += static_cast<uint64_t>(size);
213 if (buffer->GetMemory() != nullptr) {
214 MEDIA_LOG_D("ReadLocal position_ " PUBLIC_LOG_U64 ", readSize " PUBLIC_LOG_ZU,
215 position_.load(), buffer->GetMemory()->GetSize());
216 }
217 return Status::OK;
218 }
219
ReadOnlineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)220 Status FileFdSourcePlugin::ReadOnlineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
221 uint64_t offset, size_t expectedLen)
222 {
223 if (isBuffering_) {
224 if (HandleBuffering()) {
225 FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
226 FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
227 MEDIA_LOG_I("is buffering, return error again.");
228 return Status::ERROR_AGAIN;
229 }
230 }
231
232 std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
233 FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
234 expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
235 expectedLen = std::min(bufData->GetCapacity(), expectedLen);
236
237 // ringbuffer 0 after seek in 20ms, don't notify buffering
238 curReadTime_ = steadyClock2_.ElapsedMilliseconds();
239 if (isReadFrame_ && ringBuffer_->GetSize() < expectedLen && !HasCacheData(expectedLen, offset) &&
240 static_cast<size_t>(GetLastSize(position_)) > expectedLen) {
241 MEDIA_LOG_I("ringBuffer.size() " PUBLIC_LOG_ZU " curReadTime_ " PUBLIC_LOG_D64
242 " lastReadTime_ " PUBLIC_LOG_D64 " pos_ " PUBLIC_LOG_U64 " cachePos_ " PUBLIC_LOG_U64
243 " expectedLen " PUBLIC_LOG_ZU " offset_ " PUBLIC_LOG_D64 " offset " PUBLIC_LOG_U64,
244 ringBuffer_->GetSize(), curReadTime_, lastReadTime_, position_.load(), cachePosition_.load(),
245 expectedLen, offset_, offset);
246 CheckReadTime();
247 FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
248 FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
249 return Status::ERROR_AGAIN;
250 }
251
252 size_t size = ringBuffer_->ReadBuffer(bufData->GetWritableAddr(expectedLen), expectedLen, READ_RETRY);
253 if (size == 0) {
254 MEDIA_LOG_I("read size 0, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_D64 ", size:" PUBLIC_LOG_U64 ", pos:"
255 PUBLIC_LOG_U64 ", readBlock:" PUBLIC_LOG_D32, fd_, offset, size_, position_.load(), isReadBlocking_.load());
256 FALSE_RETURN_V_MSG_E(GetLastSize(position_) != 0, Status::END_OF_STREAM, "ReadCloud END_OF_STREAM");
257 bufData->UpdateDataSize(0);
258 return Status::OK;
259 }
260 bufData->UpdateDataSize(size);
261 int64_t ct = steadyClock2_.ElapsedMilliseconds() - curReadTime_;
262 if (ct > READ_TIME) {
263 MEDIA_LOG_I("ReadCloud buffer position " PUBLIC_LOG_U64 ", expectedLen " PUBLIC_LOG_ZU
264 " costTime: " PUBLIC_LOG_U64, position_.load(), expectedLen, ct);
265 }
266 position_ += static_cast<uint64_t>(size);
267 MEDIA_LOG_D("ringBuffer.size() " PUBLIC_LOG_ZU, ringBuffer_->GetSize());
268 return Status::OK;
269 }
270
SeekTo(uint64_t offset)271 Status FileFdSourcePlugin::SeekTo(uint64_t offset)
272 {
273 FALSE_RETURN_V_MSG_E(fd_ != -1 && seekable_ == Seekable::SEEKABLE,
274 Status::ERROR_WRONG_STATE, "no valid fd or no seekable.");
275
276 MEDIA_LOG_D("SeekTo offset: " PUBLIC_LOG_U64, offset);
277 if (isCloudFile_) {
278 return SeekToOnlineFile(offset);
279 } else {
280 return SeekToOfflineFile(offset);
281 }
282 }
283
SeekToOfflineFile(uint64_t offset)284 Status FileFdSourcePlugin::SeekToOfflineFile(uint64_t offset)
285 {
286 int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
287 if (ret == -1) {
288 MEDIA_LOG_E("SeekLocal failed, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
289 PUBLIC_LOG_S, fd_, offset, strerror(errno));
290 return Status::ERROR_UNKNOWN;
291 }
292 position_ = offset + static_cast<uint64_t>(offset_);
293 MEDIA_LOG_D("SeekLocal end ret " PUBLIC_LOG_D32 ", position_ " PUBLIC_LOG_U64, ret, position_.load());
294 return Status::OK;
295 }
296
SeekToOnlineFile(uint64_t offset)297 Status FileFdSourcePlugin::SeekToOnlineFile(uint64_t offset)
298 {
299 FALSE_RETURN_V_MSG_E(ringBuffer_ != nullptr, Status::ERROR_WRONG_STATE, "SeekCloud ringBuffer_ is nullptr");
300 MEDIA_LOG_D("SeekCloud, ringBuffer.size: " PUBLIC_LOG_ZU ", offset " PUBLIC_LOG_U64,
301 ringBuffer_->GetSize(), offset);
302 if (ringBuffer_->Seek(offset)) {
303 position_ = offset + static_cast<uint64_t>(offset_);
304 MEDIA_LOG_I("SeekCloud ringBuffer_ seek hit, offset " PUBLIC_LOG_U64, offset);
305 return Status::OK;
306 }
307 // First clear buffer, avoid no available buffer then task pause never exit.
308 ringBuffer_->SetActive(false);
309 inSeek_ = true;
310 if (downloadTask_ != nullptr) {
311 downloadTask_->Pause();
312 inSeek_ = false;
313 }
314 ringBuffer_->Clear();
315 ringBuffer_->SetMediaOffset(offset);
316 {
317 std::lock_guard<std::mutex> lock(interruptMutex_);
318 FALSE_RETURN_V(!isInterrupted_, Status::OK);
319 ringBuffer_->SetActive(true);
320 }
321
322 int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
323 if (ret == -1) {
324 MEDIA_LOG_E("SeekCloud failed, fd_ " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
325 PUBLIC_LOG_S, fd_, offset, strerror(errno));
326 return Status::ERROR_UNKNOWN;
327 }
328 position_ = offset + static_cast<uint64_t>(offset_);
329 cachePosition_ = position_.load();
330
331 MEDIA_LOG_D("SeekCloud end, fd_ " PUBLIC_LOG_D32 ", size_ " PUBLIC_LOG_U64 ", offset_ " PUBLIC_LOG_D64
332 ", position_ " PUBLIC_LOG_U64, fd_, size_, offset_, position_.load());
333 if (downloadTask_ != nullptr) {
334 downloadTask_->Start();
335 }
336 return Status::OK;
337 }
338
ParseUriInfo(const std::string & uri)339 Status FileFdSourcePlugin::ParseUriInfo(const std::string& uri)
340 {
341 if (uri.empty()) {
342 MEDIA_LOG_E("uri is empty");
343 return Status::ERROR_INVALID_PARAMETER;
344 }
345 std::smatch fdUriMatch;
346 FALSE_RETURN_V_MSG_E(std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)\\?offset=(.*)&size=(.*)")) ||
347 std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)")),
348 Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
349 FALSE_RETURN_V_MSG_E(fdUriMatch.size() >= FDPOS && isNumber(fdUriMatch[1].str()),
350 Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
351 fd_ = std::stoi(fdUriMatch[1].str()); // 1: sub match fd subscript
352 FALSE_RETURN_V_MSG_E(fd_ != -1 && FileSystem::IsRegularFile(fd_),
353 Status::ERROR_INVALID_PARAMETER, "Invalid fd: " PUBLIC_LOG_D32, fd_);
354 fileSize_ = GetFileSize(fd_);
355 if (fdUriMatch.size() == 4) { // 4:4 sub match
356 std::string offsetStr = fdUriMatch[2].str(); // 2: sub match offset subscript
357 FALSE_RETURN_V_MSG_E(StrToLong(offsetStr, offset_), Status::ERROR_INVALID_PARAMETER,
358 "Failed to read offset.");
359 if (static_cast<uint64_t>(offset_) > fileSize_) {
360 offset_ = static_cast<int64_t>(fileSize_);
361 }
362 size_ = static_cast<uint64_t>(std::stoll(fdUriMatch[3].str())); // 3: sub match size subscript
363 uint64_t remainingSize = fileSize_ - static_cast<uint64_t>(offset_);
364 if (size_ > remainingSize) {
365 size_ = remainingSize;
366 }
367 } else {
368 size_ = fileSize_;
369 offset_ = 0;
370 }
371 position_ = offset_;
372 seekable_ = FileSystem::IsSeekable(fd_) ? Seekable::SEEKABLE : Seekable::UNSEEKABLE;
373 if (seekable_ == Seekable::SEEKABLE) {
374 NOK_LOG(SeekTo(0));
375 }
376 MEDIA_LOG_I("Fd: " PUBLIC_LOG_D32 ", offset: " PUBLIC_LOG_D64 ", size: " PUBLIC_LOG_U64, fd_, offset_, size_);
377 return Status::OK;
378 }
379
CacheDataLoop()380 void FileFdSourcePlugin::CacheDataLoop()
381 {
382 if (isInterrupted_) {
383 MEDIA_LOG_E("CacheData break");
384 WaitForInterrupt(TEN_MILLISECOUNDS);
385 return;
386 }
387
388 int64_t curTime = steadyClock_.ElapsedMilliseconds();
389 GetCurrentSpeed(curTime);
390
391 size_t bufferSize = std::min(PER_CACHE_SIZE, static_cast<size_t>(GetLastSize(cachePosition_.load())));
392 if (bufferSize < 0) {
393 MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
394 WaitForInterrupt(TEN_MILLISECOUNDS);
395 return;
396 }
397
398 char* cacheBuffer = new char[bufferSize];
399 if (cacheBuffer == nullptr) {
400 MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
401 WaitForInterrupt(TEN_MILLISECOUNDS);
402 return;
403 }
404 int size = read(fd_, cacheBuffer, bufferSize);
405 if (size <= 0) {
406 int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
407 MEDIA_LOG_I("read failed, cost time: " PUBLIC_LOG_U64, ct);
408 DeleteCacheBuffer(cacheBuffer, bufferSize);
409 HandleReadResult(bufferSize, size);
410 return;
411 }
412 MEDIA_LOG_D("Cache fd: " PUBLIC_LOG_D32 ", cachePos_ " PUBLIC_LOG_U64 ", ringBuffer_.size() " PUBLIC_LOG_ZU
413 ", size_ " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_);
414 while (!ringBuffer_->WriteBuffer(cacheBuffer, size)) {
415 MEDIA_LOG_I("CacheData ringbuffer write failed");
416 if (inSeek_ || isInterrupted_) {
417 DeleteCacheBuffer(cacheBuffer, bufferSize);
418 return;
419 }
420 WaitForInterrupt(TEN_MILLISECOUNDS);
421 }
422 cachePosition_ += static_cast<uint64_t>(size);
423 downloadSize_ += static_cast<uint64_t>(size);
424
425 int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
426 if (ct > READ_TIME) {
427 MEDIA_LOG_I("Cache fd:" PUBLIC_LOG_D32 " pos:" PUBLIC_LOG_U64 " rb:" PUBLIC_LOG_ZU
428 " size:" PUBLIC_LOG_U64 " ct:" PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(),
429 size_, ct);
430 }
431
432 DeleteCacheBuffer(cacheBuffer, bufferSize);
433
434 CheckAndNotifyBufferingEnd();
435 }
436
CheckAndNotifyBufferingEnd()437 void FileFdSourcePlugin::CheckAndNotifyBufferingEnd()
438 {
439 if (isBuffering_ && (static_cast<int64_t>(ringBuffer_->GetSize()) > waterLineAbove_ ||
440 GetLastSize(cachePosition_.load()) == 0)) {
441 NotifyBufferingEnd();
442 }
443 }
444
HasCacheData(size_t bufferSize,uint64_t offset)445 bool FileFdSourcePlugin::HasCacheData(size_t bufferSize, uint64_t offset)
446 {
447 HmdfsHasCache ioctlData;
448 ioctlData.offset = static_cast<int64_t>(offset);
449 ioctlData.readSize = static_cast<int64_t>(bufferSize);
450 int32_t ioResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData); // 0 has, -1 hasnot
451
452 ioctlData.offset = static_cast<int64_t>(cachePosition_);
453 ioctlData.readSize = static_cast<int64_t>(PER_CACHE_SIZE);
454 int32_t ioCacheResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData);
455 // ioctl has cache
456 if (ioResult == 0 && ioCacheResult == 0) {
457 return true;
458 } else {
459 MEDIA_LOG_I("ioctl has no cache with errno " PUBLIC_LOG_D32 " cachePosition_ " PUBLIC_LOG_U64
460 " offset " PUBLIC_LOG_U64 " bufferSize " PUBLIC_LOG_ZU, errno, cachePosition_.load(), offset, bufferSize);
461 }
462 return false;
463 }
464
Stop()465 Status FileFdSourcePlugin::Stop()
466 {
467 MEDIA_LOG_I("Stop enter.");
468 isInterrupted_ = true;
469 MEDIA_LOG_I("Stop isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
470 FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
471 downloadTask_->StopAsync();
472 return Status::OK;
473 }
474
Reset()475 Status FileFdSourcePlugin::Reset()
476 {
477 MEDIA_LOG_I("Reset enter.");
478 isInterrupted_ = true;
479 MEDIA_LOG_I("Reset isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
480 FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
481 downloadTask_->StopAsync();
482 return Status::OK;
483 }
484
PauseDownloadTask(bool isAsync)485 void FileFdSourcePlugin::PauseDownloadTask(bool isAsync)
486 {
487 FALSE_RETURN(downloadTask_ != nullptr);
488 if (isAsync) {
489 downloadTask_->PauseAsync();
490 } else {
491 downloadTask_->Pause();
492 }
493 }
494
HandleBuffering()495 bool FileFdSourcePlugin::HandleBuffering()
496 {
497 MEDIA_LOG_I("HandleBuffering in.");
498 int32_t sleepTime = 0;
499 // return error again 1 time 1s, avoid ffmpeg error
500 while (sleepTime < ONE_SECONDS && !isInterrupted_ && isReadBlocking_) {
501 NotifyBufferingPercent();
502 if (!isBuffering_) {
503 break;
504 }
505 MEDIA_LOG_I("isBuffering.");
506 WaitForInterrupt(TEN_MILLISECOUNDS);
507 sleepTime += TEN_MILLISECOUNDS;
508 }
509 MEDIA_LOG_I("HandleBuffering out.");
510 return isBuffering_;
511 }
512
HandleReadResult(size_t bufferSize,int size)513 void FileFdSourcePlugin::HandleReadResult(size_t bufferSize, int size)
514 {
515 MEDIA_LOG_I("HandleReadResult size " PUBLIC_LOG_D32 ", fd " PUBLIC_LOG_D32 ", cachePosition_" PUBLIC_LOG_U64
516 ", position_ " PUBLIC_LOG_U64 ", bufferSize " PUBLIC_LOG_ZU ", size_ " PUBLIC_LOG_U64 ", offset_ "
517 PUBLIC_LOG_D64, size, fd_, cachePosition_.load(), position_.load(), bufferSize, size_, offset_);
518 if (size < 0) {
519 // errno EIO 5
520 MEDIA_LOG_E("read fail, errno " PUBLIC_LOG_D32, errno);
521
522 // read fail with errno, retry 3 * 10ms
523 retryTimes_++;
524 if (retryTimes_ >= RETRY_TIMES || isInterrupted_.load()) {
525 NotifyReadFail();
526 SetInterruptState(true);
527 }
528 WaitForInterrupt(TEN_MILLISECOUNDS);
529 } else {
530 cachePosition_ = 0;
531 PauseDownloadTask(false);
532 }
533 }
534
NotifyBufferingStart()535 void FileFdSourcePlugin::NotifyBufferingStart()
536 {
537 MEDIA_LOG_I("NotifyBufferingStart, ringBuffer.size() " PUBLIC_LOG_ZU
538 ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
539 isBuffering_ = true;
540 if (callback_ != nullptr && !isInterrupted_) {
541 MEDIA_LOG_I("Read OnEvent BUFFERING_START.");
542 callback_->OnEvent({PluginEventType::BUFFERING_START, {BufferingInfoType::BUFFERING_START}, "start"});
543 } else {
544 MEDIA_LOG_E("BUFFERING_START callback_ is nullptr or isInterrupted_ is true");
545 }
546 }
547
NotifyBufferingPercent()548 void FileFdSourcePlugin::NotifyBufferingPercent()
549 {
550 if (waterLineAbove_ != 0) {
551 int64_t bp = static_cast<float>(ringBuffer_->GetSize()) / waterLineAbove_ * PERCENT_100;
552 bp = bp > PERCENT_100 ? PERCENT_100 : bp;
553 if (isBuffering_ && callback_ != nullptr && !isInterrupted_) {
554 MEDIA_LOG_I("NotifyBufferingPercent, ringBuffer.size() " PUBLIC_LOG_ZU ", waterLineAbove_ " PUBLIC_LOG_U64
555 ", PERCENT " PUBLIC_LOG_D32, ringBuffer_->GetSize(), waterLineAbove_, static_cast<int32_t>(bp));
556 callback_->OnEvent({PluginEventType::EVENT_BUFFER_PROGRESS,
557 {BufferingInfoType::BUFFERING_PERCENT}, std::to_string(bp)});
558 } else {
559 MEDIA_LOG_E("EVENT_BUFFER_PROGRESS callback_ is nullptr or isInterrupted_ \
560 is true or isBuffering_ is false");
561 }
562 }
563 }
564
NotifyBufferingEnd()565 void FileFdSourcePlugin::NotifyBufferingEnd()
566 {
567 NotifyBufferingPercent();
568 MEDIA_LOG_I("NotifyBufferingEnd, ringBuffer.size() " PUBLIC_LOG_ZU
569 ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
570 isBuffering_ = false;
571 lastReadTime_ = 0;
572 if (callback_ != nullptr && !isInterrupted_) {
573 MEDIA_LOG_I("NotifyBufferingEnd success .");
574 callback_->OnEvent({PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
575 } else {
576 MEDIA_LOG_E("BUFFERING_END callback_ is nullptr or isInterrupted_ is true");
577 }
578 }
579
NotifyReadFail()580 void FileFdSourcePlugin::NotifyReadFail()
581 {
582 MEDIA_LOG_I("NotifyReadFail in.");
583 if (callback_ != nullptr && !isInterrupted_) {
584 MEDIA_LOG_I("Read OnEvent read fail");
585 callback_->OnEvent({PluginEventType::CLIENT_ERROR,
586 static_cast<int32_t>(NetworkClientErrorCode::ERROR_TIME_OUT), "read"});
587 } else {
588 MEDIA_LOG_E("CLIENT_ERROR callback_ is nullptr or isInterrupted_ is true");
589 }
590 }
591
SetDemuxerState(int32_t streamId)592 void FileFdSourcePlugin::SetDemuxerState(int32_t streamId)
593 {
594 MEDIA_LOG_I("SetDemuxerState");
595 isReadFrame_ = true;
596 }
597
SetCurrentBitRate(int32_t bitRate,int32_t streamID)598 Status FileFdSourcePlugin::SetCurrentBitRate(int32_t bitRate, int32_t streamID)
599 {
600 currentBitRate_ = bitRate / TO_BYTE; // 8b
601 MEDIA_LOG_I("currentBitRate: " PUBLIC_LOG_D32, currentBitRate_);
602 // default cache 0.3s
603 waterLineAbove_ = CACHE_LEVEL_1 * currentBitRate_;
604 return Status::OK;
605 }
606
SetBundleName(const std::string & bundleName)607 void FileFdSourcePlugin::SetBundleName(const std::string& bundleName)
608 {
609 MEDIA_LOG_I("SetBundleName bundleName: " PUBLIC_LOG_S, bundleName.c_str());
610 }
611
SetReadBlockingFlag(bool isAllowed)612 Status FileFdSourcePlugin::SetReadBlockingFlag(bool isAllowed)
613 {
614 MEDIA_LOG_I("SetReadBlockingFlag entered, IsReadBlockingAllowed %{public}d", isAllowed);
615 if (ringBuffer_) {
616 ringBuffer_->SetReadBlocking(isAllowed);
617 }
618 isReadBlocking_ = isAllowed;
619 return Status::OK;
620 }
621
SetInterruptState(bool isInterruptNeeded)622 void FileFdSourcePlugin::SetInterruptState(bool isInterruptNeeded)
623 {
624 bool isInterruptAllowed = true;
625 MEDIA_LOG_I("SetInterruptState isInterrupted_" PUBLIC_LOG_D32, isInterruptNeeded);
626 {
627 std::lock_guard<std::mutex> lock(interruptMutex_);
628 isInterruptAllowed = !(isInterrupted_ && isInterruptNeeded);
629 isInterrupted_ = isInterruptNeeded;
630 bufferCond_.notify_all();
631 }
632 if (ringBuffer_ != nullptr) {
633 if (isInterrupted_) {
634 ringBuffer_->SetActive(false);
635 } else {
636 ringBuffer_->SetActive(true);
637 }
638 }
639
640 if (isInterrupted_ && isInterruptAllowed && isCloudFile_) {
641 if (downloadTask_ != nullptr) {
642 downloadTask_->StopAsync();
643 }
644 int ret = ioctl(fd_, HMDFS_IOC_CANCEL_READ);
645 MEDIA_LOG_I("ioctl break read, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
646 }
647 }
648
GetSize(uint64_t & size)649 Status FileFdSourcePlugin::GetSize(uint64_t& size)
650 {
651 size = size_;
652 return Status::OK;
653 }
654
GetSeekable()655 Seekable FileFdSourcePlugin::GetSeekable()
656 {
657 MEDIA_LOG_D("GetSeekable in");
658 return seekable_;
659 }
660
CheckFileType()661 void FileFdSourcePlugin::CheckFileType()
662 {
663 int loc; // 1本地,2云端
664 int ioResult = ioctl(fd_, HMDFS_IOC_GET_LOCATION, &loc);
665 MEDIA_LOG_I("SetSource ioctl loc, ret " PUBLIC_LOG_D32 ", loc " PUBLIC_LOG_D32 ", errno"
666 PUBLIC_LOG_D32, ioResult, loc, errno);
667
668 if (ioResult == 0) {
669 loc_ = loc;
670 }
671 if (!isEnableFdCache_) {
672 isCloudFile_ = false;
673 return;
674 }
675
676 if (ioResult == 0) {
677 if (loc == IOCTL_CLOUD) {
678 isCloudFile_ = true;
679 MEDIA_LOG_I("ioctl file is cloud");
680 int ret = ioctl(fd_, HMDFS_IOC_RESTORE_READ);
681 MEDIA_LOG_I("ioctl restore fd, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
682 return;
683 } else {
684 isCloudFile_ = false;
685 MEDIA_LOG_I("ioctl file is local");
686 }
687 } else {
688 isCloudFile_ = false;
689 MEDIA_LOG_I("ioctl failed to get file type");
690 }
691 }
692
GetBufferPtr(std::shared_ptr<Buffer> & buffer,size_t expectedLen)693 std::shared_ptr<Memory> FileFdSourcePlugin::GetBufferPtr(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
694 {
695 if (!buffer) {
696 buffer = std::make_shared<Buffer>();
697 }
698 std::shared_ptr<Memory> bufData;
699 if (buffer->IsEmpty()) {
700 bufData = buffer->AllocMemory(nullptr, expectedLen);
701 } else {
702 bufData = buffer->GetMemory();
703 }
704 return bufData;
705 }
706
GetLastSize(uint64_t position)707 int64_t FileFdSourcePlugin::GetLastSize(uint64_t position)
708 {
709 int64_t ret = static_cast<int64_t>(size_) + offset_ - static_cast<int64_t>(position);
710 if (ret < 0) {
711 MEDIA_LOG_E("GetLastSize error, fd_ " PUBLIC_LOG_D32 ", offset_ " PUBLIC_LOG_D64 ", size_ "
712 PUBLIC_LOG_U64 ", position " PUBLIC_LOG_U64, fd_, offset_, size_, position);
713 }
714 return ret;
715 }
716
GetCurrentSpeed(int64_t curTime)717 void FileFdSourcePlugin::GetCurrentSpeed(int64_t curTime)
718 {
719 if ((curTime - lastCheckTime_) > RECORD_TIME_INTERVAL) {
720 MEDIA_LOG_I("CacheDataLoop curTime_: " PUBLIC_LOG_U64 " lastCheckTime_: "
721 PUBLIC_LOG_U64 " downloadSize_: " PUBLIC_LOG_U64, curTime, lastCheckTime_, downloadSize_);
722 float duration = static_cast<double>(curTime - lastCheckTime_) / MILLISECOUND_TO_SECOND;
723 avgDownloadSpeed_ = downloadSize_ / duration; //b/s
724 MEDIA_LOG_I("downloadDuration: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F,
725 duration, avgDownloadSpeed_);
726 downloadSize_ = 0;
727 lastCheckTime_ = curTime;
728 FALSE_RETURN(currentBitRate_ > 0);
729 UpdateWaterLineAbove();
730 }
731 }
732
UpdateWaterLineAbove()733 void FileFdSourcePlugin::UpdateWaterLineAbove()
734 {
735 FALSE_RETURN_MSG(currentBitRate_ > 0, "currentBitRate_ <= 0");
736 float cacheTime = GetCacheTime(avgDownloadSpeed_ / currentBitRate_);
737 MEDIA_LOG_I("cacheTime: " PUBLIC_LOG_F " avgDownloadSpeed_: " PUBLIC_LOG_F
738 " currentBitRate: " PUBLIC_LOG_D32, cacheTime, avgDownloadSpeed_, currentBitRate_);
739 waterLineAbove_ = cacheTime * currentBitRate_ > GetLastSize(cachePosition_.load()) ?
740 GetLastSize(cachePosition_.load()) : cacheTime * currentBitRate_;
741 MEDIA_LOG_I("waterLineAbove_: " PUBLIC_LOG_U64, waterLineAbove_);
742 }
743
GetCacheTime(float num)744 float FileFdSourcePlugin::GetCacheTime(float num)
745 {
746 MEDIA_LOG_I("GetCacheTime with num: " PUBLIC_LOG_F, num);
747 if (num < 0) {
748 return CACHE_LEVEL_1;
749 }
750 if (num > 0 && num < 0.5) { // (0, 0.5)
751 return CACHE_TIME_DEFAULT;
752 } else if (num >= 0.5 && num < 1) { //[0.5, 1)
753 return CACHE_TIME_DEFAULT;
754 } else if (num >= 1) { //[1, 2)
755 return CACHE_LEVEL_1;
756 }
757 return CACHE_TIME_DEFAULT;
758 }
759
DeleteCacheBuffer(char * buffer,size_t bufferSize)760 void FileFdSourcePlugin::DeleteCacheBuffer(char* buffer, size_t bufferSize)
761 {
762 if (buffer != nullptr && bufferSize >= 0) {
763 delete[] buffer;
764 }
765 }
766
CheckReadTime()767 void FileFdSourcePlugin::CheckReadTime()
768 {
769 if (IsValidTime(curReadTime_, lastReadTime_)) {
770 NotifyBufferingStart();
771 lastReadTime_ = 0;
772 } else {
773 if (lastReadTime_ == 0) {
774 lastReadTime_ = curReadTime_;
775 }
776 }
777 }
778
IsValidTime(int64_t curTime,int64_t lastTime)779 bool FileFdSourcePlugin::IsValidTime(int64_t curTime, int64_t lastTime)
780 {
781 return lastReadTime_ != 0 && curReadTime_ - lastReadTime_ < SEEK_TIME_UPPER &&
782 curReadTime_ - lastReadTime_ > SEEK_TIME_LOWER;
783 }
784
SetEnableOnlineFdCache(bool isEnableFdCache)785 void FileFdSourcePlugin::SetEnableOnlineFdCache(bool isEnableFdCache)
786 {
787 isEnableFdCache_ = isEnableFdCache;
788 }
789
WaitForInterrupt(int32_t waitTimeMS)790 void FileFdSourcePlugin::WaitForInterrupt(int32_t waitTimeMS)
791 {
792 std::unique_lock<std::mutex> lock(interruptMutex_);
793 bufferCond_.wait_for(lock, std::chrono::milliseconds(waitTimeMS), [&] { return isInterrupted_.load(); });
794 }
795
IsLocalFd()796 bool FileFdSourcePlugin::IsLocalFd()
797 {
798 return !isCloudFile_;
799 }
800 } // namespace FileFdSource
801 } // namespace Plugin
802 } // namespace Media
803 } // namespace OHOS