1 /*
2 * Copyright (c) 2022-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define HST_LOG_TAG "StreamingExecutor"
16
17 #include "streaming_executor.h"
18 #include <algorithm>
19 #include <functional>
20 #include "securec.h"
21 #include "osal/utils/util.h"
22 #include "steady_clock.h"
23
24 namespace OHOS {
25 namespace Media {
26 namespace Plugin {
27 namespace HttpPlugin {
28 constexpr int RING_BUFFER_SIZE = 5 * 48 * 1024;
29 constexpr int PER_REQUEST_SIZE = 48 * 1024;
30 constexpr int WATER_LINE = RING_BUFFER_SIZE * 0.1;
31
StreamingExecutor()32 StreamingExecutor::StreamingExecutor() noexcept
33 {
34 buffer_ = std::make_shared<RingBuffer>(RING_BUFFER_SIZE);
35 buffer_->Init();
36
37 factory_ = std::make_shared<ClientFactory>(&RxHeaderData, &RxBodyData, this);
38
39 task_ = std::make_shared<OSAL::Task>(std::string("StreamingExecutor"));
40 task_->RegisterHandler(std::bind(&StreamingExecutor::HttpDownloadThread, this));
41
42 memset_s(&headerInfo_, sizeof(HeaderInfo), 0x00, sizeof(HeaderInfo));
43 headerInfo_.fileContentLen = 0;
44 startPos_ = 0;
45 isDownloading_ = false;
46 }
47
~StreamingExecutor()48 StreamingExecutor::~StreamingExecutor() {}
49
Open(const std::string & url)50 bool StreamingExecutor::Open(const std::string &url)
51 {
52 MEDIA_LOG_D("Open in");
53 FALSE_RETURN_V(!url.empty(), false);
54
55 client_ = factory_->CreateClient(url);
56 FALSE_RETURN_V(client_ != nullptr, false);
57
58 client_->Open(url);
59
60 requestSize_ = PER_REQUEST_SIZE;
61 startPos_ = 0;
62 isEos_ = false;
63 task_->Start();
64 return true;
65 }
66
67
Close()68 void StreamingExecutor::Close()
69 {
70 task_->Stop();
71 startPos_ = 0;
72 if (client_ != nullptr) {
73 client_->Close();
74 client_ = nullptr;
75 }
76 }
77
Read(unsigned char * buff,unsigned int wantReadLength,unsigned int & realReadLength,bool & isEos)78 bool StreamingExecutor::Read(unsigned char *buff, unsigned int wantReadLength,
79 unsigned int &realReadLength, bool &isEos)
80 {
81 FALSE_RETURN_V(buffer_ != nullptr, false);
82 isEos = false;
83 realReadLength = buffer_->ReadBuffer(buff, wantReadLength, 2); // wait 2 times
84 if (isEos_ && realReadLength == 0) {
85 isEos = true;
86 }
87 MEDIA_LOG_D("Read: wantReadLength %" PUBLIC_LOG "d, realReadLength %" PUBLIC_LOG "d, isEos %"
88 PUBLIC_LOG "d", wantReadLength, realReadLength, isEos);
89 return true;
90 }
91
Seek(int offset)92 bool StreamingExecutor::Seek(int offset)
93 {
94 FALSE_RETURN_V(buffer_ != nullptr, false);
95 MEDIA_LOG_I("Seek: buffer size %" PUBLIC_LOG "d, offset %" PUBLIC_LOG "d", buffer_->GetSize(), offset);
96 if (buffer_->Seek(offset)) {
97 return true;
98 }
99 buffer_->Clear(); // First clear buffer, avoid no available buffer then task pause never exit.
100 task_->Pause();
101 buffer_->Clear();
102 startPos_ = offset;
103 int64_t temp = headerInfo_.fileContentLen - offset;
104 temp = temp >= 0 ? temp : PER_REQUEST_SIZE;
105 requestSize_ = static_cast<int>(std::min(temp, static_cast<int64_t>(PER_REQUEST_SIZE)));
106 task_->Start();
107 isEos_ = false;
108 return true;
109 }
110
GetContentLength() const111 size_t StreamingExecutor::GetContentLength() const
112 {
113 return headerInfo_.fileContentLen;
114 }
115
IsStreaming() const116 bool StreamingExecutor::IsStreaming() const
117 {
118 return headerInfo_.isChunked;
119 }
120
SetCallback(Callback * cb)121 void StreamingExecutor::SetCallback(Callback* cb)
122 {
123 callback_ = cb;
124 }
125
HttpDownloadThread()126 void StreamingExecutor::HttpDownloadThread()
127 {
128 NetworkClientErrorCode clientCode;
129 NetworkServerErrorCode serverCode;
130 Status ret = client_->RequestData(startPos_, requestSize_, serverCode, clientCode);
131
132 if (ret == Status::ERROR_CLIENT) {
133 MEDIA_LOG_I("Send http client error, code %" PUBLIC_LOG_D32, clientCode);
134 callback_->OnEvent({PluginEventType::CLIENT_ERROR, {clientCode}, "http"});
135 } else if (ret == Status::ERROR_SERVER) {
136 MEDIA_LOG_I("Send http server error, code %" PUBLIC_LOG_D32, serverCode);
137 callback_->OnEvent({PluginEventType::SERVER_ERROR, {serverCode}, "http"});
138 }
139 FALSE_LOG(ret == Status::OK);
140
141 int64_t remaining = headerInfo_.fileContentLen - startPos_;
142 if (headerInfo_.fileContentLen > 0 && remaining <= 0) { // 检查是否播放结束
143 MEDIA_LOG_I("http transfer reach end, startPos_ %" PUBLIC_LOG "d", startPos_);
144 isEos_ = true;
145 task_->PauseAsync();
146 requestSize_ = PER_REQUEST_SIZE;
147 return;
148 }
149 if(remaining < PER_REQUEST_SIZE){
150 requestSize_ = remaining;
151 }
152 }
153
RxBodyData(void * buffer,size_t size,size_t nitems,void * userParam)154 size_t StreamingExecutor::RxBodyData(void *buffer, size_t size, size_t nitems, void *userParam)
155 {
156 auto executor = static_cast<StreamingExecutor *>(userParam);
157 HeaderInfo *header = &(executor->headerInfo_);
158 size_t dataLen = size * nitems;
159
160 if (header->fileContentLen == 0) {
161 if (header->contentLen > 0) {
162 MEDIA_LOG_W("Unsupported range, use content length as content file length");
163 header->fileContentLen = header->contentLen;
164 } else {
165 MEDIA_LOG_E("fileContentLen and contentLen are both zero.");
166 return 0;
167 }
168 }
169 if (!executor->isDownloading_) {
170 executor->isDownloading_ = true;
171 }
172 executor->buffer_->WriteBuffer(buffer, dataLen, executor->startPos_);
173 executor->isDownloading_ = false;
174 MEDIA_LOG_I("RxBodyData: dataLen %" PUBLIC_LOG "d, startPos_ %" PUBLIC_LOG "d, buffer size %"
175 PUBLIC_LOG "d", dataLen, executor->startPos_, executor->buffer_->GetSize());
176 executor->startPos_ = executor->startPos_ + dataLen;
177
178 int bufferSize = executor->buffer_->GetSize();
179 double ratio = (static_cast<double>(bufferSize)) / RING_BUFFER_SIZE;
180 if (bufferSize >= WATER_LINE && !executor->aboveWaterline_) {
181 executor->aboveWaterline_ = true;
182 MEDIA_LOG_I("Send http aboveWaterline event, ringbuffer ratio %" PUBLIC_LOG_F, ratio);
183 executor->callback_->OnEvent({PluginEventType::ABOVE_LOW_WATERLINE, {ratio}, "http"});
184 } else if (bufferSize < WATER_LINE && executor->aboveWaterline_) {
185 executor->aboveWaterline_ = false;
186 MEDIA_LOG_I("Send http belowWaterline event, ringbuffer ratio %" PUBLIC_LOG_F, ratio);
187 executor->callback_->OnEvent({PluginEventType::BELOW_LOW_WATERLINE, {ratio}, "http"});
188 }
189
190 return dataLen;
191 }
192
193 namespace {
StringTrim(char * str)194 char *StringTrim(char *str)
195 {
196 if (str == nullptr) {
197 return nullptr;
198 }
199 char *p = str;
200 char *p1 = p + strlen(str) - 1;
201
202 while (*p && isspace(static_cast<int>(*p))) {
203 p++;
204 }
205 while (p1 > p && isspace(static_cast<int>(*p1))) {
206 *p1-- = 0;
207 }
208 return p;
209 }
210 }
211
RxHeaderData(void * buffer,size_t size,size_t nitems,void * userParam)212 size_t StreamingExecutor::RxHeaderData(void *buffer, size_t size, size_t nitems, void *userParam)
213 {
214 auto executor = reinterpret_cast<StreamingExecutor *>(userParam);
215 HeaderInfo *info = &(executor->headerInfo_);
216 char *key = strtok(reinterpret_cast<char *>(buffer), ":");
217 FALSE_RETURN_V(key != nullptr, size * nitems);
218 if (!strncmp(key, "Content-Type", strlen("Content-Type"))) {
219 char *token = strtok(NULL, ":");
220 FALSE_RETURN_V(token != nullptr, size * nitems);
221 char *type = StringTrim(token);
222 memcpy_s(info->contentType, sizeof(info->contentType), type, sizeof(info->contentType));
223 }
224
225 if (!strncmp(key, "Content-Length", strlen("Content-Length")) ||
226 !strncmp(key, "content-length", strlen("content-length"))) {
227 char *token = strtok(NULL, ":");
228 FALSE_RETURN_V(token != nullptr, size * nitems);
229 char *contLen = StringTrim(token);
230 info->contentLen = atol(contLen);
231 }
232
233 if (!strncmp(key, "Transfer-Encoding", strlen("Transfer-Encoding")) ||
234 !strncmp(key, "transfer-encoding", strlen("transfer-encoding"))) {
235 char *token = strtok(NULL, ":");
236 FALSE_RETURN_V(token != nullptr, size * nitems);
237 char *transEncode = StringTrim(token);
238 if (!strncmp(transEncode, "chunked", strlen("chunked"))) {
239 info->isChunked = true;
240 }
241 }
242
243 if (!strncmp(key, "Content-Range", strlen("Content-Range")) ||
244 !strncmp(key, "content-range", strlen("content-range"))) {
245 char *token = strtok(NULL, ":");
246 FALSE_RETURN_V(token != nullptr, size * nitems);
247 char *strRange = StringTrim(token);
248 long start, end, fileLen;
249 FALSE_LOG_MSG_E(sscanf_s(strRange, "bytes %ld-%ld/%ld", &start, &end, &fileLen) != -1,
250 "sscanf get range failed");
251 if (info->fileContentLen > 0 && info->fileContentLen != fileLen) {
252 MEDIA_LOG_E("FileContentLen doesn't equal to fileLen");
253 }
254 if (info->fileContentLen == 0) {
255 info->fileContentLen = fileLen;
256 }
257 }
258 return size * nitems;
259 }
260 }
261 }
262 }
263 }