• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "softbus_base_listener.h"
17 
18 #include <securec.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21 
22 #include "common_list.h"
23 #include "softbus_adapter_errcode.h"
24 #include "softbus_adapter_mem.h"
25 #include "softbus_adapter_socket.h"
26 #include "softbus_conn_common.h"
27 #include "softbus_conn_interface.h"
28 #include "softbus_def.h"
29 #include "softbus_errcode.h"
30 #include "softbus_feature_config.h"
31 #include "softbus_log.h"
32 #include "softbus_socket.h"
33 #include "softbus_utils.h"
34 
35 #define MAX_LISTEN_EVENTS 1024
36 #define DEFAULT_BACKLOG   4
37 #define FDARR_EXPAND_BASE 2
38 
39 #define LISTENER_TAG "[base-listener] "
40 
41 enum BaseListenerStatus {
42     LISTENER_IDLE = 0,
43     LISTENER_RUNNING,
44 };
45 
46 typedef struct {
47     ListNode node;
48     int32_t fd;
49     uint32_t triggerSet;
50 } FdNode;
51 
52 typedef struct {
53     ListNode waitEventFds;
54     uint32_t waitEventFdsLen;
55 
56     ModeType modeType;
57     LocalListenerInfo listenerInfo;
58     int32_t listenFd;
59     int32_t listenPort;
60 
61     enum BaseListenerStatus status;
62 } SoftbusBaseListenerInfo;
63 
64 typedef struct {
65     ListenerModule module;
66     SoftBusMutex lock;
67     SoftbusBaseListener listener;
68     const SocketInterface *socketIf;
69     SoftbusBaseListenerInfo info;
70     int32_t objectRc;
71 } SoftbusListenerNode;
72 
73 typedef struct {
74     uint32_t traceId;
75     // pipe fds, to wakeup select thread in time
76     int32_t ctrlRfd;
77     int32_t ctrlWfd;
78 
79     SoftBusMutex lock;
80     int32_t referenceCount;
81 } SelectThreadState;
82 
83 static int32_t ShutdownBaseListener(SoftbusListenerNode *node);
84 static int32_t StartSelectThread(void);
85 static int32_t StopSelectThread(void);
86 static void WakeupSelectThread(void);
87 
88 static SoftBusMutex g_listenerListLock = { 0 };
89 static SoftbusListenerNode *g_listenerList[UNUSE_BUTT] = { 0 };
90 static SoftBusMutex g_selectThreadStateLock = { 0 };
91 static SelectThreadState *g_selectThreadState = NULL;
92 
GetListenerNode(ListenerModule module)93 static SoftbusListenerNode *GetListenerNode(ListenerModule module)
94 {
95     int32_t status = SoftBusMutexLock(&g_listenerListLock);
96     CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, NULL,
97         LISTENER_TAG "ATTENTION UNEXPECTED ERROR! request listener node failed: try to lock listener lists failed, "
98                      "module=%d, error=%d",
99         module, status);
100     SoftbusListenerNode *node = g_listenerList[module];
101     do {
102         if (node == NULL) {
103             break;
104         }
105         status = SoftBusMutexLock(&node->lock);
106         if (status != SOFTBUS_OK) {
107             CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! request listener node failed: try to lock listener failed, "
108                                "module=%d, error=%d",
109                 module, status);
110             node = NULL;
111             break;
112         }
113         node->objectRc += 1;
114         SoftBusMutexUnlock(&node->lock);
115     } while (false);
116     (void)SoftBusMutexUnlock(&g_listenerListLock);
117     return node;
118 }
119 
RemoveListenerNode(SoftbusListenerNode * node)120 static void RemoveListenerNode(SoftbusListenerNode *node)
121 {
122     int32_t status = SoftBusMutexLock(&g_listenerListLock);
123     CONN_CHECK_AND_RETURN_LOG(status == SOFTBUS_OK,
124         LISTENER_TAG "ATTENTION UNEXPECTED ERROR! remove listener node failed: try to lock listener lists failed, "
125                      "module=%d, error=%d",
126         node->module, status);
127     do {
128         if (g_listenerList[node->module] != node) {
129             CLOGW(LISTENER_TAG "ATTENTION! remove listener node warning: listener node is not in listener list,"
130                 " repeat remove? just skip, module=%d", node->module);
131             break;
132         }
133         status = SoftBusMutexLock(&node->lock);
134         if (status != SOFTBUS_OK) {
135             CLOGE(LISTENER_TAG "ATTENTION UNEXPECT ERROR! remove listener node failed: "
136                 "try to lock listener node failed, module=%d", node->module);
137             break;
138         }
139         // decrease root object reference
140         node->objectRc -= 1;
141         g_listenerList[node->module] = NULL;
142         (void)SoftBusMutexUnlock(&node->lock);
143     } while (false);
144     (void)SoftBusMutexUnlock(&g_listenerListLock);
145 }
146 
ReturnListenerNode(SoftbusListenerNode ** nodePtr)147 static void ReturnListenerNode(SoftbusListenerNode **nodePtr)
148 {
149     SoftbusListenerNode *node = *nodePtr;
150     do {
151         int32_t status = SoftBusMutexLock(&node->lock);
152         if (status != SOFTBUS_OK) {
153             CLOGE(LISTENER_TAG "ATTENTION UNEXPECT ERROR! return listener node failed: "
154                 "try to lock listener node failed, module=%d", node->module);
155             break;
156         }
157         node->objectRc -= 1;
158         int32_t objectRc = node->objectRc;
159         (void)SoftBusMutexUnlock(&node->lock);
160 
161         if (objectRc > 0) {
162             break;
163         }
164         CLOGI(LISTENER_TAG
165             "release listener node, object reference count <= 0, free listener node, module=%d, object reference=%d",
166             node->module, objectRc);
167         (void)ShutdownBaseListener(node);
168         SoftBusFree(node);
169     } while (false);
170 
171     *nodePtr = NULL;
172 }
173 
CreateSpecifiedListenerModule(ListenerModule module)174 static SoftbusListenerNode *CreateSpecifiedListenerModule(ListenerModule module)
175 {
176     SoftbusListenerNode *node = (SoftbusListenerNode *)SoftBusCalloc(sizeof(SoftbusListenerNode));
177     CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, NULL,
178         LISTENER_TAG
179         "ATTENTION UNEXPECTED ERROR! create specified listener module failed: calloc listener node object failed, "
180         "module=%d",
181         module);
182 
183     node->module = module;
184     // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
185     int32_t status = SoftBusMutexInit(&node->lock, NULL);
186     if (status != SOFTBUS_OK) {
187         CLOGE(LISTENER_TAG
188             "ATTENTION UNEXPECTED ERROR! create specified listener module failed: init lock failed, module=%d, "
189             "error=%d",
190             module, status);
191         SoftBusFree(node);
192         return NULL;
193     }
194     node->listener.onConnectEvent = NULL;
195     node->listener.onDataEvent = NULL;
196 
197     node->socketIf = NULL;
198 
199     ListInit(&node->info.waitEventFds);
200     node->info.waitEventFdsLen = 0;
201     node->info.modeType = UNSET_MODE;
202     (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
203     node->info.listenFd = -1;
204     node->info.listenPort = -1;
205     // set root object reference count 1
206     node->objectRc = 1;
207     return node;
208 }
209 
CreateStaticModulesUnsafe(void)210 static int32_t CreateStaticModulesUnsafe(void)
211 {
212     ListenerModule module = 0;
213     for (; module < LISTENER_MODULE_DYNAMIC_START; module++) {
214         SoftbusListenerNode *node = CreateSpecifiedListenerModule(module);
215         if (node == NULL) {
216             CLOGW(LISTENER_TAG "create static module failed: create module listener node failed, module=%d", module);
217             goto CLEANUP;
218         }
219         CLOGI(LISTENER_TAG "create static module, create module listener node success, module=%d", module);
220         g_listenerList[module] = node;
221     }
222     return SOFTBUS_OK;
223 CLEANUP:
224     for (ListenerModule i = module - 1; i >= 0; i--) {
225         SoftbusListenerNode *node = g_listenerList[i];
226         g_listenerList[i] = NULL;
227         // cleanup
228         ReturnListenerNode(&node);
229         CLOGI(LISTENER_TAG "create static module failed: clean up listener node done, module=%d", module);
230     }
231     return SOFTBUS_ERR;
232 }
233 
InitBaseListener(void)234 int32_t InitBaseListener(void)
235 {
236     // stop select thread need re-enter lock
237     SoftBusMutexAttr attr = {
238         .type = SOFTBUS_MUTEX_RECURSIVE,
239     };
240     int32_t status = SoftBusMutexInit(&g_selectThreadStateLock, &attr);
241     if (status != SOFTBUS_OK) {
242         CLOGE(LISTENER_TAG
243             "ATTENTION UNEXPECTED ERROR! init base listener failed: init select thread lock failed, error=%d",
244             status);
245         return SOFTBUS_LOCK_ERR;
246     }
247     // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
248     status = SoftBusMutexInit(&g_listenerListLock, NULL);
249     if (status != SOFTBUS_OK) {
250         SoftBusMutexDestroy(&g_selectThreadStateLock);
251         CLOGE(LISTENER_TAG
252             "ATTENTION UNEXPECTED ERROR! init base listener failed: init listener list lock failed, error=%d",
253             status);
254         return SOFTBUS_LOCK_ERR;
255     }
256 
257     status = SoftBusMutexLock(&g_listenerListLock);
258     if (status != SOFTBUS_OK) {
259         CLOGE(LISTENER_TAG
260             "ATTENTION UNEXPECTED ERROR! init base listener failed: try to lock listener list failed, error=%d",
261             status);
262         SoftBusMutexDestroy(&g_selectThreadStateLock);
263         SoftBusMutexDestroy(&g_listenerListLock);
264         return SOFTBUS_LOCK_ERR;
265     }
266     (void)memset_s(g_listenerList, sizeof(g_listenerList), 0, sizeof(g_listenerList));
267     status = CreateStaticModulesUnsafe();
268     (void)SoftBusMutexUnlock(&g_listenerListLock);
269     if (status != SOFTBUS_OK) {
270         CLOGE(LISTENER_TAG "init base listener failed: create static module listener failed, error=%d", status);
271         SoftBusMutexDestroy(&g_selectThreadStateLock);
272         SoftBusMutexDestroy(&g_listenerListLock);
273         return status;
274     }
275 
276     return SOFTBUS_OK;
277 }
278 
DeinitBaseListener(void)279 NO_SANITIZE("cfi") void DeinitBaseListener(void)
280 {
281     for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
282         SoftbusListenerNode *node = GetListenerNode(module);
283         if (node == NULL) {
284             continue;
285         }
286         RemoveListenerNode(node);
287         ReturnListenerNode(&node);
288     }
289 }
290 
CreateListenerModule(void)291 NO_SANITIZE("cfi") uint32_t CreateListenerModule(void)
292 {
293     int32_t status = SoftBusMutexLock(&g_listenerListLock);
294     CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, UNUSE_BUTT,
295         LISTENER_TAG
296         "ATTENTION UNEXPECTED ERROR! create listener module failed: try to lock listener list failed, error=%d",
297         status);
298 
299     ListenerModule module = LISTENER_MODULE_DYNAMIC_START;
300     for (; module <= LISTENER_MODULE_DYNAMIC_END; module++) {
301         if (g_listenerList[module] != NULL) {
302             continue;
303         }
304         SoftbusListenerNode *node = CreateSpecifiedListenerModule(module);
305         if (node == NULL) {
306             CLOGE(LISTENER_TAG "create listener module failed, create specified listener module failed, module=%d",
307                 module);
308             module = UNUSE_BUTT;
309         } else {
310             CLOGE(LISTENER_TAG "create listener module success, module=%d", module);
311             g_listenerList[module] = node;
312         }
313         break;
314     }
315     (void)SoftBusMutexUnlock(&g_listenerListLock);
316     return module;
317 }
318 
DestroyBaseListener(ListenerModule module)319 NO_SANITIZE("cfi") void DestroyBaseListener(ListenerModule module)
320 {
321     CONN_CHECK_AND_RETURN_LOG(module >= LISTENER_MODULE_DYNAMIC_START && module <= LISTENER_MODULE_DYNAMIC_END,
322         LISTENER_TAG "destroy base listener failed: only dynamic module support destroy, module=%d", module);
323 
324     CLOGI(LISTENER_TAG "receive destroy base listener request, module=%d", module);
325     SoftbusListenerNode *node = GetListenerNode(module);
326     if (node == NULL) {
327         CLOGW(LISTENER_TAG "destroy base listener warning, listener not exist, module=%d", module);
328         return;
329     }
330     RemoveListenerNode(node);
331     ReturnListenerNode(&node);
332 }
333 
StartBaseClient(ListenerModule module,const SoftbusBaseListener * listener)334 int32_t StartBaseClient(ListenerModule module, const SoftbusBaseListener *listener)
335 {
336     CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
337         LISTENER_TAG "start base client listener failed: invalid module, module=%d", module);
338     CONN_CHECK_AND_RETURN_RET_LOG(listener != NULL, SOFTBUS_INVALID_PARAM,
339         LISTENER_TAG "start base client listener failed: listener is null, module=%d", module);
340     CONN_CHECK_AND_RETURN_RET_LOG(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM,
341         LISTENER_TAG "start base client listener failed: listener onConnectEvent is null, module=%d", module);
342     CONN_CHECK_AND_RETURN_RET_LOG(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM,
343         LISTENER_TAG "start base client listener failed: listener onDataEvent is null, module=%d", module);
344 
345     CLOGI(LISTENER_TAG "receive start base client listener request, module=%d", module);
346 
347     SoftbusListenerNode *node = GetListenerNode(module);
348     CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
349         LISTENER_TAG "start base client listener failed: get listener node failed, dynamic module forgot register "
350                      "first? or static module start before base listener init? module=%d",
351         module);
352 
353     int32_t status = SoftBusMutexLock(&node->lock);
354     if (status != SOFTBUS_OK) {
355         CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! start base client listener failed: try to lock listener node "
356                            "failed, module=%d, error=%d",
357             module, status);
358         ReturnListenerNode(&node);
359         return SOFTBUS_LOCK_ERR;
360     }
361     do {
362         if (node->info.status != LISTENER_IDLE) {
363             CLOGE(LISTENER_TAG "start base client listener failed: listener is not idle status, module=%d, status=%d",
364                 module, node->info.status);
365             status = SOFTBUS_ERR;
366             break;
367         }
368         node->listener.onConnectEvent = listener->onConnectEvent;
369         node->listener.onDataEvent = listener->onDataEvent;
370         status = StartSelectThread();
371         if (status != SOFTBUS_OK) {
372             CLOGE(LISTENER_TAG "start base client listener failed: start select thread failed, module=%d, status=%d",
373                 module, status);
374             break;
375         }
376         node->info.status = LISTENER_RUNNING;
377         CLOGI(LISTENER_TAG "start base client listener success, module=%d", module);
378     } while (false);
379     (void)SoftBusMutexUnlock(&node->lock);
380     ReturnListenerNode(&node);
381     return status;
382 }
383 
StartServerListenUnsafe(SoftbusListenerNode * node,const LocalListenerInfo * info)384 static int32_t StartServerListenUnsafe(SoftbusListenerNode *node, const LocalListenerInfo *info)
385 {
386     ListenerModule module = node->module;
387     ProtocolType protocol = info->socketOption.protocol;
388     const SocketInterface *socketIf = GetSocketInterface(protocol);
389     if (socketIf == NULL) {
390         CLOGE(LISTENER_TAG
391             "not find protocal implement, protocal implement should register first, module=%d, protocal=%d",
392             module, protocol);
393         return SOFTBUS_NOT_FIND;
394     }
395     node->socketIf = socketIf;
396 
397     int32_t listenFd = -1;
398     int32_t listenPort = -1;
399     int32_t status = SOFTBUS_OK;
400     do {
401         listenFd = socketIf->OpenServerSocket(info);
402         if (listenFd < 0) {
403             CLOGE(LISTENER_TAG "create server socket failed: module=%d, invalid listen fd=%d", module, listenFd);
404             status = SOFTBUS_TCP_SOCKET_ERR;
405             break;
406         }
407         status = SoftBusSocketListen(listenFd, DEFAULT_BACKLOG);
408         if (status != SOFTBUS_OK) {
409             CLOGE(LISTENER_TAG "listen server socket failed: module=%d, error=%d", module, status);
410             break;
411         }
412         listenPort = socketIf->GetSockPort(listenFd);
413         if (listenPort < 0) {
414             CLOGE(LISTENER_TAG "get listen server port failed: module=%d, listen fd=%d, error=%d", module, listenFd,
415                 status);
416             status = SOFTBUS_TCP_SOCKET_ERR;
417             break;
418         }
419         if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
420             CLOGE(LISTENER_TAG "memcpy_s local listener info object failed: module=%d", module);
421             status = SOFTBUS_MEM_ERR;
422             break;
423         }
424         node->info.modeType = SERVER_MODE;
425         node->info.listenFd = listenFd;
426         node->info.listenPort = listenPort;
427     } while (false);
428     if (status != SOFTBUS_OK && listenFd > 0) {
429         ConnShutdownSocket(listenFd);
430     }
431     return status == SOFTBUS_OK ? listenPort : status;
432 }
433 
CleanupServerListenInfoUnsafe(SoftbusListenerNode * node)434 static void CleanupServerListenInfoUnsafe(SoftbusListenerNode *node)
435 {
436     memset_s(&node->info.listenerInfo, sizeof(SoftbusBaseListenerInfo), 0, sizeof(SoftbusBaseListenerInfo));
437     if (node->info.listenFd > 0) {
438         ConnShutdownSocket(node->info.listenFd);
439     }
440     node->info.listenFd = -1;
441     node->info.listenPort = -1;
442 }
443 
StartBaseListener(const LocalListenerInfo * info,const SoftbusBaseListener * listener)444 int32_t StartBaseListener(const LocalListenerInfo *info, const SoftbusBaseListener *listener)
445 {
446     CONN_CHECK_AND_RETURN_RET_LOG(
447         info != NULL, SOFTBUS_INVALID_PARAM, LISTENER_TAG "start base listener failed: info is null");
448     CONN_CHECK_AND_RETURN_RET_LOG(info->type == CONNECT_TCP || info->type == CONNECT_P2P, SOFTBUS_INVALID_PARAM,
449         LISTENER_TAG "start base listener failed: only CONNECT_TCP(%d) and CONNECT_P2P(%d) is permitted, type=%d",
450         CONNECT_TCP, CONNECT_P2P, info->type);
451     CONN_CHECK_AND_RETURN_RET_LOG(info->socketOption.port >= 0, SOFTBUS_INVALID_PARAM,
452         LISTENER_TAG "start base listener failed: port is invalid, port=%d", info->socketOption.port);
453     CONN_CHECK_AND_RETURN_RET_LOG(info->socketOption.moduleId >= 0 && info->socketOption.moduleId < UNUSE_BUTT,
454         SOFTBUS_INVALID_PARAM, LISTENER_TAG "start base client listener failed: invalid module, module=%d",
455         info->socketOption.moduleId);
456     CONN_CHECK_AND_RETURN_RET_LOG(listener != NULL, SOFTBUS_INVALID_PARAM,
457         LISTENER_TAG "start base listener failed: listener is null, module=%d", info->socketOption.moduleId);
458     CONN_CHECK_AND_RETURN_RET_LOG(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM,
459         LISTENER_TAG "start base listener failed: listener is null, module=%d", info->socketOption.moduleId);
460     CONN_CHECK_AND_RETURN_RET_LOG(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM,
461         LISTENER_TAG "start base listener failed: listener is null, module=%d", info->socketOption.moduleId);
462 
463     ListenerModule module = info->socketOption.moduleId;
464     CLOGI(LISTENER_TAG "receive start base listener request, module=%d", module);
465     SoftbusListenerNode *node = GetListenerNode(module);
466     CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
467         LISTENER_TAG
468         "start base listener failed: get listener node failed, dynamic module should register first, module=%d",
469         module);
470     int32_t status = SoftBusMutexLock(&node->lock);
471     if (status != SOFTBUS_OK) {
472         CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! start base listener failed: try to lock listener node failed, "
473                            "module=%d, error=%d",
474             module, status);
475         ReturnListenerNode(&node);
476         return SOFTBUS_LOCK_ERR;
477     }
478 
479     int32_t listenPort = -1;
480     do {
481         if (node->info.status != LISTENER_IDLE) {
482             CLOGE(LISTENER_TAG "start base listener failed: listener is not idle status, module=%d, status=%d", module,
483                 node->info.status);
484             status = SOFTBUS_ERR;
485             break;
486         }
487 
488         node->listener.onConnectEvent = listener->onConnectEvent;
489         node->listener.onDataEvent = listener->onDataEvent;
490         if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
491             CLOGE(LISTENER_TAG "start base listener failed: memcpy_s listener info failed, module=%d", node->module);
492             status = SOFTBUS_LOCK_ERR;
493             break;
494         }
495         listenPort = StartServerListenUnsafe(node, info);
496         if (listenPort <= 0) {
497             CLOGE(LISTENER_TAG "start base listener failed: start server failed, module=%d, invalid listen port=%d",
498                 module, listenPort);
499             status = SOFTBUS_ERR;
500             break;
501         }
502 
503         status = StartSelectThread();
504         if (status != SOFTBUS_OK) {
505             CLOGE(LISTENER_TAG "start base listener failed: start listener thread failed, module=%d, status=%d", module,
506                 status);
507             CleanupServerListenInfoUnsafe(node);
508             break;
509         }
510         node->info.status = LISTENER_RUNNING;
511         CLOGI(LISTENER_TAG "start base listener success, module=%d, listen fd=%d, listen port=%d", module,
512             node->info.listenFd, listenPort);
513     } while (false);
514     (void)SoftBusMutexUnlock(&node->lock);
515     ReturnListenerNode(&node);
516     return status == SOFTBUS_OK ? listenPort : status;
517 }
518 
StopBaseListener(ListenerModule module)519 int32_t StopBaseListener(ListenerModule module)
520 {
521     CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
522         LISTENER_TAG "stop base listener failed: invalid module, module=%d", module);
523 
524     CLOGI(LISTENER_TAG "receive stop base listener request, module=%d", module);
525     SoftbusListenerNode *node = GetListenerNode(module);
526     CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
527         LISTENER_TAG "stop base listener failed: listener node not exist, module=%d", module);
528 
529     int32_t status = ShutdownBaseListener(node);
530     if (status != SOFTBUS_OK) {
531         CLOGE(LISTENER_TAG "stop base listener failed: stop listen thread failed, module=%d, error=%d",
532             module, status);
533     }
534     ReturnListenerNode(&node);
535     return status;
536 }
537 
ShutdownBaseListener(SoftbusListenerNode * node)538 static int32_t ShutdownBaseListener(SoftbusListenerNode *node)
539 {
540     int32_t status = SoftBusMutexLock(&node->lock);
541     CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
542         LISTENER_TAG "ATTENTION UNEXPECTED ERROR! shutdown base listener failed: try to lock listener node failed, "
543                      "module=%d, error=%d",
544         node->module, status);
545 
546     do {
547         if (node->info.status != LISTENER_RUNNING) {
548             CLOGW(LISTENER_TAG
549                 "shutdown base listener warning, listener is not running, just skip, module=%d, error=%d",
550                 node->module, node->info.status);
551             break;
552         }
553         status = StopSelectThread();
554         if (status != SOFTBUS_OK) {
555             CLOGE(LISTENER_TAG "shutdown base listener failed: stop select thread failed, module=%d, error=%d",
556                 node->module, status);
557             break;
558         }
559         node->info.status = LISTENER_IDLE;
560 
561         FdNode *it = NULL;
562         FdNode *next = NULL;
563         LIST_FOR_EACH_ENTRY_SAFE(it, next, &node->info.waitEventFds, FdNode, node) {
564             CLOGE(LISTENER_TAG
565                 "ATTENTION, shutdown base listener warning: listener node there is fd not close, module=%d, fd=%d, "
566                 "trigger set=%u",
567                 node->module, it->fd, it->triggerSet);
568             // not close fd, repeat close will crash process
569             ListDelete(&it->node);
570             SoftBusFree(it);
571         }
572         node->info.waitEventFdsLen = 0;
573 
574         int32_t listenFd = node->info.listenFd;
575         int32_t listenPort = node->info.listenPort;
576         if (node->info.modeType == SERVER_MODE && listenFd > 0) {
577             CLOGE(LISTENER_TAG "shutdown base listener, close server, module=%d, listen fd=%d, port=%d", node->module,
578                 listenFd, listenPort);
579             ConnCloseSocket(listenFd);
580         }
581         node->info.modeType = UNSET_MODE;
582         node->info.listenFd = -1;
583         node->info.listenPort = -1;
584         (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
585     } while (false);
586 
587     SoftBusMutexUnlock(&node->lock);
588     return status;
589 }
590 
IsValidTriggerType(TriggerType trigger)591 static bool IsValidTriggerType(TriggerType trigger)
592 {
593     switch (trigger) {
594         case READ_TRIGGER:
595         case WRITE_TRIGGER:
596         case EXCEPT_TRIGGER:
597         case RW_TRIGGER:
598             return true;
599         default:
600             return false;
601     }
602 }
603 
AddTrigger(ListenerModule module,int32_t fd,TriggerType trigger)604 NO_SANITIZE("cfi") int32_t AddTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
605 {
606     CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
607         LISTENER_TAG "add trigger failed: invalid module, module=%d", module);
608     CONN_CHECK_AND_RETURN_RET_LOG(
609         fd > 0, SOFTBUS_INVALID_PARAM, LISTENER_TAG "add trigger failed: invalid fd, module=%d, fd=%d", module, fd);
610     CONN_CHECK_AND_RETURN_RET_LOG(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM,
611         LISTENER_TAG "add trigger failed: invalid trigger, module=%d, fd=%d, trigger=%d", module, fd, trigger);
612 
613     CLOGI(LISTENER_TAG "receive add trigger request, module=%d, fd=%d, trigger=%d", module, fd, trigger);
614     SoftbusListenerNode *node = GetListenerNode(module);
615     CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
616         LISTENER_TAG "add trigger failed: listener node not exist, module=%d, fd=%d, trigger=%d", module, fd, trigger);
617 
618     int32_t status = SoftBusMutexLock(&node->lock);
619     if (status != SOFTBUS_OK) {
620         CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! add trigger failed: try to lock listener node failed, "
621                            "module=%d, fd=%d, trigger=%d, error=%d",
622             module, fd, trigger, status);
623         ReturnListenerNode(&node);
624         return SOFTBUS_LOCK_ERR;
625     }
626 
627     bool wakeup = false;
628     do {
629         if (node->info.status != LISTENER_RUNNING) {
630             CLOGE(LISTENER_TAG "add trigger failed: module is not running, call 'StartBaseListener' or "
631                                "'StartBaseClient' first, module=%d, fd=%d, trigger=%d",
632                 module, fd, trigger);
633             status = SOFTBUS_ERR;
634             break;
635         }
636 
637         if (node->info.waitEventFdsLen > MAX_LISTEN_EVENTS) {
638             CLOGE(LISTENER_TAG "add trigger failed: can not trigger more, fd exceed more than %d, module=%d, fd=%d, "
639                                "trigger=%d, wait event fds len=%d",
640                 MAX_LISTEN_EVENTS, module, fd, trigger, node->info.waitEventFdsLen);
641             status = SOFTBUS_ERR;
642             break;
643         }
644 
645         FdNode *target = NULL;
646         FdNode *it = NULL;
647         LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
648             if (fd == it->fd) {
649                 target = it;
650                 break;
651             }
652         }
653 
654         if (target != NULL) {
655             if ((it->triggerSet & trigger) == trigger) {
656                 CLOGW(LISTENER_TAG
657                     "repeat add trigger, just skip, module=%d, fd=%d, want add trigger=%d, exist trigger set=%u",
658                     module, fd, trigger, it->triggerSet);
659                 break;
660             }
661             it->triggerSet |= trigger;
662             CLOGI(LISTENER_TAG "add trigger success, module=%d, fd=%d, new add trigger=%d, all trigger set=%u", module,
663                 fd, trigger, it->triggerSet);
664             wakeup = true;
665             break;
666         }
667 
668         FdNode *fdNode = (FdNode *)SoftBusCalloc(sizeof(FdNode));
669         if (fdNode == NULL) {
670             CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! add trigger failed: calloc fd node object failed, "
671                                "module=%d, fd=%d, trigger=%d",
672                 module, fd, trigger);
673             status = SOFTBUS_MALLOC_ERR;
674             break;
675         }
676         ListInit(&fdNode->node);
677         fdNode->fd = fd;
678         fdNode->triggerSet = trigger;
679         ListAdd(&node->info.waitEventFds, &fdNode->node);
680         node->info.waitEventFdsLen += 1;
681         wakeup = true;
682         CLOGI(LISTENER_TAG "add trigger success, module=%d, fd=%d, trigger=%d", module, fd, trigger);
683     } while (false);
684 
685     (void)SoftBusMutexUnlock(&node->lock);
686     ReturnListenerNode(&node);
687 
688     if (status == SOFTBUS_OK && wakeup) {
689         WakeupSelectThread();
690     }
691     return status;
692 }
693 
DelTrigger(ListenerModule module,int32_t fd,TriggerType trigger)694 NO_SANITIZE("cfi") int32_t DelTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
695 {
696     CONN_CHECK_AND_RETURN_RET_LOG(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM,
697         LISTENER_TAG "delete trigger failed: invalid module, module=%d", module);
698     CONN_CHECK_AND_RETURN_RET_LOG(
699         fd > 0, SOFTBUS_INVALID_PARAM, LISTENER_TAG "delete trigger failed: invalid fd, module=%d, fd=%d", module, fd);
700     CONN_CHECK_AND_RETURN_RET_LOG(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM,
701         LISTENER_TAG "delete trigger failed: invalid trigger, module=%d, fd=%d, trigger=%d", module, fd, trigger);
702 
703     CLOGI(LISTENER_TAG "receive delete trigger request, module=%d, fd=%d, trigger=%d", module, fd, trigger);
704     SoftbusListenerNode *node = GetListenerNode(module);
705     CONN_CHECK_AND_RETURN_RET_LOG(node != NULL, SOFTBUS_NOT_FIND,
706         LISTENER_TAG "delete trigger failed: listener node not exist, module=%d, fd=%d, trigger=%d", module, fd,
707         trigger);
708 
709     int32_t status = SoftBusMutexLock(&node->lock);
710     if (status != SOFTBUS_OK) {
711         CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! delete trigger failed: try to lock listener node failed, "
712                            "module=%d, fd=%d, trigger=%d, error=%d",
713             module, fd, trigger, status);
714         ReturnListenerNode(&node);
715         return SOFTBUS_LOCK_ERR;
716     }
717 
718     bool wakeup = false;
719     do {
720         FdNode *target = NULL;
721         FdNode *it = NULL;
722         LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
723             if (fd == it->fd) {
724                 target = it;
725                 break;
726             }
727         }
728 
729         if (target == NULL) {
730             CLOGW(LISTENER_TAG
731                 "delete trigger warning, fd node not exist, call delete trigger without add trigger before or mismatch "
732                 "module. You are warned, fix it quickly! module=%d, fd=%d, trigger=%d",
733                 module, fd, trigger);
734             // consider delete trigger success,
735             status = SOFTBUS_OK;
736             break;
737         }
738 
739         if ((target->triggerSet & trigger) == 0) {
740             CLOGW(LISTENER_TAG
741                 "delete trigger warning, without add trigger before, repeat delete trigger or mismatch module. You are "
742                 "warned, fix it quickly! module=%d, fd=%d,  want delete trigger=%d, exist trigger set=%u",
743                 module, fd, trigger, it->triggerSet);
744             // consider delete trigger success,
745             status = SOFTBUS_OK;
746             break;
747         }
748 
749         target->triggerSet &= ~trigger;
750         wakeup = true;
751         if (target->triggerSet != 0) {
752             CLOGI(LISTENER_TAG "delete trigger success, module=%d, fd=%d, trigger=%d, exist trigger set=%u", module, fd,
753                 trigger, target->triggerSet);
754             status = SOFTBUS_OK;
755             break;
756         }
757         CLOGI(LISTENER_TAG
758             "delete trigger success, there is not exist any trigger, free fd node now, module=%d, fd=%d, trigger=%d",
759             module, fd, trigger);
760         ListDelete(&target->node);
761         SoftBusFree(target);
762         node->info.waitEventFdsLen -= 1;
763     } while (false);
764 
765     SoftBusMutexUnlock(&node->lock);
766     ReturnListenerNode(&node);
767 
768     if (status == SOFTBUS_OK && wakeup) {
769         WakeupSelectThread();
770     }
771     return status;
772 }
773 
CleanupSelectThreadState(SelectThreadState ** statePtr)774 static void CleanupSelectThreadState(SelectThreadState **statePtr)
775 {
776     SelectThreadState *state = *statePtr;
777     if (state->ctrlRfd != 0) {
778         ConnCloseSocket(state->ctrlRfd);
779     }
780     if (state->ctrlWfd != 0) {
781         ConnCloseSocket(state->ctrlWfd);
782     }
783 
784     CLOGI(LISTENER_TAG "cleanup select thread state, traceId=%d, ctrl read fd=%d, ctrl write fd=%d", state->traceId,
785         state->ctrlRfd, state->ctrlWfd);
786     (void)SoftBusMutexDestroy(&state->lock);
787     SoftBusFree(state);
788     *statePtr = NULL;
789 }
790 
ProcessCtrlFdEvent(int32_t fd,int32_t wakeupTrace)791 static void ProcessCtrlFdEvent(int32_t fd, int32_t wakeupTrace)
792 {
793 #ifndef __LITEOS__
794     while (true) {
795         int32_t ctrlTraceId = 0;
796         ssize_t len = read(fd, &ctrlTraceId, sizeof(ctrlTraceId));
797         if (len < 0) {
798             int32_t status = errno;
799             if (status == EINTR) {
800                 continue;
801             } else if (status == EAGAIN) {
802                 break;
803             } else {
804                 CLOGE(LISTENER_TAG
805                     "process ctrl fd event failed: wakeup trace=%d, fd=%d, invalid read len=%d, error=%d",
806                     wakeupTrace, fd, len, status);
807                 break;
808             }
809         }
810         CLOGI(LISTENER_TAG "process ctrl fd event, wakeup ctrl message received, wakeup trace=%d, fd=%d, "
811                            "ctrl trace=%d, read length=%d",
812             wakeupTrace, fd, ctrlTraceId, len);
813     }
814 #endif
815 }
816 
DispatchFdEvent(int32_t fd,ListenerModule module,enum SocketEvent event,const SoftbusBaseListener * listener,int32_t wakeupTrace)817 static void DispatchFdEvent(
818     int32_t fd, ListenerModule module, enum SocketEvent event, const SoftbusBaseListener *listener, int32_t wakeupTrace)
819 {
820     if (listener->onDataEvent != NULL) {
821         listener->onDataEvent(module, event, fd);
822     } else {
823         CLOGE(LISTENER_TAG
824             "process fd event, new event coming, but event listener not registered, to avoid repeat wakeup "
825             "select(LEVEL MODE), close it, wakeup trace=%d, module=%d, fd=%d, event=%d",
826             wakeupTrace, module, fd, event);
827         ConnCloseSocket(fd);
828     }
829 }
830 
ProcessSpecifiedServerAcceptEvent(ListenerModule module,int32_t listenFd,ConnectType connectType,const SocketInterface * socketIf,const SoftbusBaseListener * listener,int32_t wakeupTrace)831 static int32_t ProcessSpecifiedServerAcceptEvent(ListenerModule module, int32_t listenFd, ConnectType connectType,
832     const SocketInterface *socketIf, const SoftbusBaseListener *listener, int32_t wakeupTrace)
833 {
834     CONN_CHECK_AND_RETURN_RET_LOG(socketIf != NULL, SOFTBUS_INVALID_PARAM,
835         LISTENER_TAG
836         "process spectified listener node event failed: socket interface implement is null, wakeup trace=%d module=%d",
837         wakeupTrace, module);
838     CONN_CHECK_AND_RETURN_RET_LOG(socketIf->AcceptClient != NULL, SOFTBUS_INVALID_PARAM,
839         LISTENER_TAG "process spectified listener node event failed: socket interface implement not support "
840                      "AcceptClient method, wakeup trace=%d, module=%d",
841         wakeupTrace, module);
842 
843     int32_t status = SOFTBUS_OK;
844     while (true) {
845         int32_t clientFd = -1;
846         ConnectOption clientAddr = {
847             .type = connectType,
848             .socketOption = {
849                 .addr = { 0 },
850                 .port = 0,
851                 .moduleId = module,
852                 .protocol = 0,
853                 .keepAlive = 0,
854             },
855         };
856         status = SOFTBUS_TEMP_FAILURE_RETRY(socketIf->AcceptClient(listenFd, &clientAddr, &clientFd));
857         if (status != SOFTBUS_OK) {
858             break;
859         }
860 
861         char animizedIp[IP_LEN] = { 0 };
862         ConvertAnonymizeIpAddress(animizedIp, IP_LEN, clientAddr.socketOption.addr, IP_LEN);
863         if (listener->onConnectEvent != NULL) {
864             CLOGI(LISTENER_TAG "process spectified listener node event trace, trigger ACCEPT event, wakeup trace=%d, "
865                                "module=%d, listen fd=%d, client ip=%s, client fd=%d",
866                 wakeupTrace, module, listenFd, animizedIp, clientFd);
867             listener->onConnectEvent(module, clientFd, &clientAddr);
868         } else {
869             CLOGE(LISTENER_TAG "process spectified listener node event trace, trigger ACCEPT event, but event listener "
870                                "not registered, wakeup trace=%d, module=%d, listen fd=%d, client ip=%s, client fd=%d",
871                 wakeupTrace, module, listenFd, animizedIp, clientFd);
872             ConnCloseSocket(clientFd);
873         }
874     }
875     return status;
876 }
877 
CopyWaitEventFdsUnsafe(const SoftbusListenerNode * node,FdNode ** outArray,uint32_t * outArrayLen)878 static int32_t CopyWaitEventFdsUnsafe(const SoftbusListenerNode *node, FdNode **outArray, uint32_t *outArrayLen)
879 {
880     if (node->info.waitEventFdsLen == 0) {
881         *outArray = NULL;
882         outArrayLen = 0;
883         return SOFTBUS_OK;
884     }
885 
886     uint32_t fdArrayLen = node->info.waitEventFdsLen;
887     FdNode *fdArray = (FdNode *)SoftBusCalloc(fdArrayLen * sizeof(FdNode));
888     CONN_CHECK_AND_RETURN_RET_LOG(fdArray != NULL, SOFTBUS_MALLOC_ERR,
889         LISTENER_TAG
890         "ATTENTION UNEXPECTED ERROR! copy wait event fds failed: calloc fd node array object failed, module=%d, wait "
891         "event len=%u",
892         node->module, fdArrayLen);
893 
894     uint32_t i = 0;
895     FdNode *item = NULL;
896     bool expand = false;
897     LIST_FOR_EACH_ENTRY(item, &node->info.waitEventFds, FdNode, node) {
898         if (i >= fdArrayLen) {
899             uint32_t tmpLen = fdArrayLen * FDARR_EXPAND_BASE;
900             FdNode *tmp = (FdNode *)SoftBusCalloc(tmpLen * sizeof(FdNode));
901             if (tmp == NULL) {
902                 CLOGE(LISTENER_TAG
903                     "copy wait event fds failed: expand calloc fd node array object failed, module=%d, wait event "
904                     "len=%u",
905                     node->module, tmpLen);
906                 SoftBusFree(fdArray);
907                 return SOFTBUS_MALLOC_ERR;
908             }
909             for (uint32_t j = 0; j < fdArrayLen; j++) {
910                 tmp[j].fd = fdArray[j].fd;
911                 tmp[j].triggerSet = fdArray[j].triggerSet;
912             }
913             SoftBusFree(fdArray);
914             fdArray = tmp;
915             fdArrayLen = tmpLen;
916             expand = true;
917         }
918         fdArray[i].fd = item->fd;
919         fdArray[i].triggerSet = item->triggerSet;
920         i++;
921     }
922 
923     // diagnose by the way
924     if (expand) {
925         CLOGE(LISTENER_TAG
926             "ATTENTION! copy wait event fds warining, listener node 'waitEventFdsLen' field is unexpected, actual "
927             "wait event fd size larget than it, forgot maintain while update fd? fix it quickly. module=%d, "
928             "wait event fds len=%u, actual wait event fds len=%u",
929             node->module, node->info.waitEventFdsLen, i);
930     } else if (i != fdArrayLen) {
931         CLOGE(LISTENER_TAG
932             "ATTENTION! copy wait event fds warining, listener node 'waitEventFdsLen' field is unexpected, actual "
933             "wait event fd size lower than it, forgot maintain while update fd? fix it quickly. module=%d, "
934             "wait event fds len=%u, actual wait event fds len=%u",
935             node->module, node->info.waitEventFdsLen, i);
936     }
937 
938     *outArrayLen = i;
939     *outArray = fdArray;
940     return SOFTBUS_OK;
941 }
942 
CloseInvalidListenForcely(SoftbusListenerNode * node,int32_t listenFd,const char * anomizedIp,int32_t reason)943 static void CloseInvalidListenForcely(SoftbusListenerNode *node, int32_t listenFd, const char *anomizedIp,
944     int32_t reason)
945 {
946     CONN_CHECK_AND_RETURN_LOG(SoftBusMutexLock(&node->lock) == SOFTBUS_OK,
947         LISTENER_TAG
948         "ATTENTION UNEXPECTED ERROR! close invalid listen forcely failed: try to lock listener node failed, module=%d",
949         node->module);
950     do {
951         if (node->info.status != LISTENER_RUNNING || node->info.modeType != SERVER_MODE ||
952             node->info.listenFd != listenFd) {
953             break;
954         }
955         CLOGW(LISTENER_TAG "ATTENTION, forcely close to prevent repeat wakeup select, module=%d, invalid listen fd=%d "
956             "port=%d, ip=%s, error=%d", node->module, node->info.listenFd, node->info.listenPort, anomizedIp, reason);
957         ConnCloseSocket(node->info.listenFd);
958         node->info.listenFd = -1;
959         node->info.listenPort = -1;
960     } while (false);
961     SoftBusMutexUnlock(&node->lock);
962 }
963 
ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,int32_t wakeupTrace)964 static void ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet,
965     SoftBusFdSet *exceptSet, int32_t wakeupTrace)
966 {
967     CONN_CHECK_AND_RETURN_LOG(SoftBusMutexLock(&node->lock) == SOFTBUS_OK,
968         LISTENER_TAG "ATTENTION UNEXPECTED ERROR! try to lock listener node failed, wakeup trace=%d, module=%d",
969         wakeupTrace, node->module);
970     if (node->info.status != LISTENER_RUNNING) {
971         SoftBusMutexUnlock(&node->lock);
972         return;
973     }
974 
975     int32_t listenFd = -1;
976     int32_t listenPort = -1;
977     char animizedIp[IP_LEN] = { 0 };
978     if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
979         listenFd = node->info.listenFd;
980         listenPort = node->info.listenPort;
981         ConvertAnonymizeIpAddress(animizedIp, IP_LEN, node->info.listenerInfo.socketOption.addr, IP_LEN);
982     }
983     const SocketInterface *socketIf = node->socketIf;
984     SoftbusBaseListener listener = node->listener;
985     ConnectType connectType = node->info.listenerInfo.type;
986 
987     FdNode *fdArray = NULL;
988     uint32_t fdArrayLen = 0;
989     int32_t status = CopyWaitEventFdsUnsafe(node, &fdArray, &fdArrayLen);
990     SoftBusMutexUnlock(&node->lock);
991     if (status != SOFTBUS_OK) {
992         CLOGE(LISTENER_TAG
993             "process spectified listener node event failed: copy wait event fds failed, wakeup trace=%d, module=%d, "
994             "error=%d",
995             wakeupTrace, node->module, status);
996         return;
997     }
998 
999     if (listenFd > 0 && SoftBusSocketFdIsset(listenFd, readSet)) {
1000         status =
1001             ProcessSpecifiedServerAcceptEvent(node->module, listenFd, connectType, socketIf, &listener, wakeupTrace);
1002         switch (status) {
1003             case SOFTBUS_OK:
1004             case SOFTBUS_ADAPTER_SOCKET_EAGAIN:
1005                 break;
1006             case SOFTBUS_ADAPTER_SOCKET_EINVAL:
1007             case SOFTBUS_ADAPTER_SOCKET_EBADF:
1008                 CloseInvalidListenForcely(node, listenFd, animizedIp, status);
1009                 break;
1010             default:
1011                 CLOGD(LISTENER_TAG
1012                     "process spectified listener node event failed: accept client failed, wakeup trace=%d, "
1013                     "module=%d, listen fd=%d, port=%d, ip=%s, error=%d",
1014                     wakeupTrace, node->module, listenFd, listenPort, animizedIp, status);
1015                 break;
1016         }
1017     }
1018 
1019     if (fdArrayLen == 0) {
1020         return;
1021     }
1022 
1023     for (uint32_t i = 0; i < fdArrayLen; i++) {
1024         if ((fdArray[i].triggerSet & READ_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, readSet)) {
1025             CLOGD(LISTENER_TAG
1026                 "process spectified listener node event trace, trigger IN event, wakeup trace=%d, module=%d, fd=%d, "
1027                 "trigger set=%u",
1028                 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1029             DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_IN, &listener, wakeupTrace);
1030         }
1031         if ((fdArray[i].triggerSet & WRITE_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, writeSet)) {
1032             CLOGD(LISTENER_TAG
1033                 "process spectified listener node event trace, trigger OUT event, wakeup trace=%d, module=%d, fd=%d, "
1034                 "trigger set=%u",
1035                 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1036             DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_OUT, &listener, wakeupTrace);
1037         }
1038         if ((fdArray[i].triggerSet & EXCEPT_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, exceptSet)) {
1039             CLOGW(LISTENER_TAG
1040                 "process spectified listener node event trace, trigger EXCEPTION(out-of-band data) event, wakeup "
1041                 "trace=%d, module=%d, fd=%d, trigger set=%u",
1042                 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1043             DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_EXCEPTION, &listener, wakeupTrace);
1044         }
1045         // RW_TRIGGER is already triggered in READ_TRIGGER and WRITE_TRIGGER, just skip it
1046     }
1047     SoftBusFree(fdArray);
1048 }
1049 
ProcessEvent(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,const SelectThreadState * selectState,int32_t wakeupTrace)1050 static void ProcessEvent(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet,
1051     const SelectThreadState *selectState, int32_t wakeupTrace)
1052 {
1053     for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1054         SoftbusListenerNode *node = GetListenerNode(module);
1055         if (node == NULL) {
1056             continue;
1057         }
1058         ProcessSpecifiedListenerNodeEvent(node, readSet, writeSet, exceptSet, wakeupTrace);
1059         ReturnListenerNode(&node);
1060     }
1061 
1062     if (SoftBusSocketFdIsset(selectState->ctrlRfd, readSet)) {
1063         ProcessCtrlFdEvent(selectState->ctrlRfd, wakeupTrace);
1064     }
1065 }
1066 
CollectSpecifiedModuleListenerEvents(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1067 static int32_t CollectSpecifiedModuleListenerEvents(
1068     SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1069 {
1070     CONN_CHECK_AND_RETURN_RET_LOG(SoftBusMutexLock(&node->lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
1071         LISTENER_TAG
1072         "ATTENTION UNEXPECTED ERROR! collect wait event fd set failed: try to lock listener node failed, module=%d",
1073         node->module);
1074 
1075     if (node->info.status != LISTENER_RUNNING) {
1076         (void)SoftBusMutexUnlock(&node->lock);
1077         return 0;
1078     }
1079 
1080     int32_t maxFd = 0;
1081     if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
1082         SoftBusSocketFdSet(node->info.listenFd, readSet);
1083         maxFd = node->info.listenFd;
1084     }
1085 
1086     FdNode *it = NULL;
1087     LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
1088         if ((it->triggerSet & READ_TRIGGER) != 0) {
1089             SoftBusSocketFdSet(it->fd, readSet);
1090         }
1091         if ((it->triggerSet & WRITE_TRIGGER) != 0) {
1092             SoftBusSocketFdSet(it->fd, writeSet);
1093         }
1094         if ((it->triggerSet & EXCEPT_TRIGGER) != 0) {
1095             SoftBusSocketFdSet(it->fd, exceptSet);
1096         }
1097         // RW_TRIGGER is already collected in READ_TRIGGER and WRITE_TRIGGER, just skip it
1098         maxFd = it->fd > maxFd ? it->fd : maxFd;
1099     }
1100     (void)SoftBusMutexUnlock(&node->lock);
1101     return maxFd;
1102 }
1103 
CollectWaitEventFdSet(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1104 static int32_t CollectWaitEventFdSet(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1105 {
1106     int32_t maxFd = 0;
1107     int32_t statusOrFd = 0;
1108     int32_t status = SOFTBUS_OK;
1109     do {
1110         for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1111             SoftbusListenerNode *node = GetListenerNode(module);
1112             if (node == NULL) {
1113                 continue;
1114             }
1115             statusOrFd = CollectSpecifiedModuleListenerEvents(node, readSet, writeSet, exceptSet);
1116             ReturnListenerNode(&node);
1117             if (statusOrFd < 0) {
1118                 status = statusOrFd;
1119                 CLOGE(LISTENER_TAG "collect wait event fd set failed: module=%d, error=%d", module, status);
1120                 break;
1121             }
1122             maxFd = statusOrFd > maxFd ? statusOrFd : maxFd;
1123         }
1124     } while (false);
1125 
1126     if (status != SOFTBUS_OK) {
1127         SoftBusSocketFdZero(readSet);
1128         SoftBusSocketFdZero(writeSet);
1129         SoftBusSocketFdZero(exceptSet);
1130         return status;
1131     }
1132 
1133     return maxFd;
1134 }
1135 
SelectTask(void * arg)1136 static void *SelectTask(void *arg)
1137 {
1138 #define SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS (3 * 1000)
1139     static int32_t wakeupTraceIdGenerator = 0;
1140 
1141     SelectThreadState *selectState = (SelectThreadState *)arg;
1142     CLOGI(LISTENER_TAG "select task start, select trace=%d, ctrl read fd=%d, ctrl write fd=%d", selectState->traceId,
1143         selectState->ctrlRfd, selectState->ctrlWfd);
1144     while (true) {
1145         int status = SoftBusMutexLock(&selectState->lock);
1146         if (status != SOFTBUS_OK) {
1147             CLOGE(LISTENER_TAG
1148                 "ATTENTION UNEXPECTED ERROR! try to lock select thread state self failed, retry after %d ms, "
1149                 "select trace=%d, error=%d",
1150                 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, status);
1151             SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1152             continue;
1153         }
1154         int32_t referenceCount = selectState->referenceCount;
1155         (void)SoftBusMutexUnlock(&selectState->lock);
1156 
1157         if (referenceCount <= 0) {
1158             CLOGW(LISTENER_TAG "select task, select task is not reference by others any more, select trace=%d, exit...",
1159                 selectState->traceId);
1160             break;
1161         }
1162 
1163         SoftBusFdSet readSet;
1164         SoftBusFdSet writeSet;
1165         SoftBusFdSet exceptSet;
1166         SoftBusSocketFdZero(&readSet);
1167         SoftBusSocketFdZero(&writeSet);
1168         SoftBusSocketFdZero(&exceptSet);
1169         int32_t maxFdOrStatus = CollectWaitEventFdSet(&readSet, &writeSet, &exceptSet);
1170         if (maxFdOrStatus < 0) {
1171             CLOGE(LISTENER_TAG
1172                 "select task failed: collect wait event fd set failed, retry after %d ms, select trace=%d, error=%d",
1173                 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, maxFdOrStatus);
1174             SoftBusSocketFdZero(&readSet);
1175             SoftBusSocketFdZero(&writeSet);
1176             SoftBusSocketFdZero(&exceptSet);
1177             SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1178             continue;
1179         }
1180         SoftBusSocketFdSet(selectState->ctrlRfd, &readSet);
1181         int32_t maxFd = maxFdOrStatus > selectState->ctrlRfd ? maxFdOrStatus : selectState->ctrlRfd;
1182         int32_t nEvents = SoftBusSocketSelect(maxFd + 1, &readSet, &writeSet, &exceptSet, NULL);
1183         int32_t wakeupTraceId = ++wakeupTraceIdGenerator;
1184         if (nEvents <= 0) {
1185             CLOGE(LISTENER_TAG
1186                 "ATTENTION, select task failed: unexpect wakeup, retry after %d ms, wakeup trace id=%d, events=%d",
1187                 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, wakeupTraceId, nEvents);
1188             SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1189             continue;
1190         }
1191         CLOGI(LISTENER_TAG "select task, wakeup from select, select trace=%d, wakeup trace=%d, events=%d",
1192             selectState->traceId, wakeupTraceId, nEvents);
1193         ProcessEvent(&readSet, &writeSet, &exceptSet, selectState, wakeupTraceId);
1194     }
1195     CleanupSelectThreadState(&selectState);
1196     return NULL;
1197 }
1198 
StartSelectThread(void)1199 static int32_t StartSelectThread(void)
1200 {
1201     static int32_t selectThreadTraceIdGenerator = 1;
1202 
1203     int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1204     CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
1205         LISTENER_TAG
1206         "ATTENTION UNEXPECTED ERROR! start select thread failed: try to lock global select thread state failed");
1207 
1208     do {
1209         if (g_selectThreadState != NULL) {
1210             status = SoftBusMutexLock(&g_selectThreadState->lock);
1211             if (status != SOFTBUS_OK) {
1212                 CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! start select thread failed: try to lock select thread "
1213                                    "state self failed, error=%d",
1214                     status);
1215                 status = SOFTBUS_LOCK_ERR;
1216                 break;
1217             }
1218             int32_t referenceCount = ++g_selectThreadState->referenceCount;
1219             (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1220             WakeupSelectThread();
1221 
1222             CLOGW(LISTENER_TAG
1223                 "start select thread, select thread is already start, select trace=%d, ctrl read fd=%d, ctrl write "
1224                 "fd=%d, reference count=%d",
1225                 g_selectThreadState->traceId, g_selectThreadState->ctrlRfd, g_selectThreadState->ctrlWfd,
1226                 referenceCount);
1227             break;
1228         }
1229 
1230         SelectThreadState *state = SoftBusCalloc(sizeof(SelectThreadState));
1231         if (state == NULL) {
1232             status = SOFTBUS_ERR;
1233             break;
1234         }
1235         state->traceId = ++selectThreadTraceIdGenerator;
1236         int32_t fds[2] = { 0 };
1237         int32_t rc = 0;
1238 #ifndef __LITEOS__
1239         rc = pipe2(fds, O_CLOEXEC | O_NONBLOCK);
1240 #endif
1241         if (rc != 0) {
1242             CLOGE(LISTENER_TAG "start select thread failed: create ctrl pipe failed, error=%s", strerror(errno));
1243             SoftBusFree(state);
1244             status = SOFTBUS_ERR;
1245             break;
1246         }
1247         state->ctrlRfd = fds[0];
1248         state->ctrlWfd = fds[1];
1249 
1250         status = SoftBusMutexInit(&state->lock, NULL);
1251         if (status != SOFTBUS_OK) {
1252             CLOGE(LISTENER_TAG "start select thread failed: start select task async failed, error=%d", status);
1253             CleanupSelectThreadState(&state);
1254             break;
1255         }
1256         state->referenceCount = 1;
1257         status = ConnStartActionAsync(state, SelectTask);
1258         if (status != SOFTBUS_OK) {
1259             CLOGE(LISTENER_TAG "start select thread failed: init lock failed, error=%d", status);
1260             CleanupSelectThreadState(&state);
1261             break;
1262         }
1263         CLOGI(LISTENER_TAG "start select thread success, trace id=%d, ctrl read fd=%d, ctrl write fd=%d",
1264             state->traceId, state->ctrlRfd, state->ctrlWfd);
1265         g_selectThreadState = state;
1266     } while (false);
1267     (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1268     return status;
1269 }
1270 
StopSelectThread(void)1271 static int32_t StopSelectThread(void)
1272 {
1273     int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1274     CONN_CHECK_AND_RETURN_RET_LOG(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
1275         LISTENER_TAG "ATTENTION UNEXPECTED ERROR! stop select thread failed: try to lock global select thread state "
1276                      "failed");
1277     do {
1278         if (g_selectThreadState == NULL) {
1279             CLOGW(LISTENER_TAG "stop select thread, select thread is already stop or never start");
1280             break;
1281         }
1282 
1283         status = SoftBusMutexLock(&g_selectThreadState->lock);
1284         if (status != SOFTBUS_OK) {
1285             CLOGE(LISTENER_TAG "ATTENTION UNEXPECTED ERROR! stop select thread, try to lock select thread state self");
1286             break;
1287         }
1288         g_selectThreadState->referenceCount -= 1;
1289         int32_t referenceCount = g_selectThreadState->referenceCount;
1290         (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1291         if (referenceCount <= 0) {
1292             CLOGI(LISTENER_TAG "stop select thread, select thread is not used by other module any more, notify exit, "
1293                                "thread reference count=%d",
1294                 referenceCount);
1295             WakeupSelectThread();
1296             g_selectThreadState = NULL;
1297         }
1298     } while (false);
1299     (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1300     return status;
1301 }
1302 
WakeupSelectThread(void)1303 static void WakeupSelectThread(void)
1304 {
1305 #ifndef __LITEOS__
1306     static int32_t selectWakeupTraceIdGenerator = 0;
1307 
1308     int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1309     CONN_CHECK_AND_RETURN_LOG(status == SOFTBUS_OK,
1310         LISTENER_TAG "ATTENTION UNEXPECTED ERROR! wakeup select thread failed: try to lock global select thread state "
1311                      "failed, error=%d",
1312         status);
1313     do {
1314         if (g_selectThreadState == NULL) {
1315             CLOGW(LISTENER_TAG "wakeup select thread warning: select thread is not running, just skip");
1316             break;
1317         }
1318         int32_t ctrlTraceId = selectWakeupTraceIdGenerator++;
1319         ssize_t len = write(g_selectThreadState->ctrlWfd, &ctrlTraceId, sizeof(ctrlTraceId));
1320         CLOGI(LISTENER_TAG "wakeup select thread, wakeup ctrl message sent, write length=%d, ctrl trace=%d", len,
1321             ctrlTraceId);
1322     } while (false);
1323     SoftBusMutexUnlock(&g_selectThreadStateLock);
1324 #endif
1325 }