1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "epoll_multi_driver.h"
17
18 #include "netstack_log.h"
19 #include "request_info.h"
20 #if HAS_NETSTACK_CHR
21 #include "netstack_chr_client.h"
22 #endif
23 #ifdef HTTP_HANDOVER_FEATURE
24 #include "http_handover_handler.h"
25 #endif
26
27 namespace OHOS::NetStack::HttpOverCurl {
28
29 static constexpr size_t MAX_EPOLL_EVENTS = 10;
30
EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo * >> & incomingQueue)31 EpollMultiDriver::EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo *>> &incomingQueue)
32 : incomingQueue_(incomingQueue)
33 {
34 Initialize();
35 }
36
Initialize()37 void EpollMultiDriver::Initialize()
38 {
39 #ifdef HTTP_HANDOVER_FEATURE
40 netHandoverHandler_ = std::make_shared<HttpHandoverHandler>();
41 if (netHandoverHandler_->IsInitSuccess()) {
42 netHandoverHandler_->RegisterForPolling(poller_);
43 } else {
44 netHandoverHandler_ = nullptr;
45 }
46 #endif
47 timeoutTimer_.RegisterForPolling(poller_);
48 incomingQueue_->GetSyncEvent().RegisterForPolling(poller_);
49 multi_ = curl_multi_init();
50 if (!multi_) {
51 NETSTACK_LOGE("Failed to initialize curl_multi handle");
52 return;
53 }
54
55 static auto socketCallback = +[](CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
56 auto instance = static_cast<EpollMultiDriver *>(userp);
57 return instance->MultiSocketCallback(s, action, static_cast<CurlSocketContext *>(socketp));
58 };
59 curl_multi_setopt(multi_, CURLMOPT_SOCKETDATA, this);
60 curl_multi_setopt(multi_, CURLMOPT_SOCKETFUNCTION, socketCallback);
61
62 static auto timerCallback = +[](CURLM *multi, long timeout_ms, void *userp) -> int {
63 auto instance = static_cast<EpollMultiDriver *>(userp);
64 return instance->MultiTimeoutCallback(timeout_ms);
65 };
66 curl_multi_setopt(multi_, CURLMOPT_TIMERDATA, this);
67 curl_multi_setopt(multi_, CURLMOPT_TIMERFUNCTION, timerCallback);
68 curl_multi_setopt(multi_, CURLMOPT_MAX_HOST_CONNECTIONS, 6); // 单个主机的最大连接数
69 curl_multi_setopt(multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS, 64); // 最大同时打开连接数
70 curl_multi_setopt(multi_, CURLMOPT_MAXCONNECTS, 64); // 连接缓冲池的大小
71 }
72
~EpollMultiDriver()73 EpollMultiDriver::~EpollMultiDriver()
74 {
75 if (multi_) {
76 curl_multi_cleanup(multi_);
77 multi_ = nullptr;
78 }
79 }
80
Step(int waitEventsTimeoutMs)81 void EpollMultiDriver::Step(int waitEventsTimeoutMs)
82 {
83 epoll_event events[MAX_EPOLL_EVENTS];
84 int eventsToHandle = poller_.Wait(events, MAX_EPOLL_EVENTS, waitEventsTimeoutMs);
85 if (eventsToHandle == -1) {
86 if (errno != EINTR) {
87 NETSTACK_LOGE("epoll wait error : %{public}d", errno);
88 }
89 return;
90 }
91 if (eventsToHandle == 0) {
92 if (errno != EINTR && errno != EAGAIN && errno != 0) {
93 NETSTACK_LOGE("epoll wait event 0 err: %{public}d", errno);
94 }
95 CheckMultiInfo();
96 }
97 for (int idx = 0; idx < eventsToHandle; ++idx) {
98 if (incomingQueue_->GetSyncEvent().IsItYours(events[idx].data.fd)) {
99 IncomingRequestCallback();
100 } else if (timeoutTimer_.IsItYours(events[idx].data.fd)) {
101 EpollTimerCallback();
102 #ifdef HTTP_HANDOVER_FEATURE
103 } else if (netHandoverHandler_ && netHandoverHandler_->IsItHandoverEvent(events[idx].data.fd)) {
104 netHandoverHandler_->HandoverRequestCallback(ongoingRequests_, multi_);
105 } else if (netHandoverHandler_ && netHandoverHandler_->IsItHandoverTimeoutEvent(events[idx].data.fd)) {
106 netHandoverHandler_->HandoverTimeoutCallback();
107 #endif
108 } else { // curl socket event
109 EpollSocketCallback(events[idx].data.fd);
110 }
111 }
112 }
113
IncomingRequestCallback()114 void EpollMultiDriver::IncomingRequestCallback()
115 {
116 auto requestsToAdd = incomingQueue_->Flush();
117 for (auto &request : requestsToAdd) {
118 #ifdef HTTP_HANDOVER_FEATURE
119 if (netHandoverHandler_ &&
120 netHandoverHandler_->TryFlowControl(request, HandoverRequestType::INCOMING)) {
121 continue;
122 }
123 #endif
124 ongoingRequests_[request->easyHandle] = request;
125 auto ret = curl_multi_add_handle(multi_, request->easyHandle);
126 if (ret != CURLM_OK) {
127 NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d %{public}s", ret, curl_multi_strerror(ret));
128 continue;
129 }
130
131 if (request->callbacks.startedCallback) {
132 request->callbacks.startedCallback(request->easyHandle, request->opaqueData);
133 }
134 }
135 }
136
137 // Update the timer after curl_multi library does its thing. Curl will
138 // inform us through this callback what it wants the new timeout to be,
139 // after it does some work.
MultiTimeoutCallback(long timeoutMs)140 int EpollMultiDriver::MultiTimeoutCallback(long timeoutMs)
141 {
142 if (timeoutMs > 0) {
143 timeoutTimer_.SetTimeoutMs(timeoutMs);
144 } else if (timeoutMs == 0) {
145 // libcurl wants us to timeout now, however setting both fields of
146 // new_value.it_value to zero disarms the timer. The closest we can
147 // do is to schedule the timer to fire in 1 ns.
148 timeoutTimer_.SetTimeoutNs(1);
149 }
150
151 return 0;
152 }
153
154 // Called by main loop when our timeout expires
EpollTimerCallback()155 void EpollMultiDriver::EpollTimerCallback()
156 {
157 timeoutTimer_.ResetEvent();
158 auto rc = curl_multi_socket_action(multi_, CURL_SOCKET_TIMEOUT, 0, &stillRunning);
159 if (rc != CURLM_OK) {
160 NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
161 }
162 CheckMultiInfo();
163 }
164
CheckMultiInfo()165 __attribute__((no_sanitize("cfi"))) void EpollMultiDriver::CheckMultiInfo()
166 {
167 CURLMsg *message;
168 int pending;
169
170 while ((message = curl_multi_info_read(multi_, &pending))) {
171 switch (message->msg) {
172 case CURLMSG_DONE: {
173 auto easyHandle = message->easy_handle;
174 #if HAS_NETSTACK_CHR
175 ChrClient::NetStackChrClient::GetInstance().GetDfxInfoFromCurlHandleAndReport(easyHandle,
176 message->data.result);
177 #endif
178 if (!easyHandle) {
179 break;
180 }
181 curl_multi_remove_handle(multi_, easyHandle);
182 auto requestInfo = ongoingRequests_[easyHandle];
183 ongoingRequests_.erase(easyHandle);
184 #ifdef HTTP_HANDOVER_FEATURE
185 if (netHandoverHandler_ &&
186 netHandoverHandler_->ProcessRequestErr(ongoingRequests_, multi_, requestInfo, message)) {
187 break;
188 }
189 #endif
190 if (requestInfo != nullptr && requestInfo->callbacks.doneCallback) {
191 requestInfo->callbacks.doneCallback(message, requestInfo->opaqueData);
192 }
193 delete requestInfo;
194 break;
195 }
196 default:
197 NETSTACK_LOGD("CURLMSG default");
198 break;
199 }
200 }
201 }
202
MultiSocketCallback(curl_socket_t socket,int action,CurlSocketContext * socketContext)203 int EpollMultiDriver::MultiSocketCallback(curl_socket_t socket, int action, CurlSocketContext *socketContext)
204 {
205 switch (action) {
206 case CURL_POLL_IN:
207 case CURL_POLL_OUT:
208 case CURL_POLL_INOUT:
209 if (!socketContext) {
210 auto curlSocket = new (std::nothrow) CurlSocketContext(poller_, socket, action);
211 if (curlSocket == nullptr) {
212 return -1;
213 }
214 curl_multi_assign(multi_, socket, curlSocket);
215 } else {
216 socketContext->Reassign(socket, action);
217 }
218 break;
219 case CURL_POLL_REMOVE:
220 delete socketContext;
221 break;
222 default:
223 NETSTACK_LOGE("Unexpected socket action = %{public}d", action);
224 }
225
226 return 0;
227 }
228
CurlPollToEpoll(int action)229 static int CurlPollToEpoll(int action)
230 {
231 int kind = (((static_cast<unsigned int>(action)) & CURL_POLL_IN) ? EPOLLIN : (EPOLLIN & ~EPOLLIN)) |
232 (((static_cast<unsigned int>(action)) & CURL_POLL_OUT) ? EPOLLOUT : (EPOLLOUT & ~EPOLLOUT));
233 return kind;
234 }
235
CurlSocketContext(HttpOverCurl::Epoller & poller,curl_socket_t sockDescriptor,int action)236 EpollMultiDriver::CurlSocketContext::CurlSocketContext(HttpOverCurl::Epoller &poller, curl_socket_t sockDescriptor,
237 int action)
238 : poller_(poller), socketDescriptor_(sockDescriptor)
239 {
240 int kind = CurlPollToEpoll(action);
241 poller_.RegisterMe(socketDescriptor_, kind);
242 }
243
Reassign(curl_socket_t sockDescriptor,int action)244 void EpollMultiDriver::CurlSocketContext::Reassign(curl_socket_t sockDescriptor, int action)
245 {
246 poller_.UnregisterMe(socketDescriptor_);
247 socketDescriptor_ = sockDescriptor;
248 int kind = CurlPollToEpoll(action);
249 poller_.RegisterMe(socketDescriptor_, kind);
250 }
251
~CurlSocketContext()252 EpollMultiDriver::CurlSocketContext::~CurlSocketContext()
253 {
254 poller_.UnregisterMe(socketDescriptor_);
255 }
256
257 // Called by main loop when we get action on a multi socket file descriptor
EpollSocketCallback(int fd)258 void EpollMultiDriver::EpollSocketCallback(int fd)
259 {
260 int action = CURL_CSELECT_IN | CURL_CSELECT_OUT;
261 auto rc = curl_multi_socket_action(multi_, fd, action, &stillRunning);
262 if (rc != CURLM_OK) {
263 NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
264 }
265 CheckMultiInfo();
266
267 if (stillRunning <= 0) {
268 timeoutTimer_.Stop();
269 }
270 }
271
272 } // namespace OHOS::NetStack::HttpOverCurl
273