/* * Copyright (c) 2021-2025 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "event_manager.h" #include "napi_utils.h" #include "netstack_log.h" namespace OHOS::NetStack { static constexpr const int CALLBACK_PARAM_NUM = 1; static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2; static constexpr const int CALLBACK_TWO_PARAM_NUM = 2; static constexpr const int ASYNC_CALLBACK_TWO_PARAM_NUM = 3; static constexpr const char *ON_HEADER_RECEIVE = "headerReceive"; static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive"; EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false), proxyData_{nullptr} {} EventManager::~EventManager() { NETSTACK_LOGD("EventManager is destructed by the destructor"); } void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once, bool asyncCallback) { std::unique_lock lock(mutexForListenersAndEmitByUv_); auto it = std::remove_if(listeners_.begin(), listeners_.end(), [type](const std::shared_ptr &listener) -> bool { return listener->MatchType(type); }); if (it != listeners_.end()) { listeners_.erase(it, listeners_.end()); } auto listener = std::make_shared(GetCurrentThreadId(), env, type, callback, once, asyncCallback); listeners_.emplace_back(std::move(listener)); } void EventManager::DeleteListener(const std::string &type, napi_value callback) { std::unique_lock lock(mutexForListenersAndEmitByUv_); auto it = std::remove_if(listeners_.begin(), listeners_.end(), [type, callback] (const std::shared_ptr &listener) -> bool { return listener->Match(type, callback); }); listeners_.erase(it, listeners_.end()); } void EventManager::Emit(const std::string &type, const std::pair &argv) { std::shared_lock lock(mutexForListenersAndEmitByUv_); auto listeners = listeners_; lock.unlock(); std::for_each(listeners.begin(), listeners.end(), [type, argv] (const std::shared_ptr &listener) { if (listener->IsAsyncCallback()) { /* AsyncCallback(BusinessError error, T data) */ napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second}; listener->Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg); } else { /* Callback(T data) */ napi_value arg[CALLBACK_PARAM_NUM] = {argv.second}; listener->Emit(type, CALLBACK_PARAM_NUM, arg); } }); std::unique_lock lock2(mutexForListenersAndEmitByUv_); auto it = std::remove_if(listeners_.begin(), listeners_.end(), [type] (const std::shared_ptr &listener) -> bool { return listener->MatchOnce(type); }); listeners_.erase(it, listeners_.end()); } void EventManager::EmitWithTwoPara(const std::string &type, const std::tuple &argv) { std::shared_lock lock(mutexForListenersAndEmitByUv_); auto listeners = listeners_; lock.unlock(); std::for_each(listeners.begin(), listeners.end(), [type, argv] (const std::shared_ptr &listener) { if (listener->IsAsyncCallback()) { /* AsyncCallback(BusinessError error, T data) */ napi_value arg[ASYNC_CALLBACK_TWO_PARAM_NUM] = {std::get<0>(argv), std::get<1>(argv), std::get<2>(argv)}; listener->Emit(type, ASYNC_CALLBACK_TWO_PARAM_NUM, arg); } else { /* Callback(T data) */ napi_value arg[CALLBACK_TWO_PARAM_NUM] = {std::get<1>(argv), std::get<2>(argv)}; listener->Emit(type, CALLBACK_TWO_PARAM_NUM, arg); } }); std::unique_lock lock2(mutexForListenersAndEmitByUv_); auto it = std::remove_if(listeners_.begin(), listeners_.end(), [type] (const std::shared_ptr &listener) -> bool { return listener->MatchOnce(type); }); listeners_.erase(it, listeners_.end()); } void EventManager::SetData(void *data) { data_ = data; } void *EventManager::GetData() { return data_; } void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int)) { std::shared_lock lock(mutexForListenersAndEmitByUv_); bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [] (const std::shared_ptr &listener) { return listener->MatchType(ON_HEADER_RECEIVE); }) != listeners_.end(); bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [] (const std::shared_ptr &listener) { return listener->MatchType(ON_HEADERS_RECEIVE);}) != listeners_.end(); if (!foundHeader && !foundHeaders) { if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) { auto tempMap = static_cast *>(data); delete tempMap; return; } } else if (foundHeader && !foundHeaders) { if (type == ON_HEADERS_RECEIVE) { auto tempMap = static_cast *>(data); delete tempMap; return; } } else if (!foundHeader) { if (type == ON_HEADER_RECEIVE) { auto tempMap = static_cast *>(data); delete tempMap; return; } } std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this] (const std::shared_ptr &listener) { if (listener->MatchType(type) && listener->GetCallbackRef() != nullptr) { auto workWrapper = new UvWorkWrapperShared(data, listener->GetEnv(), type, shared_from_this()); NapiUtils::CreateUvQueueWork(listener->GetEnv(), workWrapper, Handler); } }); } void EventManager::SetQueueData(void *data) { std::lock_guard lock(dataQueueMutex_); dataQueue_.push(data); } void *EventManager::GetQueueData() { std::lock_guard lock(dataQueueMutex_); if (!dataQueue_.empty()) { auto data = dataQueue_.front(); dataQueue_.pop(); return data; } NETSTACK_LOGE("eventManager data queue is empty"); return nullptr; } void EventManager::SetServerQueueData(void *wsi, void *data) { std::unique_lock lock(dataServerQueueMutex_); serverDataQueue_[wsi].push(data); } void *EventManager::GetServerQueueData(void *wsi) { if (wsi == nullptr) { NETSTACK_LOGE("wsi is nullptr"); return nullptr; } { std::shared_lock lock(dataServerQueueMutex_); if (serverDataQueue_.empty() || serverDataQueue_.find(wsi) == serverDataQueue_.end()) { NETSTACK_LOGE("eventManager server data queue is empty"); return nullptr; } auto data = serverDataQueue_[wsi].front(); serverDataQueue_[wsi].pop(); return data; } } bool EventManager::HasEventListener(const std::string &type) { std::shared_lock lock(mutexForListenersAndEmitByUv_); return std::any_of(listeners_.begin(), listeners_.end(), [&type] (const std::shared_ptr &listener) -> bool { return listener->MatchType(type); }); } void EventManager::DeleteListener(const std::string &type) { std::unique_lock lock(mutexForListenersAndEmitByUv_); auto it = std::remove_if(listeners_.begin(), listeners_.end(), [type] (const std::shared_ptr &listener) -> bool { return listener->MatchType(type); }); listeners_.erase(it, listeners_.end()); } std::mutex EventManager::mutexForManager_; EventManagerMagic EventManager::magic_; void EventManager::CreateEventReference(napi_env env, napi_value value) { if (env != nullptr && value != nullptr) { eventRef_ = NapiUtils::CreateReference(env, value); } } void EventManager::DeleteEventReference(napi_env env) { if (env != nullptr && eventRef_ != nullptr) { NapiUtils::DeleteReference(env, eventRef_); } eventRef_ = nullptr; } void EventManager::SetEventDestroy(bool flag) { isDestroy_.store(flag); } bool EventManager::IsEventDestroy() { return isDestroy_.load(); } const std::string &EventManager::GetWebSocketTextData() { return webSocketTextData_; } void EventManager::AppendWebSocketTextData(void *data, size_t length) { webSocketTextData_.append(reinterpret_cast(data), length); } const std::string &EventManager::GetWebSocketBinaryData() { return webSocketBinaryData_; } void EventManager::AppendWebSocketBinaryData(void *data, size_t length) { webSocketBinaryData_.append(reinterpret_cast(data), length); } const std::string &EventManager::GetWsServerBinaryData(void *wsi) { return wsServerBinaryData_[wsi]; } const std::string &EventManager::GetWsServerTextData(void *wsi) { return wsServerTextData_[wsi]; } void EventManager::AppendWsServerBinaryData(void *wsi, void *data, size_t length) { wsServerBinaryData_[wsi].append(reinterpret_cast(data), length); } void EventManager::AppendWsServerTextData(void *wsi, void *data, size_t length) { wsServerTextData_[wsi].append(reinterpret_cast(data), length); } void EventManager::ClearWsServerBinaryData(void *wsi) { wsServerBinaryData_[wsi].clear(); } void EventManager::ClearWsServerTextData(void *wsi) { wsServerTextData_[wsi].clear(); } void EventManager::SetMaxConnClientCnt(const uint32_t &cnt) { maxConnClientCnt_ = cnt; } void EventManager::SetMaxConnForOneClient(const uint32_t &cnt) { maxConnForOneClient_ = cnt; } uint32_t EventManager::GetMaxConcurrentClientCnt() const { return maxConnClientCnt_; } uint32_t EventManager::GetMaxConnForOneClient() const { return maxConnForOneClient_; } void EventManager::AddClientUserData(void *wsi, std::shared_ptr &data) { std::lock_guard lock(mapMutex_); userDataMap_[wsi] = data; } void EventManager::RemoveClientUserData(void *wsi) { std::lock_guard lock(mapMutex_); auto it = userDataMap_.find(wsi); if (it != userDataMap_.end()) { userDataMap_.erase(it); } } void EventManager::ClearWebSocketTextData() { webSocketTextData_.clear(); } void EventManager::ClearWebSocketBinaryData() { webSocketBinaryData_.clear(); } std::shared_mutex &EventManager::GetDataMutex() { return dataMutex_; } void EventManager::NotifyRcvThdExit() { std::unique_lock lock(sockRcvThdMtx_); sockRcvExit_ = true; sockRcvThdCon_.notify_one(); } void EventManager::WaitForRcvThdExit() { std::unique_lock lock(sockRcvThdMtx_); sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; }); } void EventManager::SetReuseAddr(bool reuse) { isReuseAddr_.store(reuse); } bool EventManager::GetReuseAddr() { return isReuseAddr_.load(); } void EventManager::SetContextState(bool enable) { isOpened_ = enable; } bool EventManager::GetContextState() { return isOpened_; } std::shared_ptr EventManager::GetProxyData() { return proxyData_; } void EventManager::SetProxyData(std::shared_ptr data) { proxyData_ = data; } void EventManager::SetWebSocketUserData(const std::shared_ptr &userData) { webSocketUserData_ = userData; } std::shared_ptr EventManager::GetWebSocketUserData() { return webSocketUserData_; } UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType, const std::shared_ptr &eventManager) : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager) { } } // namespace OHOS::NetStack