• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define HST_LOG_TAG "StreamingExecutor"
16 
17 #include "streaming_executor.h"
18 #include <algorithm>
19 #include <functional>
20 #include "securec.h"
21 #include "osal/utils/util.h"
22 #include "steady_clock.h"
23 
24 namespace OHOS {
25 namespace Media {
26 namespace Plugin {
27 namespace HttpPlugin {
28 constexpr int RING_BUFFER_SIZE = 5 * 48 * 1024;
29 constexpr int PER_REQUEST_SIZE = 48 * 1024;
30 constexpr int WATER_LINE = RING_BUFFER_SIZE * 0.1;
31 
StreamingExecutor()32 StreamingExecutor::StreamingExecutor() noexcept
33 {
34     buffer_ = std::make_shared<RingBuffer>(RING_BUFFER_SIZE);
35     buffer_->Init();
36 
37     factory_ = std::make_shared<ClientFactory>(&RxHeaderData, &RxBodyData, this);
38 
39     task_ = std::make_shared<OSAL::Task>(std::string("StreamingExecutor"));
40     task_->RegisterHandler(std::bind(&StreamingExecutor::HttpDownloadThread, this));
41 
42     memset_s(&headerInfo_, sizeof(HeaderInfo), 0x00, sizeof(HeaderInfo));
43     headerInfo_.fileContentLen = 0;
44     startPos_ = 0;
45     isDownloading_ = false;
46 }
47 
~StreamingExecutor()48 StreamingExecutor::~StreamingExecutor() {}
49 
Open(const std::string & url)50 bool StreamingExecutor::Open(const std::string &url)
51 {
52     MEDIA_LOG_D("Open in");
53     FALSE_RETURN_V(!url.empty(), false);
54 
55     client_ = factory_->CreateClient(url);
56     FALSE_RETURN_V(client_ != nullptr, false);
57 
58     client_->Open(url);
59 
60     requestSize_ = PER_REQUEST_SIZE;
61     startPos_ = 0;
62     isEos_ = false;
63     task_->Start();
64     return true;
65 }
66 
67 
Close()68 void StreamingExecutor::Close()
69 {
70     task_->Stop();
71     startPos_ = 0;
72     if (client_ != nullptr) {
73         client_->Close();
74         client_ = nullptr;
75     }
76 }
77 
Read(unsigned char * buff,unsigned int wantReadLength,unsigned int & realReadLength,bool & isEos)78 bool StreamingExecutor::Read(unsigned char *buff, unsigned int wantReadLength,
79                              unsigned int &realReadLength, bool &isEos)
80 {
81     FALSE_RETURN_V(buffer_ != nullptr, false);
82     isEos = false;
83     realReadLength = buffer_->ReadBuffer(buff, wantReadLength, 2); // wait 2 times
84     if (isEos_ && realReadLength == 0) {
85         isEos = true;
86     }
87     MEDIA_LOG_D("Read: wantReadLength %" PUBLIC_LOG "d, realReadLength %" PUBLIC_LOG "d, isEos %"
88         PUBLIC_LOG "d", wantReadLength, realReadLength, isEos);
89     return true;
90 }
91 
Seek(int offset)92 bool StreamingExecutor::Seek(int offset)
93 {
94     FALSE_RETURN_V(buffer_ != nullptr, false);
95     MEDIA_LOG_I("Seek: buffer size %" PUBLIC_LOG "d, offset %" PUBLIC_LOG "d", buffer_->GetSize(), offset);
96     if (buffer_->Seek(offset)) {
97         return true;
98     }
99     buffer_->Clear(); // First clear buffer, avoid no available buffer then task pause never exit.
100     task_->Pause();
101     buffer_->Clear();
102     startPos_ = offset;
103     int64_t temp = headerInfo_.fileContentLen - offset;
104     temp = temp >= 0 ? temp : PER_REQUEST_SIZE;
105     requestSize_ = static_cast<int>(std::min(temp, static_cast<int64_t>(PER_REQUEST_SIZE)));
106     task_->Start();
107     isEos_ = false;
108     return true;
109 }
110 
GetContentLength() const111 size_t StreamingExecutor::GetContentLength() const
112 {
113     return headerInfo_.fileContentLen;
114 }
115 
IsStreaming() const116 bool StreamingExecutor::IsStreaming() const
117 {
118     return headerInfo_.isChunked;
119 }
120 
SetCallback(Callback * cb)121 void StreamingExecutor::SetCallback(Callback* cb)
122 {
123     callback_ = cb;
124 }
125 
HttpDownloadThread()126 void StreamingExecutor::HttpDownloadThread()
127 {
128     NetworkClientErrorCode clientCode;
129     NetworkServerErrorCode serverCode;
130     Status ret = client_->RequestData(startPos_, requestSize_, serverCode, clientCode);
131 
132     if (ret == Status::ERROR_CLIENT) {
133         MEDIA_LOG_I("Send http client error, code %" PUBLIC_LOG_D32, clientCode);
134         callback_->OnEvent({PluginEventType::CLIENT_ERROR, {clientCode}, "http"});
135     } else if (ret == Status::ERROR_SERVER) {
136         MEDIA_LOG_I("Send http server error, code %" PUBLIC_LOG_D32, serverCode);
137         callback_->OnEvent({PluginEventType::SERVER_ERROR, {serverCode}, "http"});
138     }
139     FALSE_LOG(ret == Status::OK);
140 
141     int64_t remaining = headerInfo_.fileContentLen - startPos_;
142     if (headerInfo_.fileContentLen > 0 && remaining <= 0) { // 检查是否播放结束
143         MEDIA_LOG_I("http transfer reach end, startPos_ %" PUBLIC_LOG "d", startPos_);
144         isEos_ = true;
145         task_->PauseAsync();
146         requestSize_ = PER_REQUEST_SIZE;
147         return;
148     }
149     if(remaining < PER_REQUEST_SIZE){
150         requestSize_ = remaining;
151     }
152 }
153 
RxBodyData(void * buffer,size_t size,size_t nitems,void * userParam)154 size_t StreamingExecutor::RxBodyData(void *buffer, size_t size, size_t nitems, void *userParam)
155 {
156     auto executor = static_cast<StreamingExecutor *>(userParam);
157     HeaderInfo *header = &(executor->headerInfo_);
158     size_t dataLen = size * nitems;
159 
160     if (header->fileContentLen == 0) {
161         if (header->contentLen > 0) {
162             MEDIA_LOG_W("Unsupported range, use content length as content file length");
163             header->fileContentLen = header->contentLen;
164         } else {
165             MEDIA_LOG_E("fileContentLen and contentLen are both zero.");
166             return 0;
167         }
168     }
169     if (!executor->isDownloading_) {
170         executor->isDownloading_ = true;
171     }
172     executor->buffer_->WriteBuffer(buffer, dataLen, executor->startPos_);
173     executor->isDownloading_ = false;
174     MEDIA_LOG_I("RxBodyData: dataLen %" PUBLIC_LOG "d, startPos_ %" PUBLIC_LOG "d, buffer size %"
175         PUBLIC_LOG "d", dataLen, executor->startPos_, executor->buffer_->GetSize());
176     executor->startPos_ = executor->startPos_ + dataLen;
177 
178     int bufferSize = executor->buffer_->GetSize();
179     double ratio = (static_cast<double>(bufferSize)) / RING_BUFFER_SIZE;
180     if (bufferSize >= WATER_LINE && !executor->aboveWaterline_) {
181         executor->aboveWaterline_ = true;
182         MEDIA_LOG_I("Send http aboveWaterline event, ringbuffer ratio %" PUBLIC_LOG_F, ratio);
183         executor->callback_->OnEvent({PluginEventType::ABOVE_LOW_WATERLINE, {ratio}, "http"});
184     } else if (bufferSize < WATER_LINE && executor->aboveWaterline_) {
185         executor->aboveWaterline_ = false;
186         MEDIA_LOG_I("Send http belowWaterline event, ringbuffer ratio %" PUBLIC_LOG_F, ratio);
187         executor->callback_->OnEvent({PluginEventType::BELOW_LOW_WATERLINE, {ratio}, "http"});
188     }
189 
190     return dataLen;
191 }
192 
193 namespace {
StringTrim(char * str)194 char *StringTrim(char *str)
195 {
196     if (str == nullptr) {
197         return nullptr;
198     }
199     char *p = str;
200     char *p1 = p + strlen(str) - 1;
201 
202     while (*p && isspace(static_cast<int>(*p))) {
203         p++;
204     }
205     while (p1 > p && isspace(static_cast<int>(*p1))) {
206         *p1-- = 0;
207     }
208     return p;
209 }
210 }
211 
RxHeaderData(void * buffer,size_t size,size_t nitems,void * userParam)212 size_t StreamingExecutor::RxHeaderData(void *buffer, size_t size, size_t nitems, void *userParam)
213 {
214     auto executor = reinterpret_cast<StreamingExecutor *>(userParam);
215     HeaderInfo *info = &(executor->headerInfo_);
216     char *key = strtok(reinterpret_cast<char *>(buffer), ":");
217     FALSE_RETURN_V(key != nullptr, size * nitems);
218     if (!strncmp(key, "Content-Type", strlen("Content-Type"))) {
219         char *token = strtok(NULL, ":");
220         FALSE_RETURN_V(token != nullptr, size * nitems);
221         char *type = StringTrim(token);
222         memcpy_s(info->contentType, sizeof(info->contentType), type, sizeof(info->contentType));
223     }
224 
225     if (!strncmp(key, "Content-Length", strlen("Content-Length")) ||
226         !strncmp(key, "content-length", strlen("content-length"))) {
227         char *token = strtok(NULL, ":");
228         FALSE_RETURN_V(token != nullptr, size * nitems);
229         char *contLen = StringTrim(token);
230         info->contentLen = atol(contLen);
231     }
232 
233     if (!strncmp(key, "Transfer-Encoding", strlen("Transfer-Encoding")) ||
234         !strncmp(key, "transfer-encoding", strlen("transfer-encoding"))) {
235         char *token = strtok(NULL, ":");
236         FALSE_RETURN_V(token != nullptr, size * nitems);
237         char *transEncode = StringTrim(token);
238         if (!strncmp(transEncode, "chunked", strlen("chunked"))) {
239             info->isChunked = true;
240         }
241     }
242 
243     if (!strncmp(key, "Content-Range", strlen("Content-Range")) ||
244         !strncmp(key, "content-range", strlen("content-range"))) {
245         char *token = strtok(NULL, ":");
246         FALSE_RETURN_V(token != nullptr, size * nitems);
247         char *strRange = StringTrim(token);
248         long start, end, fileLen;
249         FALSE_LOG_MSG_E(sscanf_s(strRange, "bytes %ld-%ld/%ld", &start, &end, &fileLen) != -1,
250                         "sscanf get range failed");
251         if (info->fileContentLen > 0 && info->fileContentLen != fileLen) {
252             MEDIA_LOG_E("FileContentLen doesn't equal to fileLen");
253         }
254         if (info->fileContentLen == 0) {
255             info->fileContentLen = fileLen;
256         }
257     }
258     return size * nitems;
259 }
260 }
261 }
262 }
263 }