• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <algorithm>
17 #include <map>
18 
19 #include "event_manager.h"
20 
21 #include "napi_utils.h"
22 #include "netstack_log.h"
23 
24 namespace OHOS::NetStack {
25 static constexpr const int CALLBACK_PARAM_NUM = 1;
26 static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2;
27 static constexpr const char *ON_HEADER_RECEIVE = "headerReceive";
28 static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive";
29 
EventManager()30 EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false), proxyData_{nullptr} {}
31 
~EventManager()32 EventManager::~EventManager()
33 {
34     NETSTACK_LOGD("EventManager is destructed by the destructor");
35 }
36 
AddListener(napi_env env,const std::string & type,napi_value callback,bool once,bool asyncCallback)37 void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once,
38                                bool asyncCallback)
39 {
40     std::unique_lock lock(mutexForListenersAndEmitByUv_);
41     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
42                              [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
43     if (it != listeners_.end()) {
44         listeners_.erase(it, listeners_.end());
45     }
46 
47     listeners_.emplace_back(GetCurrentThreadId(), env, type, callback, once, asyncCallback);
48 }
49 
DeleteListener(const std::string & type,napi_value callback)50 void EventManager::DeleteListener(const std::string &type, napi_value callback)
51 {
52     std::unique_lock lock(mutexForListenersAndEmitByUv_);
53     auto it =
54         std::remove_if(listeners_.begin(), listeners_.end(), [type, callback](const EventListener &listener) -> bool {
55             return listener.Match(type, callback);
56         });
57     listeners_.erase(it, listeners_.end());
58 }
59 
Emit(const std::string & type,const std::pair<napi_value,napi_value> & argv)60 void EventManager::Emit(const std::string &type, const std::pair<napi_value, napi_value> &argv)
61 {
62     std::shared_lock lock2(mutexForListenersAndEmitByUv_);
63     auto listeners = listeners_;
64     lock2.unlock();
65     std::for_each(listeners.begin(), listeners.end(), [type, argv](const EventListener &listener) {
66         if (listener.IsAsyncCallback()) {
67             /* AsyncCallback(BusinessError error, T data) */
68             napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second};
69             listener.Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg);
70         } else {
71             /* Callback(T data) */
72             napi_value arg[CALLBACK_PARAM_NUM] = {argv.second};
73             listener.Emit(type, CALLBACK_PARAM_NUM, arg);
74         }
75     });
76 
77     std::unique_lock lock(mutexForListenersAndEmitByUv_);
78     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
79                              [type](const EventListener &listener) -> bool { return listener.MatchOnce(type); });
80     listeners_.erase(it, listeners_.end());
81 }
82 
SetData(void * data)83 void EventManager::SetData(void *data)
84 {
85     data_ = data;
86 }
87 
GetData()88 void *EventManager::GetData()
89 {
90     return data_;
91 }
92 
EmitByUvWithoutCheckShared(const std::string & type,void * data,void (* Handler)(uv_work_t *,int))93 void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int))
94 {
95     std::shared_lock lock2(mutexForListenersAndEmitByUv_);
96     bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
97         return listener.MatchType(ON_HEADER_RECEIVE);
98     }) != listeners_.end();
99 
100     bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
101         return listener.MatchType(ON_HEADERS_RECEIVE);
102     }) != listeners_.end();
103     if (!foundHeader && !foundHeaders) {
104         if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
105             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
106             delete tempMap;
107         }
108     } else if (foundHeader && !foundHeaders) {
109         if (type == ON_HEADERS_RECEIVE) {
110             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
111             delete tempMap;
112         }
113     } else if (!foundHeader) {
114         if (type == ON_HEADER_RECEIVE) {
115             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
116             delete tempMap;
117         }
118     }
119 
120     std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
121         if (listener.MatchType(type)) {
122             auto workWrapper = new UvWorkWrapperShared(data, listener.GetEnv(), type, shared_from_this());
123             listener.EmitByUv(type, workWrapper, Handler);
124         }
125     });
126 }
127 
SetQueueData(void * data)128 void EventManager::SetQueueData(void *data)
129 {
130     std::lock_guard<std::mutex> lock(dataQueueMutex_);
131     dataQueue_.push(data);
132 }
133 
GetQueueData()134 void *EventManager::GetQueueData()
135 {
136     std::lock_guard<std::mutex> lock(dataQueueMutex_);
137     if (!dataQueue_.empty()) {
138         auto data = dataQueue_.front();
139         dataQueue_.pop();
140         return data;
141     }
142     NETSTACK_LOGE("eventManager data queue is empty");
143     return nullptr;
144 }
145 
HasEventListener(const std::string & type)146 bool EventManager::HasEventListener(const std::string &type)
147 {
148     std::shared_lock lock(mutexForListenersAndEmitByUv_);
149     return std::any_of(listeners_.begin(), listeners_.end(),
150                        [&type](const EventListener &listener) -> bool { return listener.MatchType(type); });
151 }
152 
DeleteListener(const std::string & type)153 void EventManager::DeleteListener(const std::string &type)
154 {
155     std::unique_lock lock(mutexForListenersAndEmitByUv_);
156     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
157                              [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
158     listeners_.erase(it, listeners_.end());
159 }
160 
161 std::mutex EventManager::mutexForManager_;
162 EventManagerMagic EventManager::magic_;
163 
CreateEventReference(napi_env env,napi_value value)164 void EventManager::CreateEventReference(napi_env env, napi_value value)
165 {
166     if (env != nullptr && value != nullptr) {
167         eventRef_ = NapiUtils::CreateReference(env, value);
168     }
169 }
170 
DeleteEventReference(napi_env env)171 void EventManager::DeleteEventReference(napi_env env)
172 {
173     if (env != nullptr && eventRef_ != nullptr) {
174         NapiUtils::DeleteReference(env, eventRef_);
175     }
176     eventRef_ = nullptr;
177 }
178 
SetEventDestroy(bool flag)179 void EventManager::SetEventDestroy(bool flag)
180 {
181     isDestroy_.store(flag);
182 }
183 
IsEventDestroy()184 bool EventManager::IsEventDestroy()
185 {
186     return isDestroy_.load();
187 }
188 
GetWebSocketTextData()189 const std::string &EventManager::GetWebSocketTextData()
190 {
191     return webSocketTextData_;
192 }
193 
AppendWebSocketTextData(void * data,size_t length)194 void EventManager::AppendWebSocketTextData(void *data, size_t length)
195 {
196     webSocketTextData_.append(reinterpret_cast<char *>(data), length);
197 }
198 
GetWebSocketBinaryData()199 const std::string &EventManager::GetWebSocketBinaryData()
200 {
201     return webSocketBinaryData_;
202 }
203 
AppendWebSocketBinaryData(void * data,size_t length)204 void EventManager::AppendWebSocketBinaryData(void *data, size_t length)
205 {
206     webSocketBinaryData_.append(reinterpret_cast<char *>(data), length);
207 }
208 
ClearWebSocketTextData()209 void EventManager::ClearWebSocketTextData()
210 {
211     webSocketTextData_.clear();
212 }
213 
ClearWebSocketBinaryData()214 void EventManager::ClearWebSocketBinaryData()
215 {
216     webSocketBinaryData_.clear();
217 }
218 
GetDataMutex()219 std::shared_mutex &EventManager::GetDataMutex()
220 {
221     return dataMutex_;
222 }
223 
NotifyRcvThdExit()224 void EventManager::NotifyRcvThdExit()
225 {
226     std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
227     sockRcvExit_ = true;
228     sockRcvThdCon_.notify_one();
229 }
230 
WaitForRcvThdExit()231 void EventManager::WaitForRcvThdExit()
232 {
233     std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
234     sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; });
235 }
236 
SetReuseAddr(bool reuse)237 void EventManager::SetReuseAddr(bool reuse)
238 {
239     isReuseAddr_.store(reuse);
240 }
241 
GetReuseAddr()242 bool EventManager::GetReuseAddr()
243 {
244     return isReuseAddr_.load();
245 }
246 
GetProxyData()247 std::shared_ptr<Socks5::Socks5Instance> EventManager::GetProxyData()
248 {
249     std::unique_lock<std::shared_mutex> lock(dataMutex_);
250     return proxyData_;
251 }
252 
SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)253 void EventManager::SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)
254 {
255     std::unique_lock<std::shared_mutex> lock(dataMutex_);
256     proxyData_ = data;
257 }
258 
SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> & userData)259 void EventManager::SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> &userData)
260 {
261     std::unique_lock<std::shared_mutex> lock(dataMutex_);
262     webSocketUserData_ = userData;
263 }
264 
GetWebSocketUserData()265 std::shared_ptr<Websocket::UserData> EventManager::GetWebSocketUserData()
266 {
267     std::unique_lock<std::shared_mutex> lock(dataMutex_);
268     return webSocketUserData_;
269 }
270 
UvWorkWrapperShared(void * theData,napi_env theEnv,std::string eventType,const std::shared_ptr<EventManager> & eventManager)271 UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType,
272                                          const std::shared_ptr<EventManager> &eventManager)
273     : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
274 {
275 }
276 } // namespace OHOS::NetStack
277