1 /*
2 * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include <map>
18
19 #include "event_manager.h"
20
21 #include "napi_utils.h"
22 #include "netstack_log.h"
23
24 namespace OHOS::NetStack {
25 static constexpr const int CALLBACK_PARAM_NUM = 1;
26 static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2;
27 static constexpr const int CALLBACK_TWO_PARAM_NUM = 2;
28 static constexpr const int ASYNC_CALLBACK_TWO_PARAM_NUM = 3;
29 static constexpr const char *ON_HEADER_RECEIVE = "headerReceive";
30 static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive";
31
EventManager()32 EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false), proxyData_{nullptr} {}
33
~EventManager()34 EventManager::~EventManager()
35 {
36 NETSTACK_LOGD("EventManager is destructed by the destructor");
37 }
38
AddListener(napi_env env,const std::string & type,napi_value callback,bool once,bool asyncCallback)39 void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once,
40 bool asyncCallback)
41 {
42 std::unique_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
43 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
44 [type](const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchType(type); });
45 if (it != listeners_.end()) {
46 listeners_.erase(it, listeners_.end());
47 }
48 auto listener = std::make_shared<EventListener>(GetCurrentThreadId(), env, type, callback, once, asyncCallback);
49 listeners_.emplace_back(std::move(listener));
50 }
51
DeleteListener(const std::string & type,napi_value callback)52 void EventManager::DeleteListener(const std::string &type, napi_value callback)
53 {
54 std::unique_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
55 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
56 [type, callback] (const std::shared_ptr<EventListener> &listener) -> bool {
57 return listener->Match(type, callback);
58 });
59 listeners_.erase(it, listeners_.end());
60 }
61
Emit(const std::string & type,const std::pair<napi_value,napi_value> & argv)62 void EventManager::Emit(const std::string &type, const std::pair<napi_value, napi_value> &argv)
63 {
64 std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
65 auto listeners = listeners_;
66 lock.unlock();
67 std::for_each(listeners.begin(), listeners.end(), [type, argv] (const std::shared_ptr<EventListener> &listener) {
68 if (listener->IsAsyncCallback()) {
69 /* AsyncCallback(BusinessError error, T data) */
70 napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second};
71 listener->Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg);
72 } else {
73 /* Callback(T data) */
74 napi_value arg[CALLBACK_PARAM_NUM] = {argv.second};
75 listener->Emit(type, CALLBACK_PARAM_NUM, arg);
76 }
77 });
78 std::unique_lock<std::shared_mutex> lock2(mutexForListenersAndEmitByUv_);
79 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
80 [type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchOnce(type); });
81 listeners_.erase(it, listeners_.end());
82 }
83
EmitWithTwoPara(const std::string & type,const std::tuple<napi_value,napi_value,napi_value> & argv)84 void EventManager::EmitWithTwoPara(const std::string &type, const std::tuple<napi_value, napi_value, napi_value> &argv)
85 {
86 std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
87 auto listeners = listeners_;
88 lock.unlock();
89 std::for_each(listeners.begin(), listeners.end(), [type, argv] (const std::shared_ptr<EventListener> &listener) {
90 if (listener->IsAsyncCallback()) {
91 /* AsyncCallback(BusinessError error, T data) */
92 napi_value arg[ASYNC_CALLBACK_TWO_PARAM_NUM] = {std::get<0>(argv), std::get<1>(argv), std::get<2>(argv)};
93 listener->Emit(type, ASYNC_CALLBACK_TWO_PARAM_NUM, arg);
94 } else {
95 /* Callback(T data) */
96 napi_value arg[CALLBACK_TWO_PARAM_NUM] = {std::get<1>(argv), std::get<2>(argv)};
97 listener->Emit(type, CALLBACK_TWO_PARAM_NUM, arg);
98 }
99 });
100 std::unique_lock<std::shared_mutex> lock2(mutexForListenersAndEmitByUv_);
101 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
102 [type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchOnce(type); });
103 listeners_.erase(it, listeners_.end());
104 }
105
SetData(void * data)106 void EventManager::SetData(void *data)
107 {
108 data_ = data;
109 }
110
GetData()111 void *EventManager::GetData()
112 {
113 return data_;
114 }
115
EmitByUvWithoutCheckShared(const std::string & type,void * data,void (* Handler)(uv_work_t *,int))116 void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int))
117 {
118 std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
119 bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(),
120 [] (const std::shared_ptr<EventListener> &listener) { return listener->MatchType(ON_HEADER_RECEIVE); }) !=
121 listeners_.end();
122
123 bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(),
124 [] (const std::shared_ptr<EventListener> &listener) { return listener->MatchType(ON_HEADERS_RECEIVE);}) !=
125 listeners_.end();
126
127 if (!foundHeader && !foundHeaders) {
128 if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
129 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
130 delete tempMap;
131 return;
132 }
133 } else if (foundHeader && !foundHeaders) {
134 if (type == ON_HEADERS_RECEIVE) {
135 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
136 delete tempMap;
137 return;
138 }
139 } else if (!foundHeader) {
140 if (type == ON_HEADER_RECEIVE) {
141 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
142 delete tempMap;
143 return;
144 }
145 }
146
147 std::for_each(listeners_.begin(), listeners_.end(),
148 [type, data, Handler, this] (const std::shared_ptr<EventListener> &listener) {
149 if (listener->MatchType(type) && listener->GetCallbackRef() != nullptr) {
150 auto workWrapper = new UvWorkWrapperShared(data, listener->GetEnv(), type, shared_from_this());
151 NapiUtils::CreateUvQueueWork(listener->GetEnv(), workWrapper, Handler);
152 }
153 });
154 }
155
SetQueueData(void * data)156 void EventManager::SetQueueData(void *data)
157 {
158 std::lock_guard<std::mutex> lock(dataQueueMutex_);
159 dataQueue_.push(data);
160 }
161
GetQueueData()162 void *EventManager::GetQueueData()
163 {
164 std::lock_guard<std::mutex> lock(dataQueueMutex_);
165 if (!dataQueue_.empty()) {
166 auto data = dataQueue_.front();
167 dataQueue_.pop();
168 return data;
169 }
170 NETSTACK_LOGE("eventManager data queue is empty");
171 return nullptr;
172 }
173
SetServerQueueData(void * wsi,void * data)174 void EventManager::SetServerQueueData(void *wsi, void *data)
175 {
176 std::unique_lock<std::shared_mutex> lock(dataServerQueueMutex_);
177 serverDataQueue_[wsi].push(data);
178 }
179
GetServerQueueData(void * wsi)180 void *EventManager::GetServerQueueData(void *wsi)
181 {
182 if (wsi == nullptr) {
183 NETSTACK_LOGE("wsi is nullptr");
184 return nullptr;
185 }
186 {
187 std::shared_lock<std::shared_mutex> lock(dataServerQueueMutex_);
188 if (serverDataQueue_.empty() || serverDataQueue_.find(wsi) == serverDataQueue_.end()) {
189 NETSTACK_LOGE("eventManager server data queue is empty");
190 return nullptr;
191 }
192 auto data = serverDataQueue_[wsi].front();
193 serverDataQueue_[wsi].pop();
194 return data;
195 }
196 }
197
HasEventListener(const std::string & type)198 bool EventManager::HasEventListener(const std::string &type)
199 {
200 std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
201 return std::any_of(listeners_.begin(), listeners_.end(),
202 [&type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchType(type); });
203 }
204
DeleteListener(const std::string & type)205 void EventManager::DeleteListener(const std::string &type)
206 {
207 std::unique_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
208 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
209 [type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchType(type); });
210 listeners_.erase(it, listeners_.end());
211 }
212
213 std::mutex EventManager::mutexForManager_;
214 EventManagerMagic EventManager::magic_;
215
CreateEventReference(napi_env env,napi_value value)216 void EventManager::CreateEventReference(napi_env env, napi_value value)
217 {
218 if (env != nullptr && value != nullptr) {
219 eventRef_ = NapiUtils::CreateReference(env, value);
220 }
221 }
222
DeleteEventReference(napi_env env)223 void EventManager::DeleteEventReference(napi_env env)
224 {
225 if (env != nullptr && eventRef_ != nullptr) {
226 NapiUtils::DeleteReference(env, eventRef_);
227 }
228 eventRef_ = nullptr;
229 }
230
SetEventDestroy(bool flag)231 void EventManager::SetEventDestroy(bool flag)
232 {
233 isDestroy_.store(flag);
234 }
235
IsEventDestroy()236 bool EventManager::IsEventDestroy()
237 {
238 return isDestroy_.load();
239 }
240
GetWebSocketTextData()241 const std::string &EventManager::GetWebSocketTextData()
242 {
243 return webSocketTextData_;
244 }
245
AppendWebSocketTextData(void * data,size_t length)246 void EventManager::AppendWebSocketTextData(void *data, size_t length)
247 {
248 webSocketTextData_.append(reinterpret_cast<char *>(data), length);
249 }
250
GetWebSocketBinaryData()251 const std::string &EventManager::GetWebSocketBinaryData()
252 {
253 return webSocketBinaryData_;
254 }
255
AppendWebSocketBinaryData(void * data,size_t length)256 void EventManager::AppendWebSocketBinaryData(void *data, size_t length)
257 {
258 webSocketBinaryData_.append(reinterpret_cast<char *>(data), length);
259 }
260
GetWsServerBinaryData(void * wsi)261 const std::string &EventManager::GetWsServerBinaryData(void *wsi)
262 {
263 return wsServerBinaryData_[wsi];
264 }
265
GetWsServerTextData(void * wsi)266 const std::string &EventManager::GetWsServerTextData(void *wsi)
267 {
268 return wsServerTextData_[wsi];
269 }
270
AppendWsServerBinaryData(void * wsi,void * data,size_t length)271 void EventManager::AppendWsServerBinaryData(void *wsi, void *data, size_t length)
272 {
273 wsServerBinaryData_[wsi].append(reinterpret_cast<char *>(data), length);
274 }
275
AppendWsServerTextData(void * wsi,void * data,size_t length)276 void EventManager::AppendWsServerTextData(void *wsi, void *data, size_t length)
277 {
278 wsServerTextData_[wsi].append(reinterpret_cast<char *>(data), length);
279 }
280
ClearWsServerBinaryData(void * wsi)281 void EventManager::ClearWsServerBinaryData(void *wsi)
282 {
283 wsServerBinaryData_[wsi].clear();
284 }
285
ClearWsServerTextData(void * wsi)286 void EventManager::ClearWsServerTextData(void *wsi)
287 {
288 wsServerTextData_[wsi].clear();
289 }
290
SetMaxConnClientCnt(const uint32_t & cnt)291 void EventManager::SetMaxConnClientCnt(const uint32_t &cnt)
292 {
293 maxConnClientCnt_ = cnt;
294 }
295
SetMaxConnForOneClient(const uint32_t & cnt)296 void EventManager::SetMaxConnForOneClient(const uint32_t &cnt)
297 {
298 maxConnForOneClient_ = cnt;
299 }
300
GetMaxConcurrentClientCnt() const301 uint32_t EventManager::GetMaxConcurrentClientCnt() const
302 {
303 return maxConnClientCnt_;
304 }
305
GetMaxConnForOneClient() const306 uint32_t EventManager::GetMaxConnForOneClient() const
307 {
308 return maxConnForOneClient_;
309 }
310
AddClientUserData(void * wsi,std::shared_ptr<Websocket::UserData> & data)311 void EventManager::AddClientUserData(void *wsi, std::shared_ptr<Websocket::UserData> &data)
312 {
313 std::lock_guard<std::mutex> lock(mapMutex_);
314 userDataMap_[wsi] = data;
315 }
316
RemoveClientUserData(void * wsi)317 void EventManager::RemoveClientUserData(void *wsi)
318 {
319 std::lock_guard<std::mutex> lock(mapMutex_);
320 auto it = userDataMap_.find(wsi);
321 if (it != userDataMap_.end()) {
322 userDataMap_.erase(it);
323 }
324 }
325
ClearWebSocketTextData()326 void EventManager::ClearWebSocketTextData()
327 {
328 webSocketTextData_.clear();
329 }
330
ClearWebSocketBinaryData()331 void EventManager::ClearWebSocketBinaryData()
332 {
333 webSocketBinaryData_.clear();
334 }
335
GetDataMutex()336 std::shared_mutex &EventManager::GetDataMutex()
337 {
338 return dataMutex_;
339 }
340
NotifyRcvThdExit()341 void EventManager::NotifyRcvThdExit()
342 {
343 std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
344 sockRcvExit_ = true;
345 sockRcvThdCon_.notify_one();
346 }
347
WaitForRcvThdExit()348 void EventManager::WaitForRcvThdExit()
349 {
350 std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
351 sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; });
352 }
353
SetReuseAddr(bool reuse)354 void EventManager::SetReuseAddr(bool reuse)
355 {
356 isReuseAddr_.store(reuse);
357 }
358
GetReuseAddr()359 bool EventManager::GetReuseAddr()
360 {
361 return isReuseAddr_.load();
362 }
363
SetContextState(bool enable)364 void EventManager::SetContextState(bool enable)
365 {
366 isOpened_ = enable;
367 }
368
GetContextState()369 bool EventManager::GetContextState()
370 {
371 return isOpened_;
372 }
373
GetProxyData()374 std::shared_ptr<Socks5::Socks5Instance> EventManager::GetProxyData()
375 {
376 return proxyData_;
377 }
378
SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)379 void EventManager::SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)
380 {
381 proxyData_ = data;
382 }
383
SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> & userData)384 void EventManager::SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> &userData)
385 {
386 webSocketUserData_ = userData;
387 }
388
GetWebSocketUserData()389 std::shared_ptr<Websocket::UserData> EventManager::GetWebSocketUserData()
390 {
391 return webSocketUserData_;
392 }
393
UvWorkWrapperShared(void * theData,napi_env theEnv,std::string eventType,const std::shared_ptr<EventManager> & eventManager)394 UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType,
395 const std::shared_ptr<EventManager> &eventManager)
396 : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
397 {
398 }
399 } // namespace OHOS::NetStack
400