• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <iostream>
17 #include <functional>
18 #include <atomic>
19 #include <curl/curl.h>
20 
21 #include "http_client.h"
22 #include "netstack_log.h"
23 
24 namespace OHOS {
25 namespace NetStack {
26 namespace HttpClient {
27 
28 static constexpr int CURL_MAX_WAIT_MSECS = 10;
29 static constexpr int CURL_TIMEOUT_MS = 50;
30 static constexpr int CONDITION_TIMEOUT_S = 3600;
31 static constexpr int TASK_MAXNUM_PER_TIME = 30;
32 
33 class HttpGlobal {
34 public:
HttpGlobal()35     HttpGlobal()
36     {
37         if (curl_global_init(CURL_GLOBAL_ALL) != CURLE_OK) {
38             NETSTACK_LOGE("Failed to initialize 'curl'");
39         }
40     }
~HttpGlobal()41     ~HttpGlobal()
42     {
43         NETSTACK_LOGD("Deinit curl_global_cleanup()");
44         curl_global_cleanup();
45     }
46 };
47 static HttpGlobal g_httpGlobal;
48 
HttpSession()49 HttpSession::HttpSession() : curlMulti_(nullptr),
50     initialized_(false),
51     runThread_(false) {
52 }
53 
~HttpSession()54 HttpSession::~HttpSession()
55 {
56     NETSTACK_LOGD("~HttpSession::enter");
57     Deinit();
58 }
59 
GetInstance()60 HttpSession &HttpSession::GetInstance()
61 {
62     static HttpSession gInstance;
63     return gInstance;
64 }
65 
AddRequestInfo()66 void HttpSession::AddRequestInfo()
67 {
68     NETSTACK_LOGD("Enter:");
69     std::unique_lock<std::mutex> lock(taskQueueMutex_);
70     int pop_num = 0;
71     while (!taskQueue_.empty() && pop_num <= TASK_MAXNUM_PER_TIME) {
72         std::shared_ptr<HttpClientTask> task = taskQueue_.top();
73         taskQueue_.pop();
74         ++pop_num;
75         lock.unlock();
76 
77         if (task != nullptr) {
78             NETSTACK_LOGD("taskQueue_ read GetTaskId : %{public}d", task->GetTaskId());
79             std::lock_guard<std::mutex> guard(curlMultiMutex_);
80             if (nullptr == curlMulti_) {
81                 NETSTACK_LOGE("curlMulti_ is nullptr");
82                 StopTask(task);
83                 break;
84             }
85             auto ret = curl_multi_add_handle(curlMulti_, task->GetCurlHandle());
86             if (ret != CURLM_OK) {
87                 NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d", ret);
88                 StopTask(task);
89             }
90         }
91         lock.lock();
92     }
93     NETSTACK_LOGD("Add %{public}d tasks, Exit", pop_num);
94 }
95 
PerformRequest(int & runningHandle)96 CURLMcode HttpSession::PerformRequest(int &runningHandle)
97 {
98     CURLMcode ret = CURLM_LAST;
99     std::unique_lock<std::mutex> lock(curlMultiMutex_);
100     if (curlMulti_ == nullptr) {
101         NETSTACK_LOGE("curlMulti_ is nullptr");
102         return ret;
103     }
104 
105     // send request
106     ret = curl_multi_perform(curlMulti_, &runningHandle);
107     if (ret != CURLM_OK) {
108         NETSTACK_LOGE("curl_multi_perform() error! ret = %{public}d", ret);
109         return ret;
110     }
111 
112     // wait for response
113     ret = curl_multi_poll(curlMulti_, nullptr, 0, CURL_MAX_WAIT_MSECS, nullptr);
114     if (ret != CURLM_OK) {
115         NETSTACK_LOGE("curl_multi_poll() error! ret = %{public}d", ret);
116         return ret;
117     }
118     return ret;
119 }
120 
RequestAndResponse()121 void HttpSession::RequestAndResponse()
122 {
123     NETSTACK_LOGD("HttpSession::RequestAndResponse() start");
124 
125     CURLMcode ret = CURLM_LAST;
126     int runningHandle = 0;
127     do {
128         AddRequestInfo();
129         ret = PerformRequest(runningHandle);
130         if (ret == CURLM_OK) {
131             ReadResponse();
132         } else {
133             // others are error
134         }
135         // read response
136     } while (runningHandle > 0 && runThread_);
137     NETSTACK_LOGD("HttpSession::RequestAndResponse() end");
138 }
139 
ReadResponse()140 void HttpSession::ReadResponse()
141 {
142     std::unique_lock<std::mutex> lock(curlMultiMutex_);
143     struct CURLMsg *m = nullptr;
144     do {
145         int msgq = 0;
146         m = curl_multi_info_read(curlMulti_, &msgq);
147         if (m) {
148             NETSTACK_LOGD("curl_multi_info_read() m->msg = %{public}d", m->msg);
149             if (m->msg != CURLMSG_DONE) {
150                 NETSTACK_LOGD("curl_multi_info_read failed, m->msg = %{public}d", m->msg);
151                 continue;
152             }
153             curl_multi_remove_handle(curlMulti_, m->easy_handle);
154             lock.unlock();
155             auto task = GetTaskByCurlHandle(m->easy_handle);
156             if (task) {
157                 task->ProcessResponse(m);
158             }
159             StopTask(task);
160             lock.lock();
161         }
162     } while (m && runThread_);
163 }
164 
RunThread()165 void HttpSession::RunThread()
166 {
167     NETSTACK_LOGD("RunThread start runThread_ = %{public}s", runThread_ ? "true" : "false");
168 
169     while (runThread_) {
170         RequestAndResponse();
171         NETSTACK_LOGD("RunThread in loop runThread_ = %{public}s",
172             runThread_ ? "true" : "false");
173 
174         std::function<bool()> f = [this]() -> bool {
175             NETSTACK_LOGD("RunThread in loop wait_for taskQueue_.empty() = %{public}d runThread_ = %{public}s",
176                 !taskQueue_.empty(), runThread_ ? "true" : "false");
177             return !taskQueue_.empty() || !runThread_;
178         };
179         std::unique_lock<std::mutex> lock(taskMutex_);
180         conditionVariable_.wait_for(lock, std::chrono::seconds(CONDITION_TIMEOUT_S), f);
181     }
182 
183     NETSTACK_LOGD("RunThread exit()");
184 }
185 
Init()186 bool HttpSession::Init()
187 {
188     std::lock_guard<std::mutex> guarder(initMutex_);
189     NETSTACK_LOGD("HttpSession::Init enter");
190     if (!initialized_) {
191         NETSTACK_LOGD("HttpSession::Init");
192 
193         std::lock_guard<std::mutex> guard(curlMultiMutex_);
194         curlMulti_ = curl_multi_init();
195         if (curlMulti_ == nullptr) {
196             NETSTACK_LOGE("Failed to initialize 'curl_multi'");
197             return false;
198         }
199         initialized_ = true;
200         runThread_ = true;
201         auto f = std::bind(&HttpSession::RunThread, this);
202         workThread_ = std::thread(f);
203     }
204     return true;
205 }
206 
Deinit()207 void HttpSession::Deinit()
208 {
209     NETSTACK_LOGD("HttpSession::Deinit");
210 
211     std::lock_guard<std::mutex> guard(initMutex_);
212     if (!initialized_) {
213         NETSTACK_LOGE("HttpSession::Deinit not initialized");
214         return;
215     }
216     runThread_ = false;
217     do {
218         std::unique_lock<std::mutex> lock(taskMutex_);
219         conditionVariable_.notify_all();
220     } while (0);
221 
222     if (workThread_.joinable()) {
223         NETSTACK_LOGD("HttpSession::Deinit workThread_.join()");
224         workThread_.join();
225     }
226 
227     do {
228         std::lock_guard<std::mutex> guard(taskMapMutex_);
229         std::lock_guard<std::mutex> lock(taskQueueMutex_);
230         curlTaskMap_.clear();
231         taskIdMap_.clear();
232         while (!taskQueue_.empty()) {
233             taskQueue_.pop();
234         }
235     } while (0);
236 
237     do {
238         std::lock_guard<std::mutex> guard(curlMultiMutex_);
239         if (curlMulti_ != nullptr) {
240             NETSTACK_LOGD("Deinit curl_multi_cleanup()");
241             curl_multi_cleanup(curlMulti_);
242             curlMulti_ = nullptr;
243         }
244     } while (0);
245 
246     initialized_ = false;
247     std::this_thread::sleep_for(std::chrono::milliseconds(CURL_TIMEOUT_MS));
248 }
249 
CreateTask(const HttpClientRequest & request)250 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request)
251 {
252     std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request);
253     if (ptr->GetCurlHandle() == nullptr) {
254         NETSTACK_LOGE("CreateTask A error!");
255         return nullptr;
256     }
257 
258     return ptr;
259 }
260 
CreateTask(const HttpClientRequest & request,TaskType type,const std::string & filePath)261 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request, TaskType type,
262                                                         const std::string &filePath)
263 {
264     std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request, type, filePath);
265     if (ptr->GetCurlHandle() == nullptr) {
266         NETSTACK_LOGE("CreateTask B error!");
267         return nullptr;
268     }
269     return ptr;
270 }
271 
ResumTask()272 void HttpSession::ResumTask()
273 {
274     std::lock_guard<std::mutex> lock(taskMutex_);
275     conditionVariable_.notify_all();
276 }
277 
StartTask(std::shared_ptr<HttpClientTask> ptr)278 void HttpSession::StartTask(std::shared_ptr<HttpClientTask> ptr)
279 {
280     if (nullptr == ptr) {
281         NETSTACK_LOGE("HttpSession::StartTask  shared_ptr = nullptr! Error!");
282         return;
283     }
284 
285     Init();
286 
287     /* add task to queue */
288     do {
289         std::lock_guard<std::mutex> taskGuard(taskMapMutex_);
290         std::lock_guard<std::mutex> lock(taskQueueMutex_);
291         taskQueue_.push(ptr);
292         taskIdMap_[ptr->GetTaskId()] = ptr;
293         curlTaskMap_[ptr->GetCurlHandle()] = ptr;
294         ptr->SetStatus(TaskStatus::RUNNING);
295     } while (0);
296 
297     ResumTask();
298 }
299 
StopTask(std::shared_ptr<HttpClientTask> ptr)300 void HttpSession::StopTask(std::shared_ptr<HttpClientTask> ptr)
301 {
302     std::lock_guard<std::mutex> guard(taskMapMutex_);
303     if (nullptr == ptr) {
304         NETSTACK_LOGE("HttpSession::StopTask  shared_ptr = nullptr! Error!");
305         return;
306     }
307     NETSTACK_LOGD("HttpSession::StopTask taskId = %{public}d", ptr->GetTaskId());
308 
309     ptr->SetStatus(TaskStatus::IDLE);
310     if (ptr->GetCurlHandle() != nullptr) {
311         curlTaskMap_.erase(ptr->GetCurlHandle());
312     }
313     taskIdMap_.erase(ptr->GetTaskId());
314     ResumTask();
315 }
316 
GetTaskById(uint32_t taskId)317 std::shared_ptr<HttpClientTask> HttpSession::GetTaskById(uint32_t taskId)
318 {
319     std::lock_guard<std::mutex> guard(taskMapMutex_);
320     auto iter = taskIdMap_.find(taskId);
321     if (iter != taskIdMap_.end()) {
322         return iter->second;
323     } else {
324         return nullptr;
325     }
326 }
327 
GetTaskByCurlHandle(CURL * curlHandle)328 std::shared_ptr<HttpClientTask> HttpSession::GetTaskByCurlHandle(CURL *curlHandle)
329 {
330     std::lock_guard<std::mutex> guard(taskMapMutex_);
331     auto iter = curlTaskMap_.find(curlHandle);
332     if (iter != curlTaskMap_.end()) {
333         return iter->second;
334     } else {
335         return nullptr;
336     }
337 }
338 
339 } // namespace HttpClient
340 } // namespace NetStack
341 } // namespace OHOS
342