1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <iostream>
17 #include <functional>
18 #include <atomic>
19 #include <curl/curl.h>
20
21 #include "http_client.h"
22 #include "netstack_log.h"
23
24 namespace OHOS {
25 namespace NetStack {
26 namespace HttpClient {
27
28 static constexpr int CURL_MAX_WAIT_MSECS = 10;
29 static constexpr int CURL_TIMEOUT_MS = 50;
30 static constexpr int CONDITION_TIMEOUT_S = 3600;
31 static constexpr int TASK_MAXNUM_PER_TIME = 30;
32
33 class HttpGlobal {
34 public:
HttpGlobal()35 HttpGlobal()
36 {
37 if (curl_global_init(CURL_GLOBAL_ALL) != CURLE_OK) {
38 NETSTACK_LOGE("Failed to initialize 'curl'");
39 }
40 }
~HttpGlobal()41 ~HttpGlobal()
42 {
43 NETSTACK_LOGD("Deinit curl_global_cleanup()");
44 curl_global_cleanup();
45 }
46 };
47 static HttpGlobal g_httpGlobal;
48
HttpSession()49 HttpSession::HttpSession() : curlMulti_(nullptr),
50 initialized_(false),
51 runThread_(false) {
52 }
53
~HttpSession()54 HttpSession::~HttpSession()
55 {
56 NETSTACK_LOGD("~HttpSession::enter");
57 Deinit();
58 }
59
GetInstance()60 HttpSession &HttpSession::GetInstance()
61 {
62 static HttpSession gInstance;
63 return gInstance;
64 }
65
AddRequestInfo()66 void HttpSession::AddRequestInfo()
67 {
68 NETSTACK_LOGD("Enter:");
69 std::unique_lock<std::mutex> lock(taskQueueMutex_);
70 int pop_num = 0;
71 while (!taskQueue_.empty() && pop_num <= TASK_MAXNUM_PER_TIME) {
72 std::shared_ptr<HttpClientTask> task = taskQueue_.top();
73 taskQueue_.pop();
74 ++pop_num;
75 lock.unlock();
76
77 if (task != nullptr) {
78 NETSTACK_LOGD("taskQueue_ read GetTaskId : %{public}d", task->GetTaskId());
79 std::lock_guard<std::mutex> guard(curlMultiMutex_);
80 if (nullptr == curlMulti_) {
81 NETSTACK_LOGE("curlMulti_ is nullptr");
82 StopTask(task);
83 break;
84 }
85 auto ret = curl_multi_add_handle(curlMulti_, task->GetCurlHandle());
86 if (ret != CURLM_OK) {
87 NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d", ret);
88 StopTask(task);
89 }
90 }
91 lock.lock();
92 }
93 NETSTACK_LOGD("Add %{public}d tasks, Exit", pop_num);
94 }
95
PerformRequest(int & runningHandle)96 CURLMcode HttpSession::PerformRequest(int &runningHandle)
97 {
98 CURLMcode ret = CURLM_LAST;
99 std::unique_lock<std::mutex> lock(curlMultiMutex_);
100 if (curlMulti_ == nullptr) {
101 NETSTACK_LOGE("curlMulti_ is nullptr");
102 return ret;
103 }
104
105 // send request
106 ret = curl_multi_perform(curlMulti_, &runningHandle);
107 if (ret != CURLM_OK) {
108 NETSTACK_LOGE("curl_multi_perform() error! ret = %{public}d", ret);
109 return ret;
110 }
111
112 // wait for response
113 ret = curl_multi_poll(curlMulti_, nullptr, 0, CURL_MAX_WAIT_MSECS, nullptr);
114 if (ret != CURLM_OK) {
115 NETSTACK_LOGE("curl_multi_poll() error! ret = %{public}d", ret);
116 return ret;
117 }
118 return ret;
119 }
120
RequestAndResponse()121 void HttpSession::RequestAndResponse()
122 {
123 NETSTACK_LOGD("HttpSession::RequestAndResponse() start");
124
125 CURLMcode ret = CURLM_LAST;
126 int runningHandle = 0;
127 do {
128 AddRequestInfo();
129 ret = PerformRequest(runningHandle);
130 if (ret == CURLM_OK) {
131 ReadResponse();
132 } else {
133 // others are error
134 }
135 // read response
136 } while (runningHandle > 0 && runThread_);
137 NETSTACK_LOGD("HttpSession::RequestAndResponse() end");
138 }
139
ReadResponse()140 void HttpSession::ReadResponse()
141 {
142 std::unique_lock<std::mutex> lock(curlMultiMutex_);
143 struct CURLMsg *m = nullptr;
144 do {
145 int msgq = 0;
146 m = curl_multi_info_read(curlMulti_, &msgq);
147 if (m) {
148 NETSTACK_LOGD("curl_multi_info_read() m->msg = %{public}d", m->msg);
149 if (m->msg != CURLMSG_DONE) {
150 NETSTACK_LOGD("curl_multi_info_read failed, m->msg = %{public}d", m->msg);
151 continue;
152 }
153 curl_multi_remove_handle(curlMulti_, m->easy_handle);
154 lock.unlock();
155 auto task = GetTaskByCurlHandle(m->easy_handle);
156 if (task) {
157 task->ProcessResponse(m);
158 }
159 StopTask(task);
160 lock.lock();
161 }
162 } while (m && runThread_);
163 }
164
RunThread()165 void HttpSession::RunThread()
166 {
167 NETSTACK_LOGD("RunThread start runThread_ = %{public}s", runThread_ ? "true" : "false");
168
169 while (runThread_) {
170 RequestAndResponse();
171 NETSTACK_LOGD("RunThread in loop runThread_ = %{public}s",
172 runThread_ ? "true" : "false");
173
174 std::function<bool()> f = [this]() -> bool {
175 NETSTACK_LOGD("RunThread in loop wait_for taskQueue_.empty() = %{public}d runThread_ = %{public}s",
176 !taskQueue_.empty(), runThread_ ? "true" : "false");
177 return !taskQueue_.empty() || !runThread_;
178 };
179 std::unique_lock<std::mutex> lock(taskMutex_);
180 conditionVariable_.wait_for(lock, std::chrono::seconds(CONDITION_TIMEOUT_S), f);
181 }
182
183 NETSTACK_LOGD("RunThread exit()");
184 }
185
Init()186 bool HttpSession::Init()
187 {
188 std::lock_guard<std::mutex> guarder(initMutex_);
189 NETSTACK_LOGD("HttpSession::Init enter");
190 if (!initialized_) {
191 NETSTACK_LOGD("HttpSession::Init");
192
193 std::lock_guard<std::mutex> guard(curlMultiMutex_);
194 curlMulti_ = curl_multi_init();
195 if (curlMulti_ == nullptr) {
196 NETSTACK_LOGE("Failed to initialize 'curl_multi'");
197 return false;
198 }
199 initialized_ = true;
200 runThread_ = true;
201 auto f = std::bind(&HttpSession::RunThread, this);
202 workThread_ = std::thread(f);
203 }
204 return true;
205 }
206
Deinit()207 void HttpSession::Deinit()
208 {
209 NETSTACK_LOGD("HttpSession::Deinit");
210
211 std::lock_guard<std::mutex> guard(initMutex_);
212 if (!initialized_) {
213 NETSTACK_LOGE("HttpSession::Deinit not initialized");
214 return;
215 }
216 runThread_ = false;
217 do {
218 std::unique_lock<std::mutex> lock(taskMutex_);
219 conditionVariable_.notify_all();
220 } while (0);
221
222 if (workThread_.joinable()) {
223 NETSTACK_LOGD("HttpSession::Deinit workThread_.join()");
224 workThread_.join();
225 }
226
227 do {
228 std::lock_guard<std::mutex> guard(taskMapMutex_);
229 std::lock_guard<std::mutex> lock(taskQueueMutex_);
230 curlTaskMap_.clear();
231 taskIdMap_.clear();
232 while (!taskQueue_.empty()) {
233 taskQueue_.pop();
234 }
235 } while (0);
236
237 do {
238 std::lock_guard<std::mutex> guard(curlMultiMutex_);
239 if (curlMulti_ != nullptr) {
240 NETSTACK_LOGD("Deinit curl_multi_cleanup()");
241 curl_multi_cleanup(curlMulti_);
242 curlMulti_ = nullptr;
243 }
244 } while (0);
245
246 initialized_ = false;
247 std::this_thread::sleep_for(std::chrono::milliseconds(CURL_TIMEOUT_MS));
248 }
249
CreateTask(const HttpClientRequest & request)250 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request)
251 {
252 std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request);
253 if (ptr->GetCurlHandle() == nullptr) {
254 NETSTACK_LOGE("CreateTask A error!");
255 return nullptr;
256 }
257
258 return ptr;
259 }
260
CreateTask(const HttpClientRequest & request,TaskType type,const std::string & filePath)261 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request, TaskType type,
262 const std::string &filePath)
263 {
264 std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request, type, filePath);
265 if (ptr->GetCurlHandle() == nullptr) {
266 NETSTACK_LOGE("CreateTask B error!");
267 return nullptr;
268 }
269 return ptr;
270 }
271
ResumTask()272 void HttpSession::ResumTask()
273 {
274 std::lock_guard<std::mutex> lock(taskMutex_);
275 conditionVariable_.notify_all();
276 }
277
StartTask(std::shared_ptr<HttpClientTask> ptr)278 void HttpSession::StartTask(std::shared_ptr<HttpClientTask> ptr)
279 {
280 if (nullptr == ptr) {
281 NETSTACK_LOGE("HttpSession::StartTask shared_ptr = nullptr! Error!");
282 return;
283 }
284
285 Init();
286
287 /* add task to queue */
288 do {
289 std::lock_guard<std::mutex> taskGuard(taskMapMutex_);
290 std::lock_guard<std::mutex> lock(taskQueueMutex_);
291 taskQueue_.push(ptr);
292 taskIdMap_[ptr->GetTaskId()] = ptr;
293 curlTaskMap_[ptr->GetCurlHandle()] = ptr;
294 ptr->SetStatus(TaskStatus::RUNNING);
295 } while (0);
296
297 ResumTask();
298 }
299
StopTask(std::shared_ptr<HttpClientTask> ptr)300 void HttpSession::StopTask(std::shared_ptr<HttpClientTask> ptr)
301 {
302 std::lock_guard<std::mutex> guard(taskMapMutex_);
303 if (nullptr == ptr) {
304 NETSTACK_LOGE("HttpSession::StopTask shared_ptr = nullptr! Error!");
305 return;
306 }
307 NETSTACK_LOGD("HttpSession::StopTask taskId = %{public}d", ptr->GetTaskId());
308
309 ptr->SetStatus(TaskStatus::IDLE);
310 if (ptr->GetCurlHandle() != nullptr) {
311 curlTaskMap_.erase(ptr->GetCurlHandle());
312 }
313 taskIdMap_.erase(ptr->GetTaskId());
314 ResumTask();
315 }
316
GetTaskById(uint32_t taskId)317 std::shared_ptr<HttpClientTask> HttpSession::GetTaskById(uint32_t taskId)
318 {
319 std::lock_guard<std::mutex> guard(taskMapMutex_);
320 auto iter = taskIdMap_.find(taskId);
321 if (iter != taskIdMap_.end()) {
322 return iter->second;
323 } else {
324 return nullptr;
325 }
326 }
327
GetTaskByCurlHandle(CURL * curlHandle)328 std::shared_ptr<HttpClientTask> HttpSession::GetTaskByCurlHandle(CURL *curlHandle)
329 {
330 std::lock_guard<std::mutex> guard(taskMapMutex_);
331 auto iter = curlTaskMap_.find(curlHandle);
332 if (iter != curlTaskMap_.end()) {
333 return iter->second;
334 } else {
335 return nullptr;
336 }
337 }
338
339 } // namespace HttpClient
340 } // namespace NetStack
341 } // namespace OHOS
342