• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H
17 #define OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H
18 
19 #include <condition_variable>
20 #include <pthread.h>
21 #include <memory>
22 #include <set>
23 #include <sys/prctl.h>
24 #include <thread>
25 #include <unordered_map>
26 #include <mutex>
27 
28 #include "event_handler.h"
29 
30 #include "constants.h"
31 #include "dh_log.h"
32 #include "anonymous_string.h"
33 #include "distributed_hardware_errno.h"
34 #include "distributed_hardware_log.h"
35 #include "event.h"
36 #include "eventbus_handler.h"
37 #include "event_registration.h"
38 
39 #ifndef DH_LOG_TAG
40 #define DH_LOG_TAG "DHEventBus"
41 #endif
42 
43 namespace OHOS {
44 namespace DistributedHardware {
45 enum POSTMODE : uint32_t {
46     POST_ASYNC = 0,
47     POST_SYNC,
48 };
49 
50 class EventBus final {
51 public:
EventBus()52     EventBus()
53     {
54         ULOGI("ctor EventBus");
55         if (eventbusHandler_ == nullptr) {
56             eventThread_ = std::thread(&EventBus::StartEvent, this);
57             std::unique_lock<std::mutex> lock(eventMutex_);
58             eventCon_.wait(lock, [this] {
59                 return eventbusHandler_ != nullptr;
60             });
61         }
62     }
63 
EventBus(const std::string & threadName)64     explicit EventBus(const std::string &threadName)
65     {
66         ULOGI("ctor EventBus threadName: %s", threadName.c_str());
67         if (eventbusHandler_ == nullptr) {
68             eventThread_ = std::thread(&EventBus::StartEventWithName, this, threadName);
69             std::unique_lock<std::mutex> lock(eventMutex_);
70             eventCon_.wait(lock, [this] {
71                 return eventbusHandler_ != nullptr;
72             });
73         }
74     }
75 
~EventBus()76     ~EventBus()
77     {
78         ULOGI("dtor EventBus");
79         if ((eventbusHandler_ != nullptr) && (eventbusHandler_->GetEventRunner() != nullptr)) {
80             eventbusHandler_->GetEventRunner()->Stop();
81         }
82         eventThread_.join();
83         eventbusHandler_ = nullptr;
84     }
85 
86     template<class T>
AddHandler(const std::string & typeId,DistributedHardware::EventBusHandler<T> & handler)87     std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId,
88         DistributedHardware::EventBusHandler<T> &handler)
89     {
90         std::lock_guard<std::mutex> lock(handlerMtx);
91         Registrations *registrations = handlers[typeId];
92 
93         if (registrations == nullptr) {
94             registrations = new(std::nothrow) EventRegistration::Registrations();
95             if (registrations == nullptr) {
96                 ULOGE("registrations is null, because applying memory fail!");
97                 return nullptr;
98             }
99             handlers[typeId] = registrations;
100         }
101 
102         for (auto &reg : *registrations) {
103             if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == nullptr) {
104                 return reg;
105             }
106         }
107 
108         std::shared_ptr<EventRegistration> registration =
109             std::make_shared<EventRegistration>(static_cast<void *>(&handler), nullptr);
110         registrations->insert(registration);
111 
112         return registration;
113     }
114 
115     template<class T>
AddHandler(const std::string & typeId,EventBusHandler<T> & handler,EventSender & sender)116     std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId, EventBusHandler<T> &handler,
117         EventSender &sender)
118     {
119         std::lock_guard<std::mutex> lock(handlerMtx);
120         Registrations *registrations = handlers[typeId];
121 
122         if (registrations == nullptr) {
123             registrations = new(std::nothrow) EventRegistration::Registrations();
124             if (registrations == nullptr) {
125                 ULOGE("registrations is null, because applying memory fail!");
126                 return nullptr;
127             }
128             handlers[typeId] = registrations;
129         }
130 
131         for (auto &reg : *registrations) {
132             if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == &sender) {
133                 return reg;
134             }
135         }
136 
137         std::shared_ptr<EventRegistration> registration =
138             std::make_shared<EventRegistration>(static_cast<void *>(&handler), &sender);
139         registrations->insert(registration);
140 
141         return registration;
142     }
143 
144     template<class T>
RemoveHandler(const std::string & typeId,std::shared_ptr<EventRegistration> & EvenReg)145     bool RemoveHandler(const std::string &typeId, std::shared_ptr<EventRegistration> &EvenReg)
146     {
147         std::lock_guard<std::mutex> lock(handlerMtx);
148         Registrations *registrations = handlers[typeId];
149         if (registrations == nullptr) {
150             return false;
151         }
152 
153         bool ret = false;
154         auto regIter = registrations->find(EvenReg);
155         if (regIter != registrations->end()) {
156             registrations->erase(regIter);
157             ret = true;
158         }
159 
160         return ret;
161     }
162 
163     template<class T>
164     void PostEvent(T &e, POSTMODE mode = POSTMODE::POST_ASYNC)
165     {
166         if (mode == POSTMODE::POST_SYNC) {
167             PostEventInner(e);
168         } else {
169             auto eventFunc = [this, e]() mutable {
170                 PostEventInner(e);
171             };
172             if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc))) {
173                 ULOGE("Eventbus::PostEvent Async PostTask fail");
174             }
175         }
176     }
177 
178     template<class T>
PostEvent(T & e,int64_t delayTime)179     void PostEvent(T &e, int64_t delayTime)
180     {
181         auto eventFunc = [this, e]() mutable {
182             PostEventInner(e);
183         };
184         if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc, e->getType(), delayTime))) {
185             ULOGE("Eventbus::PostEvent Async PostTask fail");
186         }
187     }
188 
189     template<class T>
RemoveEvent(T & e)190     void RemoveEvent(T &e)
191     {
192         if (!(eventbusHandler_ && eventbusHandler_->RemoveTask(e->getType()))) {
193             ULOGE("Eventbus::RemoveEvent fail");
194         }
195     }
196 
197     void PostTask(const OHOS::AppExecFwk::InnerEvent::Callback& callback,
198         const std::string& name,
199         int64_t delayTimeInMs = 0)
200     {
201         ULOGI("Eventbus::PostTask Async PostTask, taskName:%{public}s.", GetAnonyString(name).c_str());
202         if (eventbusHandler_ != nullptr) {
203             eventbusHandler_->PostTask(callback, name, delayTimeInMs);
204         }
205     }
206 
RemoveTask(const std::string & name)207     void RemoveTask(const std::string& name)
208     {
209         ULOGI("Eventbus::RemoveTask, taskName:%{public}s.", GetAnonyString(name).c_str());
210         if (eventbusHandler_ != nullptr) {
211             eventbusHandler_->RemoveTask(name);
212         }
213     }
214 
215 private:
216     template<class T>
PostEventInner(T & e)217     void PostEventInner(T &e)
218     {
219         std::lock_guard<std::mutex> lock(handlerMtx);
220         Registrations *registrations = handlers[e.GetType()];
221         if (registrations == nullptr) {
222             return;
223         }
224 
225         for (auto &reg : *registrations) {
226             if (reg->GetHandler() == nullptr) {
227                 continue;
228             }
229             if ((reg->GetSender() == nullptr) || (reg->GetSender() == &e.GetSender())) {
230                 static_cast<EventBusHandler<Event> *>(const_cast<void *>(reg->GetHandler()))->Dispatch(e);
231             }
232         }
233     }
234 
StartEvent()235     void StartEvent()
236     {
237         int32_t ret = pthread_setname_np(pthread_self(), START_EVENT);
238         if (ret != DH_FWK_SUCCESS) {
239             DHLOGE("StartEvent setname failed.");
240         }
241         auto busRunner = AppExecFwk::EventRunner::Create(false);
242         {
243             std::lock_guard<std::mutex> lock(eventMutex_);
244             eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner);
245         }
246         eventCon_.notify_all();
247         busRunner->Run();
248     }
249 
StartEventWithName(const std::string & threadName)250     void StartEventWithName(const std::string &threadName)
251     {
252         prctl(PR_SET_NAME, threadName.c_str());
253         auto busRunner = AppExecFwk::EventRunner::Create(false);
254         {
255             std::lock_guard<std::mutex> lock(eventMutex_);
256             eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner);
257         }
258         eventCon_.notify_all();
259         busRunner->Run();
260     }
261 
262 private:
263     std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventbusHandler_;
264 
265     using Registrations = std::set<std::shared_ptr<EventRegistration>>;
266     std::mutex handlerMtx;
267     using TypeMap = std::unordered_map<std::string, std::set<std::shared_ptr<EventRegistration>> *>;
268     TypeMap handlers;
269     std::thread eventThread_;
270     std::condition_variable eventCon_;
271     std::mutex eventMutex_;
272 };
273 } // namespace DistributedHardware
274 } // namespace OHOS
275 #endif
276