1 /*
2 * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include <map>
18
19 #include "event_manager.h"
20
21 #include "napi_utils.h"
22 #include "netstack_log.h"
23
24 namespace OHOS::NetStack {
25 static constexpr const int CALLBACK_PARAM_NUM = 1;
26 static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2;
27 static constexpr const char *ON_HEADER_RECEIVE = "headerReceive";
28 static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive";
29
EventManager()30 EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false), proxyData_{nullptr} {}
31
~EventManager()32 EventManager::~EventManager()
33 {
34 NETSTACK_LOGD("EventManager is destructed by the destructor");
35 }
36
AddListener(napi_env env,const std::string & type,napi_value callback,bool once,bool asyncCallback)37 void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once,
38 bool asyncCallback)
39 {
40 std::unique_lock lock(mutexForListenersAndEmitByUv_);
41 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
42 [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
43 if (it != listeners_.end()) {
44 listeners_.erase(it, listeners_.end());
45 }
46
47 listeners_.emplace_back(GetCurrentThreadId(), env, type, callback, once, asyncCallback);
48 }
49
DeleteListener(const std::string & type,napi_value callback)50 void EventManager::DeleteListener(const std::string &type, napi_value callback)
51 {
52 std::unique_lock lock(mutexForListenersAndEmitByUv_);
53 auto it =
54 std::remove_if(listeners_.begin(), listeners_.end(), [type, callback](const EventListener &listener) -> bool {
55 return listener.Match(type, callback);
56 });
57 listeners_.erase(it, listeners_.end());
58 }
59
Emit(const std::string & type,const std::pair<napi_value,napi_value> & argv)60 void EventManager::Emit(const std::string &type, const std::pair<napi_value, napi_value> &argv)
61 {
62 std::shared_lock lock2(mutexForListenersAndEmitByUv_);
63 auto listeners = listeners_;
64 lock2.unlock();
65 std::for_each(listeners.begin(), listeners.end(), [type, argv](const EventListener &listener) {
66 if (listener.IsAsyncCallback()) {
67 /* AsyncCallback(BusinessError error, T data) */
68 napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second};
69 listener.Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg);
70 } else {
71 /* Callback(T data) */
72 napi_value arg[CALLBACK_PARAM_NUM] = {argv.second};
73 listener.Emit(type, CALLBACK_PARAM_NUM, arg);
74 }
75 });
76
77 std::unique_lock lock(mutexForListenersAndEmitByUv_);
78 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
79 [type](const EventListener &listener) -> bool { return listener.MatchOnce(type); });
80 listeners_.erase(it, listeners_.end());
81 }
82
SetData(void * data)83 void EventManager::SetData(void *data)
84 {
85 data_ = data;
86 }
87
GetData()88 void *EventManager::GetData()
89 {
90 return data_;
91 }
92
EmitByUvWithoutCheckShared(const std::string & type,void * data,void (* Handler)(uv_work_t *,int))93 void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int))
94 {
95 std::shared_lock lock2(mutexForListenersAndEmitByUv_);
96 bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
97 return listener.MatchType(ON_HEADER_RECEIVE);
98 }) != listeners_.end();
99
100 bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
101 return listener.MatchType(ON_HEADERS_RECEIVE);
102 }) != listeners_.end();
103 if (!foundHeader && !foundHeaders) {
104 if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
105 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
106 delete tempMap;
107 }
108 } else if (foundHeader && !foundHeaders) {
109 if (type == ON_HEADERS_RECEIVE) {
110 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
111 delete tempMap;
112 }
113 } else if (!foundHeader) {
114 if (type == ON_HEADER_RECEIVE) {
115 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
116 delete tempMap;
117 }
118 }
119
120 std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
121 if (listener.MatchType(type)) {
122 auto workWrapper = new UvWorkWrapperShared(data, listener.GetEnv(), type, shared_from_this());
123 listener.EmitByUv(type, workWrapper, Handler);
124 }
125 });
126 }
127
SetQueueData(void * data)128 void EventManager::SetQueueData(void *data)
129 {
130 std::lock_guard<std::mutex> lock(dataQueueMutex_);
131 dataQueue_.push(data);
132 }
133
GetQueueData()134 void *EventManager::GetQueueData()
135 {
136 std::lock_guard<std::mutex> lock(dataQueueMutex_);
137 if (!dataQueue_.empty()) {
138 auto data = dataQueue_.front();
139 dataQueue_.pop();
140 return data;
141 }
142 NETSTACK_LOGE("eventManager data queue is empty");
143 return nullptr;
144 }
145
HasEventListener(const std::string & type)146 bool EventManager::HasEventListener(const std::string &type)
147 {
148 std::shared_lock lock(mutexForListenersAndEmitByUv_);
149 return std::any_of(listeners_.begin(), listeners_.end(),
150 [&type](const EventListener &listener) -> bool { return listener.MatchType(type); });
151 }
152
DeleteListener(const std::string & type)153 void EventManager::DeleteListener(const std::string &type)
154 {
155 std::unique_lock lock(mutexForListenersAndEmitByUv_);
156 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
157 [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
158 listeners_.erase(it, listeners_.end());
159 }
160
161 std::mutex EventManager::mutexForManager_;
162 EventManagerMagic EventManager::magic_;
163
CreateEventReference(napi_env env,napi_value value)164 void EventManager::CreateEventReference(napi_env env, napi_value value)
165 {
166 if (env != nullptr && value != nullptr) {
167 eventRef_ = NapiUtils::CreateReference(env, value);
168 }
169 }
170
DeleteEventReference(napi_env env)171 void EventManager::DeleteEventReference(napi_env env)
172 {
173 if (env != nullptr && eventRef_ != nullptr) {
174 NapiUtils::DeleteReference(env, eventRef_);
175 }
176 eventRef_ = nullptr;
177 }
178
SetEventDestroy(bool flag)179 void EventManager::SetEventDestroy(bool flag)
180 {
181 isDestroy_.store(flag);
182 }
183
IsEventDestroy()184 bool EventManager::IsEventDestroy()
185 {
186 return isDestroy_.load();
187 }
188
GetWebSocketTextData()189 const std::string &EventManager::GetWebSocketTextData()
190 {
191 return webSocketTextData_;
192 }
193
AppendWebSocketTextData(void * data,size_t length)194 void EventManager::AppendWebSocketTextData(void *data, size_t length)
195 {
196 webSocketTextData_.append(reinterpret_cast<char *>(data), length);
197 }
198
GetWebSocketBinaryData()199 const std::string &EventManager::GetWebSocketBinaryData()
200 {
201 return webSocketBinaryData_;
202 }
203
AppendWebSocketBinaryData(void * data,size_t length)204 void EventManager::AppendWebSocketBinaryData(void *data, size_t length)
205 {
206 webSocketBinaryData_.append(reinterpret_cast<char *>(data), length);
207 }
208
ClearWebSocketTextData()209 void EventManager::ClearWebSocketTextData()
210 {
211 webSocketTextData_.clear();
212 }
213
ClearWebSocketBinaryData()214 void EventManager::ClearWebSocketBinaryData()
215 {
216 webSocketBinaryData_.clear();
217 }
218
GetDataMutex()219 std::shared_mutex &EventManager::GetDataMutex()
220 {
221 return dataMutex_;
222 }
223
NotifyRcvThdExit()224 void EventManager::NotifyRcvThdExit()
225 {
226 std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
227 sockRcvExit_ = true;
228 sockRcvThdCon_.notify_one();
229 }
230
WaitForRcvThdExit()231 void EventManager::WaitForRcvThdExit()
232 {
233 std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
234 sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; });
235 }
236
SetReuseAddr(bool reuse)237 void EventManager::SetReuseAddr(bool reuse)
238 {
239 isReuseAddr_.store(reuse);
240 }
241
GetReuseAddr()242 bool EventManager::GetReuseAddr()
243 {
244 return isReuseAddr_.load();
245 }
246
GetProxyData()247 std::shared_ptr<Socks5::Socks5Instance> EventManager::GetProxyData()
248 {
249 std::unique_lock<std::shared_mutex> lock(dataMutex_);
250 return proxyData_;
251 }
252
SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)253 void EventManager::SetProxyData(std::shared_ptr<Socks5::Socks5Instance> data)
254 {
255 std::unique_lock<std::shared_mutex> lock(dataMutex_);
256 proxyData_ = data;
257 }
258
SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> & userData)259 void EventManager::SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> &userData)
260 {
261 std::unique_lock<std::shared_mutex> lock(dataMutex_);
262 webSocketUserData_ = userData;
263 }
264
GetWebSocketUserData()265 std::shared_ptr<Websocket::UserData> EventManager::GetWebSocketUserData()
266 {
267 std::unique_lock<std::shared_mutex> lock(dataMutex_);
268 return webSocketUserData_;
269 }
270
UvWorkWrapperShared(void * theData,napi_env theEnv,std::string eventType,const std::shared_ptr<EventManager> & eventManager)271 UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType,
272 const std::shared_ptr<EventManager> &eventManager)
273 : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
274 {
275 }
276 } // namespace OHOS::NetStack
277