• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "epoll_multi_driver.h"
17 
18 #include "netstack_log.h"
19 #include "request_info.h"
20 #if HAS_NETSTACK_CHR
21 #include "netstack_chr_client.h"
22 #endif
23 #ifdef HTTP_HANDOVER_FEATURE
24 #include "http_handover_handler.h"
25 #endif
26 
27 namespace OHOS::NetStack::HttpOverCurl {
28 
29 static constexpr size_t MAX_EPOLL_EVENTS = 10;
30 
EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo * >> & incomingQueue)31 EpollMultiDriver::EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo *>> &incomingQueue)
32     : incomingQueue_(incomingQueue)
33 {
34     Initialize();
35 }
36 
Initialize()37 void EpollMultiDriver::Initialize()
38 {
39 #ifdef HTTP_HANDOVER_FEATURE
40     netHandoverHandler_ = std::make_shared<HttpHandoverHandler>();
41     if (netHandoverHandler_->IsInitSuccess()) {
42         netHandoverHandler_->RegisterForPolling(poller_);
43     } else {
44         netHandoverHandler_ = nullptr;
45     }
46 #endif
47     timeoutTimer_.RegisterForPolling(poller_);
48     incomingQueue_->GetSyncEvent().RegisterForPolling(poller_);
49     multi_ = curl_multi_init();
50     if (!multi_) {
51         NETSTACK_LOGE("Failed to initialize curl_multi handle");
52         return;
53     }
54 
55     static auto socketCallback = +[](CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
56         auto instance = static_cast<EpollMultiDriver *>(userp);
57         return instance->MultiSocketCallback(s, action, static_cast<CurlSocketContext *>(socketp));
58     };
59     curl_multi_setopt(multi_, CURLMOPT_SOCKETDATA, this);
60     curl_multi_setopt(multi_, CURLMOPT_SOCKETFUNCTION, socketCallback);
61 
62     static auto timerCallback = +[](CURLM *multi, long timeout_ms, void *userp) -> int {
63         auto instance = static_cast<EpollMultiDriver *>(userp);
64         return instance->MultiTimeoutCallback(timeout_ms);
65     };
66     curl_multi_setopt(multi_, CURLMOPT_TIMERDATA, this);
67     curl_multi_setopt(multi_, CURLMOPT_TIMERFUNCTION, timerCallback);
68     curl_multi_setopt(multi_, CURLMOPT_MAX_HOST_CONNECTIONS, 6); // 单个主机的最大连接数
69     curl_multi_setopt(multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS, 64); // 最大同时打开连接数
70     curl_multi_setopt(multi_, CURLMOPT_MAXCONNECTS, 64); // 连接缓冲池的大小
71 }
72 
~EpollMultiDriver()73 EpollMultiDriver::~EpollMultiDriver()
74 {
75     if (multi_) {
76         curl_multi_cleanup(multi_);
77         multi_ = nullptr;
78     }
79 }
80 
Step(int waitEventsTimeoutMs)81 void EpollMultiDriver::Step(int waitEventsTimeoutMs)
82 {
83     epoll_event events[MAX_EPOLL_EVENTS];
84     int eventsToHandle = poller_.Wait(events, MAX_EPOLL_EVENTS, waitEventsTimeoutMs);
85     if (eventsToHandle == -1) {
86         if (errno != EINTR) {
87             NETSTACK_LOGE("epoll wait error : %{public}d", errno);
88         }
89         return;
90     }
91     if (eventsToHandle == 0) {
92         if (errno != EINTR && errno != EAGAIN && errno != 0) {
93             NETSTACK_LOGE("epoll wait event 0 err: %{public}d", errno);
94         }
95         CheckMultiInfo();
96     }
97     for (int idx = 0; idx < eventsToHandle; ++idx) {
98         if (incomingQueue_->GetSyncEvent().IsItYours(events[idx].data.fd)) {
99             IncomingRequestCallback();
100         } else if (timeoutTimer_.IsItYours(events[idx].data.fd)) {
101             EpollTimerCallback();
102 #ifdef HTTP_HANDOVER_FEATURE
103         } else if (netHandoverHandler_ && netHandoverHandler_->IsItHandoverEvent(events[idx].data.fd)) {
104             netHandoverHandler_->HandoverRequestCallback(ongoingRequests_, multi_);
105         } else if (netHandoverHandler_ && netHandoverHandler_->IsItHandoverTimeoutEvent(events[idx].data.fd)) {
106             netHandoverHandler_->HandoverTimeoutCallback();
107 #endif
108         } else { // curl socket event
109             EpollSocketCallback(events[idx].data.fd);
110         }
111     }
112 }
113 
IncomingRequestCallback()114 void EpollMultiDriver::IncomingRequestCallback()
115 {
116     auto requestsToAdd = incomingQueue_->Flush();
117     for (auto &request : requestsToAdd) {
118 #ifdef HTTP_HANDOVER_FEATURE
119         if (netHandoverHandler_ &&
120             netHandoverHandler_->TryFlowControl(request, HandoverRequestType::INCOMING)) {
121             continue;
122         }
123 #endif
124         ongoingRequests_[request->easyHandle] = request;
125         auto ret = curl_multi_add_handle(multi_, request->easyHandle);
126         if (ret != CURLM_OK) {
127             NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d %{public}s", ret, curl_multi_strerror(ret));
128             continue;
129         }
130 
131         if (request->callbacks.startedCallback) {
132             request->callbacks.startedCallback(request->easyHandle, request->opaqueData);
133         }
134     }
135 }
136 
137 // Update the timer after curl_multi library does its thing. Curl will
138 // inform us through this callback what it wants the new timeout to be,
139 // after it does some work.
MultiTimeoutCallback(long timeoutMs)140 int EpollMultiDriver::MultiTimeoutCallback(long timeoutMs)
141 {
142     if (timeoutMs > 0) {
143         timeoutTimer_.SetTimeoutMs(timeoutMs);
144     } else if (timeoutMs == 0) {
145         // libcurl wants us to timeout now, however setting both fields of
146         // new_value.it_value to zero disarms the timer. The closest we can
147         // do is to schedule the timer to fire in 1 ns.
148         timeoutTimer_.SetTimeoutNs(1);
149     }
150 
151     return 0;
152 }
153 
154 // Called by main loop when our timeout expires
EpollTimerCallback()155 void EpollMultiDriver::EpollTimerCallback()
156 {
157     timeoutTimer_.ResetEvent();
158     auto rc = curl_multi_socket_action(multi_, CURL_SOCKET_TIMEOUT, 0, &stillRunning);
159     if (rc != CURLM_OK) {
160         NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
161     }
162     CheckMultiInfo();
163 }
164 
CheckMultiInfo()165 __attribute__((no_sanitize("cfi"))) void EpollMultiDriver::CheckMultiInfo()
166 {
167     CURLMsg *message;
168     int pending;
169 
170     while ((message = curl_multi_info_read(multi_, &pending))) {
171         switch (message->msg) {
172             case CURLMSG_DONE: {
173                 auto easyHandle = message->easy_handle;
174 #if HAS_NETSTACK_CHR
175                 ChrClient::NetStackChrClient::GetInstance().GetDfxInfoFromCurlHandleAndReport(easyHandle,
176                                                                                               message->data.result);
177 #endif
178                 if (!easyHandle) {
179                     break;
180                 }
181                 curl_multi_remove_handle(multi_, easyHandle);
182                 auto requestInfo = ongoingRequests_[easyHandle];
183                 ongoingRequests_.erase(easyHandle);
184 #ifdef HTTP_HANDOVER_FEATURE
185                 if (netHandoverHandler_ &&
186                     netHandoverHandler_->ProcessRequestErr(ongoingRequests_, multi_, requestInfo, message)) {
187                     break;
188                 }
189 #endif
190                 if (requestInfo != nullptr && requestInfo->callbacks.doneCallback) {
191                     requestInfo->callbacks.doneCallback(message, requestInfo->opaqueData);
192                 }
193                 delete requestInfo;
194                 break;
195             }
196             default:
197                 NETSTACK_LOGD("CURLMSG default");
198                 break;
199         }
200     }
201 }
202 
MultiSocketCallback(curl_socket_t socket,int action,CurlSocketContext * socketContext)203 int EpollMultiDriver::MultiSocketCallback(curl_socket_t socket, int action, CurlSocketContext *socketContext)
204 {
205     switch (action) {
206         case CURL_POLL_IN:
207         case CURL_POLL_OUT:
208         case CURL_POLL_INOUT:
209             if (!socketContext) {
210                 auto curlSocket = new (std::nothrow) CurlSocketContext(poller_, socket, action);
211                 if (curlSocket == nullptr) {
212                     return -1;
213                 }
214                 curl_multi_assign(multi_, socket, curlSocket);
215             } else {
216                 socketContext->Reassign(socket, action);
217             }
218             break;
219         case CURL_POLL_REMOVE:
220             delete socketContext;
221             break;
222         default:
223             NETSTACK_LOGE("Unexpected socket action = %{public}d", action);
224     }
225 
226     return 0;
227 }
228 
CurlPollToEpoll(int action)229 static int CurlPollToEpoll(int action)
230 {
231     int kind = (((static_cast<unsigned int>(action)) & CURL_POLL_IN) ? EPOLLIN : (EPOLLIN & ~EPOLLIN)) |
232                (((static_cast<unsigned int>(action)) & CURL_POLL_OUT) ? EPOLLOUT : (EPOLLOUT & ~EPOLLOUT));
233     return kind;
234 }
235 
CurlSocketContext(HttpOverCurl::Epoller & poller,curl_socket_t sockDescriptor,int action)236 EpollMultiDriver::CurlSocketContext::CurlSocketContext(HttpOverCurl::Epoller &poller, curl_socket_t sockDescriptor,
237                                                        int action)
238     : poller_(poller), socketDescriptor_(sockDescriptor)
239 {
240     int kind = CurlPollToEpoll(action);
241     poller_.RegisterMe(socketDescriptor_, kind);
242 }
243 
Reassign(curl_socket_t sockDescriptor,int action)244 void EpollMultiDriver::CurlSocketContext::Reassign(curl_socket_t sockDescriptor, int action)
245 {
246     poller_.UnregisterMe(socketDescriptor_);
247     socketDescriptor_ = sockDescriptor;
248     int kind = CurlPollToEpoll(action);
249     poller_.RegisterMe(socketDescriptor_, kind);
250 }
251 
~CurlSocketContext()252 EpollMultiDriver::CurlSocketContext::~CurlSocketContext()
253 {
254     poller_.UnregisterMe(socketDescriptor_);
255 }
256 
257 // Called by main loop when we get action on a multi socket file descriptor
EpollSocketCallback(int fd)258 void EpollMultiDriver::EpollSocketCallback(int fd)
259 {
260     int action = CURL_CSELECT_IN | CURL_CSELECT_OUT;
261     auto rc = curl_multi_socket_action(multi_, fd, action, &stillRunning);
262     if (rc != CURLM_OK) {
263         NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
264     }
265     CheckMultiInfo();
266 
267     if (stillRunning <= 0) {
268         timeoutTimer_.Stop();
269     }
270 }
271 
272 } // namespace OHOS::NetStack::HttpOverCurl
273