1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "softbus_base_listener.h"
17
18 #include <securec.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include "common_list.h"
23 #include "softbus_adapter_errcode.h"
24 #include "softbus_adapter_mem.h"
25 #include "softbus_adapter_socket.h"
26 #include "softbus_conn_common.h"
27 #include "softbus_conn_interface.h"
28 #include "softbus_def.h"
29 #include "softbus_errcode.h"
30 #include "softbus_feature_config.h"
31 #include "softbus_log.h"
32 #include "softbus_socket.h"
33 #include "softbus_utils.h"
34
35 #define MAX_LISTEN_EVENTS 1024
36 #define DEFAULT_BACKLOG 4
37 #define FDARR_EXPAND_BASE 2
38
39 #define LISTENER_TAG "[base-listener] "
40
41 enum BaseListenerStatus {
42 LISTENER_IDLE = 0,
43 LISTENER_RUNNING,
44 };
45
46 typedef struct {
47 ListNode node;
48 int32_t fd;
49 uint32_t triggerSet;
50 } FdNode;
51
52 typedef struct {
53 ListNode waitEventFds;
54 uint32_t waitEventFdsLen;
55
56 ModeType modeType;
57 LocalListenerInfo listenerInfo;
58 int32_t listenFd;
59 int32_t listenPort;
60
61 enum BaseListenerStatus status;
62 } SoftbusBaseListenerInfo;
63
64 typedef struct {
65 ListenerModule module;
66 SoftBusMutex lock;
67 SoftbusBaseListener listener;
68 const SocketInterface *socketIf;
69 SoftbusBaseListenerInfo info;
70 int32_t objectRc;
71 } SoftbusListenerNode;
72
73 typedef struct {
74 uint32_t traceId;
75 // pipe fds, to wakeup select thread in time
76 int32_t ctrlRfd;
77 int32_t ctrlWfd;
78
79 SoftBusMutex lock;
80 int32_t referenceCount;
81 } SelectThreadState;
82
83 static int32_t ShutdownBaseListener(SoftbusListenerNode *node);
84 static int32_t StartSelectThread(void);
85 static int32_t StopSelectThread(void);
86 static void WakeupSelectThread(void);
87
88 static SoftBusMutex g_listenerListLock = { 0 };
89 static SoftbusListenerNode *g_listenerList[UNUSE_BUTT] = { 0 };
90 static SoftBusMutex g_selectThreadStateLock = { 0 };
91 static SelectThreadState *g_selectThreadState = NULL;
92
GetListenerNode(ListenerModule module)93 static SoftbusListenerNode *GetListenerNode(ListenerModule module)
94 {
95 int32_t status = SoftBusMutexLock(&g_listenerListLock);
96 CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, NULL,
97 LISTENER_TAG "ATTENTION UNEXPECTED ERROR! request listener node failed: try to lock listener lists failed, "
98 "module=%d, error=%d",
99 module, status);
100 SoftbusListenerNode *node = g_listenerList[module];
101 do {
102 if (node == NULL) {
103 break;
104 }
105 status = SoftBusMutexLock(&node->lock);
106 if (status != SOFTBUS_OK) {
107 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! request listener node failed: try to lock listener failed, "
108 "module=%d, error=%d",
109 module, status);
110 node = NULL;
111 break;
112 }
113 node->objectRc += 1;
114 SoftBusMutexUnlock(&node->lock);
115 } while (false);
116 (void)SoftBusMutexUnlock(&g_listenerListLock);
117 return node;
118 }
119
RemoveListenerNode(SoftbusListenerNode * node)120 static void RemoveListenerNode(SoftbusListenerNode *node)
121 {
122 int32_t status = SoftBusMutexLock(&g_listenerListLock);
123 CONN_CHECK_AND_RETURN_LOG(status == SOFTBUS_OK,
124 LISTENER_TAG "ATTENTION UNEXPECTED ERROR! remove listener node failed: try to lock listener lists failed, "
125 "module=%d, error=%d",
126 node->module, status);
127 do {
128 if (g_listenerList[node->module] != node) {
129 CLOGW(LISTENER_TAG "ATTENTION! remove listener node warning: listener node is not in listener list,"
130 " repeat remove? just skip, module=%d", node->module);
131 break;
132 }
133 status = SoftBusMutexLock(&node->lock);
134 if (status != SOFTBUS_OK) {
135 CLOGE(LISTENER_TAG "ATTENTION UNEXPECT ERROR! remove listener node failed: "
136 "try to lock listener node failed, module=%d", node->module);
137 break;
138 }
139 // decrease root object reference
140 node->objectRc -= 1;
141 g_listenerList[node->module] = NULL;
142 (void)SoftBusMutexUnlock(&node->lock);
143 } while (false);
144 (void)SoftBusMutexUnlock(&g_listenerListLock);
145 }
146
ReturnListenerNode(SoftbusListenerNode ** nodePtr)147 static void ReturnListenerNode(SoftbusListenerNode **nodePtr)
148 {
149 SoftbusListenerNode *node = *nodePtr;
150 do {
151 int32_t status = SoftBusMutexLock(&node->lock);
152 if (status != SOFTBUS_OK) {
153 CLOGE(LISTENER_TAG "ATTENTION UNEXPECT ERROR! return listener node failed: "
154 "try to lock listener node failed, module=%d", node->module);
155 break;
156 }
157 node->objectRc -= 1;
158 int32_t objectRc = node->objectRc;
159 (void)SoftBusMutexUnlock(&node->lock);
160
161 if (objectRc > 0) {
162 break;
163 }
164 CLOGI(LISTENER_TAG
165 "release listener node, object reference count <= 0, free listener node, module=%d, object reference=%d",
166 node->module, objectRc);
167 (void)ShutdownBaseListener(node);
168 SoftBusFree(node);
169 } while (false);
170
171 *nodePtr = NULL;
172 }
173
CreateSpecifiedListenerModule(ListenerModule module)174 static SoftbusListenerNode *CreateSpecifiedListenerModule(ListenerModule module)
175 {
176 SoftbusListenerNode *node = (SoftbusListenerNode *)SoftBusCalloc(sizeof(SoftbusListenerNode));
177 CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, NULL,
178 LISTENER_TAG
179 "ATTENTION UNEXPECTED ERROR! create specified listener module failed: calloc listener node object failed, "
180 "module=%d",
181 module);
182
183 node->module = module;
184 // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
185 int32_t status = SoftBusMutexInit(&node->lock, NULL);
186 if (status != SOFTBUS_OK) {
187 CLOGE(LISTENER_TAG
188 "ATTENTION UNEXPECTED ERROR! create specified listener module failed: init lock failed, module=%d, "
189 "error=%d",
190 module, status);
191 SoftBusFree(node);
192 return NULL;
193 }
194 node->listener.onConnectEvent = NULL;
195 node->listener.onDataEvent = NULL;
196
197 node->socketIf = NULL;
198
199 ListInit(&node->info.waitEventFds);
200 node->info.waitEventFdsLen = 0;
201 node->info.modeType = UNSET_MODE;
202 (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
203 node->info.listenFd = -1;
204 node->info.listenPort = -1;
205 // set root object reference count 1
206 node->objectRc = 1;
207 return node;
208 }
209
CreateStaticModulesUnsafe(void)210 static int32_t CreateStaticModulesUnsafe(void)
211 {
212 ListenerModule module = 0;
213 for (; module < LISTENER_MODULE_DYNAMIC_START; module++) {
214 SoftbusListenerNode *node = CreateSpecifiedListenerModule(module);
215 if (node == NULL) {
216 CLOGW(LISTENER_TAG "create static module failed: create module listener node failed, module=%d", module);
217 goto CLEANUP;
218 }
219 CLOGI(LISTENER_TAG "create static module, create module listener node success, module=%d", module);
220 g_listenerList[module] = node;
221 }
222 return SOFTBUS_OK;
223 CLEANUP:
224 for (ListenerModule i = module - 1; i >= 0; i--) {
225 SoftbusListenerNode *node = g_listenerList[i];
226 g_listenerList[i] = NULL;
227 // cleanup
228 ReturnListenerNode(&node);
229 CLOGI(LISTENER_TAG "create static module failed: clean up listener node done, module=%d", module);
230 }
231 return SOFTBUS_ERR;
232 }
233
InitBaseListener(void)234 int32_t InitBaseListener(void)
235 {
236 // stop select thread need re-enter lock
237 SoftBusMutexAttr attr = {
238 .type = SOFTBUS_MUTEX_RECURSIVE,
239 };
240 int32_t status = SoftBusMutexInit(&g_selectThreadStateLock, &attr);
241 if (status != SOFTBUS_OK) {
242 CLOGE(LISTENER_TAG
243 "ATTENTION UNEXPECTED ERROR! init base listener failed: init select thread lock failed, error=%d",
244 status);
245 return SOFTBUS_LOCK_ERR;
246 }
247 // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
248 status = SoftBusMutexInit(&g_listenerListLock, NULL);
249 if (status != SOFTBUS_OK) {
250 SoftBusMutexDestroy(&g_selectThreadStateLock);
251 CLOGE(LISTENER_TAG
252 "ATTENTION UNEXPECTED ERROR! init base listener failed: init listener list lock failed, error=%d",
253 status);
254 return SOFTBUS_LOCK_ERR;
255 }
256
257 status = SoftBusMutexLock(&g_listenerListLock);
258 if (status != SOFTBUS_OK) {
259 CLOGE(LISTENER_TAG
260 "ATTENTION UNEXPECTED ERROR! init base listener failed: try to lock listener list failed, error=%d",
261 status);
262 SoftBusMutexDestroy(&g_selectThreadStateLock);
263 SoftBusMutexDestroy(&g_listenerListLock);
264 return SOFTBUS_LOCK_ERR;
265 }
266 (void)memset_s(g_listenerList, sizeof(g_listenerList), 0, sizeof(g_listenerList));
267 status = CreateStaticModulesUnsafe();
268 (void)SoftBusMutexUnlock(&g_listenerListLock);
269 if (status != SOFTBUS_OK) {
270 CLOGE(LISTENER_TAG "init base listener failed: create static module listener failed, error=%d", status);
271 SoftBusMutexDestroy(&g_selectThreadStateLock);
272 SoftBusMutexDestroy(&g_listenerListLock);
273 return status;
274 }
275
276 return SOFTBUS_OK;
277 }
278
DeinitBaseListener(void)279 NO_SANITIZE("cfi") void DeinitBaseListener(void)
280 {
281 for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
282 SoftbusListenerNode *node = GetListenerNode(module);
283 if (node == NULL) {
284 continue;
285 }
286 RemoveListenerNode(node);
287 ReturnListenerNode(&node);
288 }
289 }
290
CreateListenerModule(void)291 NO_SANITIZE("cfi") uint32_t CreateListenerModule(void)
292 {
293 int32_t status = SoftBusMutexLock(&g_listenerListLock);
294 CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, UNUSE_BUTT,
295 LISTENER_TAG
296 "ATTENTION UNEXPECTED ERROR! create listener module failed: try to lock listener list failed, error=%d",
297 status);
298
299 ListenerModule module = LISTENER_MODULE_DYNAMIC_START;
300 for (; module <= LISTENER_MODULE_DYNAMIC_END; module++) {
301 if (g_listenerList[module] != NULL) {
302 continue;
303 }
304 SoftbusListenerNode *node = CreateSpecifiedListenerModule(module);
305 if (node == NULL) {
306 CLOGE(LISTENER_TAG "create listener module failed, create specified listener module failed, module=%d",
307 module);
308 module = UNUSE_BUTT;
309 } else {
310 CLOGE(LISTENER_TAG "create listener module success, module=%d", module);
311 g_listenerList[module] = node;
312 }
313 break;
314 }
315 (void)SoftBusMutexUnlock(&g_listenerListLock);
316 return module;
317 }
318
DestroyBaseListener(ListenerModule module)319 NO_SANITIZE("cfi") void DestroyBaseListener(ListenerModule module)
320 {
321 CONN_CHECK_AND_RETURN_LOG(module >= LISTENER_MODULE_DYNAMIC_START && module <= LISTENER_MODULE_DYNAMIC_END,
322 LISTENER_TAG "destroy base listener failed: only dynamic module support destroy, module=%d", module);
323
324 CLOGI(LISTENER_TAG "receive destroy base listener request, module=%d", module);
325 SoftbusListenerNode *node = GetListenerNode(module);
326 if (node == NULL) {
327 CLOGW(LISTENER_TAG "destroy base listener warning, listener not exist, module=%d", module);
328 return;
329 }
330 RemoveListenerNode(node);
331 ReturnListenerNode(&node);
332 }
333
StartBaseClient(ListenerModule module,const SoftbusBaseListener * listener)334 int32_t StartBaseClient(ListenerModule module, const SoftbusBaseListener *listener)
335 {
336 CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
337 LISTENER_TAG "start base client listener failed: invalid module, module=%d", module);
338 CONN_CHECK_AND_RETURN_RET_LOG(listener != NULL, SOFTBUS_INVALID_PARAM,
339 LISTENER_TAG "start base client listener failed: listener is null, module=%d", module);
340 CONN_CHECK_AND_RETURN_RET_LOG(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM,
341 LISTENER_TAG "start base client listener failed: listener onConnectEvent is null, module=%d", module);
342 CONN_CHECK_AND_RETURN_RET_LOG(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM,
343 LISTENER_TAG "start base client listener failed: listener onDataEvent is null, module=%d", module);
344
345 CLOGI(LISTENER_TAG "receive start base client listener request, module=%d", module);
346
347 SoftbusListenerNode *node = GetListenerNode(module);
348 CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
349 LISTENER_TAG "start base client listener failed: get listener node failed, dynamic module forgot register "
350 "first? or static module start before base listener init? module=%d",
351 module);
352
353 int32_t status = SoftBusMutexLock(&node->lock);
354 if (status != SOFTBUS_OK) {
355 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! start base client listener failed: try to lock listener node "
356 "failed, module=%d, error=%d",
357 module, status);
358 ReturnListenerNode(&node);
359 return SOFTBUS_LOCK_ERR;
360 }
361 do {
362 if (node->info.status != LISTENER_IDLE) {
363 CLOGE(LISTENER_TAG "start base client listener failed: listener is not idle status, module=%d, status=%d",
364 module, node->info.status);
365 status = SOFTBUS_ERR;
366 break;
367 }
368 node->listener.onConnectEvent = listener->onConnectEvent;
369 node->listener.onDataEvent = listener->onDataEvent;
370 status = StartSelectThread();
371 if (status != SOFTBUS_OK) {
372 CLOGE(LISTENER_TAG "start base client listener failed: start select thread failed, module=%d, status=%d",
373 module, status);
374 break;
375 }
376 node->info.status = LISTENER_RUNNING;
377 CLOGI(LISTENER_TAG "start base client listener success, module=%d", module);
378 } while (false);
379 (void)SoftBusMutexUnlock(&node->lock);
380 ReturnListenerNode(&node);
381 return status;
382 }
383
StartServerListenUnsafe(SoftbusListenerNode * node,const LocalListenerInfo * info)384 static int32_t StartServerListenUnsafe(SoftbusListenerNode *node, const LocalListenerInfo *info)
385 {
386 ListenerModule module = node->module;
387 ProtocolType protocol = info->socketOption.protocol;
388 const SocketInterface *socketIf = GetSocketInterface(protocol);
389 if (socketIf == NULL) {
390 CLOGE(LISTENER_TAG
391 "not find protocal implement, protocal implement should register first, module=%d, protocal=%d",
392 module, protocol);
393 return SOFTBUS_NOT_FIND;
394 }
395 node->socketIf = socketIf;
396
397 int32_t listenFd = -1;
398 int32_t listenPort = -1;
399 int32_t status = SOFTBUS_OK;
400 do {
401 listenFd = socketIf->OpenServerSocket(info);
402 if (listenFd < 0) {
403 CLOGE(LISTENER_TAG "create server socket failed: module=%d, invalid listen fd=%d", module, listenFd);
404 status = SOFTBUS_TCP_SOCKET_ERR;
405 break;
406 }
407 status = SoftBusSocketListen(listenFd, DEFAULT_BACKLOG);
408 if (status != SOFTBUS_OK) {
409 CLOGE(LISTENER_TAG "listen server socket failed: module=%d, error=%d", module, status);
410 break;
411 }
412 listenPort = socketIf->GetSockPort(listenFd);
413 if (listenPort < 0) {
414 CLOGE(LISTENER_TAG "get listen server port failed: module=%d, listen fd=%d, error=%d", module, listenFd,
415 status);
416 status = SOFTBUS_TCP_SOCKET_ERR;
417 break;
418 }
419 if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
420 CLOGE(LISTENER_TAG "memcpy_s local listener info object failed: module=%d", module);
421 status = SOFTBUS_MEM_ERR;
422 break;
423 }
424 node->info.modeType = SERVER_MODE;
425 node->info.listenFd = listenFd;
426 node->info.listenPort = listenPort;
427 } while (false);
428 if (status != SOFTBUS_OK && listenFd > 0) {
429 ConnShutdownSocket(listenFd);
430 }
431 return status == SOFTBUS_OK ? listenPort : status;
432 }
433
CleanupServerListenInfoUnsafe(SoftbusListenerNode * node)434 static void CleanupServerListenInfoUnsafe(SoftbusListenerNode *node)
435 {
436 memset_s(&node->info.listenerInfo, sizeof(SoftbusBaseListenerInfo), 0, sizeof(SoftbusBaseListenerInfo));
437 if (node->info.listenFd > 0) {
438 ConnShutdownSocket(node->info.listenFd);
439 }
440 node->info.listenFd = -1;
441 node->info.listenPort = -1;
442 }
443
StartBaseListener(const LocalListenerInfo * info,const SoftbusBaseListener * listener)444 int32_t StartBaseListener(const LocalListenerInfo *info, const SoftbusBaseListener *listener)
445 {
446 CONN_CHECK_AND_RETURN_RET_LOG(
447 info != NULL, SOFTBUS_INVALID_PARAM, LISTENER_TAG "start base listener failed: info is null");
448 CONN_CHECK_AND_RETURN_RET_LOG(info->type == CONNECT_TCP || info->type == CONNECT_P2P, SOFTBUS_INVALID_PARAM,
449 LISTENER_TAG "start base listener failed: only CONNECT_TCP(%d) and CONNECT_P2P(%d) is permitted, type=%d",
450 CONNECT_TCP, CONNECT_P2P, info->type);
451 CONN_CHECK_AND_RETURN_RET_LOG(info->socketOption.port >= 0, SOFTBUS_INVALID_PARAM,
452 LISTENER_TAG "start base listener failed: port is invalid, port=%d", info->socketOption.port);
453 CONN_CHECK_AND_RETURN_RET_LOG(info->socketOption.moduleId >= 0 && info->socketOption.moduleId < UNUSE_BUTT,
454 SOFTBUS_INVALID_PARAM, LISTENER_TAG "start base client listener failed: invalid module, module=%d",
455 info->socketOption.moduleId);
456 CONN_CHECK_AND_RETURN_RET_LOG(listener != NULL, SOFTBUS_INVALID_PARAM,
457 LISTENER_TAG "start base listener failed: listener is null, module=%d", info->socketOption.moduleId);
458 CONN_CHECK_AND_RETURN_RET_LOG(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM,
459 LISTENER_TAG "start base listener failed: listener is null, module=%d", info->socketOption.moduleId);
460 CONN_CHECK_AND_RETURN_RET_LOG(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM,
461 LISTENER_TAG "start base listener failed: listener is null, module=%d", info->socketOption.moduleId);
462
463 ListenerModule module = info->socketOption.moduleId;
464 CLOGI(LISTENER_TAG "receive start base listener request, module=%d", module);
465 SoftbusListenerNode *node = GetListenerNode(module);
466 CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
467 LISTENER_TAG
468 "start base listener failed: get listener node failed, dynamic module should register first, module=%d",
469 module);
470 int32_t status = SoftBusMutexLock(&node->lock);
471 if (status != SOFTBUS_OK) {
472 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! start base listener failed: try to lock listener node failed, "
473 "module=%d, error=%d",
474 module, status);
475 ReturnListenerNode(&node);
476 return SOFTBUS_LOCK_ERR;
477 }
478
479 int32_t listenPort = -1;
480 do {
481 if (node->info.status != LISTENER_IDLE) {
482 CLOGE(LISTENER_TAG "start base listener failed: listener is not idle status, module=%d, status=%d", module,
483 node->info.status);
484 status = SOFTBUS_ERR;
485 break;
486 }
487
488 node->listener.onConnectEvent = listener->onConnectEvent;
489 node->listener.onDataEvent = listener->onDataEvent;
490 if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
491 CLOGE(LISTENER_TAG "start base listener failed: memcpy_s listener info failed, module=%d", node->module);
492 status = SOFTBUS_LOCK_ERR;
493 break;
494 }
495 listenPort = StartServerListenUnsafe(node, info);
496 if (listenPort <= 0) {
497 CLOGE(LISTENER_TAG "start base listener failed: start server failed, module=%d, invalid listen port=%d",
498 module, listenPort);
499 status = SOFTBUS_ERR;
500 break;
501 }
502
503 status = StartSelectThread();
504 if (status != SOFTBUS_OK) {
505 CLOGE(LISTENER_TAG "start base listener failed: start listener thread failed, module=%d, status=%d", module,
506 status);
507 CleanupServerListenInfoUnsafe(node);
508 break;
509 }
510 node->info.status = LISTENER_RUNNING;
511 CLOGI(LISTENER_TAG "start base listener success, module=%d, listen fd=%d, listen port=%d", module,
512 node->info.listenFd, listenPort);
513 } while (false);
514 (void)SoftBusMutexUnlock(&node->lock);
515 ReturnListenerNode(&node);
516 return status == SOFTBUS_OK ? listenPort : status;
517 }
518
StopBaseListener(ListenerModule module)519 int32_t StopBaseListener(ListenerModule module)
520 {
521 CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
522 LISTENER_TAG "stop base listener failed: invalid module, module=%d", module);
523
524 CLOGI(LISTENER_TAG "receive stop base listener request, module=%d", module);
525 SoftbusListenerNode *node = GetListenerNode(module);
526 CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
527 LISTENER_TAG "stop base listener failed: listener node not exist, module=%d", module);
528
529 int32_t status = ShutdownBaseListener(node);
530 if (status != SOFTBUS_OK) {
531 CLOGE(LISTENER_TAG "stop base listener failed: stop listen thread failed, module=%d, error=%d",
532 module, status);
533 }
534 ReturnListenerNode(&node);
535 return status;
536 }
537
ShutdownBaseListener(SoftbusListenerNode * node)538 static int32_t ShutdownBaseListener(SoftbusListenerNode *node)
539 {
540 int32_t status = SoftBusMutexLock(&node->lock);
541 CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
542 LISTENER_TAG "ATTENTION UNEXPECTED ERROR! shutdown base listener failed: try to lock listener node failed, "
543 "module=%d, error=%d",
544 node->module, status);
545
546 do {
547 if (node->info.status != LISTENER_RUNNING) {
548 CLOGW(LISTENER_TAG
549 "shutdown base listener warning, listener is not running, just skip, module=%d, error=%d",
550 node->module, node->info.status);
551 break;
552 }
553 status = StopSelectThread();
554 if (status != SOFTBUS_OK) {
555 CLOGE(LISTENER_TAG "shutdown base listener failed: stop select thread failed, module=%d, error=%d",
556 node->module, status);
557 break;
558 }
559 node->info.status = LISTENER_IDLE;
560
561 FdNode *it = NULL;
562 FdNode *next = NULL;
563 LIST_FOR_EACH_ENTRY_SAFE(it, next, &node->info.waitEventFds, FdNode, node) {
564 CLOGE(LISTENER_TAG
565 "ATTENTION, shutdown base listener warning: listener node there is fd not close, module=%d, fd=%d, "
566 "trigger set=%u",
567 node->module, it->fd, it->triggerSet);
568 // not close fd, repeat close will crash process
569 ListDelete(&it->node);
570 SoftBusFree(it);
571 }
572 node->info.waitEventFdsLen = 0;
573
574 int32_t listenFd = node->info.listenFd;
575 int32_t listenPort = node->info.listenPort;
576 if (node->info.modeType == SERVER_MODE && listenFd > 0) {
577 CLOGE(LISTENER_TAG "shutdown base listener, close server, module=%d, listen fd=%d, port=%d", node->module,
578 listenFd, listenPort);
579 ConnCloseSocket(listenFd);
580 }
581 node->info.modeType = UNSET_MODE;
582 node->info.listenFd = -1;
583 node->info.listenPort = -1;
584 (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
585 } while (false);
586
587 SoftBusMutexUnlock(&node->lock);
588 return status;
589 }
590
IsValidTriggerType(TriggerType trigger)591 static bool IsValidTriggerType(TriggerType trigger)
592 {
593 switch (trigger) {
594 case READ_TRIGGER:
595 case WRITE_TRIGGER:
596 case EXCEPT_TRIGGER:
597 case RW_TRIGGER:
598 return true;
599 default:
600 return false;
601 }
602 }
603
AddTrigger(ListenerModule module,int32_t fd,TriggerType trigger)604 NO_SANITIZE("cfi") int32_t AddTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
605 {
606 CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
607 LISTENER_TAG "add trigger failed: invalid module, module=%d", module);
608 CONN_CHECK_AND_RETURN_RET_LOG(
609 fd > 0, SOFTBUS_INVALID_PARAM, LISTENER_TAG "add trigger failed: invalid fd, module=%d, fd=%d", module, fd);
610 CONN_CHECK_AND_RETURN_RET_LOG(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM,
611 LISTENER_TAG "add trigger failed: invalid trigger, module=%d, fd=%d, trigger=%d", module, fd, trigger);
612
613 CLOGI(LISTENER_TAG "receive add trigger request, module=%d, fd=%d, trigger=%d", module, fd, trigger);
614 SoftbusListenerNode *node = GetListenerNode(module);
615 CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
616 LISTENER_TAG "add trigger failed: listener node not exist, module=%d, fd=%d, trigger=%d", module, fd, trigger);
617
618 int32_t status = SoftBusMutexLock(&node->lock);
619 if (status != SOFTBUS_OK) {
620 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! add trigger failed: try to lock listener node failed, "
621 "module=%d, fd=%d, trigger=%d, error=%d",
622 module, fd, trigger, status);
623 ReturnListenerNode(&node);
624 return SOFTBUS_LOCK_ERR;
625 }
626
627 bool wakeup = false;
628 do {
629 if (node->info.status != LISTENER_RUNNING) {
630 CLOGE(LISTENER_TAG "add trigger failed: module is not running, call 'StartBaseListener' or "
631 "'StartBaseClient' first, module=%d, fd=%d, trigger=%d",
632 module, fd, trigger);
633 status = SOFTBUS_ERR;
634 break;
635 }
636
637 if (node->info.waitEventFdsLen > MAX_LISTEN_EVENTS) {
638 CLOGE(LISTENER_TAG "add trigger failed: can not trigger more, fd exceed more than %d, module=%d, fd=%d, "
639 "trigger=%d, wait event fds len=%d",
640 MAX_LISTEN_EVENTS, module, fd, trigger, node->info.waitEventFdsLen);
641 status = SOFTBUS_ERR;
642 break;
643 }
644
645 FdNode *target = NULL;
646 FdNode *it = NULL;
647 LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
648 if (fd == it->fd) {
649 target = it;
650 break;
651 }
652 }
653
654 if (target != NULL) {
655 if ((it->triggerSet & trigger) == trigger) {
656 CLOGW(LISTENER_TAG
657 "repeat add trigger, just skip, module=%d, fd=%d, want add trigger=%d, exist trigger set=%u",
658 module, fd, trigger, it->triggerSet);
659 break;
660 }
661 it->triggerSet |= trigger;
662 CLOGI(LISTENER_TAG "add trigger success, module=%d, fd=%d, new add trigger=%d, all trigger set=%u", module,
663 fd, trigger, it->triggerSet);
664 wakeup = true;
665 break;
666 }
667
668 FdNode *fdNode = (FdNode *)SoftBusCalloc(sizeof(FdNode));
669 if (fdNode == NULL) {
670 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! add trigger failed: calloc fd node object failed, "
671 "module=%d, fd=%d, trigger=%d",
672 module, fd, trigger);
673 status = SOFTBUS_MALLOC_ERR;
674 break;
675 }
676 ListInit(&fdNode->node);
677 fdNode->fd = fd;
678 fdNode->triggerSet = trigger;
679 ListAdd(&node->info.waitEventFds, &fdNode->node);
680 node->info.waitEventFdsLen += 1;
681 wakeup = true;
682 CLOGI(LISTENER_TAG "add trigger success, module=%d, fd=%d, trigger=%d", module, fd, trigger);
683 } while (false);
684
685 (void)SoftBusMutexUnlock(&node->lock);
686 ReturnListenerNode(&node);
687
688 if (status == SOFTBUS_OK && wakeup) {
689 WakeupSelectThread();
690 }
691 return status;
692 }
693
DelTrigger(ListenerModule module,int32_t fd,TriggerType trigger)694 NO_SANITIZE("cfi") int32_t DelTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
695 {
696 CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
697 LISTENER_TAG "delete trigger failed: invalid module, module=%d", module);
698 CONN_CHECK_AND_RETURN_RET_LOG(
699 fd > 0, SOFTBUS_INVALID_PARAM, LISTENER_TAG "delete trigger failed: invalid fd, module=%d, fd=%d", module, fd);
700 CONN_CHECK_AND_RETURN_RET_LOG(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM,
701 LISTENER_TAG "delete trigger failed: invalid trigger, module=%d, fd=%d, trigger=%d", module, fd, trigger);
702
703 CLOGI(LISTENER_TAG "receive delete trigger request, module=%d, fd=%d, trigger=%d", module, fd, trigger);
704 SoftbusListenerNode *node = GetListenerNode(module);
705 CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
706 LISTENER_TAG "delete trigger failed: listener node not exist, module=%d, fd=%d, trigger=%d", module, fd,
707 trigger);
708
709 int32_t status = SoftBusMutexLock(&node->lock);
710 if (status != SOFTBUS_OK) {
711 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! delete trigger failed: try to lock listener node failed, "
712 "module=%d, fd=%d, trigger=%d, error=%d",
713 module, fd, trigger, status);
714 ReturnListenerNode(&node);
715 return SOFTBUS_LOCK_ERR;
716 }
717
718 bool wakeup = false;
719 do {
720 FdNode *target = NULL;
721 FdNode *it = NULL;
722 LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
723 if (fd == it->fd) {
724 target = it;
725 break;
726 }
727 }
728
729 if (target == NULL) {
730 CLOGW(LISTENER_TAG
731 "delete trigger warning, fd node not exist, call delete trigger without add trigger before or mismatch "
732 "module. You are warned, fix it quickly! module=%d, fd=%d, trigger=%d",
733 module, fd, trigger);
734 // consider delete trigger success,
735 status = SOFTBUS_OK;
736 break;
737 }
738
739 if ((target->triggerSet & trigger) == 0) {
740 CLOGW(LISTENER_TAG
741 "delete trigger warning, without add trigger before, repeat delete trigger or mismatch module. You are "
742 "warned, fix it quickly! module=%d, fd=%d, want delete trigger=%d, exist trigger set=%u",
743 module, fd, trigger, it->triggerSet);
744 // consider delete trigger success,
745 status = SOFTBUS_OK;
746 break;
747 }
748
749 target->triggerSet &= ~trigger;
750 wakeup = true;
751 if (target->triggerSet != 0) {
752 CLOGI(LISTENER_TAG "delete trigger success, module=%d, fd=%d, trigger=%d, exist trigger set=%u", module, fd,
753 trigger, target->triggerSet);
754 status = SOFTBUS_OK;
755 break;
756 }
757 CLOGI(LISTENER_TAG
758 "delete trigger success, there is not exist any trigger, free fd node now, module=%d, fd=%d, trigger=%d",
759 module, fd, trigger);
760 ListDelete(&target->node);
761 SoftBusFree(target);
762 node->info.waitEventFdsLen -= 1;
763 } while (false);
764
765 SoftBusMutexUnlock(&node->lock);
766 ReturnListenerNode(&node);
767
768 if (status == SOFTBUS_OK && wakeup) {
769 WakeupSelectThread();
770 }
771 return status;
772 }
773
CleanupSelectThreadState(SelectThreadState ** statePtr)774 static void CleanupSelectThreadState(SelectThreadState **statePtr)
775 {
776 SelectThreadState *state = *statePtr;
777 if (state->ctrlRfd != 0) {
778 ConnCloseSocket(state->ctrlRfd);
779 }
780 if (state->ctrlWfd != 0) {
781 ConnCloseSocket(state->ctrlWfd);
782 }
783
784 CLOGI(LISTENER_TAG "cleanup select thread state, traceId=%d, ctrl read fd=%d, ctrl write fd=%d", state->traceId,
785 state->ctrlRfd, state->ctrlWfd);
786 (void)SoftBusMutexDestroy(&state->lock);
787 SoftBusFree(state);
788 *statePtr = NULL;
789 }
790
ProcessCtrlFdEvent(int32_t fd,int32_t wakeupTrace)791 static void ProcessCtrlFdEvent(int32_t fd, int32_t wakeupTrace)
792 {
793 #ifndef __LITEOS__
794 while (true) {
795 int32_t ctrlTraceId = 0;
796 ssize_t len = read(fd, &ctrlTraceId, sizeof(ctrlTraceId));
797 if (len < 0) {
798 int32_t status = errno;
799 if (status == EINTR) {
800 continue;
801 } else if (status == EAGAIN) {
802 break;
803 } else {
804 CLOGE(LISTENER_TAG
805 "process ctrl fd event failed: wakeup trace=%d, fd=%d, invalid read len=%d, error=%d",
806 wakeupTrace, fd, len, status);
807 break;
808 }
809 }
810 CLOGI(LISTENER_TAG "process ctrl fd event, wakeup ctrl message received, wakeup trace=%d, fd=%d, "
811 "ctrl trace=%d, read length=%d",
812 wakeupTrace, fd, ctrlTraceId, len);
813 }
814 #endif
815 }
816
DispatchFdEvent(int32_t fd,ListenerModule module,enum SocketEvent event,const SoftbusBaseListener * listener,int32_t wakeupTrace)817 static void DispatchFdEvent(
818 int32_t fd, ListenerModule module, enum SocketEvent event, const SoftbusBaseListener *listener, int32_t wakeupTrace)
819 {
820 if (listener->onDataEvent != NULL) {
821 listener->onDataEvent(module, event, fd);
822 } else {
823 CLOGE(LISTENER_TAG
824 "process fd event, new event coming, but event listener not registered, to avoid repeat wakeup "
825 "select(LEVEL MODE), close it, wakeup trace=%d, module=%d, fd=%d, event=%d",
826 wakeupTrace, module, fd, event);
827 ConnCloseSocket(fd);
828 }
829 }
830
ProcessSpecifiedServerAcceptEvent(ListenerModule module,int32_t listenFd,ConnectType connectType,const SocketInterface * socketIf,const SoftbusBaseListener * listener,int32_t wakeupTrace)831 static int32_t ProcessSpecifiedServerAcceptEvent(ListenerModule module, int32_t listenFd, ConnectType connectType,
832 const SocketInterface *socketIf, const SoftbusBaseListener *listener, int32_t wakeupTrace)
833 {
834 CONN_CHECK_AND_RETURN_RET_LOG(socketIf != NULL, SOFTBUS_INVALID_PARAM,
835 LISTENER_TAG
836 "process spectified listener node event failed: socket interface implement is null, wakeup trace=%d module=%d",
837 wakeupTrace, module);
838 CONN_CHECK_AND_RETURN_RET_LOG(socketIf->AcceptClient != NULL, SOFTBUS_INVALID_PARAM,
839 LISTENER_TAG "process spectified listener node event failed: socket interface implement not support "
840 "AcceptClient method, wakeup trace=%d, module=%d",
841 wakeupTrace, module);
842
843 int32_t status = SOFTBUS_OK;
844 while (true) {
845 int32_t clientFd = -1;
846 ConnectOption clientAddr = {
847 .type = connectType,
848 .socketOption = {
849 .addr = { 0 },
850 .port = 0,
851 .moduleId = module,
852 .protocol = 0,
853 .keepAlive = 0,
854 },
855 };
856 status = SOFTBUS_TEMP_FAILURE_RETRY(socketIf->AcceptClient(listenFd, &clientAddr, &clientFd));
857 if (status != SOFTBUS_OK) {
858 break;
859 }
860
861 char animizedIp[IP_LEN] = { 0 };
862 ConvertAnonymizeIpAddress(animizedIp, IP_LEN, clientAddr.socketOption.addr, IP_LEN);
863 if (listener->onConnectEvent != NULL) {
864 CLOGI(LISTENER_TAG "process spectified listener node event trace, trigger ACCEPT event, wakeup trace=%d, "
865 "module=%d, listen fd=%d, client ip=%s, client fd=%d",
866 wakeupTrace, module, listenFd, animizedIp, clientFd);
867 listener->onConnectEvent(module, clientFd, &clientAddr);
868 } else {
869 CLOGE(LISTENER_TAG "process spectified listener node event trace, trigger ACCEPT event, but event listener "
870 "not registered, wakeup trace=%d, module=%d, listen fd=%d, client ip=%s, client fd=%d",
871 wakeupTrace, module, listenFd, animizedIp, clientFd);
872 ConnCloseSocket(clientFd);
873 }
874 }
875 return status;
876 }
877
CopyWaitEventFdsUnsafe(const SoftbusListenerNode * node,FdNode ** outArray,uint32_t * outArrayLen)878 static int32_t CopyWaitEventFdsUnsafe(const SoftbusListenerNode *node, FdNode **outArray, uint32_t *outArrayLen)
879 {
880 if (node->info.waitEventFdsLen == 0) {
881 *outArray = NULL;
882 outArrayLen = 0;
883 return SOFTBUS_OK;
884 }
885
886 uint32_t fdArrayLen = node->info.waitEventFdsLen;
887 FdNode *fdArray = (FdNode *)SoftBusCalloc(fdArrayLen * sizeof(FdNode));
888 CONN_CHECK_AND_RETURN_RET_LOG(fdArray != NULL, SOFTBUS_MALLOC_ERR,
889 LISTENER_TAG
890 "ATTENTION UNEXPECTED ERROR! copy wait event fds failed: calloc fd node array object failed, module=%d, wait "
891 "event len=%u",
892 node->module, fdArrayLen);
893
894 uint32_t i = 0;
895 FdNode *item = NULL;
896 bool expand = false;
897 LIST_FOR_EACH_ENTRY(item, &node->info.waitEventFds, FdNode, node) {
898 if (i >= fdArrayLen) {
899 uint32_t tmpLen = fdArrayLen * FDARR_EXPAND_BASE;
900 FdNode *tmp = (FdNode *)SoftBusCalloc(tmpLen * sizeof(FdNode));
901 if (tmp == NULL) {
902 CLOGE(LISTENER_TAG
903 "copy wait event fds failed: expand calloc fd node array object failed, module=%d, wait event "
904 "len=%u",
905 node->module, tmpLen);
906 SoftBusFree(fdArray);
907 return SOFTBUS_MALLOC_ERR;
908 }
909 for (uint32_t j = 0; j < fdArrayLen; j++) {
910 tmp[j].fd = fdArray[j].fd;
911 tmp[j].triggerSet = fdArray[j].triggerSet;
912 }
913 SoftBusFree(fdArray);
914 fdArray = tmp;
915 fdArrayLen = tmpLen;
916 expand = true;
917 }
918 fdArray[i].fd = item->fd;
919 fdArray[i].triggerSet = item->triggerSet;
920 i++;
921 }
922
923 // diagnose by the way
924 if (expand) {
925 CLOGE(LISTENER_TAG
926 "ATTENTION! copy wait event fds warining, listener node 'waitEventFdsLen' field is unexpected, actual "
927 "wait event fd size larget than it, forgot maintain while update fd? fix it quickly. module=%d, "
928 "wait event fds len=%u, actual wait event fds len=%u",
929 node->module, node->info.waitEventFdsLen, i);
930 } else if (i != fdArrayLen) {
931 CLOGE(LISTENER_TAG
932 "ATTENTION! copy wait event fds warining, listener node 'waitEventFdsLen' field is unexpected, actual "
933 "wait event fd size lower than it, forgot maintain while update fd? fix it quickly. module=%d, "
934 "wait event fds len=%u, actual wait event fds len=%u",
935 node->module, node->info.waitEventFdsLen, i);
936 }
937
938 *outArrayLen = i;
939 *outArray = fdArray;
940 return SOFTBUS_OK;
941 }
942
CloseInvalidListenForcely(SoftbusListenerNode * node,int32_t listenFd,const char * anomizedIp,int32_t reason)943 static void CloseInvalidListenForcely(SoftbusListenerNode *node, int32_t listenFd, const char *anomizedIp,
944 int32_t reason)
945 {
946 CONN_CHECK_AND_RETURN_LOG(SoftBusMutexLock(&node->lock) == SOFTBUS_OK,
947 LISTENER_TAG
948 "ATTENTION UNEXPECTED ERROR! close invalid listen forcely failed: try to lock listener node failed, module=%d",
949 node->module);
950 do {
951 if (node->info.status != LISTENER_RUNNING || node->info.modeType != SERVER_MODE ||
952 node->info.listenFd != listenFd) {
953 break;
954 }
955 CLOGW(LISTENER_TAG "ATTENTION, forcely close to prevent repeat wakeup select, module=%d, invalid listen fd=%d "
956 "port=%d, ip=%s, error=%d", node->module, node->info.listenFd, node->info.listenPort, anomizedIp, reason);
957 ConnCloseSocket(node->info.listenFd);
958 node->info.listenFd = -1;
959 node->info.listenPort = -1;
960 } while (false);
961 SoftBusMutexUnlock(&node->lock);
962 }
963
ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,int32_t wakeupTrace)964 static void ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet,
965 SoftBusFdSet *exceptSet, int32_t wakeupTrace)
966 {
967 CONN_CHECK_AND_RETURN_LOG(SoftBusMutexLock(&node->lock) == SOFTBUS_OK,
968 LISTENER_TAG "ATTENTION UNEXPECTED ERROR! try to lock listener node failed, wakeup trace=%d, module=%d",
969 wakeupTrace, node->module);
970 if (node->info.status != LISTENER_RUNNING) {
971 SoftBusMutexUnlock(&node->lock);
972 return;
973 }
974
975 int32_t listenFd = -1;
976 int32_t listenPort = -1;
977 char animizedIp[IP_LEN] = { 0 };
978 if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
979 listenFd = node->info.listenFd;
980 listenPort = node->info.listenPort;
981 ConvertAnonymizeIpAddress(animizedIp, IP_LEN, node->info.listenerInfo.socketOption.addr, IP_LEN);
982 }
983 const SocketInterface *socketIf = node->socketIf;
984 SoftbusBaseListener listener = node->listener;
985 ConnectType connectType = node->info.listenerInfo.type;
986
987 FdNode *fdArray = NULL;
988 uint32_t fdArrayLen = 0;
989 int32_t status = CopyWaitEventFdsUnsafe(node, &fdArray, &fdArrayLen);
990 SoftBusMutexUnlock(&node->lock);
991 if (status != SOFTBUS_OK) {
992 CLOGE(LISTENER_TAG
993 "process spectified listener node event failed: copy wait event fds failed, wakeup trace=%d, module=%d, "
994 "error=%d",
995 wakeupTrace, node->module, status);
996 return;
997 }
998
999 if (listenFd > 0 && SoftBusSocketFdIsset(listenFd, readSet)) {
1000 status =
1001 ProcessSpecifiedServerAcceptEvent(node->module, listenFd, connectType, socketIf, &listener, wakeupTrace);
1002 switch (status) {
1003 case SOFTBUS_OK:
1004 case SOFTBUS_ADAPTER_SOCKET_EAGAIN:
1005 break;
1006 case SOFTBUS_ADAPTER_SOCKET_EINVAL:
1007 case SOFTBUS_ADAPTER_SOCKET_EBADF:
1008 CloseInvalidListenForcely(node, listenFd, animizedIp, status);
1009 break;
1010 default:
1011 CLOGD(LISTENER_TAG
1012 "process spectified listener node event failed: accept client failed, wakeup trace=%d, "
1013 "module=%d, listen fd=%d, port=%d, ip=%s, error=%d",
1014 wakeupTrace, node->module, listenFd, listenPort, animizedIp, status);
1015 break;
1016 }
1017 }
1018
1019 if (fdArrayLen == 0) {
1020 return;
1021 }
1022
1023 for (uint32_t i = 0; i < fdArrayLen; i++) {
1024 if ((fdArray[i].triggerSet & READ_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, readSet)) {
1025 CLOGD(LISTENER_TAG
1026 "process spectified listener node event trace, trigger IN event, wakeup trace=%d, module=%d, fd=%d, "
1027 "trigger set=%u",
1028 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1029 DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_IN, &listener, wakeupTrace);
1030 }
1031 if ((fdArray[i].triggerSet & WRITE_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, writeSet)) {
1032 CLOGD(LISTENER_TAG
1033 "process spectified listener node event trace, trigger OUT event, wakeup trace=%d, module=%d, fd=%d, "
1034 "trigger set=%u",
1035 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1036 DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_OUT, &listener, wakeupTrace);
1037 }
1038 if ((fdArray[i].triggerSet & EXCEPT_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, exceptSet)) {
1039 CLOGW(LISTENER_TAG
1040 "process spectified listener node event trace, trigger EXCEPTION(out-of-band data) event, wakeup "
1041 "trace=%d, module=%d, fd=%d, trigger set=%u",
1042 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1043 DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_EXCEPTION, &listener, wakeupTrace);
1044 }
1045 // RW_TRIGGER is already triggered in READ_TRIGGER and WRITE_TRIGGER, just skip it
1046 }
1047 SoftBusFree(fdArray);
1048 }
1049
ProcessEvent(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,const SelectThreadState * selectState,int32_t wakeupTrace)1050 static void ProcessEvent(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet,
1051 const SelectThreadState *selectState, int32_t wakeupTrace)
1052 {
1053 for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1054 SoftbusListenerNode *node = GetListenerNode(module);
1055 if (node == NULL) {
1056 continue;
1057 }
1058 ProcessSpecifiedListenerNodeEvent(node, readSet, writeSet, exceptSet, wakeupTrace);
1059 ReturnListenerNode(&node);
1060 }
1061
1062 if (SoftBusSocketFdIsset(selectState->ctrlRfd, readSet)) {
1063 ProcessCtrlFdEvent(selectState->ctrlRfd, wakeupTrace);
1064 }
1065 }
1066
CollectSpecifiedModuleListenerEvents(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1067 static int32_t CollectSpecifiedModuleListenerEvents(
1068 SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1069 {
1070 CONN_CHECK_AND_RETURN_RET_LOG(SoftBusMutexLock(&node->lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
1071 LISTENER_TAG
1072 "ATTENTION UNEXPECTED ERROR! collect wait event fd set failed: try to lock listener node failed, module=%d",
1073 node->module);
1074
1075 if (node->info.status != LISTENER_RUNNING) {
1076 (void)SoftBusMutexUnlock(&node->lock);
1077 return 0;
1078 }
1079
1080 int32_t maxFd = 0;
1081 if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
1082 SoftBusSocketFdSet(node->info.listenFd, readSet);
1083 maxFd = node->info.listenFd;
1084 }
1085
1086 FdNode *it = NULL;
1087 LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
1088 if ((it->triggerSet & READ_TRIGGER) != 0) {
1089 SoftBusSocketFdSet(it->fd, readSet);
1090 }
1091 if ((it->triggerSet & WRITE_TRIGGER) != 0) {
1092 SoftBusSocketFdSet(it->fd, writeSet);
1093 }
1094 if ((it->triggerSet & EXCEPT_TRIGGER) != 0) {
1095 SoftBusSocketFdSet(it->fd, exceptSet);
1096 }
1097 // RW_TRIGGER is already collected in READ_TRIGGER and WRITE_TRIGGER, just skip it
1098 maxFd = it->fd > maxFd ? it->fd : maxFd;
1099 }
1100 (void)SoftBusMutexUnlock(&node->lock);
1101 return maxFd;
1102 }
1103
CollectWaitEventFdSet(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1104 static int32_t CollectWaitEventFdSet(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1105 {
1106 int32_t maxFd = 0;
1107 int32_t statusOrFd = 0;
1108 int32_t status = SOFTBUS_OK;
1109 do {
1110 for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1111 SoftbusListenerNode *node = GetListenerNode(module);
1112 if (node == NULL) {
1113 continue;
1114 }
1115 statusOrFd = CollectSpecifiedModuleListenerEvents(node, readSet, writeSet, exceptSet);
1116 ReturnListenerNode(&node);
1117 if (statusOrFd < 0) {
1118 status = statusOrFd;
1119 CLOGE(LISTENER_TAG "collect wait event fd set failed: module=%d, error=%d", module, status);
1120 break;
1121 }
1122 maxFd = statusOrFd > maxFd ? statusOrFd : maxFd;
1123 }
1124 } while (false);
1125
1126 if (status != SOFTBUS_OK) {
1127 SoftBusSocketFdZero(readSet);
1128 SoftBusSocketFdZero(writeSet);
1129 SoftBusSocketFdZero(exceptSet);
1130 return status;
1131 }
1132
1133 return maxFd;
1134 }
1135
SelectTask(void * arg)1136 static void *SelectTask(void *arg)
1137 {
1138 #define SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS (3 * 1000)
1139 static int32_t wakeupTraceIdGenerator = 0;
1140
1141 SelectThreadState *selectState = (SelectThreadState *)arg;
1142 CLOGI(LISTENER_TAG "select task start, select trace=%d, ctrl read fd=%d, ctrl write fd=%d", selectState->traceId,
1143 selectState->ctrlRfd, selectState->ctrlWfd);
1144 while (true) {
1145 int status = SoftBusMutexLock(&selectState->lock);
1146 if (status != SOFTBUS_OK) {
1147 CLOGE(LISTENER_TAG
1148 "ATTENTION UNEXPECTED ERROR! try to lock select thread state self failed, retry after %d ms, "
1149 "select trace=%d, error=%d",
1150 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, status);
1151 SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1152 continue;
1153 }
1154 int32_t referenceCount = selectState->referenceCount;
1155 (void)SoftBusMutexUnlock(&selectState->lock);
1156
1157 if (referenceCount <= 0) {
1158 CLOGW(LISTENER_TAG "select task, select task is not reference by others any more, select trace=%d, exit...",
1159 selectState->traceId);
1160 break;
1161 }
1162
1163 SoftBusFdSet readSet;
1164 SoftBusFdSet writeSet;
1165 SoftBusFdSet exceptSet;
1166 SoftBusSocketFdZero(&readSet);
1167 SoftBusSocketFdZero(&writeSet);
1168 SoftBusSocketFdZero(&exceptSet);
1169 int32_t maxFdOrStatus = CollectWaitEventFdSet(&readSet, &writeSet, &exceptSet);
1170 if (maxFdOrStatus < 0) {
1171 CLOGE(LISTENER_TAG
1172 "select task failed: collect wait event fd set failed, retry after %d ms, select trace=%d, error=%d",
1173 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, maxFdOrStatus);
1174 SoftBusSocketFdZero(&readSet);
1175 SoftBusSocketFdZero(&writeSet);
1176 SoftBusSocketFdZero(&exceptSet);
1177 SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1178 continue;
1179 }
1180 SoftBusSocketFdSet(selectState->ctrlRfd, &readSet);
1181 int32_t maxFd = maxFdOrStatus > selectState->ctrlRfd ? maxFdOrStatus : selectState->ctrlRfd;
1182 int32_t nEvents = SoftBusSocketSelect(maxFd + 1, &readSet, &writeSet, &exceptSet, NULL);
1183 int32_t wakeupTraceId = ++wakeupTraceIdGenerator;
1184 if (nEvents <= 0) {
1185 CLOGE(LISTENER_TAG
1186 "ATTENTION, select task failed: unexpect wakeup, retry after %d ms, wakeup trace id=%d, events=%d",
1187 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, wakeupTraceId, nEvents);
1188 SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1189 continue;
1190 }
1191 CLOGI(LISTENER_TAG "select task, wakeup from select, select trace=%d, wakeup trace=%d, events=%d",
1192 selectState->traceId, wakeupTraceId, nEvents);
1193 ProcessEvent(&readSet, &writeSet, &exceptSet, selectState, wakeupTraceId);
1194 }
1195 CleanupSelectThreadState(&selectState);
1196 return NULL;
1197 }
1198
StartSelectThread(void)1199 static int32_t StartSelectThread(void)
1200 {
1201 static int32_t selectThreadTraceIdGenerator = 1;
1202
1203 int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1204 CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
1205 LISTENER_TAG
1206 "ATTENTION UNEXPECTED ERROR! start select thread failed: try to lock global select thread state failed");
1207
1208 do {
1209 if (g_selectThreadState != NULL) {
1210 status = SoftBusMutexLock(&g_selectThreadState->lock);
1211 if (status != SOFTBUS_OK) {
1212 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! start select thread failed: try to lock select thread "
1213 "state self failed, error=%d",
1214 status);
1215 status = SOFTBUS_LOCK_ERR;
1216 break;
1217 }
1218 int32_t referenceCount = ++g_selectThreadState->referenceCount;
1219 (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1220 WakeupSelectThread();
1221
1222 CLOGW(LISTENER_TAG
1223 "start select thread, select thread is already start, select trace=%d, ctrl read fd=%d, ctrl write "
1224 "fd=%d, reference count=%d",
1225 g_selectThreadState->traceId, g_selectThreadState->ctrlRfd, g_selectThreadState->ctrlWfd,
1226 referenceCount);
1227 break;
1228 }
1229
1230 SelectThreadState *state = SoftBusCalloc(sizeof(SelectThreadState));
1231 if (state == NULL) {
1232 status = SOFTBUS_ERR;
1233 break;
1234 }
1235 state->traceId = ++selectThreadTraceIdGenerator;
1236 int32_t fds[2] = { 0 };
1237 int32_t rc = 0;
1238 #ifndef __LITEOS__
1239 rc = pipe2(fds, O_CLOEXEC | O_NONBLOCK);
1240 #endif
1241 if (rc != 0) {
1242 CLOGE(LISTENER_TAG "start select thread failed: create ctrl pipe failed, error=%s", strerror(errno));
1243 SoftBusFree(state);
1244 status = SOFTBUS_ERR;
1245 break;
1246 }
1247 state->ctrlRfd = fds[0];
1248 state->ctrlWfd = fds[1];
1249
1250 status = SoftBusMutexInit(&state->lock, NULL);
1251 if (status != SOFTBUS_OK) {
1252 CLOGE(LISTENER_TAG "start select thread failed: start select task async failed, error=%d", status);
1253 CleanupSelectThreadState(&state);
1254 break;
1255 }
1256 state->referenceCount = 1;
1257 status = ConnStartActionAsync(state, SelectTask);
1258 if (status != SOFTBUS_OK) {
1259 CLOGE(LISTENER_TAG "start select thread failed: init lock failed, error=%d", status);
1260 CleanupSelectThreadState(&state);
1261 break;
1262 }
1263 CLOGI(LISTENER_TAG "start select thread success, trace id=%d, ctrl read fd=%d, ctrl write fd=%d",
1264 state->traceId, state->ctrlRfd, state->ctrlWfd);
1265 g_selectThreadState = state;
1266 } while (false);
1267 (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1268 return status;
1269 }
1270
StopSelectThread(void)1271 static int32_t StopSelectThread(void)
1272 {
1273 int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1274 CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
1275 LISTENER_TAG "ATTENTION UNEXPECTED ERROR! stop select thread failed: try to lock global select thread state "
1276 "failed");
1277 do {
1278 if (g_selectThreadState == NULL) {
1279 CLOGW(LISTENER_TAG "stop select thread, select thread is already stop or never start");
1280 break;
1281 }
1282
1283 status = SoftBusMutexLock(&g_selectThreadState->lock);
1284 if (status != SOFTBUS_OK) {
1285 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! stop select thread, try to lock select thread state self");
1286 break;
1287 }
1288 g_selectThreadState->referenceCount -= 1;
1289 int32_t referenceCount = g_selectThreadState->referenceCount;
1290 (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1291 if (referenceCount <= 0) {
1292 CLOGI(LISTENER_TAG "stop select thread, select thread is not used by other module any more, notify exit, "
1293 "thread reference count=%d",
1294 referenceCount);
1295 WakeupSelectThread();
1296 g_selectThreadState = NULL;
1297 }
1298 } while (false);
1299 (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1300 return status;
1301 }
1302
WakeupSelectThread(void)1303 static void WakeupSelectThread(void)
1304 {
1305 #ifndef __LITEOS__
1306 static int32_t selectWakeupTraceIdGenerator = 0;
1307
1308 int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1309 CONN_CHECK_AND_RETURN_LOG(status == SOFTBUS_OK,
1310 LISTENER_TAG "ATTENTION UNEXPECTED ERROR! wakeup select thread failed: try to lock global select thread state "
1311 "failed, error=%d",
1312 status);
1313 do {
1314 if (g_selectThreadState == NULL) {
1315 CLOGW(LISTENER_TAG "wakeup select thread warning: select thread is not running, just skip");
1316 break;
1317 }
1318 int32_t ctrlTraceId = selectWakeupTraceIdGenerator++;
1319 ssize_t len = write(g_selectThreadState->ctrlWfd, &ctrlTraceId, sizeof(ctrlTraceId));
1320 CLOGI(LISTENER_TAG "wakeup select thread, wakeup ctrl message sent, write length=%d, ctrl trace=%d", len,
1321 ctrlTraceId);
1322 } while (false);
1323 SoftBusMutexUnlock(&g_selectThreadStateLock);
1324 #endif
1325 }