1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <iostream>
17 #include <curl/curl.h>
18
19 #include "http_client.h"
20 #include "netstack_log.h"
21
22 namespace OHOS {
23 namespace NetStack {
24 namespace HttpClient {
25 std::atomic_bool HttpSession::runThread_(false);
26 std::mutex HttpSession::curlMultiMutex_;
27 CURLM *HttpSession::curlMulti_;
28 std::atomic_bool HttpSession::initialized_(false);
29 std::thread HttpSession::workThread_;
30 std::condition_variable HttpSession::conditionVariable_;
31 std::mutex HttpSession::taskQueueMutex_;
32 std::priority_queue<std::shared_ptr<HttpClientTask>, std::vector<std::shared_ptr<HttpClientTask>>,
33 HttpSession::CompareTasks>
34 HttpSession::taskQueue_;
35 HttpSession HttpSession::instance_;
36
37 static constexpr int CURL_MAX_WAIT_MSECS = 10;
38 static constexpr int CURL_TIMEOUT_MS = 50;
39 static constexpr int CONDITION_TIMEOUT_S = 3600;
40
~HttpSession()41 HttpSession::~HttpSession()
42 {
43 Deinit();
44 }
45
ExecRequest()46 void HttpSession::ExecRequest()
47 {
48 AddRequestInfo();
49 RequestAndResponse();
50 }
51
AddRequestInfo()52 void HttpSession::AddRequestInfo()
53 {
54 std::shared_ptr<HttpClientTask> task = nullptr;
55 std::lock_guard<std::mutex> lock(taskQueueMutex_);
56 NETSTACK_LOGD("HttpSession::AddRequestInfo() start");
57
58 while (!taskQueue_.empty()) {
59 task = taskQueue_.top();
60 taskQueue_.pop();
61
62 if (task != nullptr) {
63 NETSTACK_LOGD("taskQueue_ read GetTaskId : %{public}d", task->GetTaskId());
64 std::lock_guard<std::mutex> guard(curlMultiMutex_);
65 if (nullptr == curlMulti_) {
66 NETSTACK_LOGE("HttpSession::AddRequestInfo() curlMulti_ is nullptr");
67 StopTask(task);
68 return;
69 }
70 auto ret = curl_multi_add_handle(curlMulti_, task->GetCurlHandle());
71 if (ret != CURLM_OK) {
72 NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d", ret);
73 StopTask(task);
74 return;
75 }
76 }
77
78 task = nullptr;
79 }
80 }
81
RequestAndResponse()82 void HttpSession::RequestAndResponse()
83 {
84 int runningHandle = 0;
85 NETSTACK_LOGD("HttpSession::RequestAndResponse() start");
86
87 do {
88 if (runningHandle > 0) {
89 AddRequestInfo();
90 }
91 std::lock_guard<std::mutex> guard(curlMultiMutex_);
92 if (!runThread_ || curlMulti_ == nullptr) {
93 NETSTACK_LOGE("RequestAndResponse() runThread_ or curlMulti_ nullptr");
94 break;
95 }
96
97 // send request
98 auto ret = curl_multi_perform(curlMulti_, &runningHandle);
99 if (ret != CURLM_OK) {
100 NETSTACK_LOGE("curl_multi_perform() error! ret = %{public}d", ret);
101 continue;
102 }
103
104 // wait for response
105 ret = curl_multi_poll(curlMulti_, nullptr, 0, CURL_MAX_WAIT_MSECS, nullptr);
106 if (ret != CURLM_OK) {
107 NETSTACK_LOGE("curl_multi_poll() error! ret = %{public}d", ret);
108 continue;
109 }
110
111 // read response
112 ReadResponse();
113 } while (runningHandle > 0);
114
115 NETSTACK_LOGD("HttpSession::RequestAndResponse() end");
116 }
117
ReadResponse()118 void HttpSession::ReadResponse()
119 {
120 struct CURLMsg *m;
121 do {
122 int msgq = 0;
123 m = curl_multi_info_read(curlMulti_, &msgq);
124 if (m) {
125 NETSTACK_LOGI("curl_multi_info_read() m->msg = %{public}d", m->msg);
126 if (m->msg != CURLMSG_DONE) {
127 continue;
128 }
129 auto task = GetTaskByCurlHandle(m->easy_handle);
130 if (task) {
131 task->ProcessResponse(m);
132 }
133 curl_multi_remove_handle(curlMulti_, m->easy_handle);
134 StopTask(task);
135 }
136 } while (m);
137 }
138
RunThread()139 void HttpSession::RunThread()
140 {
141 NETSTACK_LOGI("RunThread start runThread_ = %{public}s", runThread_ ? "true" : "false");
142 while (runThread_) {
143 HttpSession::GetInstance().ExecRequest();
144 std::this_thread::sleep_for(std::chrono::milliseconds(CURL_TIMEOUT_MS));
145 NETSTACK_LOGD("RunThread in loop runThread_ = %{public}s", runThread_ ? "true" : "false");
146 std::unique_lock<std::mutex> lock(taskQueueMutex_);
147 conditionVariable_.wait_for(lock, std::chrono::seconds(CONDITION_TIMEOUT_S), [] {
148 NETSTACK_LOGD("RunThread in loop wait_for taskQueue_.empty() = %{public}d runThread_ = %{public}s",
149 taskQueue_.empty(), runThread_ ? "true" : "false");
150 return !taskQueue_.empty() || !runThread_;
151 });
152 }
153
154 NETSTACK_LOGI("RunThread exit()");
155 }
156
Init()157 bool HttpSession::Init()
158 {
159 if (!initialized_) {
160 NETSTACK_LOGD("HttpSession::Init");
161
162 std::lock_guard<std::mutex> lock(taskQueueMutex_);
163
164 if (curl_global_init(CURL_GLOBAL_ALL) != CURLE_OK) {
165 NETSTACK_LOGE("Failed to initialize 'curl'");
166 return false;
167 }
168
169 std::lock_guard<std::mutex> guard(curlMultiMutex_);
170 curlMulti_ = curl_multi_init();
171 if (curlMulti_ == nullptr) {
172 NETSTACK_LOGE("Failed to initialize 'curl_multi'");
173 return false;
174 }
175
176 workThread_ = std::thread(RunThread);
177 runThread_ = true;
178 initialized_ = true;
179 }
180
181 return true;
182 }
183
IsInited()184 bool HttpSession::IsInited()
185 {
186 return initialized_;
187 }
188
Deinit()189 void HttpSession::Deinit()
190 {
191 NETSTACK_LOGD("HttpSession::Deinit");
192 if (!initialized_) {
193 NETSTACK_LOGD("HttpSession::Deinit not initialized");
194 return;
195 }
196
197 std::unique_lock<std::mutex> lock(taskQueueMutex_);
198 runThread_ = false;
199 conditionVariable_.notify_all();
200 lock.unlock();
201 if (workThread_.joinable()) {
202 NETSTACK_LOGD("HttpSession::Deinit workThread_.join()");
203 workThread_.join();
204 }
205
206 std::lock_guard<std::mutex> guard(curlMultiMutex_);
207 if (curlMulti_ != nullptr) {
208 NETSTACK_LOGD("Deinit curl_multi_cleanup()");
209 curl_multi_cleanup(curlMulti_);
210 curlMulti_ = nullptr;
211 }
212
213 NETSTACK_LOGD("Deinit curl_global_cleanup()");
214 curl_global_cleanup();
215
216 initialized_ = false;
217
218 std::this_thread::sleep_for(std::chrono::milliseconds(CURL_TIMEOUT_MS));
219 }
220
CreateTask(const HttpClientRequest & request)221 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request)
222 {
223 std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request);
224 if (ptr->GetCurlHandle() == nullptr) {
225 NETSTACK_LOGE("CreateTask A error!");
226 return nullptr;
227 }
228
229 return ptr;
230 }
231
CreateTask(const HttpClientRequest & request,TaskType type,const std::string & filePath)232 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request, TaskType type,
233 const std::string &filePath)
234 {
235 std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request, type, filePath);
236 if (ptr->GetCurlHandle() == nullptr) {
237 NETSTACK_LOGE("CreateTask B error!");
238 return nullptr;
239 }
240
241 return ptr;
242 }
243
StartTask(std::shared_ptr<HttpClientTask> ptr)244 void HttpSession::StartTask(std::shared_ptr<HttpClientTask> ptr)
245 {
246 if (nullptr == ptr) {
247 NETSTACK_LOGE("HttpSession::StartTask shared_ptr = nullptr! Error!");
248 return;
249 }
250 NETSTACK_LOGD("HttpSession::StartTask taskId = %{public}d", ptr->GetTaskId());
251
252 if (!IsInited()) {
253 Init();
254 }
255
256 std::lock_guard<std::mutex> lock(taskQueueMutex_);
257 std::lock_guard<std::mutex> guard(taskMapMutex_);
258 ptr->SetStatus(TaskStatus::RUNNING);
259 taskIdMap_[ptr->GetTaskId()] = ptr;
260 curlTaskMap_[ptr->GetCurlHandle()] = ptr;
261 taskQueue_.push(ptr);
262 conditionVariable_.notify_all();
263 }
264
StopTask(std::shared_ptr<HttpClientTask> ptr)265 void HttpSession::StopTask(std::shared_ptr<HttpClientTask> ptr)
266 {
267 if (nullptr == ptr) {
268 NETSTACK_LOGE("HttpSession::StopTask shared_ptr = nullptr! Error!");
269 return;
270 }
271
272 NETSTACK_LOGD("HttpSession::StopTask taskId = %{public}d", ptr->GetTaskId());
273 std::lock_guard<std::mutex> lock(taskQueueMutex_);
274 std::lock_guard<std::mutex> guard(taskMapMutex_);
275 ptr->SetStatus(TaskStatus::IDLE);
276
277 if (ptr->GetCurlHandle() != nullptr) {
278 curlTaskMap_.erase(ptr->GetCurlHandle());
279 }
280 taskIdMap_.erase(ptr->GetTaskId());
281 conditionVariable_.notify_all();
282 }
283
GetTaskById(uint32_t taskId)284 std::shared_ptr<HttpClientTask> HttpSession::GetTaskById(uint32_t taskId)
285 {
286 std::lock_guard<std::mutex> guard(taskMapMutex_);
287 auto iter = taskIdMap_.find(taskId);
288 if (iter != taskIdMap_.end()) {
289 return iter->second;
290 } else {
291 return nullptr;
292 }
293 }
294
GetTaskByCurlHandle(CURL * curlHandle)295 std::shared_ptr<HttpClientTask> HttpSession::GetTaskByCurlHandle(CURL *curlHandle)
296 {
297 std::lock_guard<std::mutex> guard(taskMapMutex_);
298 auto iter = curlTaskMap_.find(curlHandle);
299 if (iter != curlTaskMap_.end()) {
300 return iter->second;
301 } else {
302 return nullptr;
303 }
304 }
305
306 } // namespace HttpClient
307 } // namespace NetStack
308 } // namespace OHOS