1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H 17 #define OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H 18 19 #include <condition_variable> 20 #include <memory> 21 #include <set> 22 #include <sys/prctl.h> 23 #include <thread> 24 #include <unordered_map> 25 #include <mutex> 26 27 #include "event_handler.h" 28 29 #include "dh_log.h" 30 #include "anonymous_string.h" 31 #include "event.h" 32 #include "eventbus_handler.h" 33 #include "event_registration.h" 34 35 #ifndef DH_LOG_TAG 36 #define DH_LOG_TAG "DHEventBus" 37 #endif 38 39 namespace OHOS { 40 namespace DistributedHardware { 41 enum POSTMODE : uint32_t { 42 POST_ASYNC = 0, 43 POST_SYNC, 44 }; 45 46 class EventBus final { 47 public: EventBus()48 EventBus() 49 { 50 ULOGI("ctor EventBus"); 51 if (eventbusHandler_ == nullptr) { 52 eventThread_ = std::thread(&EventBus::StartEvent, this); 53 std::unique_lock<std::mutex> lock(eventMutex_); 54 eventCon_.wait(lock, [this] { 55 return eventbusHandler_ != nullptr; 56 }); 57 } 58 } 59 EventBus(const std::string & threadName)60 explicit EventBus(const std::string &threadName) 61 { 62 ULOGI("ctor EventBus threadName: %s", threadName.c_str()); 63 if (eventbusHandler_ == nullptr) { 64 eventThread_ = std::thread(&EventBus::StartEventWithName, this, threadName); 65 std::unique_lock<std::mutex> lock(eventMutex_); 66 eventCon_.wait(lock, [this] { 67 return eventbusHandler_ != nullptr; 68 }); 69 } 70 } 71 ~EventBus()72 ~EventBus() 73 { 74 ULOGI("dtor EventBus"); 75 if ((eventbusHandler_ != nullptr) && (eventbusHandler_->GetEventRunner() != nullptr)) { 76 eventbusHandler_->GetEventRunner()->Stop(); 77 } 78 eventThread_.join(); 79 eventbusHandler_ = nullptr; 80 } 81 82 template<class T> AddHandler(const std::string & typeId,DistributedHardware::EventBusHandler<T> & handler)83 std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId, 84 DistributedHardware::EventBusHandler<T> &handler) 85 { 86 std::lock_guard<std::mutex> lock(handlerMtx); 87 Registrations *registrations = handlers[typeId]; 88 89 if (registrations == nullptr) { 90 registrations = new(std::nothrow) EventRegistration::Registrations(); 91 if (registrations == nullptr) { 92 ULOGE("registrations is null, because applying memory fail!"); 93 return nullptr; 94 } 95 handlers[typeId] = registrations; 96 } 97 98 for (auto ® : *registrations) { 99 if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == nullptr) { 100 return reg; 101 } 102 } 103 104 std::shared_ptr<EventRegistration> registration = 105 std::make_shared<EventRegistration>(static_cast<void *>(&handler), nullptr); 106 registrations->insert(registration); 107 108 return registration; 109 } 110 111 template<class T> AddHandler(const std::string & typeId,EventBusHandler<T> & handler,EventSender & sender)112 std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId, EventBusHandler<T> &handler, 113 EventSender &sender) 114 { 115 std::lock_guard<std::mutex> lock(handlerMtx); 116 Registrations *registrations = handlers[typeId]; 117 118 if (registrations == nullptr) { 119 registrations = new(std::nothrow) EventRegistration::Registrations(); 120 if (registrations == nullptr) { 121 ULOGE("registrations is null, because applying memory fail!"); 122 return nullptr; 123 } 124 handlers[typeId] = registrations; 125 } 126 127 for (auto ® : *registrations) { 128 if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == &sender) { 129 return reg; 130 } 131 } 132 133 std::shared_ptr<EventRegistration> registration = 134 std::make_shared<EventRegistration>(static_cast<void *>(&handler), &sender); 135 registrations->insert(registration); 136 137 return registration; 138 } 139 140 template<class T> RemoveHandler(const std::string & typeId,std::shared_ptr<EventRegistration> & EvenReg)141 bool RemoveHandler(const std::string &typeId, std::shared_ptr<EventRegistration> &EvenReg) 142 { 143 std::lock_guard<std::mutex> lock(handlerMtx); 144 Registrations *registrations = handlers[typeId]; 145 if (registrations == nullptr) { 146 return false; 147 } 148 149 bool ret = false; 150 auto regIter = registrations->find(EvenReg); 151 if (regIter != registrations->end()) { 152 registrations->erase(regIter); 153 ret = true; 154 } 155 156 return ret; 157 } 158 159 template<class T> 160 void PostEvent(T &e, POSTMODE mode = POSTMODE::POST_ASYNC) 161 { 162 if (mode == POSTMODE::POST_SYNC) { 163 PostEventInner(e); 164 } else { 165 auto eventFunc = [this, e]() mutable { 166 PostEventInner(e); 167 }; 168 if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc))) { 169 ULOGE("Eventbus::PostEvent Async PostTask fail"); 170 } 171 } 172 } 173 174 template<class T> PostEvent(T & e,int64_t delayTime)175 void PostEvent(T &e, int64_t delayTime) 176 { 177 auto eventFunc = [this, e]() mutable { 178 PostEventInner(e); 179 }; 180 if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc, e->getType(), delayTime))) { 181 ULOGE("Eventbus::PostEvent Async PostTask fail"); 182 } 183 } 184 185 template<class T> RemoveEvent(T & e)186 void RemoveEvent(T &e) 187 { 188 if (!(eventbusHandler_ && eventbusHandler_->RemoveTask(e->getType()))) { 189 ULOGE("Eventbus::RemoveEvent fail"); 190 } 191 } 192 193 void PostTask(const OHOS::AppExecFwk::InnerEvent::Callback& callback, 194 const std::string& name, 195 int64_t delayTimeInMs = 0) 196 { 197 ULOGI("Eventbus::PostTask Async PostTask, taskName:%{public}s.", GetAnonyString(name).c_str()); 198 if (eventbusHandler_ != nullptr) { 199 eventbusHandler_->PostTask(callback, name, delayTimeInMs); 200 } 201 } 202 RemoveTask(const std::string & name)203 void RemoveTask(const std::string& name) 204 { 205 ULOGI("Eventbus::RemoveTask, taskName:%{public}s.", GetAnonyString(name).c_str()); 206 if (eventbusHandler_ != nullptr) { 207 eventbusHandler_->RemoveTask(name); 208 } 209 } 210 211 private: 212 template<class T> PostEventInner(T & e)213 void PostEventInner(T &e) 214 { 215 std::lock_guard<std::mutex> lock(handlerMtx); 216 Registrations *registrations = handlers[e.GetType()]; 217 if (registrations == nullptr) { 218 return; 219 } 220 221 for (auto ® : *registrations) { 222 if ((reg->GetSender() == nullptr) || (reg->GetSender() == &e.GetSender())) { 223 static_cast<EventBusHandler<Event> *>(const_cast<void *>(reg->GetHandler()))->Dispatch(e); 224 } 225 } 226 } 227 StartEvent()228 void StartEvent() 229 { 230 auto busRunner = AppExecFwk::EventRunner::Create(false); 231 { 232 std::lock_guard<std::mutex> lock(eventMutex_); 233 eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner); 234 } 235 eventCon_.notify_all(); 236 busRunner->Run(); 237 } 238 StartEventWithName(const std::string & threadName)239 void StartEventWithName(const std::string &threadName) 240 { 241 prctl(PR_SET_NAME, threadName.c_str()); 242 auto busRunner = AppExecFwk::EventRunner::Create(false); 243 { 244 std::lock_guard<std::mutex> lock(eventMutex_); 245 eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner); 246 } 247 eventCon_.notify_all(); 248 busRunner->Run(); 249 } 250 251 private: 252 std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventbusHandler_; 253 254 using Registrations = std::set<std::shared_ptr<EventRegistration>>; 255 std::mutex handlerMtx; 256 using TypeMap = std::unordered_map<std::string, std::set<std::shared_ptr<EventRegistration>> *>; 257 TypeMap handlers; 258 std::thread eventThread_; 259 std::condition_variable eventCon_; 260 std::mutex eventMutex_; 261 }; 262 } // namespace DistributedHardware 263 } // namespace OHOS 264 #endif 265