1 /*
2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "StreamDemuxer"
17
18 #include "stream_demuxer.h"
19
20 #include <algorithm>
21 #include <map>
22 #include <memory>
23
24 #include "avcodec_common.h"
25 #include "avcodec_trace.h"
26 #include "cpp_ext/type_traits_ext.h"
27 #include "buffer/avallocator.h"
28 #include "common/event.h"
29 #include "common/log.h"
30 #include "meta/media_types.h"
31 #include "meta/meta.h"
32 #include "osal/utils/dump_buffer.h"
33 #include "plugin/plugin_buffer.h"
34 #include "plugin/plugin_info.h"
35 #include "plugin/plugin_time.h"
36 #include "source/source.h"
37 #include "scoped_timer.h"
38
39 namespace {
40 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "StreamDemuxer" };
41 }
42
43 namespace OHOS {
44 namespace Media {
45
46 const int32_t TRY_READ_SLEEP_TIME = 10; //ms
47 const int32_t TRY_READ_TIMES = 10;
48 constexpr int64_t SOURCE_READ_WARNING_MS = 100;
StreamDemuxer()49 StreamDemuxer::StreamDemuxer() : position_(0)
50 {
51 MEDIA_LOG_D("VodStreamDemuxer called");
52 }
53
~StreamDemuxer()54 StreamDemuxer::~StreamDemuxer()
55 {
56 MEDIA_LOG_D("~VodStreamDemuxer called");
57 ResetAllCache();
58 }
59
ReadFrameData(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)60 Status StreamDemuxer::ReadFrameData(int32_t streamID, uint64_t offset, size_t size,
61 std::shared_ptr<Buffer>& bufferPtr)
62 {
63 if (IsDash() || GetIsDataSrcNoSeek()) {
64 MEDIA_LOG_D("GetPeekRange read cache, offset: " PUBLIC_LOG_U64 " streamID: " PUBLIC_LOG_D32, offset, streamID);
65 if (cacheDataMap_.find(streamID) != cacheDataMap_.end() && cacheDataMap_[streamID].CheckCacheExist(offset)) {
66 MEDIA_LOG_D("GetPeekRange read cache, offset: " PUBLIC_LOG_U64, offset);
67 auto memory = cacheDataMap_[streamID].GetData()->GetMemory();
68 if (memory != nullptr && memory->GetSize() > 0) {
69 MEDIA_LOG_D("GetPeekRange read cache, Read data from cache data. streamID: " PUBLIC_LOG_D32, streamID);
70 return PullDataWithCache(streamID, offset, size, bufferPtr);
71 }
72 }
73 }
74 return PullData(streamID, offset, size, bufferPtr);
75 }
76
ReadHeaderData(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)77 Status StreamDemuxer::ReadHeaderData(int32_t streamID, uint64_t offset, size_t size,
78 std::shared_ptr<Buffer>& bufferPtr)
79 {
80 if (cacheDataMap_.find(streamID) != cacheDataMap_.end() && cacheDataMap_[streamID].CheckCacheExist(offset)) {
81 MEDIA_LOG_D("GetPeekRange read cache, offset: " PUBLIC_LOG_U64, offset);
82 auto memory = cacheDataMap_[streamID].GetData()->GetMemory();
83 if (memory != nullptr && memory->GetSize() > 0) {
84 MEDIA_LOG_D("GetPeekRange read cache, Read data from cache data.");
85 return PullDataWithCache(streamID, offset, size, bufferPtr);
86 }
87 }
88 return PullDataWithoutCache(streamID, offset, size, bufferPtr);
89 }
90
GetPeekRange(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)91 Status StreamDemuxer::GetPeekRange(int32_t streamID, uint64_t offset, size_t size, std::shared_ptr<Buffer>& bufferPtr)
92 {
93 FALSE_RETURN_V_MSG_E(!isInterruptNeeded_.load(), Status::ERROR_WRONG_STATE,
94 "GetPeekRange interrupt " PUBLIC_LOG_D32 " " PUBLIC_LOG_U64 " " PUBLIC_LOG_ZU, streamID, offset, size);
95 FALSE_RETURN_V_MSG_E(streamID >= 0, Status::ERROR_INVALID_PARAMETER,
96 "Invalid streamId, id = " PUBLIC_LOG_D32, streamID);
97 if (bufferPtr == nullptr) {
98 MEDIA_LOG_E("GetPeekRange bufferPtr invalid.");
99 return Status::ERROR_INVALID_PARAMETER;
100 }
101 bufferPtr->streamID = streamID;
102 Status ret = Status::OK;
103 if (pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FRAME) {
104 ret = ReadFrameData(streamID, offset, size, bufferPtr);
105 } else {
106 ret = ReadHeaderData(streamID, offset, size, bufferPtr);
107 }
108 if (ret != Status::OK) {
109 return ret;
110 }
111 return CheckChangeStreamID(streamID, bufferPtr);
112 }
113
Init(const std::string & uri)114 Status StreamDemuxer::Init(const std::string& uri)
115 {
116 MediaAVCodec::AVCodecTrace trace("StreamDemuxer::Init");
117 MEDIA_LOG_D("StreamDemuxer::Init called");
118 checkRange_ = [](int32_t streamID, uint64_t offset, uint32_t size) {
119 return Status::OK;
120 };
121 peekRange_ = [this](int32_t streamID, uint64_t offset, size_t size, std::shared_ptr<Buffer>& bufferPtr) -> Status {
122 return GetPeekRange(streamID, offset, size, bufferPtr);
123 };
124 getRange_ = peekRange_;
125 uri_ = uri;
126 return Status::OK;
127 }
128
PullDataWithCache(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)129 Status StreamDemuxer::PullDataWithCache(int32_t streamID, uint64_t offset, size_t size,
130 std::shared_ptr<Buffer>& bufferPtr)
131 {
132 FALSE_RETURN_V_MSG_E(bufferPtr->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "bufferPtr invalid");
133 auto memory = cacheDataMap_[streamID].GetData()->GetMemory();
134 FALSE_RETURN_V_MSG_E(memory != nullptr, Status::ERROR_UNKNOWN, "memory invalid");
135 MEDIA_LOG_D("PullDataWithCache, Read data from cache data. streamID: " PUBLIC_LOG_D32, streamID);
136 uint64_t offsetInCache = offset - cacheDataMap_[streamID].GetOffset();
137 if (size <= memory->GetSize() - offsetInCache) {
138 MEDIA_LOG_D("Readfromcache. streamID: " PUBLIC_LOG_D32, streamID);
139 bufferPtr->GetMemory()->Write(memory->GetReadOnlyData() + offsetInCache, size, 0);
140 return Status::OK;
141 }
142 bufferPtr->GetMemory()->Write(memory->GetReadOnlyData() + offsetInCache, memory->GetSize() - offsetInCache, 0);
143 uint64_t remainOffset = cacheDataMap_[streamID].GetOffset() + memory->GetSize();
144 uint64_t remainSize = size - (memory->GetSize() - offsetInCache);
145 std::shared_ptr<Buffer> tempBuffer = Buffer::CreateDefaultBuffer(remainSize);
146 if (tempBuffer == nullptr || tempBuffer->GetMemory() == nullptr) {
147 MEDIA_LOG_W("PullDataWithCache, Read data from cache data. only get partial data.");
148 return Status::ERROR_UNKNOWN;
149 }
150 Status ret = PullData(streamID, remainOffset, remainSize, tempBuffer);
151 if (ret == Status::OK) {
152 FALSE_RETURN_V_MSG_E(tempBuffer->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "tempBuffer invalid");
153 bufferPtr->GetMemory()->Write(tempBuffer->GetMemory()->GetReadOnlyData(),
154 tempBuffer->GetMemory()->GetSize(), memory->GetSize() - offsetInCache);
155 if (pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FRAME) {
156 MEDIA_LOG_W("PullDataWithCache, not cache begin.");
157 return ret;
158 }
159 std::shared_ptr<Buffer> mergedBuffer = Buffer::CreateDefaultBuffer(
160 tempBuffer->GetMemory()->GetSize() + memory->GetSize());
161 FALSE_RETURN_V_MSG_E(mergedBuffer != nullptr, Status::ERROR_UNKNOWN, "mergedBuffer invalid");
162 FALSE_RETURN_V_MSG_E(mergedBuffer->GetMemory() != nullptr, Status::ERROR_UNKNOWN,
163 "mergedBuffer->GetMemory invalid");
164 mergedBuffer->GetMemory()->Write(memory->GetReadOnlyData(), memory->GetSize(), 0);
165 mergedBuffer->GetMemory()->Write(tempBuffer->GetMemory()->GetReadOnlyData(),
166 tempBuffer->GetMemory()->GetSize(), memory->GetSize());
167 cacheDataMap_[streamID].SetData(mergedBuffer);
168 memory = cacheDataMap_[streamID].GetData()->GetMemory();
169 FALSE_RETURN_V_MSG_E(memory != nullptr, Status::ERROR_UNKNOWN, "memory invalid");
170 MEDIA_LOG_I("PullDataWithCache, offset: " PUBLIC_LOG_U64 ", cache offset: " PUBLIC_LOG_U64
171 ", cache size: " PUBLIC_LOG_ZU, offset, cacheDataMap_[streamID].GetOffset(), memory->GetSize());
172 }
173 FALSE_RETURN_V_MSG_E(ret != Status::END_OF_STREAM, Status::OK, "eos&data return data");
174 return ret;
175 }
176
ProcInnerDash(int32_t streamID,uint64_t offset,std::shared_ptr<Buffer> & bufferPtr)177 Status StreamDemuxer::ProcInnerDash(int32_t streamID, uint64_t offset, std::shared_ptr<Buffer>& bufferPtr)
178 {
179 FALSE_RETURN_V_MSG_E(bufferPtr != nullptr, Status::ERROR_UNKNOWN, "bufferPtr invalid");
180 if (IsDash()) {
181 MEDIA_LOG_D("dash PullDataWithoutCache, cacheDataMap_ exist streamID , merge it.");
182 FALSE_RETURN_V_MSG_E(cacheDataMap_[streamID].GetData() != nullptr, Status::ERROR_UNKNOWN, "getdata invalid");
183 auto cacheMemory = cacheDataMap_[streamID].GetData()->GetMemory();
184 auto bufferMemory = bufferPtr->GetMemory();
185 FALSE_RETURN_V_MSG_E(bufferMemory != nullptr, Status::ERROR_UNKNOWN, "bufferPtr invalid");
186 FALSE_RETURN_V_MSG_E(cacheMemory != nullptr, Status::ERROR_UNKNOWN, "cacheMemory invalid");
187 std::shared_ptr<Buffer> mergedBuffer = Buffer::CreateDefaultBuffer(
188 bufferMemory->GetSize() + cacheMemory->GetSize());
189 FALSE_RETURN_V_MSG_E(mergedBuffer != nullptr, Status::ERROR_UNKNOWN, "mergedBuffer invalid");
190 auto mergeMemory = mergedBuffer->GetMemory();
191 FALSE_RETURN_V_MSG_E(mergeMemory != nullptr, Status::ERROR_UNKNOWN, "mergeMemory invalid");
192 MEDIA_LOG_I("dash PullDataWithoutCache merge before: cache offset: " PUBLIC_LOG_U64
193 ", cache size: " PUBLIC_LOG_ZU, cacheDataMap_[streamID].GetOffset(), cacheMemory->GetSize());
194 mergeMemory->Write(cacheMemory->GetReadOnlyData(), cacheMemory->GetSize(), 0);
195 mergeMemory->Write(bufferMemory->GetReadOnlyData(), bufferMemory->GetSize(), cacheMemory->GetSize());
196 cacheDataMap_[streamID].SetData(mergedBuffer);
197 MEDIA_LOG_I("dash PullDataWithoutCache merge after: " PUBLIC_LOG_U64 ", cache offset: " PUBLIC_LOG_U64,
198 offset, cacheDataMap_[streamID].GetOffset());
199 }
200 return Status::OK;
201 }
202
PullDataWithoutCache(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)203 Status StreamDemuxer::PullDataWithoutCache(int32_t streamID, uint64_t offset, size_t size,
204 std::shared_ptr<Buffer>& bufferPtr)
205 {
206 Status ret = PullData(streamID, offset, size, bufferPtr);
207 if (ret != Status::OK) {
208 MEDIA_LOG_D("PullDataWithoutCache, PullData error " PUBLIC_LOG_D32, static_cast<int32_t>(ret));
209 return ret;
210 }
211 if (cacheDataMap_.find(streamID) != cacheDataMap_.end()) {
212 MEDIA_LOG_D("PullDataWithoutCache, cacheDataMap_ exist streamID , do nothing.");
213 ret = ProcInnerDash(streamID, offset, bufferPtr);
214 if (ret != Status::OK) {
215 MEDIA_LOG_E("ProcInnerDash error " PUBLIC_LOG_D32, static_cast<int32_t>(ret));
216 return ret;
217 }
218 } else {
219 CacheData cacheTmp;
220 cacheDataMap_[streamID] = cacheTmp;
221 }
222 if (cacheDataMap_[streamID].GetData() == nullptr || cacheDataMap_[streamID].GetData()->GetMemory() == nullptr) {
223 MEDIA_LOG_D("PullDataWithoutCache, write cache data.");
224 if (bufferPtr->GetMemory() == nullptr) {
225 MEDIA_LOG_W("PullDataWithoutCache, write cache data error. memory is nullptr!");
226 } else {
227 auto buffer = Buffer::CreateDefaultBuffer(bufferPtr->GetMemory()->GetSize());
228 if (buffer != nullptr && buffer->GetMemory() != nullptr) {
229 buffer->GetMemory()->Write(bufferPtr->GetMemory()->GetReadOnlyData(),
230 bufferPtr->GetMemory()->GetSize(), 0);
231 cacheDataMap_[streamID].Init(buffer, offset);
232 MEDIA_LOG_D("PullDataWithoutCache, write cache data success. offset=" PUBLIC_LOG_U64, offset);
233 } else {
234 MEDIA_LOG_W("PullDataWithoutCache, write cache data failed. memory is nullptr!");
235 }
236 }
237 }
238 return ret;
239 }
240
ReadRetry(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Plugins::Buffer> & data)241 Status StreamDemuxer::ReadRetry(int32_t streamID, uint64_t offset, size_t size,
242 std::shared_ptr<Plugins::Buffer>& data)
243 {
244 FALSE_RETURN_V_MSG_E(data->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "getmemory invalid");
245 Status err = Status::OK;
246 int32_t retryTimes = 0;
247 while (true && !isInterruptNeeded_.load()) {
248 {
249 ScopedTimer timer("Source Read", SOURCE_READ_WARNING_MS);
250 err = source_->Read(streamID, data, offset, size);
251 }
252 if (IsDash() && streamID != data->streamID) {
253 break;
254 }
255 FALSE_RETURN_V_MSG_E(err != Status::ERROR_UNKNOWN, Status::ERROR_UNKNOWN, "error unknown");
256 if (err != Status::END_OF_STREAM && data->GetMemory()->GetSize() == 0) {
257 {
258 std::unique_lock<std::mutex> lock(mutex_);
259 readCond_.wait_for(lock, std::chrono::milliseconds(TRY_READ_SLEEP_TIME),
260 [&] { return isInterruptNeeded_.load(); });
261 }
262 retryTimes++;
263 if (retryTimes > TRY_READ_TIMES || isInterruptNeeded_.load()) {
264 break;
265 }
266 continue;
267 }
268 break;
269 }
270 FALSE_LOG_MSG(!isInterruptNeeded_.load(), "ReadRetry interrupted");
271 return err;
272 }
273
PullData(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Plugins::Buffer> & data)274 Status StreamDemuxer::PullData(int32_t streamID, uint64_t offset, size_t size,
275 std::shared_ptr<Plugins::Buffer>& data)
276 {
277 MEDIA_LOG_DD("IN, offset: " PUBLIC_LOG_U64 ", size: " PUBLIC_LOG_ZU
278 ", position: " PUBLIC_LOG_U64, offset, size, position_);
279 if (!source_) {
280 return Status::ERROR_INVALID_OPERATION;
281 }
282 Status err;
283 auto readSize = size;
284 if (source_->IsSeekToTimeSupported() || source_->GetSeekable() == Plugins::Seekable::UNSEEKABLE) {
285 err = ReadRetry(streamID, offset, readSize, data);
286 FALSE_LOG_MSG(err == Status::OK, "hls, plugin read failed.");
287 return err;
288 }
289
290 uint64_t totalSize = 0;
291 if ((source_->GetSize(totalSize) == Status::OK) && (totalSize != 0)) {
292 if (offset >= totalSize) {
293 MEDIA_LOG_D("Offset: " PUBLIC_LOG_U64 " is larger than totalSize: " PUBLIC_LOG_U64, offset, totalSize);
294 return Status::END_OF_STREAM;
295 }
296 if ((offset + readSize) > totalSize) {
297 readSize = totalSize - offset;
298 }
299 if (data->GetMemory() != nullptr) {
300 auto realSize = data->GetMemory()->GetCapacity();
301 readSize = (readSize > realSize) ? realSize : readSize;
302 }
303 MEDIA_LOG_DD("TotalSize_: " PUBLIC_LOG_U64, totalSize);
304 }
305
306 if (position_ != offset || GetIsDataSrc()) {
307 err = source_->SeekTo(offset);
308 FALSE_RETURN_V_MSG_E(err == Status::OK, err, "Seek to " PUBLIC_LOG_U64 " fail", offset);
309 position_ = offset;
310 }
311
312 err = ReadRetry(streamID, offset, readSize, data);
313 if (err == Status::OK) {
314 FALSE_RETURN_V_MSG_E(data->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "data->GetMemory invalid");
315 position_ += data->GetMemory()->GetSize();
316 }
317 return err;
318 }
319
ResetCache(int32_t streamID)320 Status StreamDemuxer::ResetCache(int32_t streamID)
321 {
322 if (cacheDataMap_.find(streamID) != cacheDataMap_.end()) {
323 cacheDataMap_[streamID].Reset();
324 cacheDataMap_.erase(streamID);
325 }
326 return Status::OK;
327 }
328
ResetAllCache()329 Status StreamDemuxer::ResetAllCache()
330 {
331 for (auto& iter : cacheDataMap_) {
332 iter.second.Reset();
333 }
334 cacheDataMap_.clear();
335 return Status::OK;
336 }
337
Start()338 Status StreamDemuxer::Start()
339 {
340 return Status::OK;
341 }
342
343
Stop()344 Status StreamDemuxer::Stop()
345 {
346 return Status::OK;
347 }
348
349
Resume()350 Status StreamDemuxer::Resume()
351 {
352 return Status::OK;
353 }
354
355
Pause()356 Status StreamDemuxer::Pause()
357 {
358 return Status::OK;
359 }
360
Flush()361 Status StreamDemuxer::Flush()
362 {
363 return Status::OK;
364 }
365
HandleReadHeader(int32_t streamID,int64_t offset,std::shared_ptr<Buffer> & buffer,size_t expectedLen)366 Status StreamDemuxer::HandleReadHeader(int32_t streamID, int64_t offset, std::shared_ptr<Buffer>& buffer,
367 size_t expectedLen)
368 {
369 MEDIA_LOG_D("Demuxer parse DEMUXER_STATE_PARSE_HEADER, offset: " PUBLIC_LOG_D64
370 ", expectedLen: " PUBLIC_LOG_ZU, offset, expectedLen);
371 if (expectedLen == 0) {
372 return Status::END_OF_STREAM;
373 }
374 Status ret = getRange_(streamID, static_cast<uint64_t>(offset), expectedLen, buffer);
375 if (ret == Status::OK) {
376 DUMP_BUFFER2FILE(DEMUXER_INPUT_PEEK, buffer);
377 return ret;
378 }
379 // Under the current specifications, change buffer->streamID only in the scenario of switching tracks.
380 FALSE_RETURN_V_NOLOG(!IsDash() || buffer == nullptr || buffer->streamID == streamID, Status::END_OF_STREAM);
381
382 MEDIA_LOG_W("Demuxer parse DEMUXER_STATE_PARSE_HEADER, getRange_ failed, ret = " PUBLIC_LOG_D32, ret);
383 return ret;
384 }
385
CheckChangeStreamID(int32_t streamID,std::shared_ptr<Buffer> & buffer)386 Status StreamDemuxer::CheckChangeStreamID(int32_t streamID, std::shared_ptr<Buffer>& buffer)
387 {
388 if (IsDash()) {
389 if (buffer != nullptr && buffer->streamID != streamID) {
390 if (GetNewVideoStreamID() == streamID) {
391 SetNewVideoStreamID(buffer->streamID);
392 } else if (GetNewAudioStreamID() == streamID) {
393 SetNewAudioStreamID(buffer->streamID);
394 } else if (GetNewSubtitleStreamID() == streamID) {
395 SetNewSubtitleStreamID(buffer->streamID);
396 } else {}
397 MEDIA_LOG_I("Demuxer parse dash change, oldStreamID = " PUBLIC_LOG_D32
398 ", newStreamID = " PUBLIC_LOG_D32, streamID, buffer->streamID);
399 return Status::END_OF_STREAM;
400 }
401 }
402 return Status::OK;
403 }
404
HandleReadPacket(int32_t streamID,int64_t offset,std::shared_ptr<Buffer> & buffer,size_t expectedLen)405 Status StreamDemuxer::HandleReadPacket(int32_t streamID, int64_t offset, std::shared_ptr<Buffer>& buffer,
406 size_t expectedLen)
407 {
408 MEDIA_LOG_D("Demuxer parse DEMUXER_STATE_PARSE_FRAME");
409 Status ret = getRange_(streamID, static_cast<uint64_t>(offset), expectedLen, buffer);
410 if (ret == Status::OK) {
411 DUMP_BUFFER2LOG("Demuxer GetRange", buffer, offset);
412 DUMP_BUFFER2FILE(DEMUXER_INPUT_GET, buffer);
413 if (buffer != nullptr && buffer->GetMemory() != nullptr &&
414 buffer->GetMemory()->GetSize() == 0) {
415 MEDIA_LOG_I("Demuxer parse DEMUXER_STATE_PARSE_FRAME in pausing(isIgnoreParse),"
416 " Read fail and try again");
417 return Status::ERROR_AGAIN;
418 }
419 return ret;
420 }
421 MEDIA_LOG_D("Demuxer parse DEMUXER_STATE_PARSE_FRAME, getRange_ failed, ret = " PUBLIC_LOG_D32, ret);
422 return ret;
423 }
424
CallbackReadAt(int32_t streamID,int64_t offset,std::shared_ptr<Buffer> & buffer,size_t expectedLen)425 Status StreamDemuxer::CallbackReadAt(int32_t streamID, int64_t offset, std::shared_ptr<Buffer>& buffer,
426 size_t expectedLen)
427 {
428 FALSE_RETURN_V(!isInterruptNeeded_.load(), Status::ERROR_WRONG_STATE);
429 switch (pluginStateMap_[streamID]) {
430 case DemuxerState::DEMUXER_STATE_NULL:
431 return Status::ERROR_WRONG_STATE;
432 case DemuxerState::DEMUXER_STATE_PARSE_HEADER: {
433 auto ret = HandleReadHeader(streamID, offset, buffer, expectedLen);
434 if (ret != Status::OK) {
435 return ret;
436 }
437 break;
438 }
439 case DemuxerState::DEMUXER_STATE_PARSE_FIRST_FRAME:
440 case DemuxerState::DEMUXER_STATE_PARSE_FRAME: {
441 auto ret = HandleReadPacket(streamID, offset, buffer, expectedLen);
442 if (ret == Status::END_OF_STREAM &&
443 pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FIRST_FRAME) {
444 SetDemuxerState(streamID, DemuxerState::DEMUXER_STATE_PARSE_FRAME);
445 return ret;
446 }
447 if (ret != Status::OK) {
448 return ret;
449 }
450 if (pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FIRST_FRAME) {
451 SetDemuxerState(streamID, DemuxerState::DEMUXER_STATE_PARSE_FRAME);
452 }
453 break;
454 }
455 default:
456 break;
457 }
458 return Status::OK;
459 }
460
SetInterruptState(bool isInterruptNeeded)461 void StreamDemuxer::SetInterruptState(bool isInterruptNeeded)
462 {
463 MEDIA_LOG_D("StreamDemuxer OnInterrupted %{public}d", isInterruptNeeded);
464 {
465 std::unique_lock<std::mutex> lock(mutex_);
466 isInterruptNeeded_ = isInterruptNeeded;
467 readCond_.notify_all();
468 }
469 TypeFinderInterrupt(isInterruptNeeded);
470 }
471 } // namespace Media
472 } // namespace OHOS
473