• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H
17 #define OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H
18 
19 #include <condition_variable>
20 #include <memory>
21 #include <set>
22 #include <sys/prctl.h>
23 #include <thread>
24 #include <unordered_map>
25 #include <mutex>
26 
27 #include "event_handler.h"
28 
29 #include "dh_log.h"
30 #include "anonymous_string.h"
31 #include "event.h"
32 #include "eventbus_handler.h"
33 #include "event_registration.h"
34 
35 #ifndef DH_LOG_TAG
36 #define DH_LOG_TAG "DHEventBus"
37 #endif
38 
39 namespace OHOS {
40 namespace DistributedHardware {
41 enum POSTMODE : uint32_t {
42     POST_ASYNC = 0,
43     POST_SYNC,
44 };
45 
46 class EventBus final {
47 public:
EventBus()48     EventBus()
49     {
50         ULOGI("ctor EventBus");
51         if (eventbusHandler_ == nullptr) {
52             eventThread_ = std::thread(&EventBus::StartEvent, this);
53             std::unique_lock<std::mutex> lock(eventMutex_);
54             eventCon_.wait(lock, [this] {
55                 return eventbusHandler_ != nullptr;
56             });
57         }
58     }
59 
EventBus(const std::string & threadName)60     explicit EventBus(const std::string &threadName)
61     {
62         ULOGI("ctor EventBus threadName: %s", threadName.c_str());
63         if (eventbusHandler_ == nullptr) {
64             eventThread_ = std::thread(&EventBus::StartEventWithName, this, threadName);
65             std::unique_lock<std::mutex> lock(eventMutex_);
66             eventCon_.wait(lock, [this] {
67                 return eventbusHandler_ != nullptr;
68             });
69         }
70     }
71 
~EventBus()72     ~EventBus()
73     {
74         ULOGI("dtor EventBus");
75         if ((eventbusHandler_ != nullptr) && (eventbusHandler_->GetEventRunner() != nullptr)) {
76             eventbusHandler_->GetEventRunner()->Stop();
77         }
78         eventThread_.join();
79         eventbusHandler_ = nullptr;
80     }
81 
82     template<class T>
AddHandler(const std::string & typeId,DistributedHardware::EventBusHandler<T> & handler)83     std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId,
84         DistributedHardware::EventBusHandler<T> &handler)
85     {
86         std::lock_guard<std::mutex> lock(handlerMtx);
87         Registrations *registrations = handlers[typeId];
88 
89         if (registrations == nullptr) {
90             registrations = new(std::nothrow) EventRegistration::Registrations();
91             if (registrations == nullptr) {
92                 ULOGE("registrations is null, because applying memory fail!");
93                 return nullptr;
94             }
95             handlers[typeId] = registrations;
96         }
97 
98         for (auto &reg : *registrations) {
99             if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == nullptr) {
100                 return reg;
101             }
102         }
103 
104         std::shared_ptr<EventRegistration> registration =
105             std::make_shared<EventRegistration>(static_cast<void *>(&handler), nullptr);
106         registrations->insert(registration);
107 
108         return registration;
109     }
110 
111     template<class T>
AddHandler(const std::string & typeId,EventBusHandler<T> & handler,EventSender & sender)112     std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId, EventBusHandler<T> &handler,
113         EventSender &sender)
114     {
115         std::lock_guard<std::mutex> lock(handlerMtx);
116         Registrations *registrations = handlers[typeId];
117 
118         if (registrations == nullptr) {
119             registrations = new(std::nothrow) EventRegistration::Registrations();
120             if (registrations == nullptr) {
121                 ULOGE("registrations is null, because applying memory fail!");
122                 return nullptr;
123             }
124             handlers[typeId] = registrations;
125         }
126 
127         for (auto &reg : *registrations) {
128             if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == &sender) {
129                 return reg;
130             }
131         }
132 
133         std::shared_ptr<EventRegistration> registration =
134             std::make_shared<EventRegistration>(static_cast<void *>(&handler), &sender);
135         registrations->insert(registration);
136 
137         return registration;
138     }
139 
140     template<class T>
RemoveHandler(const std::string & typeId,std::shared_ptr<EventRegistration> & EvenReg)141     bool RemoveHandler(const std::string &typeId, std::shared_ptr<EventRegistration> &EvenReg)
142     {
143         std::lock_guard<std::mutex> lock(handlerMtx);
144         Registrations *registrations = handlers[typeId];
145         if (registrations == nullptr) {
146             return false;
147         }
148 
149         bool ret = false;
150         auto regIter = registrations->find(EvenReg);
151         if (regIter != registrations->end()) {
152             registrations->erase(regIter);
153             ret = true;
154         }
155 
156         return ret;
157     }
158 
159     template<class T>
160     void PostEvent(T &e, POSTMODE mode = POSTMODE::POST_ASYNC)
161     {
162         if (mode == POSTMODE::POST_SYNC) {
163             PostEventInner(e);
164         } else {
165             auto eventFunc = [this, e]() mutable {
166                 PostEventInner(e);
167             };
168             if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc))) {
169                 ULOGE("Eventbus::PostEvent Async PostTask fail");
170             }
171         }
172     }
173 
174     template<class T>
PostEvent(T & e,int64_t delayTime)175     void PostEvent(T &e, int64_t delayTime)
176     {
177         auto eventFunc = [this, e]() mutable {
178             PostEventInner(e);
179         };
180         if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc, e->getType(), delayTime))) {
181             ULOGE("Eventbus::PostEvent Async PostTask fail");
182         }
183     }
184 
185     template<class T>
RemoveEvent(T & e)186     void RemoveEvent(T &e)
187     {
188         if (!(eventbusHandler_ && eventbusHandler_->RemoveTask(e->getType()))) {
189             ULOGE("Eventbus::RemoveEvent fail");
190         }
191     }
192 
193     void PostTask(const OHOS::AppExecFwk::InnerEvent::Callback& callback,
194         const std::string& name,
195         int64_t delayTimeInMs = 0)
196     {
197         ULOGI("Eventbus::PostTask Async PostTask, taskName:%{public}s.", GetAnonyString(name).c_str());
198         if (eventbusHandler_ != nullptr) {
199             eventbusHandler_->PostTask(callback, name, delayTimeInMs);
200         }
201     }
202 
RemoveTask(const std::string & name)203     void RemoveTask(const std::string& name)
204     {
205         ULOGI("Eventbus::RemoveTask, taskName:%{public}s.", GetAnonyString(name).c_str());
206         if (eventbusHandler_ != nullptr) {
207             eventbusHandler_->RemoveTask(name);
208         }
209     }
210 
211 private:
212     template<class T>
PostEventInner(T & e)213     void PostEventInner(T &e)
214     {
215         std::lock_guard<std::mutex> lock(handlerMtx);
216         Registrations *registrations = handlers[e.GetType()];
217         if (registrations == nullptr) {
218             return;
219         }
220 
221         for (auto &reg : *registrations) {
222             if ((reg->GetSender() == nullptr) || (reg->GetSender() == &e.GetSender())) {
223                 static_cast<EventBusHandler<Event> *>(const_cast<void *>(reg->GetHandler()))->Dispatch(e);
224             }
225         }
226     }
227 
StartEvent()228     void StartEvent()
229     {
230         auto busRunner = AppExecFwk::EventRunner::Create(false);
231         {
232             std::lock_guard<std::mutex> lock(eventMutex_);
233             eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner);
234         }
235         eventCon_.notify_all();
236         busRunner->Run();
237     }
238 
StartEventWithName(const std::string & threadName)239     void StartEventWithName(const std::string &threadName)
240     {
241         prctl(PR_SET_NAME, threadName.c_str());
242         auto busRunner = AppExecFwk::EventRunner::Create(false);
243         {
244             std::lock_guard<std::mutex> lock(eventMutex_);
245             eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner);
246         }
247         eventCon_.notify_all();
248         busRunner->Run();
249     }
250 
251 private:
252     std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventbusHandler_;
253 
254     using Registrations = std::set<std::shared_ptr<EventRegistration>>;
255     std::mutex handlerMtx;
256     using TypeMap = std::unordered_map<std::string, std::set<std::shared_ptr<EventRegistration>> *>;
257     TypeMap handlers;
258     std::thread eventThread_;
259     std::condition_variable eventCon_;
260     std::mutex eventMutex_;
261 };
262 } // namespace DistributedHardware
263 } // namespace OHOS
264 #endif
265