1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_loop_impl.h"
17
18 #include <ctime>
19
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "event_impl.h"
23
24 namespace DistributedDB {
25 class EventRequest {
26 public:
27 enum {
28 ADD_EVENT = 1,
29 REMOVE_EVENT,
30 SET_TIMEOUT,
31 MOD_EVENTS_ADD,
32 MOD_EVENTS_REMOVE,
33 };
34
EventRequest(int type,EventImpl * event,EventsMask events)35 EventRequest(int type, EventImpl *event, EventsMask events)
36 : type_(type),
37 event_(event),
38 events_(events),
39 timeout_(0)
40 {
41 if (event != nullptr) {
42 event->IncObjRef(event);
43 }
44 }
45
EventRequest(int type,EventImpl * event,EventTime timeout)46 EventRequest(int type, EventImpl *event, EventTime timeout)
47 : type_(type),
48 event_(event),
49 events_(0),
50 timeout_(timeout)
51 {
52 if (event != nullptr) {
53 event->IncObjRef(event);
54 }
55 }
56
~EventRequest()57 ~EventRequest()
58 {
59 if (event_ != nullptr) {
60 event_->DecObjRef(event_);
61 event_ = nullptr;
62 }
63 }
64
IsValidType(int type)65 static bool IsValidType(int type)
66 {
67 if (type < ADD_EVENT || type > MOD_EVENTS_REMOVE) {
68 return false;
69 }
70 return true;
71 }
72
GetType() const73 int GetType() const
74 {
75 return type_;
76 }
77
GetEvent(EventImpl * & event) const78 void GetEvent(EventImpl *&event) const
79 {
80 event = event_;
81 }
82
GetEvents() const83 EventsMask GetEvents() const
84 {
85 return events_;
86 }
87
GetTimeout() const88 EventTime GetTimeout() const
89 {
90 return timeout_;
91 }
92
93 private:
94 int type_;
95 EventImpl *event_;
96 EventsMask events_;
97 EventTime timeout_;
98 };
99
EventLoopImpl()100 EventLoopImpl::EventLoopImpl()
101 : pollingSetChanged_(false)
102 {
103 OnKill([this](){ OnKillLoop(); });
104 }
105
~EventLoopImpl()106 EventLoopImpl::~EventLoopImpl()
107 {}
108
Add(IEvent * event)109 int EventLoopImpl::Add(IEvent *event)
110 {
111 if (event == nullptr) {
112 return -E_INVALID_ARGS;
113 }
114
115 auto eventImpl = static_cast<EventImpl *>(event);
116 if (!eventImpl->SetLoop(this)) {
117 LOGE("Add ev to loop failed, already attached.");
118 return -E_INVALID_ARGS;
119 }
120
121 EventTime timeout = 0;
122 int errCode = QueueRequest(EventRequest::ADD_EVENT, eventImpl, timeout);
123 if (errCode != E_OK) {
124 eventImpl->SetLoop(nullptr);
125 LOGE("Add ev to loop failed. err: '%d'.", errCode);
126 }
127 return errCode;
128 }
129
Remove(IEvent * event)130 int EventLoopImpl::Remove(IEvent *event)
131 {
132 if (event == nullptr) {
133 return -E_INVALID_ARGS;
134 }
135
136 auto eventImpl = static_cast<EventImpl *>(event);
137 bool isLoopConfused = false;
138 if (!eventImpl->Attached(this, isLoopConfused)) {
139 if (isLoopConfused) {
140 LOGE("Remove ev' from loop failed, loop confused.");
141 return -E_UNEXPECTED_DATA;
142 }
143 return E_OK;
144 }
145
146 EventTime timeout = 0;
147 int errCode = QueueRequest(EventRequest::REMOVE_EVENT, eventImpl, timeout);
148 if (errCode != E_OK) {
149 LOGE("Remove ev from loop failed. err: '%d'.", errCode);
150 }
151 return errCode;
152 }
153
Run()154 int EventLoopImpl::Run()
155 {
156 {
157 RefObject::AutoLock lockGuard(this);
158 if (IsKilled()) {
159 LOGE("Try to run a killed loop.");
160 return -E_OBJ_IS_KILLED;
161 }
162 if (loopThread_ != std::thread::id()) {
163 LOGE("Try to run a threaded loop.");
164 return -E_BUSY;
165 }
166 loopThread_ = std::this_thread::get_id();
167 }
168
169 int errCode;
170 IncObjRef(this);
171
172 while (true) {
173 errCode = ProcessRequest();
174 if (errCode != E_OK) {
175 break;
176 }
177
178 errCode = Prepare(polling_);
179 if (errCode != E_OK) {
180 break;
181 }
182
183 EventTime sleepTime = CalSleepTime();
184 errCode = Poll(sleepTime);
185 if (errCode != E_OK) {
186 break;
187 }
188
189 errCode = ProcessRequest();
190 if (errCode != E_OK) {
191 break;
192 }
193
194 errCode = DispatchAll();
195 if (errCode != E_OK) {
196 break;
197 }
198 }
199
200 CleanLoop();
201 DecObjRef(this);
202 if (errCode == -E_OBJ_IS_KILLED) {
203 LOGD("Loop exited.");
204 } else {
205 LOGE("Loop exited, err:'%d'.", errCode);
206 }
207 return errCode;
208 }
209
Modify(EventImpl * event,bool isAdd,EventsMask events)210 int EventLoopImpl::Modify(EventImpl *event, bool isAdd, EventsMask events)
211 {
212 if (event == nullptr) {
213 return -E_INVALID_ARGS;
214 }
215
216 int type = isAdd ? EventRequest::MOD_EVENTS_ADD :
217 EventRequest::MOD_EVENTS_REMOVE;
218 int errCode = QueueRequest(type, event, events);
219 if (errCode != E_OK) {
220 LOGE("Modify loop ev events failed. err: '%d'.", errCode);
221 }
222 return errCode;
223 }
224
Modify(EventImpl * event,EventTime time)225 int EventLoopImpl::Modify(EventImpl *event, EventTime time)
226 {
227 if (event == nullptr) {
228 return -E_INVALID_ARGS;
229 }
230
231 int errCode = QueueRequest(EventRequest::SET_TIMEOUT, event, time);
232 if (errCode != E_OK) {
233 LOGE("Mod loop ev time failed. err: '%d'.", errCode);
234 }
235 return errCode;
236 }
237
GetTime() const238 EventTime EventLoopImpl::GetTime() const
239 {
240 uint64_t microsecond = 0;
241 OS::GetMonotonicRelativeTimeInMicrosecond(microsecond); // It is not very possible to fail, if so use 0 as default
242 return static_cast<EventTime>(microsecond / 1000); // 1000 is the multiple between microsecond and millisecond
243 }
244
SendRequestToLoop(EventRequest * eventRequest)245 int EventLoopImpl::SendRequestToLoop(EventRequest *eventRequest)
246 {
247 if (eventRequest == nullptr) {
248 return -E_INVALID_ARGS;
249 }
250
251 RefObject::AutoLock lockGuard(this);
252 if (IsKilled()) {
253 return -E_OBJ_IS_KILLED;
254 }
255 requests_.push_back(eventRequest);
256 WakeUp();
257 return E_OK;
258 }
259
260 template<typename T>
QueueRequest(int type,EventImpl * event,T argument)261 int EventLoopImpl::QueueRequest(int type, EventImpl *event, T argument)
262 {
263 if (!EventRequest::IsValidType(type)) {
264 return -E_INVALID_ARGS;
265 }
266 if (event == nullptr ||
267 !event->IsValidArg(argument)) {
268 return -E_INVALID_ARGS;
269 }
270
271 if (IsKilled()) { // pre-check
272 return -E_OBJ_IS_KILLED;
273 }
274
275 int errCode;
276 if (event != nullptr) {
277 errCode = event->CheckStatus();
278 if (errCode != E_OK) {
279 if (errCode != -E_OBJ_IS_KILLED ||
280 type != EventRequest::REMOVE_EVENT) {
281 return errCode;
282 }
283 }
284 }
285
286 auto eventRequest = new (std::nothrow) EventRequest(type, event, argument);
287 if (eventRequest == nullptr) {
288 return -E_OUT_OF_MEMORY;
289 }
290
291 errCode = SendRequestToLoop(eventRequest);
292 if (errCode != E_OK) {
293 delete eventRequest;
294 eventRequest = nullptr;
295 }
296 return errCode;
297 }
298
IsInLoopThread(bool & started) const299 bool EventLoopImpl::IsInLoopThread(bool &started) const
300 {
301 if (loopThread_ == std::thread::id()) {
302 started = false;
303 } else {
304 started = true;
305 }
306 return std::this_thread::get_id() == loopThread_;
307 }
308
EventObjectExists(EventImpl * event) const309 bool EventLoopImpl::EventObjectExists(EventImpl *event) const
310 {
311 auto it = polling_.find(event);
312 if (it != polling_.end()) {
313 return true;
314 }
315 return false;
316 }
317
EventFdExists(const EventImpl * event) const318 bool EventLoopImpl::EventFdExists(const EventImpl *event) const
319 {
320 if (!event->IsValidFd()) {
321 return false;
322 }
323 for (auto ev : polling_) {
324 if (ev->GetEventFd() == event->GetEventFd()) {
325 return true;
326 }
327 }
328 return false;
329 }
330
AddEventObject(EventImpl * event,EventTime now)331 int EventLoopImpl::AddEventObject(EventImpl *event, EventTime now)
332 {
333 if (event == nullptr) {
334 return -E_INVALID_ARGS;
335 }
336 if (EventObjectExists(event)) {
337 LOGE("Add event object failed. ev already exists.");
338 return -EEXIST;
339 }
340 if (EventFdExists(event)) {
341 LOGE("Add event object failed. ev fd already exists.");
342 return -EEXIST;
343 }
344
345 int errCode = E_OK;
346 if (!event->IsTimer()) {
347 errCode = AddEvent(event);
348 }
349
350 if (errCode == E_OK) {
351 polling_.insert(event);
352 event->SetStartTime(now);
353 event->SetRevents(0);
354 event->IncObjRef(event);
355 pollingSetChanged_ = true;
356 } else {
357 LOGE("Add event failed. err: '%d'.", errCode);
358 }
359 return errCode;
360 }
361
RemoveEventObject(EventImpl * event)362 int EventLoopImpl::RemoveEventObject(EventImpl *event)
363 {
364 if (event == nullptr) {
365 return -E_INVALID_ARGS;
366 }
367 if (!EventObjectExists(event)) {
368 return -E_NO_SUCH_ENTRY;
369 }
370
371 int errCode = E_OK;
372 if (!event->IsTimer()) {
373 errCode = RemoveEvent(event);
374 }
375
376 if (errCode == E_OK) {
377 polling_.erase(event);
378 event->SetLoop(nullptr);
379 event->DecObjRef(event);
380 pollingSetChanged_ = true;
381 } else {
382 LOGE("Remove event failed. err: '%d'.", errCode);
383 }
384 return errCode;
385 }
386
ModifyEventObject(EventImpl * event,bool isAdd,EventsMask events)387 int EventLoopImpl::ModifyEventObject(EventImpl *event, bool isAdd, EventsMask events)
388 {
389 if (event == nullptr) {
390 return -E_INVALID_ARGS;
391 }
392 if (!EventObjectExists(event)) {
393 return -EEXIST;
394 }
395
396 int errCode = E_OK;
397 if (!event->IsTimer()) {
398 EventsMask genericEvents = events & (~IEvent::ET_TIMEOUT);
399 if (genericEvents) {
400 errCode = ModifyEvent(event, isAdd, genericEvents);
401 }
402 }
403
404 if (errCode == E_OK) {
405 event->SetEvents(isAdd, events);
406 } else {
407 LOGE("Modify event' failed. err: '%d'.", errCode);
408 }
409 return errCode;
410 }
411
ModifyEventObject(EventImpl * event,EventTime timeout)412 int EventLoopImpl::ModifyEventObject(EventImpl *event, EventTime timeout)
413 {
414 if (event == nullptr) {
415 return -E_INVALID_ARGS;
416 }
417 if (!EventObjectExists(event)) {
418 return -E_NO_SUCH_ENTRY;
419 }
420 event->SetTimeoutPeriod(timeout);
421 return E_OK;
422 }
423
ProcessRequest(std::list<EventRequest * > & requests)424 void EventLoopImpl::ProcessRequest(std::list<EventRequest *> &requests)
425 {
426 EventTime now = GetTime();
427 while (true) {
428 if (requests.empty()) {
429 break;
430 }
431
432 EventRequest *request = requests.front();
433 requests.pop_front();
434 if (request == nullptr) {
435 continue;
436 }
437
438 if (!IsKilled()) {
439 EventImpl *event = nullptr;
440 request->GetEvent(event);
441 EventsMask events = request->GetEvents();
442 EventTime timeout = request->GetTimeout();
443
444 switch (request->GetType()) {
445 case EventRequest::ADD_EVENT:
446 (void)(AddEventObject(event, now));
447 break;
448
449 case EventRequest::REMOVE_EVENT:
450 (void)(RemoveEventObject(event));
451 break;
452
453 case EventRequest::MOD_EVENTS_ADD:
454 (void)(ModifyEventObject(event, true, events));
455 break;
456
457 case EventRequest::MOD_EVENTS_REMOVE:
458 (void)(ModifyEventObject(event, false, events));
459 break;
460
461 case EventRequest::SET_TIMEOUT:
462 (void)(ModifyEventObject(event, timeout));
463 break;
464
465 default:
466 break;
467 }
468 }
469
470 delete request;
471 request = nullptr;
472 }
473 }
474
ProcessRequest()475 int EventLoopImpl::ProcessRequest()
476 {
477 int errCode = E_OK;
478 std::list<EventRequest *> requests;
479 {
480 RefObject::AutoLock lockGuard(this);
481 if (IsKilled()) {
482 errCode = -E_OBJ_IS_KILLED;
483 }
484 if (requests_.empty()) {
485 return errCode;
486 }
487 std::swap(requests, requests_);
488 }
489
490 ProcessRequest(requests);
491 return errCode;
492 }
493
CalSleepTime() const494 EventTime EventLoopImpl::CalSleepTime() const
495 {
496 EventTime now = GetTime();
497 EventTime minInterval = EventImpl::MAX_TIME_VALUE;
498
499 for (auto event : polling_) {
500 if (event == nullptr) {
501 continue;
502 }
503
504 EventTime t;
505 bool valid = event->GetTimeoutPoint(t);
506 if (!valid) {
507 continue;
508 }
509
510 if (t <= now) {
511 return 0;
512 }
513
514 EventTime interval = t - now;
515 if (interval < minInterval) {
516 minInterval = interval;
517 }
518 }
519
520 return minInterval;
521 }
522
DispatchAll()523 int EventLoopImpl::DispatchAll()
524 {
525 do {
526 EventTime now = GetTime();
527 pollingSetChanged_ = false;
528
529 for (auto event : polling_) {
530 if (IsKilled()) {
531 return -E_OBJ_IS_KILLED;
532 }
533 if (event == nullptr) {
534 continue;
535 }
536
537 event->IncObjRef(event);
538 event->UpdateElapsedTime(now);
539 int errCode = event->Dispatch();
540 if (errCode != E_OK) {
541 RemoveEventObject(event);
542 } else {
543 event->SetRevents(0);
544 }
545 event->DecObjRef(event);
546
547 if (pollingSetChanged_) {
548 break;
549 }
550 }
551 } while (pollingSetChanged_);
552 return E_OK;
553 }
554
CleanLoop()555 void EventLoopImpl::CleanLoop()
556 {
557 if (!IsKilled()) {
558 return;
559 }
560
561 ProcessRequest();
562 std::set<EventImpl *> polling = std::move(polling_);
563 int errCode = Exit(polling);
564 if (errCode != E_OK) {
565 LOGE("Exit loop failed when cleanup, err:'%d'.", errCode);
566 }
567
568 for (auto event : polling) {
569 if (event != nullptr) {
570 event->KillAndDecObjRef(event);
571 }
572 }
573 }
574
OnKillLoop()575 void EventLoopImpl::OnKillLoop()
576 {
577 bool started = true;
578 if (IsInLoopThread(started)) {
579 // Loop object is set to state: killed,
580 // everything will be done in loop.Run()
581 return;
582 }
583
584 if (started) {
585 // Ditto
586 WakeUp();
587 } else {
588 // Drop the lock.
589 UnlockObj();
590 CleanLoop();
591 LockObj();
592 }
593 }
594 }
595