• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <iostream>
17 #include <curl/curl.h>
18 
19 #include "http_client.h"
20 #include "netstack_log.h"
21 
22 namespace OHOS {
23 namespace NetStack {
24 namespace HttpClient {
25 std::atomic_bool HttpSession::runThread_(false);
26 std::mutex HttpSession::curlMultiMutex_;
27 CURLM *HttpSession::curlMulti_;
28 std::atomic_bool HttpSession::initialized_(false);
29 std::thread HttpSession::workThread_;
30 std::condition_variable HttpSession::conditionVariable_;
31 std::mutex HttpSession::taskQueueMutex_;
32 std::priority_queue<std::shared_ptr<HttpClientTask>, std::vector<std::shared_ptr<HttpClientTask>>,
33                     HttpSession::CompareTasks>
34     HttpSession::taskQueue_;
35 HttpSession HttpSession::instance_;
36 
37 static constexpr int CURL_MAX_WAIT_MSECS = 10;
38 static constexpr int CURL_TIMEOUT_MS = 50;
39 static constexpr int CONDITION_TIMEOUT_S = 3600;
40 
~HttpSession()41 HttpSession::~HttpSession()
42 {
43     Deinit();
44 }
45 
ExecRequest()46 void HttpSession::ExecRequest()
47 {
48     AddRequestInfo();
49     RequestAndResponse();
50 }
51 
AddRequestInfo()52 void HttpSession::AddRequestInfo()
53 {
54     std::shared_ptr<HttpClientTask> task = nullptr;
55     std::lock_guard<std::mutex> lock(taskQueueMutex_);
56     NETSTACK_LOGD("HttpSession::AddRequestInfo() start");
57 
58     while (!taskQueue_.empty()) {
59         task = taskQueue_.top();
60         taskQueue_.pop();
61 
62         if (task != nullptr) {
63             NETSTACK_LOGD("taskQueue_ read GetTaskId : %{public}d", task->GetTaskId());
64             std::lock_guard<std::mutex> guard(curlMultiMutex_);
65             if (nullptr == curlMulti_) {
66                 NETSTACK_LOGE("HttpSession::AddRequestInfo() curlMulti_ is nullptr");
67                 StopTask(task);
68                 return;
69             }
70             auto ret = curl_multi_add_handle(curlMulti_, task->GetCurlHandle());
71             if (ret != CURLM_OK) {
72                 NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d", ret);
73                 StopTask(task);
74                 return;
75             }
76         }
77 
78         task = nullptr;
79     }
80 }
81 
RequestAndResponse()82 void HttpSession::RequestAndResponse()
83 {
84     int runningHandle = 0;
85     NETSTACK_LOGD("HttpSession::RequestAndResponse() start");
86 
87     do {
88         if (runningHandle > 0) {
89             AddRequestInfo();
90         }
91         std::lock_guard<std::mutex> guard(curlMultiMutex_);
92         if (!runThread_ || curlMulti_ == nullptr) {
93             NETSTACK_LOGE("RequestAndResponse() runThread_ or curlMulti_ nullptr");
94             break;
95         }
96 
97         // send request
98         auto ret = curl_multi_perform(curlMulti_, &runningHandle);
99         if (ret != CURLM_OK) {
100             NETSTACK_LOGE("curl_multi_perform() error! ret = %{public}d", ret);
101             continue;
102         }
103 
104         // wait for response
105         ret = curl_multi_poll(curlMulti_, nullptr, 0, CURL_MAX_WAIT_MSECS, nullptr);
106         if (ret != CURLM_OK) {
107             NETSTACK_LOGE("curl_multi_poll() error! ret = %{public}d", ret);
108             continue;
109         }
110 
111         // read response
112         ReadResponse();
113     } while (runningHandle > 0);
114 
115     NETSTACK_LOGD("HttpSession::RequestAndResponse() end");
116 }
117 
ReadResponse()118 void HttpSession::ReadResponse()
119 {
120     struct CURLMsg *m;
121     do {
122         int msgq = 0;
123         m = curl_multi_info_read(curlMulti_, &msgq);
124         if (m) {
125             NETSTACK_LOGI("curl_multi_info_read() m->msg = %{public}d", m->msg);
126             if (m->msg != CURLMSG_DONE) {
127                 continue;
128             }
129             auto task = GetTaskByCurlHandle(m->easy_handle);
130             if (task) {
131                 task->ProcessResponse(m);
132             }
133             curl_multi_remove_handle(curlMulti_, m->easy_handle);
134             StopTask(task);
135         }
136     } while (m);
137 }
138 
RunThread()139 void HttpSession::RunThread()
140 {
141     NETSTACK_LOGI("RunThread start runThread_ = %{public}s", runThread_ ? "true" : "false");
142     while (runThread_) {
143         HttpSession::GetInstance().ExecRequest();
144         std::this_thread::sleep_for(std::chrono::milliseconds(CURL_TIMEOUT_MS));
145         NETSTACK_LOGD("RunThread in loop runThread_ = %{public}s", runThread_ ? "true" : "false");
146         std::unique_lock<std::mutex> lock(taskQueueMutex_);
147         conditionVariable_.wait_for(lock, std::chrono::seconds(CONDITION_TIMEOUT_S), [] {
148             NETSTACK_LOGD("RunThread in loop wait_for taskQueue_.empty() = %{public}d runThread_ = %{public}s",
149                           taskQueue_.empty(), runThread_ ? "true" : "false");
150             return !taskQueue_.empty() || !runThread_;
151         });
152     }
153 
154     NETSTACK_LOGI("RunThread exit()");
155 }
156 
Init()157 bool HttpSession::Init()
158 {
159     if (!initialized_) {
160         NETSTACK_LOGD("HttpSession::Init");
161 
162         std::lock_guard<std::mutex> lock(taskQueueMutex_);
163 
164         if (curl_global_init(CURL_GLOBAL_ALL) != CURLE_OK) {
165             NETSTACK_LOGE("Failed to initialize 'curl'");
166             return false;
167         }
168 
169         std::lock_guard<std::mutex> guard(curlMultiMutex_);
170         curlMulti_ = curl_multi_init();
171         if (curlMulti_ == nullptr) {
172             NETSTACK_LOGE("Failed to initialize 'curl_multi'");
173             return false;
174         }
175 
176         workThread_ = std::thread(RunThread);
177         runThread_ = true;
178         initialized_ = true;
179     }
180 
181     return true;
182 }
183 
IsInited()184 bool HttpSession::IsInited()
185 {
186     return initialized_;
187 }
188 
Deinit()189 void HttpSession::Deinit()
190 {
191     NETSTACK_LOGD("HttpSession::Deinit");
192     if (!initialized_) {
193         NETSTACK_LOGD("HttpSession::Deinit not initialized");
194         return;
195     }
196 
197     std::unique_lock<std::mutex> lock(taskQueueMutex_);
198     runThread_ = false;
199     conditionVariable_.notify_all();
200     lock.unlock();
201     if (workThread_.joinable()) {
202         NETSTACK_LOGD("HttpSession::Deinit workThread_.join()");
203         workThread_.join();
204     }
205 
206     std::lock_guard<std::mutex> guard(curlMultiMutex_);
207     if (curlMulti_ != nullptr) {
208         NETSTACK_LOGD("Deinit curl_multi_cleanup()");
209         curl_multi_cleanup(curlMulti_);
210         curlMulti_ = nullptr;
211     }
212 
213     NETSTACK_LOGD("Deinit curl_global_cleanup()");
214     curl_global_cleanup();
215 
216     initialized_ = false;
217 
218     std::this_thread::sleep_for(std::chrono::milliseconds(CURL_TIMEOUT_MS));
219 }
220 
CreateTask(const HttpClientRequest & request)221 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request)
222 {
223     std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request);
224     if (ptr->GetCurlHandle() == nullptr) {
225         NETSTACK_LOGE("CreateTask A error!");
226         return nullptr;
227     }
228 
229     return ptr;
230 }
231 
CreateTask(const HttpClientRequest & request,TaskType type,const std::string & filePath)232 std::shared_ptr<HttpClientTask> HttpSession::CreateTask(const HttpClientRequest &request, TaskType type,
233                                                         const std::string &filePath)
234 {
235     std::shared_ptr<HttpClientTask> ptr = std::make_shared<HttpClientTask>(request, type, filePath);
236     if (ptr->GetCurlHandle() == nullptr) {
237         NETSTACK_LOGE("CreateTask B error!");
238         return nullptr;
239     }
240 
241     return ptr;
242 }
243 
StartTask(std::shared_ptr<HttpClientTask> ptr)244 void HttpSession::StartTask(std::shared_ptr<HttpClientTask> ptr)
245 {
246     if (nullptr == ptr) {
247         NETSTACK_LOGE("HttpSession::StartTask  shared_ptr = nullptr! Error!");
248         return;
249     }
250     NETSTACK_LOGD("HttpSession::StartTask taskId = %{public}d", ptr->GetTaskId());
251 
252     if (!IsInited()) {
253         Init();
254     }
255 
256     std::lock_guard<std::mutex> lock(taskQueueMutex_);
257     std::lock_guard<std::mutex> guard(taskMapMutex_);
258     ptr->SetStatus(TaskStatus::RUNNING);
259     taskIdMap_[ptr->GetTaskId()] = ptr;
260     curlTaskMap_[ptr->GetCurlHandle()] = ptr;
261     taskQueue_.push(ptr);
262     conditionVariable_.notify_all();
263 }
264 
StopTask(std::shared_ptr<HttpClientTask> ptr)265 void HttpSession::StopTask(std::shared_ptr<HttpClientTask> ptr)
266 {
267     if (nullptr == ptr) {
268         NETSTACK_LOGE("HttpSession::StopTask  shared_ptr = nullptr! Error!");
269         return;
270     }
271 
272     NETSTACK_LOGD("HttpSession::StopTask taskId = %{public}d", ptr->GetTaskId());
273     std::lock_guard<std::mutex> lock(taskQueueMutex_);
274     std::lock_guard<std::mutex> guard(taskMapMutex_);
275     ptr->SetStatus(TaskStatus::IDLE);
276 
277     if (ptr->GetCurlHandle() != nullptr) {
278         curlTaskMap_.erase(ptr->GetCurlHandle());
279     }
280     taskIdMap_.erase(ptr->GetTaskId());
281     conditionVariable_.notify_all();
282 }
283 
GetTaskById(uint32_t taskId)284 std::shared_ptr<HttpClientTask> HttpSession::GetTaskById(uint32_t taskId)
285 {
286     std::lock_guard<std::mutex> guard(taskMapMutex_);
287     auto iter = taskIdMap_.find(taskId);
288     if (iter != taskIdMap_.end()) {
289         return iter->second;
290     } else {
291         return nullptr;
292     }
293 }
294 
GetTaskByCurlHandle(CURL * curlHandle)295 std::shared_ptr<HttpClientTask> HttpSession::GetTaskByCurlHandle(CURL *curlHandle)
296 {
297     std::lock_guard<std::mutex> guard(taskMapMutex_);
298     auto iter = curlTaskMap_.find(curlHandle);
299     if (iter != curlTaskMap_.end()) {
300         return iter->second;
301     } else {
302         return nullptr;
303     }
304 }
305 
306 } // namespace HttpClient
307 } // namespace NetStack
308 } // namespace OHOS