• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "epoll_multi_driver.h"
17 
18 #include "netstack_log.h"
19 #include "request_info.h"
20 
21 namespace OHOS::NetStack::HttpOverCurl {
22 
23 static constexpr size_t MAX_EPOLL_EVENTS = 10;
24 
EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo * >> & incomingQueue)25 EpollMultiDriver::EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo *>> &incomingQueue)
26     : incomingQueue_(incomingQueue)
27 {
28     Initialize();
29 }
30 
Initialize()31 void EpollMultiDriver::Initialize()
32 {
33     timeoutTimer_.RegisterForPolling(poller_);
34     incomingQueue_->GetSyncEvent().RegisterForPolling(poller_);
35     multi_ = curl_multi_init();
36     if (!multi_) {
37         NETSTACK_LOGE("Failed to initialize curl_multi handle");
38         return;
39     }
40 
41     static auto socketCallback = +[](CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
42         auto instance = static_cast<EpollMultiDriver *>(userp);
43         return instance->MultiSocketCallback(s, action, static_cast<CurlSocketContext *>(socketp));
44     };
45     curl_multi_setopt(multi_, CURLMOPT_SOCKETDATA, this);
46     curl_multi_setopt(multi_, CURLMOPT_SOCKETFUNCTION, socketCallback);
47 
48     static auto timerCallback = +[](CURLM *multi, long timeout_ms, void *userp) -> int {
49         auto instance = static_cast<EpollMultiDriver *>(userp);
50         return instance->MultiTimeoutCallback(timeout_ms);
51     };
52     curl_multi_setopt(multi_, CURLMOPT_TIMERDATA, this);
53     curl_multi_setopt(multi_, CURLMOPT_TIMERFUNCTION, timerCallback);
54 }
55 
~EpollMultiDriver()56 EpollMultiDriver::~EpollMultiDriver()
57 {
58     if (multi_) {
59         curl_multi_cleanup(multi_);
60         multi_ = nullptr;
61     }
62 }
63 
Step(int waitEventsTimeoutMs)64 void EpollMultiDriver::Step(int waitEventsTimeoutMs)
65 {
66     epoll_event events[MAX_EPOLL_EVENTS];
67     int eventsToHandle = poller_.Wait(events, MAX_EPOLL_EVENTS, waitEventsTimeoutMs);
68     if (eventsToHandle == -1) {
69         if (errno != EINTR) {
70             NETSTACK_LOGE("epoll wait error : %{public}d", errno);
71         }
72         return;
73     }
74     if (eventsToHandle == 0) {
75         if (errno != EINTR && errno != EAGAIN && errno != 0) {
76             NETSTACK_LOGE("epoll wait event 0 err: %{public}d", errno);
77         }
78         CheckMultiInfo();
79     }
80     for (int idx = 0; idx < eventsToHandle; ++idx) {
81         if (incomingQueue_->GetSyncEvent().IsItYours(events[idx].data.fd)) {
82             IncomingRequestCallback();
83         } else if (timeoutTimer_.IsItYours(events[idx].data.fd)) {
84             EpollTimerCallback();
85         } else { // curl socket event
86             EpollSocketCallback(events[idx].data.fd);
87         }
88     }
89 }
90 
IncomingRequestCallback()91 void EpollMultiDriver::IncomingRequestCallback()
92 {
93     auto requestsToAdd = incomingQueue_->Flush();
94     for (auto &request : requestsToAdd) {
95         ongoingRequests_[request->easyHandle] = request;
96         auto ret = curl_multi_add_handle(multi_, request->easyHandle);
97         if (ret != CURLM_OK) {
98             NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d %{public}s", ret, curl_multi_strerror(ret));
99             continue;
100         }
101 
102         if (request->startedCallback) {
103             request->startedCallback(request->easyHandle, request->opaqueData);
104         }
105     }
106 }
107 
108 // Update the timer after curl_multi library does its thing. Curl will
109 // inform us through this callback what it wants the new timeout to be,
110 // after it does some work.
MultiTimeoutCallback(long timeoutMs)111 int EpollMultiDriver::MultiTimeoutCallback(long timeoutMs)
112 {
113     if (timeoutMs > 0) {
114         timeoutTimer_.SetTimeoutMs(timeoutMs);
115     } else if (timeoutMs == 0) {
116         // libcurl wants us to timeout now, however setting both fields of
117         // new_value.it_value to zero disarms the timer. The closest we can
118         // do is to schedule the timer to fire in 1 ns.
119         timeoutTimer_.SetTimeoutNs(1);
120     }
121 
122     return 0;
123 }
124 
125 // Called by main loop when our timeout expires
EpollTimerCallback()126 void EpollMultiDriver::EpollTimerCallback()
127 {
128     timeoutTimer_.ResetEvent();
129     auto rc = curl_multi_socket_action(multi_, CURL_SOCKET_TIMEOUT, 0, &stillRunning);
130     if (rc != CURLM_OK) {
131         NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
132     }
133     CheckMultiInfo();
134 }
135 
CheckMultiInfo()136 __attribute__((no_sanitize("cfi"))) void EpollMultiDriver::CheckMultiInfo()
137 {
138     CURLMsg *message;
139     int pending;
140 
141     while ((message = curl_multi_info_read(multi_, &pending))) {
142         switch (message->msg) {
143             case CURLMSG_DONE: {
144                 auto easyHandle = message->easy_handle;
145                 curl_multi_remove_handle(multi_, easyHandle);
146                 auto requestInfo = ongoingRequests_[easyHandle];
147                 ongoingRequests_.erase(easyHandle);
148                 if (requestInfo != nullptr && requestInfo->doneCallback) {
149                     requestInfo->doneCallback(message, requestInfo->opaqueData);
150                 }
151                 delete requestInfo;
152                 break;
153             }
154             default:
155                 NETSTACK_LOGD("CURLMSG default");
156                 break;
157         }
158     }
159 }
160 
MultiSocketCallback(curl_socket_t socket,int action,CurlSocketContext * socketContext)161 int EpollMultiDriver::MultiSocketCallback(curl_socket_t socket, int action, CurlSocketContext *socketContext)
162 {
163     switch (action) {
164         case CURL_POLL_IN:
165         case CURL_POLL_OUT:
166         case CURL_POLL_INOUT:
167             if (!socketContext) {
168                 auto curlSocket = new (std::nothrow) CurlSocketContext(poller_, socket, action);
169                 if (curlSocket == nullptr) {
170                     return -1;
171                 }
172                 curl_multi_assign(multi_, socket, curlSocket);
173             } else {
174                 socketContext->Reassign(socket, action);
175             }
176             break;
177         case CURL_POLL_REMOVE:
178             delete socketContext;
179             break;
180         default:
181             NETSTACK_LOGE("Unexpected socket action = %{public}d", action);
182     }
183 
184     return 0;
185 }
186 
CurlPollToEpoll(int action)187 static int CurlPollToEpoll(int action)
188 {
189     int kind = ((action & CURL_POLL_IN) ? EPOLLIN : (EPOLLIN & ~EPOLLIN)) |
190                ((action & CURL_POLL_OUT) ? EPOLLOUT : (EPOLLOUT & ~EPOLLOUT));
191     return kind;
192 }
193 
CurlSocketContext(HttpOverCurl::Epoller & poller,curl_socket_t sockDescriptor,int action)194 EpollMultiDriver::CurlSocketContext::CurlSocketContext(HttpOverCurl::Epoller &poller, curl_socket_t sockDescriptor,
195                                                        int action)
196     : poller_(poller), socketDescriptor_(sockDescriptor)
197 {
198     int kind = CurlPollToEpoll(action);
199     poller_.RegisterMe(socketDescriptor_, kind);
200 }
201 
Reassign(curl_socket_t sockDescriptor,int action)202 void EpollMultiDriver::CurlSocketContext::Reassign(curl_socket_t sockDescriptor, int action)
203 {
204     poller_.UnregisterMe(socketDescriptor_);
205     socketDescriptor_ = sockDescriptor;
206     int kind = CurlPollToEpoll(action);
207     poller_.RegisterMe(socketDescriptor_, kind);
208 }
209 
~CurlSocketContext()210 EpollMultiDriver::CurlSocketContext::~CurlSocketContext()
211 {
212     poller_.UnregisterMe(socketDescriptor_);
213 }
214 
215 // Called by main loop when we get action on a multi socket file descriptor
EpollSocketCallback(int fd)216 void EpollMultiDriver::EpollSocketCallback(int fd)
217 {
218     int action = CURL_CSELECT_IN | CURL_CSELECT_OUT;
219     auto rc = curl_multi_socket_action(multi_, fd, action, &stillRunning);
220     if (rc != CURLM_OK) {
221         NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
222     }
223     CheckMultiInfo();
224 
225     if (stillRunning <= 0) {
226         timeoutTimer_.Stop();
227     }
228 }
229 
230 } // namespace OHOS::NetStack::HttpOverCurl
231