• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "StreamDemuxer"
17 
18 #include "stream_demuxer.h"
19 
20 #include <algorithm>
21 #include <map>
22 #include <memory>
23 
24 #include "avcodec_common.h"
25 #include "avcodec_trace.h"
26 #include "cpp_ext/type_traits_ext.h"
27 #include "buffer/avallocator.h"
28 #include "common/event.h"
29 #include "common/log.h"
30 #include "meta/media_types.h"
31 #include "meta/meta.h"
32 #include "osal/utils/dump_buffer.h"
33 #include "plugin/plugin_buffer.h"
34 #include "plugin/plugin_info.h"
35 #include "plugin/plugin_time.h"
36 #include "source/source.h"
37 #include "scoped_timer.h"
38 
39 namespace {
40 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "StreamDemuxer" };
41 }
42 
43 namespace OHOS {
44 namespace Media {
45 
46 const int32_t TRY_READ_SLEEP_TIME = 10;  //ms
47 const int32_t TRY_READ_TIMES = 10;
48 constexpr int64_t SOURCE_READ_WARNING_MS = 100;
StreamDemuxer()49 StreamDemuxer::StreamDemuxer() : position_(0)
50 {
51     MEDIA_LOG_D("VodStreamDemuxer called");
52 }
53 
~StreamDemuxer()54 StreamDemuxer::~StreamDemuxer()
55 {
56     MEDIA_LOG_D("~VodStreamDemuxer called");
57     ResetAllCache();
58 }
59 
ReadFrameData(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)60 Status StreamDemuxer::ReadFrameData(int32_t streamID, uint64_t offset, size_t size,
61     std::shared_ptr<Buffer>& bufferPtr)
62 {
63     if (IsDash() || GetIsDataSrcNoSeek()) {
64         MEDIA_LOG_D("GetPeekRange read cache, offset: " PUBLIC_LOG_U64 " streamID: " PUBLIC_LOG_D32, offset, streamID);
65         if (cacheDataMap_.find(streamID) != cacheDataMap_.end() && cacheDataMap_[streamID].CheckCacheExist(offset)) {
66             MEDIA_LOG_D("GetPeekRange read cache, offset: " PUBLIC_LOG_U64, offset);
67             auto memory = cacheDataMap_[streamID].GetData()->GetMemory();
68             if (memory != nullptr && memory->GetSize() > 0) {
69                 MEDIA_LOG_D("GetPeekRange read cache, Read data from cache data. streamID: " PUBLIC_LOG_D32, streamID);
70                 return PullDataWithCache(streamID, offset, size, bufferPtr);
71             }
72         }
73     }
74     return PullData(streamID, offset, size, bufferPtr);
75 }
76 
ReadHeaderData(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)77 Status StreamDemuxer::ReadHeaderData(int32_t streamID, uint64_t offset, size_t size,
78     std::shared_ptr<Buffer>& bufferPtr)
79 {
80     if (cacheDataMap_.find(streamID) != cacheDataMap_.end() && cacheDataMap_[streamID].CheckCacheExist(offset)) {
81         MEDIA_LOG_D("GetPeekRange read cache, offset: " PUBLIC_LOG_U64, offset);
82         auto memory = cacheDataMap_[streamID].GetData()->GetMemory();
83         if (memory != nullptr && memory->GetSize() > 0) {
84             MEDIA_LOG_D("GetPeekRange read cache, Read data from cache data.");
85             return PullDataWithCache(streamID, offset, size, bufferPtr);
86         }
87     }
88     return PullDataWithoutCache(streamID, offset, size, bufferPtr);
89 }
90 
GetPeekRange(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)91 Status StreamDemuxer::GetPeekRange(int32_t streamID, uint64_t offset, size_t size, std::shared_ptr<Buffer>& bufferPtr)
92 {
93     FALSE_RETURN_V_MSG_E(!isInterruptNeeded_.load(), Status::ERROR_WRONG_STATE,
94         "GetPeekRange interrupt " PUBLIC_LOG_D32 " " PUBLIC_LOG_U64 " " PUBLIC_LOG_ZU, streamID, offset, size);
95     FALSE_RETURN_V_MSG_E(streamID >= 0, Status::ERROR_INVALID_PARAMETER,
96         "Invalid streamId, id = " PUBLIC_LOG_D32, streamID);
97     if (bufferPtr == nullptr) {
98         MEDIA_LOG_E("GetPeekRange bufferPtr invalid.");
99         return Status::ERROR_INVALID_PARAMETER;
100     }
101     bufferPtr->streamID = streamID;
102     Status ret = Status::OK;
103     if (pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FRAME) {
104         ret = ReadFrameData(streamID, offset, size, bufferPtr);
105     } else {
106         ret = ReadHeaderData(streamID, offset, size, bufferPtr);
107     }
108     if (ret != Status::OK) {
109         return ret;
110     }
111     return CheckChangeStreamID(streamID, bufferPtr);
112 }
113 
Init(const std::string & uri)114 Status StreamDemuxer::Init(const std::string& uri)
115 {
116     MediaAVCodec::AVCodecTrace trace("StreamDemuxer::Init");
117     MEDIA_LOG_D("StreamDemuxer::Init called");
118     checkRange_ = [](int32_t streamID, uint64_t offset, uint32_t size) {
119         return Status::OK;
120     };
121     peekRange_ = [this](int32_t streamID, uint64_t offset, size_t size, std::shared_ptr<Buffer>& bufferPtr) -> Status {
122         return GetPeekRange(streamID, offset, size, bufferPtr);
123     };
124     getRange_ = peekRange_;
125     uri_ = uri;
126     return Status::OK;
127 }
128 
PullDataWithCache(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)129 Status StreamDemuxer::PullDataWithCache(int32_t streamID, uint64_t offset, size_t size,
130     std::shared_ptr<Buffer>& bufferPtr)
131 {
132     FALSE_RETURN_V_MSG_E(bufferPtr->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "bufferPtr invalid");
133     auto memory = cacheDataMap_[streamID].GetData()->GetMemory();
134     FALSE_RETURN_V_MSG_E(memory != nullptr, Status::ERROR_UNKNOWN, "memory invalid");
135     MEDIA_LOG_D("PullDataWithCache, Read data from cache data. streamID: " PUBLIC_LOG_D32, streamID);
136     uint64_t offsetInCache = offset - cacheDataMap_[streamID].GetOffset();
137     if (size <= memory->GetSize() - offsetInCache) {
138         MEDIA_LOG_D("Readfromcache. streamID: " PUBLIC_LOG_D32, streamID);
139         bufferPtr->GetMemory()->Write(memory->GetReadOnlyData() + offsetInCache, size, 0);
140         return Status::OK;
141     }
142     bufferPtr->GetMemory()->Write(memory->GetReadOnlyData() + offsetInCache, memory->GetSize() - offsetInCache, 0);
143     uint64_t remainOffset = cacheDataMap_[streamID].GetOffset() + memory->GetSize();
144     uint64_t remainSize = size - (memory->GetSize() - offsetInCache);
145     std::shared_ptr<Buffer> tempBuffer = Buffer::CreateDefaultBuffer(remainSize);
146     if (tempBuffer == nullptr || tempBuffer->GetMemory() == nullptr) {
147         MEDIA_LOG_W("PullDataWithCache, Read data from cache data. only get partial data.");
148         return Status::ERROR_UNKNOWN;
149     }
150     Status ret = PullData(streamID, remainOffset, remainSize, tempBuffer);
151     if (ret == Status::OK) {
152         FALSE_RETURN_V_MSG_E(tempBuffer->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "tempBuffer invalid");
153         bufferPtr->GetMemory()->Write(tempBuffer->GetMemory()->GetReadOnlyData(),
154             tempBuffer->GetMemory()->GetSize(), memory->GetSize() - offsetInCache);
155         if (pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FRAME) {
156             MEDIA_LOG_W("PullDataWithCache, not cache begin.");
157             return ret;
158         }
159         std::shared_ptr<Buffer> mergedBuffer = Buffer::CreateDefaultBuffer(
160             tempBuffer->GetMemory()->GetSize() + memory->GetSize());
161         FALSE_RETURN_V_MSG_E(mergedBuffer != nullptr, Status::ERROR_UNKNOWN, "mergedBuffer invalid");
162         FALSE_RETURN_V_MSG_E(mergedBuffer->GetMemory() != nullptr, Status::ERROR_UNKNOWN,
163             "mergedBuffer->GetMemory invalid");
164         mergedBuffer->GetMemory()->Write(memory->GetReadOnlyData(), memory->GetSize(), 0);
165         mergedBuffer->GetMemory()->Write(tempBuffer->GetMemory()->GetReadOnlyData(),
166             tempBuffer->GetMemory()->GetSize(), memory->GetSize());
167         cacheDataMap_[streamID].SetData(mergedBuffer);
168         memory = cacheDataMap_[streamID].GetData()->GetMemory();
169         FALSE_RETURN_V_MSG_E(memory != nullptr, Status::ERROR_UNKNOWN, "memory invalid");
170         MEDIA_LOG_I("PullDataWithCache, offset: " PUBLIC_LOG_U64 ", cache offset: " PUBLIC_LOG_U64
171             ", cache size: " PUBLIC_LOG_ZU, offset, cacheDataMap_[streamID].GetOffset(), memory->GetSize());
172     }
173     FALSE_RETURN_V_MSG_E(ret != Status::END_OF_STREAM, Status::OK, "eos&data return data");
174     return ret;
175 }
176 
ProcInnerDash(int32_t streamID,uint64_t offset,std::shared_ptr<Buffer> & bufferPtr)177 Status StreamDemuxer::ProcInnerDash(int32_t streamID,  uint64_t offset, std::shared_ptr<Buffer>& bufferPtr)
178 {
179     FALSE_RETURN_V_MSG_E(bufferPtr != nullptr, Status::ERROR_UNKNOWN, "bufferPtr invalid");
180     if (IsDash()) {
181         MEDIA_LOG_D("dash PullDataWithoutCache, cacheDataMap_ exist streamID , merge it.");
182         FALSE_RETURN_V_MSG_E(cacheDataMap_[streamID].GetData() != nullptr, Status::ERROR_UNKNOWN, "getdata invalid");
183         auto cacheMemory = cacheDataMap_[streamID].GetData()->GetMemory();
184         auto bufferMemory = bufferPtr->GetMemory();
185         FALSE_RETURN_V_MSG_E(bufferMemory != nullptr, Status::ERROR_UNKNOWN, "bufferPtr invalid");
186         FALSE_RETURN_V_MSG_E(cacheMemory != nullptr, Status::ERROR_UNKNOWN, "cacheMemory invalid");
187         std::shared_ptr<Buffer> mergedBuffer = Buffer::CreateDefaultBuffer(
188             bufferMemory->GetSize() + cacheMemory->GetSize());
189         FALSE_RETURN_V_MSG_E(mergedBuffer != nullptr, Status::ERROR_UNKNOWN, "mergedBuffer invalid");
190         auto mergeMemory = mergedBuffer->GetMemory();
191         FALSE_RETURN_V_MSG_E(mergeMemory != nullptr, Status::ERROR_UNKNOWN, "mergeMemory invalid");
192         MEDIA_LOG_I("dash PullDataWithoutCache merge before: cache offset: " PUBLIC_LOG_U64
193             ", cache size: " PUBLIC_LOG_ZU, cacheDataMap_[streamID].GetOffset(), cacheMemory->GetSize());
194         mergeMemory->Write(cacheMemory->GetReadOnlyData(), cacheMemory->GetSize(), 0);
195         mergeMemory->Write(bufferMemory->GetReadOnlyData(), bufferMemory->GetSize(), cacheMemory->GetSize());
196         cacheDataMap_[streamID].SetData(mergedBuffer);
197         MEDIA_LOG_I("dash PullDataWithoutCache merge after: " PUBLIC_LOG_U64 ", cache offset: " PUBLIC_LOG_U64,
198             offset, cacheDataMap_[streamID].GetOffset());
199     }
200     return Status::OK;
201 }
202 
PullDataWithoutCache(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Buffer> & bufferPtr)203 Status StreamDemuxer::PullDataWithoutCache(int32_t streamID, uint64_t offset, size_t size,
204     std::shared_ptr<Buffer>& bufferPtr)
205 {
206     Status ret = PullData(streamID, offset, size, bufferPtr);
207     if (ret != Status::OK) {
208         MEDIA_LOG_D("PullDataWithoutCache, PullData error " PUBLIC_LOG_D32, static_cast<int32_t>(ret));
209         return ret;
210     }
211     if (cacheDataMap_.find(streamID) != cacheDataMap_.end()) {
212         MEDIA_LOG_D("PullDataWithoutCache, cacheDataMap_ exist streamID , do nothing.");
213         ret = ProcInnerDash(streamID, offset, bufferPtr);
214         if (ret != Status::OK) {
215             MEDIA_LOG_E("ProcInnerDash error " PUBLIC_LOG_D32, static_cast<int32_t>(ret));
216             return ret;
217         }
218     } else {
219         CacheData cacheTmp;
220         cacheDataMap_[streamID] = cacheTmp;
221     }
222     if (cacheDataMap_[streamID].GetData() == nullptr || cacheDataMap_[streamID].GetData()->GetMemory() == nullptr) {
223         MEDIA_LOG_D("PullDataWithoutCache, write cache data.");
224         if (bufferPtr->GetMemory() == nullptr) {
225             MEDIA_LOG_W("PullDataWithoutCache, write cache data error. memory is nullptr!");
226         } else {
227             auto buffer = Buffer::CreateDefaultBuffer(bufferPtr->GetMemory()->GetSize());
228             if (buffer != nullptr && buffer->GetMemory() != nullptr) {
229                 buffer->GetMemory()->Write(bufferPtr->GetMemory()->GetReadOnlyData(),
230                     bufferPtr->GetMemory()->GetSize(), 0);
231                 cacheDataMap_[streamID].Init(buffer, offset);
232                 MEDIA_LOG_D("PullDataWithoutCache, write cache data success. offset=" PUBLIC_LOG_U64, offset);
233             } else {
234                 MEDIA_LOG_W("PullDataWithoutCache, write cache data failed. memory is nullptr!");
235             }
236         }
237     }
238     return ret;
239 }
240 
ReadRetry(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Plugins::Buffer> & data)241 Status StreamDemuxer::ReadRetry(int32_t streamID, uint64_t offset, size_t size,
242     std::shared_ptr<Plugins::Buffer>& data)
243 {
244     FALSE_RETURN_V_MSG_E(data->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "getmemory invalid");
245     Status err = Status::OK;
246     int32_t retryTimes = 0;
247     while (true && !isInterruptNeeded_.load()) {
248         {
249             ScopedTimer timer("Source Read", SOURCE_READ_WARNING_MS);
250             err = source_->Read(streamID, data, offset, size);
251         }
252         if (IsDash() && streamID != data->streamID) {
253             break;
254         }
255         FALSE_RETURN_V_MSG_E(err != Status::ERROR_UNKNOWN, Status::ERROR_UNKNOWN, "error unknown");
256         if (err != Status::END_OF_STREAM && data->GetMemory()->GetSize() == 0) {
257             {
258                 std::unique_lock<std::mutex> lock(mutex_);
259                 readCond_.wait_for(lock, std::chrono::milliseconds(TRY_READ_SLEEP_TIME),
260                                    [&] { return isInterruptNeeded_.load(); });
261             }
262             retryTimes++;
263             if (retryTimes > TRY_READ_TIMES || isInterruptNeeded_.load()) {
264                 break;
265             }
266             continue;
267         }
268         break;
269     }
270     FALSE_LOG_MSG(!isInterruptNeeded_.load(), "ReadRetry interrupted");
271     return err;
272 }
273 
PullData(int32_t streamID,uint64_t offset,size_t size,std::shared_ptr<Plugins::Buffer> & data)274 Status StreamDemuxer::PullData(int32_t streamID, uint64_t offset, size_t size,
275     std::shared_ptr<Plugins::Buffer>& data)
276 {
277     MEDIA_LOG_DD("IN, offset: " PUBLIC_LOG_U64 ", size: " PUBLIC_LOG_ZU
278         ", position: " PUBLIC_LOG_U64, offset, size, position_);
279     if (!source_) {
280         return Status::ERROR_INVALID_OPERATION;
281     }
282     Status err;
283     auto readSize = size;
284     if (source_->IsSeekToTimeSupported() || source_->GetSeekable() == Plugins::Seekable::UNSEEKABLE) {
285         err = ReadRetry(streamID, offset, readSize, data);
286         FALSE_LOG_MSG(err == Status::OK, "hls, plugin read failed.");
287         return err;
288     }
289 
290     uint64_t totalSize = 0;
291     if ((source_->GetSize(totalSize) == Status::OK) && (totalSize != 0)) {
292         if (offset >= totalSize) {
293             MEDIA_LOG_D("Offset: " PUBLIC_LOG_U64 " is larger than totalSize: " PUBLIC_LOG_U64, offset, totalSize);
294             return Status::END_OF_STREAM;
295         }
296         if ((offset + readSize) > totalSize) {
297             readSize = totalSize - offset;
298         }
299         if (data->GetMemory() != nullptr) {
300             auto realSize = data->GetMemory()->GetCapacity();
301             readSize = (readSize > realSize) ? realSize : readSize;
302         }
303         MEDIA_LOG_DD("TotalSize_: " PUBLIC_LOG_U64, totalSize);
304     }
305 
306     if (position_ != offset || GetIsDataSrc()) {
307         err = source_->SeekTo(offset);
308         FALSE_RETURN_V_MSG_E(err == Status::OK, err, "Seek to " PUBLIC_LOG_U64 " fail", offset);
309         position_ = offset;
310     }
311 
312     err = ReadRetry(streamID, offset, readSize, data);
313     if (err == Status::OK) {
314         FALSE_RETURN_V_MSG_E(data->GetMemory() != nullptr, Status::ERROR_UNKNOWN, "data->GetMemory invalid");
315         position_ += data->GetMemory()->GetSize();
316     }
317     return err;
318 }
319 
ResetCache(int32_t streamID)320 Status StreamDemuxer::ResetCache(int32_t streamID)
321 {
322     if (cacheDataMap_.find(streamID) != cacheDataMap_.end()) {
323         cacheDataMap_[streamID].Reset();
324         cacheDataMap_.erase(streamID);
325     }
326     return Status::OK;
327 }
328 
ResetAllCache()329 Status StreamDemuxer::ResetAllCache()
330 {
331     for (auto& iter : cacheDataMap_) {
332         iter.second.Reset();
333     }
334     cacheDataMap_.clear();
335     return Status::OK;
336 }
337 
Start()338 Status StreamDemuxer::Start()
339 {
340     return Status::OK;
341 }
342 
343 
Stop()344 Status StreamDemuxer::Stop()
345 {
346     return Status::OK;
347 }
348 
349 
Resume()350 Status StreamDemuxer::Resume()
351 {
352     return Status::OK;
353 }
354 
355 
Pause()356 Status StreamDemuxer::Pause()
357 {
358     return Status::OK;
359 }
360 
Flush()361 Status StreamDemuxer::Flush()
362 {
363     return Status::OK;
364 }
365 
HandleReadHeader(int32_t streamID,int64_t offset,std::shared_ptr<Buffer> & buffer,size_t expectedLen)366 Status StreamDemuxer::HandleReadHeader(int32_t streamID, int64_t offset, std::shared_ptr<Buffer>& buffer,
367     size_t expectedLen)
368 {
369     MEDIA_LOG_D("Demuxer parse DEMUXER_STATE_PARSE_HEADER, offset: " PUBLIC_LOG_D64
370         ", expectedLen: " PUBLIC_LOG_ZU, offset, expectedLen);
371     if (expectedLen == 0) {
372         return Status::END_OF_STREAM;
373     }
374     Status ret = getRange_(streamID, static_cast<uint64_t>(offset), expectedLen, buffer);
375     if (ret == Status::OK) {
376         DUMP_BUFFER2FILE(DEMUXER_INPUT_PEEK, buffer);
377         return ret;
378     }
379     // Under the current specifications, change buffer->streamID only in the scenario of switching tracks.
380     FALSE_RETURN_V_NOLOG(!IsDash() || buffer == nullptr || buffer->streamID == streamID, Status::END_OF_STREAM);
381 
382     MEDIA_LOG_W("Demuxer parse DEMUXER_STATE_PARSE_HEADER, getRange_ failed, ret = " PUBLIC_LOG_D32, ret);
383     return ret;
384 }
385 
CheckChangeStreamID(int32_t streamID,std::shared_ptr<Buffer> & buffer)386 Status StreamDemuxer::CheckChangeStreamID(int32_t streamID, std::shared_ptr<Buffer>& buffer)
387 {
388     if (IsDash()) {
389         if (buffer != nullptr && buffer->streamID != streamID) {
390             if (GetNewVideoStreamID() == streamID) {
391                 SetNewVideoStreamID(buffer->streamID);
392             } else if (GetNewAudioStreamID() == streamID) {
393                 SetNewAudioStreamID(buffer->streamID);
394             } else if (GetNewSubtitleStreamID() == streamID) {
395                 SetNewSubtitleStreamID(buffer->streamID);
396             } else {}
397             MEDIA_LOG_I("Demuxer parse dash change, oldStreamID = " PUBLIC_LOG_D32
398                 ", newStreamID = " PUBLIC_LOG_D32, streamID, buffer->streamID);
399             return Status::END_OF_STREAM;
400         }
401     }
402     return Status::OK;
403 }
404 
HandleReadPacket(int32_t streamID,int64_t offset,std::shared_ptr<Buffer> & buffer,size_t expectedLen)405 Status StreamDemuxer::HandleReadPacket(int32_t streamID, int64_t offset, std::shared_ptr<Buffer>& buffer,
406     size_t expectedLen)
407 {
408     MEDIA_LOG_D("Demuxer parse DEMUXER_STATE_PARSE_FRAME");
409     Status ret = getRange_(streamID, static_cast<uint64_t>(offset), expectedLen, buffer);
410     if (ret == Status::OK) {
411         DUMP_BUFFER2LOG("Demuxer GetRange", buffer, offset);
412         DUMP_BUFFER2FILE(DEMUXER_INPUT_GET, buffer);
413         if (buffer != nullptr && buffer->GetMemory() != nullptr &&
414             buffer->GetMemory()->GetSize() == 0) {
415             MEDIA_LOG_I("Demuxer parse DEMUXER_STATE_PARSE_FRAME in pausing(isIgnoreParse),"
416                         " Read fail and try again");
417             return Status::ERROR_AGAIN;
418         }
419         return ret;
420     }
421     MEDIA_LOG_D("Demuxer parse DEMUXER_STATE_PARSE_FRAME, getRange_ failed, ret = " PUBLIC_LOG_D32, ret);
422     return ret;
423 }
424 
CallbackReadAt(int32_t streamID,int64_t offset,std::shared_ptr<Buffer> & buffer,size_t expectedLen)425 Status StreamDemuxer::CallbackReadAt(int32_t streamID, int64_t offset, std::shared_ptr<Buffer>& buffer,
426     size_t expectedLen)
427 {
428     FALSE_RETURN_V(!isInterruptNeeded_.load(), Status::ERROR_WRONG_STATE);
429     switch (pluginStateMap_[streamID]) {
430         case DemuxerState::DEMUXER_STATE_NULL:
431             return Status::ERROR_WRONG_STATE;
432         case DemuxerState::DEMUXER_STATE_PARSE_HEADER: {
433             auto ret = HandleReadHeader(streamID, offset, buffer, expectedLen);
434             if (ret != Status::OK) {
435                 return ret;
436             }
437             break;
438         }
439         case DemuxerState::DEMUXER_STATE_PARSE_FIRST_FRAME:
440         case DemuxerState::DEMUXER_STATE_PARSE_FRAME: {
441             auto ret = HandleReadPacket(streamID, offset, buffer, expectedLen);
442             if (ret == Status::END_OF_STREAM &&
443                 pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FIRST_FRAME) {
444                 SetDemuxerState(streamID, DemuxerState::DEMUXER_STATE_PARSE_FRAME);
445                 return ret;
446             }
447             if (ret != Status::OK) {
448                 return ret;
449             }
450             if (pluginStateMap_[streamID] == DemuxerState::DEMUXER_STATE_PARSE_FIRST_FRAME) {
451                 SetDemuxerState(streamID, DemuxerState::DEMUXER_STATE_PARSE_FRAME);
452             }
453             break;
454         }
455         default:
456             break;
457     }
458     return Status::OK;
459 }
460 
SetInterruptState(bool isInterruptNeeded)461 void StreamDemuxer::SetInterruptState(bool isInterruptNeeded)
462 {
463     MEDIA_LOG_D("StreamDemuxer OnInterrupted %{public}d", isInterruptNeeded);
464     {
465         std::unique_lock<std::mutex> lock(mutex_);
466         isInterruptNeeded_ = isInterruptNeeded;
467         readCond_.notify_all();
468     }
469     TypeFinderInterrupt(isInterruptNeeded);
470 }
471 } // namespace Media
472 } // namespace OHOS
473