1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <atomic>
17 #include <mutex>
18 #include <ostream>
19 #include <queue>
20 #include <climits>
21 #include <iostream>
22 #include "utils_log.h"
23 #include "common_event_sys_errors.h"
24 #include "io_event_epoll.h"
25 #include "io_event_reactor.h"
26
27 namespace OHOS {
28 namespace Utils {
29
IOEventReactor()30 IOEventReactor::IOEventReactor()
31 :loopReady_(false), enabled_(false), count_(0), ioHandlers_(INIT_FD_NUMS), backend_(new IOEventEpoll()) {}
32
~IOEventReactor()33 IOEventReactor::~IOEventReactor()
34 {
35 CleanUp();
36 }
37
SetUp()38 ErrCode IOEventReactor::SetUp()
39 {
40 if (backend_ == nullptr) {
41 backend_ = std::make_unique<IOEventEpoll>();
42 }
43
44 ErrCode res = backend_->SetUp();
45 if (res != EVENT_SYS_ERR_OK) {
46 UTILS_LOGE("%{public}s: Backend start failed.", __FUNCTION__);
47 return res;
48 }
49
50 loopReady_ = true;
51 return res;
52 }
53
InsertNodeFront(int fd,IOEventHandler * target)54 void IOEventReactor::InsertNodeFront(int fd, IOEventHandler* target)
55 {
56 IOEventHandler* h = ioHandlers_[fd].head.get();
57 target->next_ = h->next_;
58 target->prev_ = h;
59 if (h->next_ != nullptr) {
60 h->next_->prev_ = target;
61 }
62 h->next_ = target;
63 }
64
RemoveNode(IOEventHandler * target)65 void IOEventReactor::RemoveNode(IOEventHandler* target)
66 {
67 target->prev_->next_ = target->next_;
68
69 if (target->next_ != nullptr) {
70 target->next_->prev_ = target->prev_;
71 }
72
73 target->prev_ = nullptr;
74 target->next_ = nullptr;
75 }
76
AddHandler(IOEventHandler * target)77 ErrCode IOEventReactor::AddHandler(IOEventHandler* target)
78 {
79 if (target == nullptr) {
80 return EVENT_SYS_ERR_NOT_FOUND;
81 }
82
83 if (target->fd_ == -1) {
84 UTILS_LOGE("%{public}s: Failed, Bad fd.", __FUNCTION__);
85 return EVENT_SYS_ERR_BADF;
86 }
87 if (target->prev_!=nullptr) {
88 UTILS_LOGW("%{public}s: Warning, already started.", __FUNCTION__);
89 return EVENT_SYS_ERR_ALREADY_STARTED;
90 }
91
92 std::lock_guard<std::mutex> lock(mutex_);
93 int fd = target->fd_;
94 if (static_cast<size_t>(fd) > ioHandlers_.size() - 1u) {
95 UTILS_LOGD("%{public}s: Resize when fd: %{public}d", __FUNCTION__, fd);
96 ioHandlers_.resize(fd * EXPANSION_COEFF);
97 }
98
99 InsertNodeFront(fd, target);
100
101 if ((ioHandlers_[fd].events & target->events_) != target->events_) {
102 if (backend_ == nullptr || !UpdateToDemultiplexer(target->fd_)) {
103 UTILS_LOGE("%{public}s: Update fd: %{public}d to backend failed.", __FUNCTION__, target->fd_);
104 return EVENT_SYS_ERR_FAILED;
105 }
106 }
107
108 target->enabled_ = true;
109 count_++;
110 return EVENT_SYS_ERR_OK;
111 }
112
UpdateHandler(IOEventHandler * target)113 ErrCode IOEventReactor::UpdateHandler(IOEventHandler* target)
114 {
115 if (target == nullptr) {
116 return EVENT_SYS_ERR_NOT_FOUND;
117 }
118
119 if (target->fd_ == -1) {
120 UTILS_LOGE("%{public}s: Failed, Bad fd.", __FUNCTION__);
121 return EVENT_SYS_ERR_BADF;
122 }
123
124 if (target->prev_!=nullptr) {
125 if (!HasHandler(target)) {
126 UTILS_LOGE("%{public}s: Failed, handler not found.", __FUNCTION__);
127 return EVENT_SYS_ERR_NOT_FOUND;
128 }
129 if (backend_ == nullptr || !UpdateToDemultiplexer(target->fd_)) {
130 UTILS_LOGE("%{public}s: Update fd: %{public}d to backend failed.", __FUNCTION__, target->fd_);
131 return EVENT_SYS_ERR_FAILED;
132 }
133 return EVENT_SYS_ERR_OK;
134 }
135
136 return AddHandler(target);
137 }
138
RemoveHandler(IOEventHandler * target)139 ErrCode IOEventReactor::RemoveHandler(IOEventHandler* target)
140 {
141 if (target == nullptr) {
142 return EVENT_SYS_ERR_NOT_FOUND;
143 }
144
145 if (target->fd_ == -1) {
146 UTILS_LOGE("%{public}s: Failed, Bad fd.", __FUNCTION__);
147 return EVENT_SYS_ERR_BADF;
148 }
149
150 target->enabled_ = false;
151 std::lock_guard<std::mutex> lock(mutex_);
152
153 if (!HasHandler(target)) {
154 UTILS_LOGE("%{public}s Failed. Handler not found.", __FUNCTION__);
155 target->enabled_=true;
156 return EVENT_SYS_ERR_NOT_FOUND;
157 }
158
159 RemoveNode(target);
160
161 if (backend_ == nullptr || !UpdateToDemultiplexer(target->fd_)) {
162 UTILS_LOGE("%{public}s: Update fd: %{public}d to backend failed.", __FUNCTION__, target->fd_);
163 target->enabled_=true;
164 return EVENT_SYS_ERR_FAILED;
165 }
166
167 count_--;
168 return EVENT_SYS_ERR_OK;
169 }
170
HasHandler(IOEventHandler * target)171 bool IOEventReactor::HasHandler(IOEventHandler* target)
172 {
173 for (IOEventHandler* cur = ioHandlers_[target->fd_].head.get(); cur != nullptr; cur = cur->next_) {
174 if (cur == target) {
175 return true;
176 }
177 }
178
179 return false;
180 }
181
FindHandler(IOEventHandler * target)182 ErrCode IOEventReactor::FindHandler(IOEventHandler* target)
183 {
184 if (target == nullptr) {
185 return EVENT_SYS_ERR_NOT_FOUND;
186 }
187
188 if (target->fd_ == -1) {
189 UTILS_LOGD("%{public}s: Failed, Bad fd.", __FUNCTION__);
190 return EVENT_SYS_ERR_BADF;
191 }
192
193 std::lock_guard<std::mutex> lock(mutex_);
194
195 if (!HasHandler(target)) {
196 UTILS_LOGD("%{public}s: Handler not found.", __FUNCTION__);
197 return EVENT_SYS_ERR_NOT_FOUND;
198 }
199
200 return EVENT_SYS_ERR_OK;
201 }
202
UpdateToDemultiplexer(int fd)203 bool IOEventReactor::UpdateToDemultiplexer(int fd)
204 {
205 uint32_t emask = 0u;
206 for (IOEventHandler* cur = ioHandlers_[fd].head.get(); cur != nullptr; cur = cur->next_) {
207 emask |= cur->events_;
208 }
209
210 if (emask == ioHandlers_[fd].events) {
211 UTILS_LOGW("%{public}s: Warning, Interested events not changed.", __FUNCTION__);
212 return true;
213 }
214
215 ErrCode res = backend_->ModifyEvents(fd, emask);
216 if (res != EVENT_SYS_ERR_OK) {
217 UTILS_LOGE("%{public}s: Modify events on backend failed. fd: %{public}d, \
218 new event: %{public}d, error code: %{public}d", __FUNCTION__, fd, emask, res);
219 return false;
220 }
221
222 ioHandlers_[fd].events = emask;
223 return true;
224 }
225
Execute(const std::vector<EventCallback> & tasks)226 void IOEventReactor::Execute(const std::vector<EventCallback>& tasks)
227 {
228 for (const EventCallback& cb : tasks) {
229 cb();
230 }
231 }
232
HandleEvents(int fd,EventId event)233 ErrCode IOEventReactor::HandleEvents(int fd, EventId event)
234 {
235 std::vector<EventCallback> taskQue;
236 {
237 std::lock_guard<std::mutex> lock(mutex_);
238 if (!(ioHandlers_[fd].events & event)) {
239 UTILS_LOGD("%{public}s: Non-interested event: %{public}d with fd: %{public}d, interested events: \
240 %{public}d", __FUNCTION__, event, fd, ioHandlers_[fd].events);
241 return EVENT_SYS_ERR_BADEVENT;
242 }
243
244 for (IOEventHandler* cur = ioHandlers_[fd].head.get()->next_; cur != nullptr; cur = cur->next_) {
245 if (cur->events_ != Events::EVENT_NONE && cur->enabled_ && (cur->events_ & event) && cur->cb_) {
246 taskQue.push_back(cur->cb_);
247 UTILS_LOGD("%{public}s: Handling event success: %{public}d with fd: %{public}d; \
248 handler interested events: %{public}d, active-status: %{public}d", \
249 __FUNCTION__, event, fd, cur->events_, cur->enabled_);
250 } else {
251 UTILS_LOGD("%{public}s: Handling event ignore: %{public}d with fd: %{public}d; \
252 handler interested events: %{public}d, active-status: %{public}d", \
253 __FUNCTION__, event, fd, cur->events_, cur->enabled_);
254 }
255 }
256 }
257
258 Execute(taskQue);
259 return EVENT_SYS_ERR_OK;
260 }
261
HandleAll(const std::vector<std::pair<int,EventId>> & events)262 void IOEventReactor::HandleAll(const std::vector<std::pair<int, EventId>>& events)
263 {
264 for (size_t idx = 0u; idx < events.size(); idx++) {
265 int fd = events[idx].first;
266 EventId event = events[idx].second;
267
268 UTILS_LOGD("%{public}s: Processing. Handling event: %{public}d, with fd: %{public}d.", \
269 __FUNCTION__, event, fd);
270
271 if (HandleEvents(fd, event) == EVENT_SYS_ERR_BADEVENT) {
272 UTILS_LOGD("%{public}s: Received non-interested events-%{public}d.", __FUNCTION__, event);
273 }
274 }
275 }
276
Run(int timeout)277 void IOEventReactor::Run(int timeout)
278 {
279 std::vector<std::pair<int, EventId>> gotEvents;
280 while (loopReady_) {
281 if (!enabled_) {
282 continue;
283 }
284 ErrCode res;
285 if (timeout == -1) {
286 std::lock_guard<std::mutex> lock(mutex_);
287 if (count_ ==0) {
288 continue;
289 }
290 res = backend_->Polling(timeout, gotEvents);
291 } else {
292 res = backend_->Polling(timeout, gotEvents);
293 }
294
295 switch (res) {
296 case EVENT_SYS_ERR_OK:
297 HandleAll(gotEvents);
298 gotEvents.clear();
299 break;
300 case EVENT_SYS_ERR_NOEVENT:
301 UTILS_LOGD("%{public}s: No events captured.", __FUNCTION__);
302 break;
303 case EVENT_SYS_ERR_FAILED:
304 UTILS_LOGE("%{public}s: Backends failed.", __FUNCTION__);
305 break;
306 default:
307 break;
308 }
309 }
310 }
311
DoClean(int fd)312 bool IOEventReactor::DoClean(int fd)
313 {
314 if (ioHandlers_[fd].head->next_ == nullptr) {
315 return true;
316 }
317
318 for (IOEventHandler* cur = ioHandlers_[fd].head->next_; cur != nullptr; cur = cur->next_) {
319 cur->prev_->next_ = nullptr;
320 cur->prev_ = nullptr;
321 cur->enabled_ = false;
322 }
323
324 if (!UpdateToDemultiplexer(fd)) {
325 UTILS_LOGD("%{public}s: Clear handler list success, while updating backend failed.", __FUNCTION__);
326 return false;
327 }
328
329 return true;
330 }
331
CleanUp()332 ErrCode IOEventReactor::CleanUp()
333 {
334 std::lock_guard<std::mutex> lock(mutex_);
335 ErrCode res = EVENT_SYS_ERR_OK;
336 for (size_t fd = 0u; fd < ioHandlers_.size() && fd <= INT_MAX; fd++) {
337 if (!DoClean(fd)) {
338 UTILS_LOGD("%{public}s Failed.", __FUNCTION__);
339 res = EVENT_SYS_ERR_FAILED;
340 }
341 }
342
343 return res;
344 }
345
Clean(int fd)346 ErrCode IOEventReactor::Clean(int fd)
347 {
348 if (fd == -1) {
349 UTILS_LOGD("%{public}s: Failed, bad fd.", __FUNCTION__);
350 return EVENT_SYS_ERR_BADF;
351 }
352
353 std::lock_guard<std::mutex> lock(mutex_);
354 if (!DoClean(fd)) {
355 UTILS_LOGD("%{public}s: Failed.", __FUNCTION__);
356 return EVENT_SYS_ERR_FAILED;
357 }
358
359 return EVENT_SYS_ERR_OK;
360 }
361
362 } // namespace Utils
363 } // namespace OHOS
364