• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <algorithm>
17 #include <map>
18 
19 #include "event_manager.h"
20 
21 #include "napi_utils.h"
22 #include "netstack_log.h"
23 
24 namespace OHOS::NetStack {
25 static constexpr const int CALLBACK_PARAM_NUM = 1;
26 static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2;
27 static constexpr const int CALLBACK_TWO_PARAM_NUM = 2;
28 static constexpr const int ASYNC_CALLBACK_TWO_PARAM_NUM = 3;
29 static constexpr const char *ON_HEADER_RECEIVE = "headerReceive";
30 static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive";
31 
EventManager()32 EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false), proxyData_{nullptr} {}
33 
~EventManager()34 EventManager::~EventManager()
35 {
36     NETSTACK_LOGD("EventManager is destructed by the destructor");
37 }
38 
AddListener(napi_env env,const std::string & type,napi_value callback,bool once,bool asyncCallback)39 void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once,
40                                bool asyncCallback)
41 {
42     std::unique_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
43     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
44         [type](const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchType(type); });
45     if (it != listeners_.end()) {
46         listeners_.erase(it, listeners_.end());
47     }
48     auto listener = std::make_shared<EventListener>(GetCurrentThreadId(), env, type, callback, once, asyncCallback);
49     listeners_.emplace_back(std::move(listener));
50 }
51 
DeleteListener(const std::string & type,napi_value callback)52 void EventManager::DeleteListener(const std::string &type, napi_value callback)
53 {
54     std::unique_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
55     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
56         [type, callback] (const std::shared_ptr<EventListener> &listener) -> bool {
57             return listener->Match(type, callback);
58         });
59     listeners_.erase(it, listeners_.end());
60 }
61 
Emit(const std::string & type,const std::pair<napi_value,napi_value> & argv)62 void EventManager::Emit(const std::string &type, const std::pair<napi_value, napi_value> &argv)
63 {
64     std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
65     auto listeners = listeners_;
66     lock.unlock();
67     std::for_each(listeners.begin(), listeners.end(), [type, argv] (const std::shared_ptr<EventListener> &listener) {
68         if (listener->IsAsyncCallback()) {
69             /* AsyncCallback(BusinessError error, T data) */
70             napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second};
71             listener->Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg);
72         } else {
73             /* Callback(T data) */
74             napi_value arg[CALLBACK_PARAM_NUM] = {argv.second};
75             listener->Emit(type, CALLBACK_PARAM_NUM, arg);
76         }
77     });
78     std::unique_lock<std::shared_mutex> lock2(mutexForListenersAndEmitByUv_);
79     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
80         [type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchOnce(type); });
81     listeners_.erase(it, listeners_.end());
82 }
83 
EmitWithTwoPara(const std::string & type,const std::tuple<napi_value,napi_value,napi_value> & argv)84 void EventManager::EmitWithTwoPara(const std::string &type, const std::tuple<napi_value, napi_value, napi_value> &argv)
85 {
86     std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
87     auto listeners = listeners_;
88     lock.unlock();
89     std::for_each(listeners.begin(), listeners.end(), [type, argv] (const std::shared_ptr<EventListener> &listener) {
90         if (listener->IsAsyncCallback()) {
91             /* AsyncCallback(BusinessError error, T data) */
92             napi_value arg[ASYNC_CALLBACK_TWO_PARAM_NUM] = {std::get<0>(argv), std::get<1>(argv), std::get<2>(argv)};
93             listener->Emit(type, ASYNC_CALLBACK_TWO_PARAM_NUM, arg);
94         } else {
95             /* Callback(T data) */
96             napi_value arg[CALLBACK_TWO_PARAM_NUM] = {std::get<1>(argv), std::get<2>(argv)};
97             listener->Emit(type, CALLBACK_TWO_PARAM_NUM, arg);
98         }
99     });
100     std::unique_lock<std::shared_mutex> lock2(mutexForListenersAndEmitByUv_);
101     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
102         [type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchOnce(type); });
103     listeners_.erase(it, listeners_.end());
104 }
105 
SetData(void * data)106 void EventManager::SetData(void *data)
107 {
108     data_ = data;
109 }
110 
GetData()111 void *EventManager::GetData()
112 {
113     return data_;
114 }
115 
EmitByUvWithoutCheckShared(const std::string & type,void * data,void (* Handler)(uv_work_t *,int))116 void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int))
117 {
118     std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
119     bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(),
120         [] (const std::shared_ptr<EventListener> &listener) { return listener->MatchType(ON_HEADER_RECEIVE); }) !=
121         listeners_.end();
122 
123     bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(),
124         [] (const std::shared_ptr<EventListener> &listener) { return listener->MatchType(ON_HEADERS_RECEIVE);}) !=
125         listeners_.end();
126 
127     if (!foundHeader && !foundHeaders) {
128         if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
129             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
130             delete tempMap;
131             return;
132         }
133     } else if (foundHeader && !foundHeaders) {
134         if (type == ON_HEADERS_RECEIVE) {
135             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
136             delete tempMap;
137             return;
138         }
139     } else if (!foundHeader) {
140         if (type == ON_HEADER_RECEIVE) {
141             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
142             delete tempMap;
143             return;
144         }
145     }
146 
147     std::for_each(listeners_.begin(), listeners_.end(),
148         [type, data, Handler, this] (const std::shared_ptr<EventListener> &listener) {
149             if (listener->MatchType(type) && listener->GetCallbackRef() != nullptr) {
150                 auto workWrapper = new UvWorkWrapperShared(data, listener->GetEnv(), type, shared_from_this());
151                 NapiUtils::CreateUvQueueWork(listener->GetEnv(), workWrapper, Handler);
152             }
153         });
154 }
155 
SetQueueData(void * data)156 void EventManager::SetQueueData(void *data)
157 {
158     std::lock_guard<std::mutex> lock(dataQueueMutex_);
159     dataQueue_.push(data);
160 }
161 
GetQueueData()162 void *EventManager::GetQueueData()
163 {
164     std::lock_guard<std::mutex> lock(dataQueueMutex_);
165     if (!dataQueue_.empty()) {
166         auto data = dataQueue_.front();
167         dataQueue_.pop();
168         return data;
169     }
170     NETSTACK_LOGE("eventManager data queue is empty");
171     return nullptr;
172 }
173 
SetServerQueueData(void * wsi,void * data)174 void EventManager::SetServerQueueData(void *wsi, void *data)
175 {
176     std::unique_lock<std::shared_mutex> lock(dataServerQueueMutex_);
177     serverDataQueue_[wsi].push(data);
178 }
179 
GetServerQueueData(void * wsi)180 void *EventManager::GetServerQueueData(void *wsi)
181 {
182     if (wsi == nullptr) {
183         NETSTACK_LOGE("wsi is nullptr");
184         return nullptr;
185     }
186     {
187         std::shared_lock<std::shared_mutex> lock(dataServerQueueMutex_);
188         if (serverDataQueue_.empty() || serverDataQueue_.find(wsi) == serverDataQueue_.end()) {
189             NETSTACK_LOGE("eventManager server data queue is empty");
190             return nullptr;
191         }
192         auto data = serverDataQueue_[wsi].front();
193         serverDataQueue_[wsi].pop();
194         return data;
195     }
196 }
197 
HasEventListener(const std::string & type)198 bool EventManager::HasEventListener(const std::string &type)
199 {
200     std::shared_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
201     return std::any_of(listeners_.begin(), listeners_.end(),
202         [&type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchType(type); });
203 }
204 
DeleteListener(const std::string & type)205 void EventManager::DeleteListener(const std::string &type)
206 {
207     std::unique_lock<std::shared_mutex> lock(mutexForListenersAndEmitByUv_);
208     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
209         [type] (const std::shared_ptr<EventListener> &listener) -> bool { return listener->MatchType(type); });
210     listeners_.erase(it, listeners_.end());
211 }
212 
213 std::mutex EventManager::mutexForManager_;
214 EventManagerMagic EventManager::magic_;
215 
CreateEventReference(napi_env env,napi_value value)216 void EventManager::CreateEventReference(napi_env env, napi_value value)
217 {
218     if (env != nullptr && value != nullptr) {
219         eventRef_ = NapiUtils::CreateReference(env, value);
220     }
221 }
222 
DeleteEventReference(napi_env env)223 void EventManager::DeleteEventReference(napi_env env)
224 {
225     if (env != nullptr && eventRef_ != nullptr) {
226         NapiUtils::DeleteReference(env, eventRef_);
227     }
228     eventRef_ = nullptr;
229 }
230 
SetEventDestroy(bool flag)231 void EventManager::SetEventDestroy(bool flag)
232 {
233     isDestroy_.store(flag);
234 }
235 
IsEventDestroy()236 bool EventManager::IsEventDestroy()
237 {
238     return isDestroy_.load();
239 }
240 
GetWebSocketTextData()241 const std::string &EventManager::GetWebSocketTextData()
242 {
243     return webSocketTextData_;
244 }
245 
AppendWebSocketTextData(void * data,size_t length)246 void EventManager::AppendWebSocketTextData(void *data, size_t length)
247 {
248     webSocketTextData_.append(reinterpret_cast<char *>(data), length);
249 }
250 
GetWebSocketBinaryData()251 const std::string &EventManager::GetWebSocketBinaryData()
252 {
253     return webSocketBinaryData_;
254 }
255 
AppendWebSocketBinaryData(void * data,size_t length)256 void EventManager::AppendWebSocketBinaryData(void *data, size_t length)
257 {
258     webSocketBinaryData_.append(reinterpret_cast<char *>(data), length);
259 }
260 
GetWsServerBinaryData(void * wsi)261 const std::string &EventManager::GetWsServerBinaryData(void *wsi)
262 {
263     return wsServerBinaryData_[wsi];
264 }
265 
GetWsServerTextData(void * wsi)266 const std::string &EventManager::GetWsServerTextData(void *wsi)
267 {
268     return wsServerTextData_[wsi];
269 }
270 
AppendWsServerBinaryData(void * wsi,void * data,size_t length)271 void EventManager::AppendWsServerBinaryData(void *wsi, void *data, size_t length)
272 {
273     wsServerBinaryData_[wsi].append(reinterpret_cast<char *>(data), length);
274 }
275 
AppendWsServerTextData(void * wsi,void * data,size_t length)276 void EventManager::AppendWsServerTextData(void *wsi, void *data, size_t length)
277 {
278     wsServerTextData_[wsi].append(reinterpret_cast<char *>(data), length);
279 }
280 
ClearWsServerBinaryData(void * wsi)281 void EventManager::ClearWsServerBinaryData(void *wsi)
282 {
283     wsServerBinaryData_[wsi].clear();
284 }
285 
ClearWsServerTextData(void * wsi)286 void EventManager::ClearWsServerTextData(void *wsi)
287 {
288     wsServerTextData_[wsi].clear();
289 }
290 
SetMaxConnClientCnt(const uint32_t & cnt)291 void EventManager::SetMaxConnClientCnt(const uint32_t &cnt)
292 {
293     maxConnClientCnt_ = cnt;
294 }
295 
SetMaxConnForOneClient(const uint32_t & cnt)296 void EventManager::SetMaxConnForOneClient(const uint32_t &cnt)
297 {
298     maxConnForOneClient_ = cnt;
299 }
300 
GetMaxConcurrentClientCnt() const301 uint32_t EventManager::GetMaxConcurrentClientCnt() const
302 {
303     return maxConnClientCnt_;
304 }
305 
GetMaxConnForOneClient() const306 uint32_t EventManager::GetMaxConnForOneClient() const
307 {
308     return maxConnForOneClient_;
309 }
310 
AddClientUserData(void * wsi,std::shared_ptr<Websocket::UserData> & data)311 void EventManager::AddClientUserData(void *wsi, std::shared_ptr<Websocket::UserData> &data)
312 {
313     std::lock_guard<std::mutex> lock(mapMutex_);
314     userDataMap_[wsi] = data;
315 }
316 
RemoveClientUserData(void * wsi)317 void EventManager::RemoveClientUserData(void *wsi)
318 {
319     std::lock_guard<std::mutex> lock(mapMutex_);
320     auto it = userDataMap_.find(wsi);
321     if (it != userDataMap_.end()) {
322         userDataMap_.erase(it);
323     }
324 }
325 
ClearWebSocketTextData()326 void EventManager::ClearWebSocketTextData()
327 {
328     webSocketTextData_.clear();
329 }
330 
ClearWebSocketBinaryData()331 void EventManager::ClearWebSocketBinaryData()
332 {
333     webSocketBinaryData_.clear();
334 }
335 
GetDataMutex()336 std::shared_mutex &EventManager::GetDataMutex()
337 {
338     return dataMutex_;
339 }
340 
NotifyRcvThdExit()341 void EventManager::NotifyRcvThdExit()
342 {
343     std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
344     sockRcvExit_ = true;
345     sockRcvThdCon_.notify_one();
346 }
347 
WaitForRcvThdExit()348 void EventManager::WaitForRcvThdExit()
349 {
350     std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
351     sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; });
352 }
353 
SetReuseAddr(bool reuse)354 void EventManager::SetReuseAddr(bool reuse)
355 {
356     isReuseAddr_.store(reuse);
357 }
358 
GetReuseAddr()359 bool EventManager::GetReuseAddr()
360 {
361     return isReuseAddr_.load();
362 }
363 
SetContextState(bool enable)364 void EventManager::SetContextState(bool enable)
365 {
366     isOpened_ = enable;
367 }
368 
GetContextState()369 bool EventManager::GetContextState()
370 {
371     return isOpened_;
372 }
373 
GetProxyData()374 std::shared_ptr<Socks5::Socks5Instance> EventManager::GetProxyData()
375 {
376     return proxyData_;
377 }
378 
SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)379 void EventManager::SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)
380 {
381     proxyData_ = data;
382 }
383 
SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> & userData)384 void EventManager::SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> &userData)
385 {
386     webSocketUserData_ = userData;
387 }
388 
GetWebSocketUserData()389 std::shared_ptr<Websocket::UserData> EventManager::GetWebSocketUserData()
390 {
391     return webSocketUserData_;
392 }
393 
UvWorkWrapperShared(void * theData,napi_env theEnv,std::string eventType,const std::shared_ptr<EventManager> & eventManager)394 UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType,
395                                          const std::shared_ptr<EventManager> &eventManager)
396     : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
397 {
398 }
399 } // namespace OHOS::NetStack
400