• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_loop_impl.h"
17 
18 #include <ctime>
19 
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "event_impl.h"
23 
24 namespace DistributedDB {
25 class EventRequest {
26 public:
27     enum {
28         ADD_EVENT = 1,
29         REMOVE_EVENT,
30         SET_TIMEOUT,
31         MOD_EVENTS_ADD,
32         MOD_EVENTS_REMOVE,
33     };
34 
EventRequest(int type,EventImpl * event,EventsMask events)35     EventRequest(int type, EventImpl *event, EventsMask events)
36         : type_(type),
37           event_(event),
38           events_(events),
39           timeout_(0)
40     {
41         if (event != nullptr) {
42             event->IncObjRef(event);
43         }
44     }
45 
EventRequest(int type,EventImpl * event,EventTime timeout)46     EventRequest(int type, EventImpl *event, EventTime timeout)
47         : type_(type),
48           event_(event),
49           events_(0),
50           timeout_(timeout)
51     {
52         if (event != nullptr) {
53             event->IncObjRef(event);
54         }
55     }
56 
~EventRequest()57     ~EventRequest()
58     {
59         if (event_ != nullptr) {
60             event_->DecObjRef(event_);
61             event_ = nullptr;
62         }
63     }
64 
IsValidType(int type)65     static bool IsValidType(int type)
66     {
67         if (type < ADD_EVENT || type > MOD_EVENTS_REMOVE) {
68             return false;
69         }
70         return true;
71     }
72 
GetType() const73     int GetType() const
74     {
75         return type_;
76     }
77 
GetEvent(EventImpl * & event) const78     void GetEvent(EventImpl *&event) const
79     {
80         event = event_;
81     }
82 
GetEvents() const83     EventsMask GetEvents() const
84     {
85         return events_;
86     }
87 
GetTimeout() const88     EventTime GetTimeout() const
89     {
90         return timeout_;
91     }
92 
93 private:
94     int type_;
95     EventImpl *event_;
96     EventsMask events_;
97     EventTime timeout_;
98 };
99 
EventLoopImpl()100 EventLoopImpl::EventLoopImpl()
101     : pollingSetChanged_(false)
102 {
103     OnKill([this](){ OnKillLoop(); });
104 }
105 
~EventLoopImpl()106 EventLoopImpl::~EventLoopImpl()
107 {}
108 
Add(IEvent * event)109 int EventLoopImpl::Add(IEvent *event)
110 {
111     if (event == nullptr) {
112         return -E_INVALID_ARGS;
113     }
114 
115     auto eventImpl = static_cast<EventImpl *>(event);
116     if (!eventImpl->SetLoop(this)) {
117         LOGE("Add ev to loop failed, already attached.");
118         return -E_INVALID_ARGS;
119     }
120 
121     EventTime timeout = 0;
122     int errCode = QueueRequest(EventRequest::ADD_EVENT, eventImpl, timeout);
123     if (errCode != E_OK) {
124         eventImpl->SetLoop(nullptr);
125         LOGE("Add ev to loop failed. err: '%d'.", errCode);
126     }
127     return errCode;
128 }
129 
Remove(IEvent * event)130 int EventLoopImpl::Remove(IEvent *event)
131 {
132     if (event == nullptr) {
133         return -E_INVALID_ARGS;
134     }
135 
136     auto eventImpl = static_cast<EventImpl *>(event);
137     bool isLoopConfused = false;
138     if (!eventImpl->Attached(this, isLoopConfused)) {
139         if (isLoopConfused) {
140             LOGE("Remove ev' from loop failed, loop confused.");
141             return -E_UNEXPECTED_DATA;
142         }
143         return E_OK;
144     }
145 
146     EventTime timeout = 0;
147     int errCode = QueueRequest(EventRequest::REMOVE_EVENT, eventImpl, timeout);
148     if (errCode != E_OK) {
149         LOGE("Remove ev from loop failed. err: '%d'.", errCode);
150     }
151     return errCode;
152 }
153 
Run()154 int EventLoopImpl::Run()
155 {
156     {
157         RefObject::AutoLock lockGuard(this);
158         if (IsKilled()) {
159             LOGE("Try to run a killed loop.");
160             return -E_OBJ_IS_KILLED;
161         }
162         if (loopThread_ != std::thread::id()) {
163             LOGE("Try to run a threaded loop.");
164             return -E_BUSY;
165         }
166         loopThread_ = std::this_thread::get_id();
167     }
168 
169     int errCode = E_OK;
170     IncObjRef(this);
171 
172     while (true) {
173         errCode = ProcessRequest();
174         if (errCode != E_OK) {
175             break;
176         }
177 
178         errCode = Prepare(polling_);
179         if (errCode != E_OK) {
180             break;
181         }
182 
183         EventTime sleepTime = CalSleepTime();
184         errCode = Poll(sleepTime);
185         if (errCode != E_OK) {
186             break;
187         }
188 
189         errCode = ProcessRequest();
190         if (errCode != E_OK) {
191             break;
192         }
193 
194         errCode = DispatchAll();
195         if (errCode != E_OK) {
196             break;
197         }
198     }
199 
200     CleanLoop();
201     DecObjRef(this);
202     if (errCode == -E_OBJ_IS_KILLED) {
203         LOGD("Loop exited.");
204     } else {
205         LOGE("Loop exited, err:'%d'.", errCode);
206     }
207     return errCode;
208 }
209 
Modify(EventImpl * event,bool isAdd,EventsMask events)210 int EventLoopImpl::Modify(EventImpl *event, bool isAdd, EventsMask events)
211 {
212     if (event == nullptr) {
213         return -E_INVALID_ARGS;
214     }
215 
216     int type = isAdd ? EventRequest::MOD_EVENTS_ADD :
217         EventRequest::MOD_EVENTS_REMOVE;
218     int errCode = QueueRequest(type, event, events);
219     if (errCode != E_OK) {
220         LOGE("Modify loop ev events failed. err: '%d'.", errCode);
221     }
222     return errCode;
223 }
224 
Modify(EventImpl * event,EventTime time)225 int EventLoopImpl::Modify(EventImpl *event, EventTime time)
226 {
227     if (event == nullptr) {
228         return -E_INVALID_ARGS;
229     }
230 
231     int errCode = QueueRequest(EventRequest::SET_TIMEOUT, event, time);
232     if (errCode != E_OK) {
233         LOGE("Mod loop ev time failed. err: '%d'.", errCode);
234     }
235     return errCode;
236 }
237 
GetTime() const238 EventTime EventLoopImpl::GetTime() const
239 {
240     uint64_t microsecond = 0;
241     OS::GetMonotonicRelativeTimeInMicrosecond(microsecond); // It is not very possible to fail, if so use 0 as default
242     return static_cast<EventTime>(microsecond / 1000); // 1000 is the multiple between microsecond and millisecond
243 }
244 
SendRequestToLoop(EventRequest * eventRequest)245 int EventLoopImpl::SendRequestToLoop(EventRequest *eventRequest)
246 {
247     if (eventRequest == nullptr) {
248         return -E_INVALID_ARGS;
249     }
250 
251     RefObject::AutoLock lockGuard(this);
252     if (IsKilled()) {
253         return -E_OBJ_IS_KILLED;
254     }
255     requests_.push_back(eventRequest);
256     WakeUp();
257     return E_OK;
258 }
259 
260 template<typename T>
QueueRequest(int type,EventImpl * event,T argument)261 int EventLoopImpl::QueueRequest(int type, EventImpl *event, T argument)
262 {
263     if (!EventRequest::IsValidType(type)) {
264         return -E_INVALID_ARGS;
265     }
266     if (event == nullptr ||
267         !event->IsValidArg(argument)) {
268         return -E_INVALID_ARGS;
269     }
270 
271     if (IsKilled()) { // pre-check
272         return -E_OBJ_IS_KILLED;
273     }
274 
275     int errCode = event->CheckStatus();
276     if (errCode != E_OK) {
277         if (errCode != -E_OBJ_IS_KILLED ||
278             type != EventRequest::REMOVE_EVENT) {
279             return errCode;
280         }
281     }
282 
283     auto eventRequest = new (std::nothrow) EventRequest(type, event, argument);
284     if (eventRequest == nullptr) {
285         return -E_OUT_OF_MEMORY;
286     }
287 
288     errCode = SendRequestToLoop(eventRequest);
289     if (errCode != E_OK) {
290         delete eventRequest;
291         eventRequest = nullptr;
292     }
293     return errCode;
294 }
295 
IsInLoopThread(bool & started) const296 bool EventLoopImpl::IsInLoopThread(bool &started) const
297 {
298     if (loopThread_ == std::thread::id()) {
299         started = false;
300     } else {
301         started = true;
302     }
303     return std::this_thread::get_id() == loopThread_;
304 }
305 
EventObjectExists(EventImpl * event) const306 bool EventLoopImpl::EventObjectExists(EventImpl *event) const
307 {
308     return polling_.find(event) != polling_.end();
309 }
310 
EventFdExists(const EventImpl * event) const311 bool EventLoopImpl::EventFdExists(const EventImpl *event) const
312 {
313     if (!event->IsValidFd()) {
314         return false;
315     }
316     for (auto ev : polling_) {
317         if (ev->GetEventFd() == event->GetEventFd()) {
318             return true;
319         }
320     }
321     return false;
322 }
323 
AddEventObject(EventImpl * event,EventTime now)324 int EventLoopImpl::AddEventObject(EventImpl *event, EventTime now)
325 {
326     if (event == nullptr) {
327         return -E_INVALID_ARGS;
328     }
329     if (EventObjectExists(event)) {
330         LOGE("Add event object failed. ev already exists.");
331         return -EEXIST;
332     }
333     if (EventFdExists(event)) {
334         LOGE("Add event object failed. ev fd already exists.");
335         return -EEXIST;
336     }
337 
338     int errCode = E_OK;
339     if (!event->IsTimer()) {
340         errCode = AddEvent(event);
341     }
342 
343     if (errCode == E_OK) {
344         polling_.insert(event);
345         event->SetStartTime(now);
346         event->SetRevents(0);
347         event->IncObjRef(event);
348         pollingSetChanged_ = true;
349     } else {
350         LOGE("Add event failed. err: '%d'.", errCode);
351     }
352     return errCode;
353 }
354 
RemoveEventObject(EventImpl * event)355 int EventLoopImpl::RemoveEventObject(EventImpl *event)
356 {
357     if (event == nullptr) {
358         return -E_INVALID_ARGS;
359     }
360     if (!EventObjectExists(event)) {
361         return -E_NO_SUCH_ENTRY;
362     }
363 
364     int errCode = E_OK;
365     if (!event->IsTimer()) {
366         errCode = RemoveEvent(event);
367     }
368 
369     if (errCode == E_OK) {
370         polling_.erase(event);
371         event->SetLoop(nullptr);
372         event->DecObjRef(event);
373         pollingSetChanged_ = true;
374     } else {
375         LOGE("Remove event failed. err: '%d'.", errCode);
376     }
377     return errCode;
378 }
379 
ModifyEventObject(EventImpl * event,bool isAdd,EventsMask events)380 int EventLoopImpl::ModifyEventObject(EventImpl *event, bool isAdd, EventsMask events)
381 {
382     if (event == nullptr) {
383         return -E_INVALID_ARGS;
384     }
385     if (!EventObjectExists(event)) {
386         return -EEXIST;
387     }
388 
389     int errCode = E_OK;
390     if (!event->IsTimer()) {
391         EventsMask genericEvents = events & (~IEvent::ET_TIMEOUT);
392         if (genericEvents) {
393             errCode = ModifyEvent(event, isAdd, genericEvents);
394         }
395     }
396 
397     if (errCode == E_OK) {
398         event->SetEvents(isAdd, events);
399     } else {
400         LOGE("Modify event' failed. err: '%d'.", errCode);
401     }
402     return errCode;
403 }
404 
ModifyEventObject(EventImpl * event,EventTime timeout)405 int EventLoopImpl::ModifyEventObject(EventImpl *event, EventTime timeout)
406 {
407     if (event == nullptr) {
408         return -E_INVALID_ARGS;
409     }
410     if (!EventObjectExists(event)) {
411         return -E_NO_SUCH_ENTRY;
412     }
413     event->SetTimeoutPeriod(timeout);
414     return E_OK;
415 }
416 
ProcessRequest(std::list<EventRequest * > & requests)417 void EventLoopImpl::ProcessRequest(std::list<EventRequest *> &requests)
418 {
419     EventTime now = GetTime();
420     while (true) {
421         if (requests.empty()) {
422             break;
423         }
424 
425         EventRequest *request = requests.front();
426         requests.pop_front();
427         if (request == nullptr) {
428             continue;
429         }
430 
431         if (!IsKilled()) {
432             EventImpl *event = nullptr;
433             request->GetEvent(event);
434             EventsMask events = request->GetEvents();
435             EventTime timeout = request->GetTimeout();
436 
437             switch (request->GetType()) {
438                 case EventRequest::ADD_EVENT:
439                     (void)(AddEventObject(event, now));
440                     break;
441 
442                 case EventRequest::REMOVE_EVENT:
443                     (void)(RemoveEventObject(event));
444                     break;
445 
446                 case EventRequest::MOD_EVENTS_ADD:
447                     (void)(ModifyEventObject(event, true, events));
448                     break;
449 
450                 case EventRequest::MOD_EVENTS_REMOVE:
451                     (void)(ModifyEventObject(event, false, events));
452                     break;
453 
454                 case EventRequest::SET_TIMEOUT:
455                     (void)(ModifyEventObject(event, timeout));
456                     break;
457 
458                 default:
459                     break;
460             }
461         }
462 
463         delete request;
464         request = nullptr;
465     }
466 }
467 
ProcessRequest()468 int EventLoopImpl::ProcessRequest()
469 {
470     int errCode = E_OK;
471     std::list<EventRequest *> requests;
472     {
473         RefObject::AutoLock lockGuard(this);
474         if (IsKilled()) {
475             errCode = -E_OBJ_IS_KILLED;
476         }
477         if (requests_.empty()) {
478             return errCode;
479         }
480         std::swap(requests, requests_);
481     }
482 
483     ProcessRequest(requests);
484     return errCode;
485 }
486 
CalSleepTime() const487 EventTime EventLoopImpl::CalSleepTime() const
488 {
489     EventTime now = GetTime();
490     EventTime minInterval = EventImpl::MAX_TIME_VALUE;
491 
492     for (auto event : polling_) {
493         if (event == nullptr) {
494             continue;
495         }
496 
497         EventTime t;
498         bool valid = event->GetTimeoutPoint(t);
499         if (!valid) {
500             continue;
501         }
502 
503         if (t <= now) {
504             return 0;
505         }
506 
507         EventTime interval = t - now;
508         if (interval < minInterval) {
509             minInterval = interval;
510         }
511     }
512 
513     return minInterval;
514 }
515 
DispatchAll()516 int EventLoopImpl::DispatchAll()
517 {
518     do {
519         EventTime now = GetTime();
520         pollingSetChanged_ = false;
521 
522         for (auto event : polling_) {
523             if (IsKilled()) {
524                 return -E_OBJ_IS_KILLED;
525             }
526             if (event == nullptr) {
527                 continue;
528             }
529 
530             event->IncObjRef(event);
531             event->UpdateElapsedTime(now);
532             int errCode = event->Dispatch();
533             if (errCode != E_OK) {
534                 RemoveEventObject(event);
535             } else {
536                 event->SetRevents(0);
537             }
538             event->DecObjRef(event);
539 
540             if (pollingSetChanged_) {
541                 break;
542             }
543         }
544     } while (pollingSetChanged_);
545     return E_OK;
546 }
547 
CleanLoop()548 void EventLoopImpl::CleanLoop()
549 {
550     if (!IsKilled()) {
551         return;
552     }
553 
554     ProcessRequest();
555     std::set<EventImpl *> polling = std::move(polling_);
556     int errCode = Exit(polling);
557     if (errCode != E_OK) {
558         LOGE("Exit loop failed when cleanup, err:'%d'.", errCode);
559     }
560 
561     for (auto event : polling) {
562         if (event != nullptr) {
563             event->KillAndDecObjRef(event);
564         }
565     }
566 }
567 
OnKillLoop()568 void EventLoopImpl::OnKillLoop()
569 {
570     bool started = true;
571     if (IsInLoopThread(started)) {
572         // Loop object is set to state: killed,
573         // everything will be done in loop.Run()
574         return;
575     }
576 
577     if (started) {
578         // Ditto
579         WakeUp();
580     } else {
581         // Drop the lock.
582         UnlockObj();
583         CleanLoop();
584         LockObj();
585     }
586 }
587 }
588