/* * Copyright (c) 2023-2024 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include "http_client.h" #include "netstack_log.h" namespace OHOS { namespace NetStack { namespace HttpClient { static constexpr int CURL_MAX_WAIT_MSECS = 10; static constexpr int CURL_TIMEOUT_MS = 50; static constexpr int CONDITION_TIMEOUT_S = 3600; static constexpr int TASK_MAXNUM_PER_TIME = 30; class HttpGlobal { public: HttpGlobal() { if (curl_global_init(CURL_GLOBAL_ALL) != CURLE_OK) { NETSTACK_LOGE("Failed to initialize 'curl'"); } } ~HttpGlobal() { NETSTACK_LOGD("Deinit curl_global_cleanup()"); curl_global_cleanup(); } }; static HttpGlobal g_httpGlobal; HttpSession::HttpSession() : curlMulti_(nullptr), initialized_(false), runThread_(false) { } HttpSession::~HttpSession() { NETSTACK_LOGD("~HttpSession::enter"); Deinit(); } HttpSession &HttpSession::GetInstance() { static HttpSession gInstance; return gInstance; } void HttpSession::AddRequestInfo() { NETSTACK_LOGD("Enter:"); std::unique_lock lock(taskQueueMutex_); int pop_num = 0; while (!taskQueue_.empty() && pop_num <= TASK_MAXNUM_PER_TIME) { std::shared_ptr task = taskQueue_.top(); taskQueue_.pop(); ++pop_num; lock.unlock(); if (task != nullptr) { NETSTACK_LOGD("taskQueue_ read GetTaskId : %{public}d", task->GetTaskId()); std::lock_guard guard(curlMultiMutex_); if (nullptr == curlMulti_) { NETSTACK_LOGE("curlMulti_ is nullptr"); StopTask(task); break; } auto ret = curl_multi_add_handle(curlMulti_, task->GetCurlHandle()); if (ret != CURLM_OK) { NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d", ret); StopTask(task); } } lock.lock(); } NETSTACK_LOGD("Add %{public}d tasks, Exit", pop_num); } CURLMcode HttpSession::PerformRequest(int &runningHandle) { CURLMcode ret = CURLM_LAST; std::unique_lock lock(curlMultiMutex_); if (curlMulti_ == nullptr) { NETSTACK_LOGE("curlMulti_ is nullptr"); return ret; } // send request ret = curl_multi_perform(curlMulti_, &runningHandle); if (ret != CURLM_OK) { NETSTACK_LOGE("curl_multi_perform() error! ret = %{public}d", ret); return ret; } // wait for response ret = curl_multi_poll(curlMulti_, nullptr, 0, CURL_MAX_WAIT_MSECS, nullptr); if (ret != CURLM_OK) { NETSTACK_LOGE("curl_multi_poll() error! ret = %{public}d", ret); return ret; } return ret; } void HttpSession::RequestAndResponse() { NETSTACK_LOGD("HttpSession::RequestAndResponse() start"); CURLMcode ret = CURLM_LAST; int runningHandle = 0; do { AddRequestInfo(); ret = PerformRequest(runningHandle); if (ret == CURLM_OK) { ReadResponse(); } else { // others are error } // read response } while (runningHandle > 0 && runThread_); NETSTACK_LOGD("HttpSession::RequestAndResponse() end"); } void HttpSession::ReadResponse() { std::unique_lock lock(curlMultiMutex_); struct CURLMsg *m = nullptr; do { int msgq = 0; m = curl_multi_info_read(curlMulti_, &msgq); if (m) { NETSTACK_LOGD("curl_multi_info_read() m->msg = %{public}d", m->msg); if (m->msg != CURLMSG_DONE) { NETSTACK_LOGD("curl_multi_info_read failed, m->msg = %{public}d", m->msg); continue; } curl_multi_remove_handle(curlMulti_, m->easy_handle); lock.unlock(); auto task = GetTaskByCurlHandle(m->easy_handle); if (task) { task->ProcessResponse(m); } StopTask(task); lock.lock(); } } while (m && runThread_); } void HttpSession::RunThread() { NETSTACK_LOGD("RunThread start runThread_ = %{public}s", runThread_ ? "true" : "false"); while (runThread_) { RequestAndResponse(); NETSTACK_LOGD("RunThread in loop runThread_ = %{public}s", runThread_ ? "true" : "false"); std::function f = [this]() -> bool { NETSTACK_LOGD("RunThread in loop wait_for taskQueue_.empty() = %{public}d runThread_ = %{public}s", !taskQueue_.empty(), runThread_ ? "true" : "false"); return !taskQueue_.empty() || !runThread_; }; std::unique_lock lock(taskMutex_); conditionVariable_.wait_for(lock, std::chrono::seconds(CONDITION_TIMEOUT_S), f); } NETSTACK_LOGD("RunThread exit()"); } bool HttpSession::Init() { std::lock_guard guarder(initMutex_); NETSTACK_LOGD("HttpSession::Init enter"); if (!initialized_) { NETSTACK_LOGD("HttpSession::Init"); std::lock_guard guard(curlMultiMutex_); curlMulti_ = curl_multi_init(); if (curlMulti_ == nullptr) { NETSTACK_LOGE("Failed to initialize 'curl_multi'"); return false; } initialized_ = true; runThread_ = true; auto f = std::bind(&HttpSession::RunThread, this); workThread_ = std::thread(f); } return true; } void HttpSession::Deinit() { NETSTACK_LOGD("HttpSession::Deinit"); std::lock_guard guard(initMutex_); if (!initialized_) { NETSTACK_LOGE("HttpSession::Deinit not initialized"); return; } runThread_ = false; do { std::unique_lock lock(taskMutex_); conditionVariable_.notify_all(); } while (0); if (workThread_.joinable()) { NETSTACK_LOGD("HttpSession::Deinit workThread_.join()"); workThread_.join(); } do { std::lock_guard guard(taskMapMutex_); std::lock_guard lock(taskQueueMutex_); curlTaskMap_.clear(); taskIdMap_.clear(); while (!taskQueue_.empty()) { taskQueue_.pop(); } } while (0); do { std::lock_guard guard(curlMultiMutex_); if (curlMulti_ != nullptr) { NETSTACK_LOGD("Deinit curl_multi_cleanup()"); curl_multi_cleanup(curlMulti_); curlMulti_ = nullptr; } } while (0); initialized_ = false; std::this_thread::sleep_for(std::chrono::milliseconds(CURL_TIMEOUT_MS)); } std::shared_ptr HttpSession::CreateTask(const HttpClientRequest &request) { std::shared_ptr ptr = std::make_shared(request); if (ptr->GetCurlHandle() == nullptr) { NETSTACK_LOGE("CreateTask A error!"); return nullptr; } return ptr; } std::shared_ptr HttpSession::CreateTask(const HttpClientRequest &request, TaskType type, const std::string &filePath) { std::shared_ptr ptr = std::make_shared(request, type, filePath); if (ptr->GetCurlHandle() == nullptr) { NETSTACK_LOGE("CreateTask B error!"); return nullptr; } return ptr; } void HttpSession::ResumTask() { std::lock_guard lock(taskMutex_); conditionVariable_.notify_all(); } void HttpSession::StartTask(std::shared_ptr ptr) { if (nullptr == ptr) { NETSTACK_LOGE("HttpSession::StartTask shared_ptr = nullptr! Error!"); return; } Init(); /* add task to queue */ do { std::lock_guard taskGuard(taskMapMutex_); std::lock_guard lock(taskQueueMutex_); taskQueue_.push(ptr); taskIdMap_[ptr->GetTaskId()] = ptr; curlTaskMap_[ptr->GetCurlHandle()] = ptr; ptr->SetStatus(TaskStatus::RUNNING); } while (0); ResumTask(); } void HttpSession::StopTask(std::shared_ptr ptr) { std::lock_guard guard(taskMapMutex_); if (nullptr == ptr) { NETSTACK_LOGE("HttpSession::StopTask shared_ptr = nullptr! Error!"); return; } NETSTACK_LOGD("HttpSession::StopTask taskId = %{public}d", ptr->GetTaskId()); ptr->SetStatus(TaskStatus::IDLE); if (ptr->GetCurlHandle() != nullptr) { curlTaskMap_.erase(ptr->GetCurlHandle()); } taskIdMap_.erase(ptr->GetTaskId()); ResumTask(); } std::shared_ptr HttpSession::GetTaskById(uint32_t taskId) { std::lock_guard guard(taskMapMutex_); auto iter = taskIdMap_.find(taskId); if (iter != taskIdMap_.end()) { return iter->second; } else { return nullptr; } } std::shared_ptr HttpSession::GetTaskByCurlHandle(CURL *curlHandle) { std::lock_guard guard(taskMapMutex_); auto iter = curlTaskMap_.find(curlHandle); if (iter != curlTaskMap_.end()) { return iter->second; } else { return nullptr; } } } // namespace HttpClient } // namespace NetStack } // namespace OHOS