• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_loop_impl.h"
17 
18 #include <ctime>
19 
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "event_impl.h"
23 
24 namespace DistributedDB {
25 class EventRequest {
26 public:
27     enum {
28         ADD_EVENT = 1,
29         REMOVE_EVENT,
30         SET_TIMEOUT,
31         MOD_EVENTS_ADD,
32         MOD_EVENTS_REMOVE,
33     };
34 
EventRequest(int type,EventImpl * event,EventsMask events)35     EventRequest(int type, EventImpl *event, EventsMask events)
36         : type_(type),
37           event_(event),
38           events_(events),
39           timeout_(0)
40     {
41         if (event != nullptr) {
42             event->IncObjRef(event);
43         }
44     }
45 
EventRequest(int type,EventImpl * event,EventTime timeout)46     EventRequest(int type, EventImpl *event, EventTime timeout)
47         : type_(type),
48           event_(event),
49           events_(0),
50           timeout_(timeout)
51     {
52         if (event != nullptr) {
53             event->IncObjRef(event);
54         }
55     }
56 
~EventRequest()57     ~EventRequest()
58     {
59         if (event_ != nullptr) {
60             event_->DecObjRef(event_);
61             event_ = nullptr;
62         }
63     }
64 
IsValidType(int type)65     static bool IsValidType(int type)
66     {
67         if (type < ADD_EVENT || type > MOD_EVENTS_REMOVE) {
68             return false;
69         }
70         return true;
71     }
72 
GetType() const73     int GetType() const
74     {
75         return type_;
76     }
77 
GetEvent(EventImpl * & event) const78     void GetEvent(EventImpl *&event) const
79     {
80         event = event_;
81     }
82 
GetEvents() const83     EventsMask GetEvents() const
84     {
85         return events_;
86     }
87 
GetTimeout() const88     EventTime GetTimeout() const
89     {
90         return timeout_;
91     }
92 
93 private:
94     int type_;
95     EventImpl *event_;
96     EventsMask events_;
97     EventTime timeout_;
98 };
99 
EventLoopImpl()100 EventLoopImpl::EventLoopImpl()
101     : pollingSetChanged_(false)
102 {
103     OnKill([this](){ OnKillLoop(); });
104 }
105 
~EventLoopImpl()106 EventLoopImpl::~EventLoopImpl()
107 {}
108 
Add(IEvent * event)109 int EventLoopImpl::Add(IEvent *event)
110 {
111     if (event == nullptr) {
112         return -E_INVALID_ARGS;
113     }
114 
115     auto eventImpl = static_cast<EventImpl *>(event);
116     if (!eventImpl->SetLoop(this)) {
117         LOGE("Add ev to loop failed, already attached.");
118         return -E_INVALID_ARGS;
119     }
120 
121     EventTime timeout = 0;
122     int errCode = QueueRequest(EventRequest::ADD_EVENT, eventImpl, timeout);
123     if (errCode != E_OK) {
124         eventImpl->SetLoop(nullptr);
125         LOGE("Add ev to loop failed. err: '%d'.", errCode);
126     }
127     return errCode;
128 }
129 
Remove(IEvent * event)130 int EventLoopImpl::Remove(IEvent *event)
131 {
132     if (event == nullptr) {
133         return -E_INVALID_ARGS;
134     }
135 
136     auto eventImpl = static_cast<EventImpl *>(event);
137     bool isLoopConfused = false;
138     if (!eventImpl->Attached(this, isLoopConfused)) {
139         if (isLoopConfused) {
140             LOGE("Remove ev' from loop failed, loop confused.");
141             return -E_UNEXPECTED_DATA;
142         }
143         return E_OK;
144     }
145 
146     EventTime timeout = 0;
147     int errCode = QueueRequest(EventRequest::REMOVE_EVENT, eventImpl, timeout);
148     if (errCode != E_OK) {
149         LOGE("Remove ev from loop failed. err: '%d'.", errCode);
150     }
151     return errCode;
152 }
153 
Run()154 int EventLoopImpl::Run()
155 {
156     {
157         RefObject::AutoLock lockGuard(this);
158         if (IsKilled()) {
159             LOGE("Try to run a killed loop.");
160             return -E_OBJ_IS_KILLED;
161         }
162         if (loopThread_ != std::thread::id()) {
163             LOGE("Try to run a threaded loop.");
164             return -E_BUSY;
165         }
166         loopThread_ = std::this_thread::get_id();
167     }
168 
169     int errCode;
170     IncObjRef(this);
171 
172     while (true) {
173         errCode = ProcessRequest();
174         if (errCode != E_OK) {
175             break;
176         }
177 
178         errCode = Prepare(polling_);
179         if (errCode != E_OK) {
180             break;
181         }
182 
183         EventTime sleepTime = CalSleepTime();
184         errCode = Poll(sleepTime);
185         if (errCode != E_OK) {
186             break;
187         }
188 
189         errCode = ProcessRequest();
190         if (errCode != E_OK) {
191             break;
192         }
193 
194         errCode = DispatchAll();
195         if (errCode != E_OK) {
196             break;
197         }
198     }
199 
200     CleanLoop();
201     DecObjRef(this);
202     if (errCode == -E_OBJ_IS_KILLED) {
203         LOGD("Loop exited.");
204     } else {
205         LOGE("Loop exited, err:'%d'.", errCode);
206     }
207     return errCode;
208 }
209 
Modify(EventImpl * event,bool isAdd,EventsMask events)210 int EventLoopImpl::Modify(EventImpl *event, bool isAdd, EventsMask events)
211 {
212     if (event == nullptr) {
213         return -E_INVALID_ARGS;
214     }
215 
216     int type = isAdd ? EventRequest::MOD_EVENTS_ADD :
217         EventRequest::MOD_EVENTS_REMOVE;
218     int errCode = QueueRequest(type, event, events);
219     if (errCode != E_OK) {
220         LOGE("Modify loop ev events failed. err: '%d'.", errCode);
221     }
222     return errCode;
223 }
224 
Modify(EventImpl * event,EventTime time)225 int EventLoopImpl::Modify(EventImpl *event, EventTime time)
226 {
227     if (event == nullptr) {
228         return -E_INVALID_ARGS;
229     }
230 
231     int errCode = QueueRequest(EventRequest::SET_TIMEOUT, event, time);
232     if (errCode != E_OK) {
233         LOGE("Mod loop ev time failed. err: '%d'.", errCode);
234     }
235     return errCode;
236 }
237 
GetTime() const238 EventTime EventLoopImpl::GetTime() const
239 {
240     uint64_t microsecond = 0;
241     OS::GetMonotonicRelativeTimeInMicrosecond(microsecond); // It is not very possible to fail, if so use 0 as default
242     return static_cast<EventTime>(microsecond / 1000); // 1000 is the multiple between microsecond and millisecond
243 }
244 
SendRequestToLoop(EventRequest * eventRequest)245 int EventLoopImpl::SendRequestToLoop(EventRequest *eventRequest)
246 {
247     if (eventRequest == nullptr) {
248         return -E_INVALID_ARGS;
249     }
250 
251     RefObject::AutoLock lockGuard(this);
252     if (IsKilled()) {
253         return -E_OBJ_IS_KILLED;
254     }
255     requests_.push_back(eventRequest);
256     WakeUp();
257     return E_OK;
258 }
259 
260 template<typename T>
QueueRequest(int type,EventImpl * event,T argument)261 int EventLoopImpl::QueueRequest(int type, EventImpl *event, T argument)
262 {
263     if (!EventRequest::IsValidType(type)) {
264         return -E_INVALID_ARGS;
265     }
266     if (event == nullptr ||
267         !event->IsValidArg(argument)) {
268         return -E_INVALID_ARGS;
269     }
270 
271     if (IsKilled()) { // pre-check
272         return -E_OBJ_IS_KILLED;
273     }
274 
275     int errCode;
276     if (event != nullptr) {
277         errCode = event->CheckStatus();
278         if (errCode != E_OK) {
279             if (errCode != -E_OBJ_IS_KILLED ||
280                 type != EventRequest::REMOVE_EVENT) {
281                 return errCode;
282             }
283         }
284     }
285 
286     auto eventRequest = new (std::nothrow) EventRequest(type, event, argument);
287     if (eventRequest == nullptr) {
288         return -E_OUT_OF_MEMORY;
289     }
290 
291     errCode = SendRequestToLoop(eventRequest);
292     if (errCode != E_OK) {
293         delete eventRequest;
294         eventRequest = nullptr;
295     }
296     return errCode;
297 }
298 
IsInLoopThread(bool & started) const299 bool EventLoopImpl::IsInLoopThread(bool &started) const
300 {
301     if (loopThread_ == std::thread::id()) {
302         started = false;
303     } else {
304         started = true;
305     }
306     return std::this_thread::get_id() == loopThread_;
307 }
308 
EventObjectExists(EventImpl * event) const309 bool EventLoopImpl::EventObjectExists(EventImpl *event) const
310 {
311     auto it = polling_.find(event);
312     if (it != polling_.end()) {
313         return true;
314     }
315     return false;
316 }
317 
EventFdExists(const EventImpl * event) const318 bool EventLoopImpl::EventFdExists(const EventImpl *event) const
319 {
320     if (!event->IsValidFd()) {
321         return false;
322     }
323     for (auto ev : polling_) {
324         if (ev->GetEventFd() == event->GetEventFd()) {
325             return true;
326         }
327     }
328     return false;
329 }
330 
AddEventObject(EventImpl * event,EventTime now)331 int EventLoopImpl::AddEventObject(EventImpl *event, EventTime now)
332 {
333     if (event == nullptr) {
334         return -E_INVALID_ARGS;
335     }
336     if (EventObjectExists(event)) {
337         LOGE("Add event object failed. ev already exists.");
338         return -EEXIST;
339     }
340     if (EventFdExists(event)) {
341         LOGE("Add event object failed. ev fd already exists.");
342         return -EEXIST;
343     }
344 
345     int errCode = E_OK;
346     if (!event->IsTimer()) {
347         errCode = AddEvent(event);
348     }
349 
350     if (errCode == E_OK) {
351         polling_.insert(event);
352         event->SetStartTime(now);
353         event->SetRevents(0);
354         event->IncObjRef(event);
355         pollingSetChanged_ = true;
356     } else {
357         LOGE("Add event failed. err: '%d'.", errCode);
358     }
359     return errCode;
360 }
361 
RemoveEventObject(EventImpl * event)362 int EventLoopImpl::RemoveEventObject(EventImpl *event)
363 {
364     if (event == nullptr) {
365         return -E_INVALID_ARGS;
366     }
367     if (!EventObjectExists(event)) {
368         return -E_NO_SUCH_ENTRY;
369     }
370 
371     int errCode = E_OK;
372     if (!event->IsTimer()) {
373         errCode = RemoveEvent(event);
374     }
375 
376     if (errCode == E_OK) {
377         polling_.erase(event);
378         event->SetLoop(nullptr);
379         event->DecObjRef(event);
380         pollingSetChanged_ = true;
381     } else {
382         LOGE("Remove event failed. err: '%d'.", errCode);
383     }
384     return errCode;
385 }
386 
ModifyEventObject(EventImpl * event,bool isAdd,EventsMask events)387 int EventLoopImpl::ModifyEventObject(EventImpl *event, bool isAdd, EventsMask events)
388 {
389     if (event == nullptr) {
390         return -E_INVALID_ARGS;
391     }
392     if (!EventObjectExists(event)) {
393         return -EEXIST;
394     }
395 
396     int errCode = E_OK;
397     if (!event->IsTimer()) {
398         EventsMask genericEvents = events & (~IEvent::ET_TIMEOUT);
399         if (genericEvents) {
400             errCode = ModifyEvent(event, isAdd, genericEvents);
401         }
402     }
403 
404     if (errCode == E_OK) {
405         event->SetEvents(isAdd, events);
406     } else {
407         LOGE("Modify event' failed. err: '%d'.", errCode);
408     }
409     return errCode;
410 }
411 
ModifyEventObject(EventImpl * event,EventTime timeout)412 int EventLoopImpl::ModifyEventObject(EventImpl *event, EventTime timeout)
413 {
414     if (event == nullptr) {
415         return -E_INVALID_ARGS;
416     }
417     if (!EventObjectExists(event)) {
418         return -E_NO_SUCH_ENTRY;
419     }
420     event->SetTimeoutPeriod(timeout);
421     return E_OK;
422 }
423 
ProcessRequest(std::list<EventRequest * > & requests)424 void EventLoopImpl::ProcessRequest(std::list<EventRequest *> &requests)
425 {
426     EventTime now = GetTime();
427     while (true) {
428         if (requests.empty()) {
429             break;
430         }
431 
432         EventRequest *request = requests.front();
433         requests.pop_front();
434         if (request == nullptr) {
435             continue;
436         }
437 
438         if (!IsKilled()) {
439             EventImpl *event = nullptr;
440             request->GetEvent(event);
441             EventsMask events = request->GetEvents();
442             EventTime timeout = request->GetTimeout();
443 
444             switch (request->GetType()) {
445                 case EventRequest::ADD_EVENT:
446                     (void)(AddEventObject(event, now));
447                     break;
448 
449                 case EventRequest::REMOVE_EVENT:
450                     (void)(RemoveEventObject(event));
451                     break;
452 
453                 case EventRequest::MOD_EVENTS_ADD:
454                     (void)(ModifyEventObject(event, true, events));
455                     break;
456 
457                 case EventRequest::MOD_EVENTS_REMOVE:
458                     (void)(ModifyEventObject(event, false, events));
459                     break;
460 
461                 case EventRequest::SET_TIMEOUT:
462                     (void)(ModifyEventObject(event, timeout));
463                     break;
464 
465                 default:
466                     break;
467             }
468         }
469 
470         delete request;
471         request = nullptr;
472     }
473 }
474 
ProcessRequest()475 int EventLoopImpl::ProcessRequest()
476 {
477     int errCode = E_OK;
478     std::list<EventRequest *> requests;
479     {
480         RefObject::AutoLock lockGuard(this);
481         if (IsKilled()) {
482             errCode = -E_OBJ_IS_KILLED;
483         }
484         if (requests_.empty()) {
485             return errCode;
486         }
487         std::swap(requests, requests_);
488     }
489 
490     ProcessRequest(requests);
491     return errCode;
492 }
493 
CalSleepTime() const494 EventTime EventLoopImpl::CalSleepTime() const
495 {
496     EventTime now = GetTime();
497     EventTime minInterval = EventImpl::MAX_TIME_VALUE;
498 
499     for (auto event : polling_) {
500         if (event == nullptr) {
501             continue;
502         }
503 
504         EventTime t;
505         bool valid = event->GetTimeoutPoint(t);
506         if (!valid) {
507             continue;
508         }
509 
510         if (t <= now) {
511             return 0;
512         }
513 
514         EventTime interval = t - now;
515         if (interval < minInterval) {
516             minInterval = interval;
517         }
518     }
519 
520     return minInterval;
521 }
522 
DispatchAll()523 int EventLoopImpl::DispatchAll()
524 {
525     do {
526         EventTime now = GetTime();
527         pollingSetChanged_ = false;
528 
529         for (auto event : polling_) {
530             if (IsKilled()) {
531                 return -E_OBJ_IS_KILLED;
532             }
533             if (event == nullptr) {
534                 continue;
535             }
536 
537             event->IncObjRef(event);
538             event->UpdateElapsedTime(now);
539             int errCode = event->Dispatch();
540             if (errCode != E_OK) {
541                 RemoveEventObject(event);
542             } else {
543                 event->SetRevents(0);
544             }
545             event->DecObjRef(event);
546 
547             if (pollingSetChanged_) {
548                 break;
549             }
550         }
551     } while (pollingSetChanged_);
552     return E_OK;
553 }
554 
CleanLoop()555 void EventLoopImpl::CleanLoop()
556 {
557     if (!IsKilled()) {
558         return;
559     }
560 
561     ProcessRequest();
562     std::set<EventImpl *> polling = std::move(polling_);
563     int errCode = Exit(polling);
564     if (errCode != E_OK) {
565         LOGE("Exit loop failed when cleanup, err:'%d'.", errCode);
566     }
567 
568     for (auto event : polling) {
569         if (event != nullptr) {
570             event->KillAndDecObjRef(event);
571         }
572     }
573 }
574 
OnKillLoop()575 void EventLoopImpl::OnKillLoop()
576 {
577     bool started = true;
578     if (IsInLoopThread(started)) {
579         // Loop object is set to state: killed,
580         // everything will be done in loop.Run()
581         return;
582     }
583 
584     if (started) {
585         // Ditto
586         WakeUp();
587     } else {
588         // Drop the lock.
589         UnlockObj();
590         CleanLoop();
591         LockObj();
592     }
593 }
594 }
595