1 /* 2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H 17 #define OHOS_DISTRIBUTED_HARDWARE_EVENT_BUS_H 18 19 #include <condition_variable> 20 #include <pthread.h> 21 #include <memory> 22 #include <set> 23 #include <sys/prctl.h> 24 #include <thread> 25 #include <unordered_map> 26 #include <mutex> 27 28 #include "event_handler.h" 29 30 #include "constants.h" 31 #include "dh_log.h" 32 #include "anonymous_string.h" 33 #include "distributed_hardware_errno.h" 34 #include "distributed_hardware_log.h" 35 #include "event.h" 36 #include "eventbus_handler.h" 37 #include "event_registration.h" 38 39 #ifndef DH_LOG_TAG 40 #define DH_LOG_TAG "DHEventBus" 41 #endif 42 43 namespace OHOS { 44 namespace DistributedHardware { 45 enum POSTMODE : uint32_t { 46 POST_ASYNC = 0, 47 POST_SYNC, 48 }; 49 50 class EventBus final { 51 public: EventBus()52 EventBus() 53 { 54 ULOGI("ctor EventBus"); 55 if (eventbusHandler_ == nullptr) { 56 eventThread_ = std::thread(&EventBus::StartEvent, this); 57 std::unique_lock<std::mutex> lock(eventMutex_); 58 eventCon_.wait(lock, [this] { 59 return eventbusHandler_ != nullptr; 60 }); 61 } 62 } 63 EventBus(const std::string & threadName)64 explicit EventBus(const std::string &threadName) 65 { 66 ULOGI("ctor EventBus threadName: %s", threadName.c_str()); 67 if (eventbusHandler_ == nullptr) { 68 eventThread_ = std::thread(&EventBus::StartEventWithName, this, threadName); 69 std::unique_lock<std::mutex> lock(eventMutex_); 70 eventCon_.wait(lock, [this] { 71 return eventbusHandler_ != nullptr; 72 }); 73 } 74 } 75 ~EventBus()76 ~EventBus() 77 { 78 ULOGI("dtor EventBus"); 79 if ((eventbusHandler_ != nullptr) && (eventbusHandler_->GetEventRunner() != nullptr)) { 80 eventbusHandler_->GetEventRunner()->Stop(); 81 } 82 eventThread_.join(); 83 eventbusHandler_ = nullptr; 84 } 85 86 template<class T> AddHandler(const std::string & typeId,DistributedHardware::EventBusHandler<T> & handler)87 std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId, 88 DistributedHardware::EventBusHandler<T> &handler) 89 { 90 std::lock_guard<std::mutex> lock(handlerMtx); 91 Registrations *registrations = handlers[typeId]; 92 93 if (registrations == nullptr) { 94 registrations = new(std::nothrow) EventRegistration::Registrations(); 95 if (registrations == nullptr) { 96 ULOGE("registrations is null, because applying memory fail!"); 97 return nullptr; 98 } 99 handlers[typeId] = registrations; 100 } 101 102 for (auto ® : *registrations) { 103 if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == nullptr) { 104 return reg; 105 } 106 } 107 108 std::shared_ptr<EventRegistration> registration = 109 std::make_shared<EventRegistration>(static_cast<void *>(&handler), nullptr); 110 registrations->insert(registration); 111 112 return registration; 113 } 114 115 template<class T> AddHandler(const std::string & typeId,EventBusHandler<T> & handler,EventSender & sender)116 std::shared_ptr<EventRegistration> AddHandler(const std::string &typeId, EventBusHandler<T> &handler, 117 EventSender &sender) 118 { 119 std::lock_guard<std::mutex> lock(handlerMtx); 120 Registrations *registrations = handlers[typeId]; 121 122 if (registrations == nullptr) { 123 registrations = new(std::nothrow) EventRegistration::Registrations(); 124 if (registrations == nullptr) { 125 ULOGE("registrations is null, because applying memory fail!"); 126 return nullptr; 127 } 128 handlers[typeId] = registrations; 129 } 130 131 for (auto ® : *registrations) { 132 if (reg->GetHandler() == static_cast<void *>(&handler) && reg->GetSender() == &sender) { 133 return reg; 134 } 135 } 136 137 std::shared_ptr<EventRegistration> registration = 138 std::make_shared<EventRegistration>(static_cast<void *>(&handler), &sender); 139 registrations->insert(registration); 140 141 return registration; 142 } 143 144 template<class T> RemoveHandler(const std::string & typeId,std::shared_ptr<EventRegistration> & EvenReg)145 bool RemoveHandler(const std::string &typeId, std::shared_ptr<EventRegistration> &EvenReg) 146 { 147 std::lock_guard<std::mutex> lock(handlerMtx); 148 Registrations *registrations = handlers[typeId]; 149 if (registrations == nullptr) { 150 return false; 151 } 152 153 bool ret = false; 154 auto regIter = registrations->find(EvenReg); 155 if (regIter != registrations->end()) { 156 registrations->erase(regIter); 157 ret = true; 158 } 159 160 return ret; 161 } 162 163 template<class T> 164 void PostEvent(T &e, POSTMODE mode = POSTMODE::POST_ASYNC) 165 { 166 if (mode == POSTMODE::POST_SYNC) { 167 PostEventInner(e); 168 } else { 169 auto eventFunc = [this, e]() mutable { 170 PostEventInner(e); 171 }; 172 if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc))) { 173 ULOGE("Eventbus::PostEvent Async PostTask fail"); 174 } 175 } 176 } 177 178 template<class T> PostEvent(T & e,int64_t delayTime)179 void PostEvent(T &e, int64_t delayTime) 180 { 181 auto eventFunc = [this, e]() mutable { 182 PostEventInner(e); 183 }; 184 if (!(eventbusHandler_ && eventbusHandler_->PostTask(eventFunc, e->getType(), delayTime))) { 185 ULOGE("Eventbus::PostEvent Async PostTask fail"); 186 } 187 } 188 189 template<class T> RemoveEvent(T & e)190 void RemoveEvent(T &e) 191 { 192 if (!(eventbusHandler_ && eventbusHandler_->RemoveTask(e->getType()))) { 193 ULOGE("Eventbus::RemoveEvent fail"); 194 } 195 } 196 197 void PostTask(const OHOS::AppExecFwk::InnerEvent::Callback& callback, 198 const std::string& name, 199 int64_t delayTimeInMs = 0) 200 { 201 ULOGI("Eventbus::PostTask Async PostTask, taskName:%{public}s.", GetAnonyString(name).c_str()); 202 if (eventbusHandler_ != nullptr) { 203 eventbusHandler_->PostTask(callback, name, delayTimeInMs); 204 } 205 } 206 RemoveTask(const std::string & name)207 void RemoveTask(const std::string& name) 208 { 209 ULOGI("Eventbus::RemoveTask, taskName:%{public}s.", GetAnonyString(name).c_str()); 210 if (eventbusHandler_ != nullptr) { 211 eventbusHandler_->RemoveTask(name); 212 } 213 } 214 215 private: 216 template<class T> PostEventInner(T & e)217 void PostEventInner(T &e) 218 { 219 std::lock_guard<std::mutex> lock(handlerMtx); 220 Registrations *registrations = handlers[e.GetType()]; 221 if (registrations == nullptr) { 222 return; 223 } 224 225 for (auto ® : *registrations) { 226 if (reg->GetHandler() == nullptr) { 227 continue; 228 } 229 if ((reg->GetSender() == nullptr) || (reg->GetSender() == &e.GetSender())) { 230 static_cast<EventBusHandler<Event> *>(const_cast<void *>(reg->GetHandler()))->Dispatch(e); 231 } 232 } 233 } 234 StartEvent()235 void StartEvent() 236 { 237 int32_t ret = pthread_setname_np(pthread_self(), START_EVENT); 238 if (ret != DH_FWK_SUCCESS) { 239 DHLOGE("StartEvent setname failed."); 240 } 241 auto busRunner = AppExecFwk::EventRunner::Create(false); 242 { 243 std::lock_guard<std::mutex> lock(eventMutex_); 244 eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner); 245 } 246 eventCon_.notify_all(); 247 busRunner->Run(); 248 } 249 StartEventWithName(const std::string & threadName)250 void StartEventWithName(const std::string &threadName) 251 { 252 prctl(PR_SET_NAME, threadName.c_str()); 253 auto busRunner = AppExecFwk::EventRunner::Create(false); 254 { 255 std::lock_guard<std::mutex> lock(eventMutex_); 256 eventbusHandler_ = std::make_shared<AppExecFwk::EventHandler>(busRunner); 257 } 258 eventCon_.notify_all(); 259 busRunner->Run(); 260 } 261 262 private: 263 std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventbusHandler_; 264 265 using Registrations = std::set<std::shared_ptr<EventRegistration>>; 266 std::mutex handlerMtx; 267 using TypeMap = std::unordered_map<std::string, std::set<std::shared_ptr<EventRegistration>> *>; 268 TypeMap handlers; 269 std::thread eventThread_; 270 std::condition_variable eventCon_; 271 std::mutex eventMutex_; 272 }; 273 } // namespace DistributedHardware 274 } // namespace OHOS 275 #endif 276