1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_loop_impl.h"
17
18 #include <ctime>
19
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "event_impl.h"
23
24 namespace DistributedDB {
25 class EventRequest {
26 public:
27 enum {
28 ADD_EVENT = 1,
29 REMOVE_EVENT,
30 SET_TIMEOUT,
31 MOD_EVENTS_ADD,
32 MOD_EVENTS_REMOVE,
33 };
34
EventRequest(int type,EventImpl * event,EventsMask events)35 EventRequest(int type, EventImpl *event, EventsMask events)
36 : type_(type),
37 event_(event),
38 events_(events),
39 timeout_(0)
40 {
41 if (event != nullptr) {
42 event->IncObjRef(event);
43 }
44 }
45
EventRequest(int type,EventImpl * event,EventTime timeout)46 EventRequest(int type, EventImpl *event, EventTime timeout)
47 : type_(type),
48 event_(event),
49 events_(0),
50 timeout_(timeout)
51 {
52 if (event != nullptr) {
53 event->IncObjRef(event);
54 }
55 }
56
~EventRequest()57 ~EventRequest()
58 {
59 if (event_ != nullptr) {
60 event_->DecObjRef(event_);
61 event_ = nullptr;
62 }
63 }
64
IsValidType(int type)65 static bool IsValidType(int type)
66 {
67 if (type < ADD_EVENT || type > MOD_EVENTS_REMOVE) {
68 return false;
69 }
70 return true;
71 }
72
GetType() const73 int GetType() const
74 {
75 return type_;
76 }
77
GetEvent(EventImpl * & event) const78 void GetEvent(EventImpl *&event) const
79 {
80 event = event_;
81 }
82
GetEvents() const83 EventsMask GetEvents() const
84 {
85 return events_;
86 }
87
GetTimeout() const88 EventTime GetTimeout() const
89 {
90 return timeout_;
91 }
92
93 private:
94 int type_;
95 EventImpl *event_;
96 EventsMask events_;
97 EventTime timeout_;
98 };
99
EventLoopImpl()100 EventLoopImpl::EventLoopImpl()
101 : pollingSetChanged_(false)
102 {
103 OnKill([this](){ OnKillLoop(); });
104 }
105
~EventLoopImpl()106 EventLoopImpl::~EventLoopImpl()
107 {}
108
Add(IEvent * event)109 int EventLoopImpl::Add(IEvent *event)
110 {
111 if (event == nullptr) {
112 return -E_INVALID_ARGS;
113 }
114
115 auto eventImpl = static_cast<EventImpl *>(event);
116 if (!eventImpl->SetLoop(this)) {
117 LOGE("Add ev to loop failed, already attached.");
118 return -E_INVALID_ARGS;
119 }
120
121 EventTime timeout = 0;
122 int errCode = QueueRequest(EventRequest::ADD_EVENT, eventImpl, timeout);
123 if (errCode != E_OK) {
124 eventImpl->SetLoop(nullptr);
125 LOGE("Add ev to loop failed. err: '%d'.", errCode);
126 }
127 return errCode;
128 }
129
Remove(IEvent * event)130 int EventLoopImpl::Remove(IEvent *event)
131 {
132 if (event == nullptr) {
133 return -E_INVALID_ARGS;
134 }
135
136 auto eventImpl = static_cast<EventImpl *>(event);
137 bool isLoopConfused = false;
138 if (!eventImpl->Attached(this, isLoopConfused)) {
139 if (isLoopConfused) {
140 LOGE("Remove ev' from loop failed, loop confused.");
141 return -E_UNEXPECTED_DATA;
142 }
143 return E_OK;
144 }
145
146 EventTime timeout = 0;
147 int errCode = QueueRequest(EventRequest::REMOVE_EVENT, eventImpl, timeout);
148 if (errCode != E_OK) {
149 LOGE("Remove ev from loop failed. err: '%d'.", errCode);
150 }
151 return errCode;
152 }
153
Run()154 int EventLoopImpl::Run()
155 {
156 {
157 RefObject::AutoLock lockGuard(this);
158 if (IsKilled()) {
159 LOGE("Try to run a killed loop.");
160 return -E_OBJ_IS_KILLED;
161 }
162 if (loopThread_ != std::thread::id()) {
163 LOGE("Try to run a threaded loop.");
164 return -E_BUSY;
165 }
166 loopThread_ = std::this_thread::get_id();
167 }
168
169 int errCode = E_OK;
170 IncObjRef(this);
171
172 while (true) {
173 errCode = ProcessRequest();
174 if (errCode != E_OK) {
175 break;
176 }
177
178 errCode = Prepare(polling_);
179 if (errCode != E_OK) {
180 break;
181 }
182
183 EventTime sleepTime = CalSleepTime();
184 errCode = Poll(sleepTime);
185 if (errCode != E_OK) {
186 break;
187 }
188
189 errCode = ProcessRequest();
190 if (errCode != E_OK) {
191 break;
192 }
193
194 errCode = DispatchAll();
195 if (errCode != E_OK) {
196 break;
197 }
198 }
199
200 CleanLoop();
201 DecObjRef(this);
202 if (errCode == -E_OBJ_IS_KILLED) {
203 LOGD("Loop exited.");
204 } else {
205 LOGE("Loop exited, err:'%d'.", errCode);
206 }
207 return errCode;
208 }
209
Modify(EventImpl * event,bool isAdd,EventsMask events)210 int EventLoopImpl::Modify(EventImpl *event, bool isAdd, EventsMask events)
211 {
212 if (event == nullptr) {
213 return -E_INVALID_ARGS;
214 }
215
216 int type = isAdd ? EventRequest::MOD_EVENTS_ADD :
217 EventRequest::MOD_EVENTS_REMOVE;
218 int errCode = QueueRequest(type, event, events);
219 if (errCode != E_OK) {
220 LOGE("Modify loop ev events failed. err: '%d'.", errCode);
221 }
222 return errCode;
223 }
224
Modify(EventImpl * event,EventTime time)225 int EventLoopImpl::Modify(EventImpl *event, EventTime time)
226 {
227 if (event == nullptr) {
228 return -E_INVALID_ARGS;
229 }
230
231 int errCode = QueueRequest(EventRequest::SET_TIMEOUT, event, time);
232 if (errCode != E_OK) {
233 LOGE("Mod loop ev time failed. err: '%d'.", errCode);
234 }
235 return errCode;
236 }
237
GetTime() const238 EventTime EventLoopImpl::GetTime() const
239 {
240 uint64_t microsecond = 0;
241 OS::GetMonotonicRelativeTimeInMicrosecond(microsecond); // It is not very possible to fail, if so use 0 as default
242 return static_cast<EventTime>(microsecond / 1000); // 1000 is the multiple between microsecond and millisecond
243 }
244
SendRequestToLoop(EventRequest * eventRequest)245 int EventLoopImpl::SendRequestToLoop(EventRequest *eventRequest)
246 {
247 if (eventRequest == nullptr) {
248 return -E_INVALID_ARGS;
249 }
250
251 RefObject::AutoLock lockGuard(this);
252 if (IsKilled()) {
253 return -E_OBJ_IS_KILLED;
254 }
255 requests_.push_back(eventRequest);
256 WakeUp();
257 return E_OK;
258 }
259
260 template<typename T>
QueueRequest(int type,EventImpl * event,T argument)261 int EventLoopImpl::QueueRequest(int type, EventImpl *event, T argument)
262 {
263 if (!EventRequest::IsValidType(type)) {
264 return -E_INVALID_ARGS;
265 }
266 if (event == nullptr ||
267 !event->IsValidArg(argument)) {
268 return -E_INVALID_ARGS;
269 }
270
271 if (IsKilled()) { // pre-check
272 return -E_OBJ_IS_KILLED;
273 }
274
275 int errCode = event->CheckStatus();
276 if (errCode != E_OK) {
277 if (errCode != -E_OBJ_IS_KILLED ||
278 type != EventRequest::REMOVE_EVENT) {
279 return errCode;
280 }
281 }
282
283 auto eventRequest = new (std::nothrow) EventRequest(type, event, argument);
284 if (eventRequest == nullptr) {
285 return -E_OUT_OF_MEMORY;
286 }
287
288 errCode = SendRequestToLoop(eventRequest);
289 if (errCode != E_OK) {
290 delete eventRequest;
291 eventRequest = nullptr;
292 }
293 return errCode;
294 }
295
IsInLoopThread(bool & started) const296 bool EventLoopImpl::IsInLoopThread(bool &started) const
297 {
298 if (loopThread_ == std::thread::id()) {
299 started = false;
300 } else {
301 started = true;
302 }
303 return std::this_thread::get_id() == loopThread_;
304 }
305
EventObjectExists(EventImpl * event) const306 bool EventLoopImpl::EventObjectExists(EventImpl *event) const
307 {
308 return polling_.find(event) != polling_.end();
309 }
310
EventFdExists(const EventImpl * event) const311 bool EventLoopImpl::EventFdExists(const EventImpl *event) const
312 {
313 if (!event->IsValidFd()) {
314 return false;
315 }
316 for (auto ev : polling_) {
317 if (ev->GetEventFd() == event->GetEventFd()) {
318 return true;
319 }
320 }
321 return false;
322 }
323
AddEventObject(EventImpl * event,EventTime now)324 int EventLoopImpl::AddEventObject(EventImpl *event, EventTime now)
325 {
326 if (event == nullptr) {
327 return -E_INVALID_ARGS;
328 }
329 if (EventObjectExists(event)) {
330 LOGE("Add event object failed. ev already exists.");
331 return -EEXIST;
332 }
333 if (EventFdExists(event)) {
334 LOGE("Add event object failed. ev fd already exists.");
335 return -EEXIST;
336 }
337
338 int errCode = E_OK;
339 if (!event->IsTimer()) {
340 errCode = AddEvent(event);
341 }
342
343 if (errCode == E_OK) {
344 polling_.insert(event);
345 event->SetStartTime(now);
346 event->SetRevents(0);
347 event->IncObjRef(event);
348 pollingSetChanged_ = true;
349 } else {
350 LOGE("Add event failed. err: '%d'.", errCode);
351 }
352 return errCode;
353 }
354
RemoveEventObject(EventImpl * event)355 int EventLoopImpl::RemoveEventObject(EventImpl *event)
356 {
357 if (event == nullptr) {
358 return -E_INVALID_ARGS;
359 }
360 if (!EventObjectExists(event)) {
361 return -E_NO_SUCH_ENTRY;
362 }
363
364 int errCode = E_OK;
365 if (!event->IsTimer()) {
366 errCode = RemoveEvent(event);
367 }
368
369 if (errCode == E_OK) {
370 polling_.erase(event);
371 event->SetLoop(nullptr);
372 event->DecObjRef(event);
373 pollingSetChanged_ = true;
374 } else {
375 LOGE("Remove event failed. err: '%d'.", errCode);
376 }
377 return errCode;
378 }
379
ModifyEventObject(EventImpl * event,bool isAdd,EventsMask events)380 int EventLoopImpl::ModifyEventObject(EventImpl *event, bool isAdd, EventsMask events)
381 {
382 if (event == nullptr) {
383 return -E_INVALID_ARGS;
384 }
385 if (!EventObjectExists(event)) {
386 return -EEXIST;
387 }
388
389 int errCode = E_OK;
390 if (!event->IsTimer()) {
391 EventsMask genericEvents = events & (~IEvent::ET_TIMEOUT);
392 if (genericEvents) {
393 errCode = ModifyEvent(event, isAdd, genericEvents);
394 }
395 }
396
397 if (errCode == E_OK) {
398 event->SetEvents(isAdd, events);
399 } else {
400 LOGE("Modify event' failed. err: '%d'.", errCode);
401 }
402 return errCode;
403 }
404
ModifyEventObject(EventImpl * event,EventTime timeout)405 int EventLoopImpl::ModifyEventObject(EventImpl *event, EventTime timeout)
406 {
407 if (event == nullptr) {
408 return -E_INVALID_ARGS;
409 }
410 if (!EventObjectExists(event)) {
411 return -E_NO_SUCH_ENTRY;
412 }
413 event->SetTimeoutPeriod(timeout);
414 return E_OK;
415 }
416
ProcessRequest(std::list<EventRequest * > & requests)417 void EventLoopImpl::ProcessRequest(std::list<EventRequest *> &requests)
418 {
419 EventTime now = GetTime();
420 while (true) {
421 if (requests.empty()) {
422 break;
423 }
424
425 EventRequest *request = requests.front();
426 requests.pop_front();
427 if (request == nullptr) {
428 continue;
429 }
430
431 if (!IsKilled()) {
432 EventImpl *event = nullptr;
433 request->GetEvent(event);
434 EventsMask events = request->GetEvents();
435 EventTime timeout = request->GetTimeout();
436
437 switch (request->GetType()) {
438 case EventRequest::ADD_EVENT:
439 (void)(AddEventObject(event, now));
440 break;
441
442 case EventRequest::REMOVE_EVENT:
443 (void)(RemoveEventObject(event));
444 break;
445
446 case EventRequest::MOD_EVENTS_ADD:
447 (void)(ModifyEventObject(event, true, events));
448 break;
449
450 case EventRequest::MOD_EVENTS_REMOVE:
451 (void)(ModifyEventObject(event, false, events));
452 break;
453
454 case EventRequest::SET_TIMEOUT:
455 (void)(ModifyEventObject(event, timeout));
456 break;
457
458 default:
459 break;
460 }
461 }
462
463 delete request;
464 request = nullptr;
465 }
466 }
467
ProcessRequest()468 int EventLoopImpl::ProcessRequest()
469 {
470 int errCode = E_OK;
471 std::list<EventRequest *> requests;
472 {
473 RefObject::AutoLock lockGuard(this);
474 if (IsKilled()) {
475 errCode = -E_OBJ_IS_KILLED;
476 }
477 if (requests_.empty()) {
478 return errCode;
479 }
480 std::swap(requests, requests_);
481 }
482
483 ProcessRequest(requests);
484 return errCode;
485 }
486
CalSleepTime() const487 EventTime EventLoopImpl::CalSleepTime() const
488 {
489 EventTime now = GetTime();
490 EventTime minInterval = EventImpl::MAX_TIME_VALUE;
491
492 for (auto event : polling_) {
493 if (event == nullptr) {
494 continue;
495 }
496
497 EventTime t;
498 bool valid = event->GetTimeoutPoint(t);
499 if (!valid) {
500 continue;
501 }
502
503 if (t <= now) {
504 return 0;
505 }
506
507 EventTime interval = t - now;
508 if (interval < minInterval) {
509 minInterval = interval;
510 }
511 }
512
513 return minInterval;
514 }
515
DispatchAll()516 int EventLoopImpl::DispatchAll()
517 {
518 do {
519 EventTime now = GetTime();
520 pollingSetChanged_ = false;
521
522 for (auto event : polling_) {
523 if (IsKilled()) {
524 return -E_OBJ_IS_KILLED;
525 }
526 if (event == nullptr) {
527 continue;
528 }
529
530 event->IncObjRef(event);
531 event->UpdateElapsedTime(now);
532 int errCode = event->Dispatch();
533 if (errCode != E_OK) {
534 RemoveEventObject(event);
535 } else {
536 event->SetRevents(0);
537 }
538 event->DecObjRef(event);
539
540 if (pollingSetChanged_) {
541 break;
542 }
543 }
544 } while (pollingSetChanged_);
545 return E_OK;
546 }
547
CleanLoop()548 void EventLoopImpl::CleanLoop()
549 {
550 if (!IsKilled()) {
551 return;
552 }
553
554 ProcessRequest();
555 std::set<EventImpl *> polling = std::move(polling_);
556 int errCode = Exit(polling);
557 if (errCode != E_OK) {
558 LOGE("Exit loop failed when cleanup, err:'%d'.", errCode);
559 }
560
561 for (auto event : polling) {
562 if (event != nullptr) {
563 event->KillAndDecObjRef(event);
564 }
565 }
566 }
567
OnKillLoop()568 void EventLoopImpl::OnKillLoop()
569 {
570 bool started = true;
571 if (IsInLoopThread(started)) {
572 // Loop object is set to state: killed,
573 // everything will be done in loop.Run()
574 return;
575 }
576
577 if (started) {
578 // Ditto
579 WakeUp();
580 } else {
581 // Drop the lock.
582 UnlockObj();
583 CleanLoop();
584 LockObj();
585 }
586 }
587 }
588