• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "softbus_proxychannel_pipeline.h"
17 
18 #include <securec.h>
19 #include <stdatomic.h>
20 
21 #include "bus_center_manager.h"
22 #include "common_list.h"
23 #include "conn_log.h"
24 #include "g_enhance_conn_func.h"
25 #include "g_enhance_conn_func_pack.h"
26 #include "lnn_lane_interface.h"
27 #include "message_handler.h"
28 #include "softbus_adapter_mem.h"
29 #include "softbus_adapter_socket.h"
30 #include "softbus_error_code.h"
31 #include "softbus_transmission_interface.h"
32 #include "softbus_utils.h"
33 #include "softbus_init_common.h"
34 #include "trans_log.h"
35 
36 #define SESSION_NAME "ohos.dsoftbus.inner.p2pchannel"
37 #define PIPELINEHANDLER_NAME "ProxyChannelPipelineHandler"
38 #define MSG_CNT 2
39 
40 enum PipelineLooperMsgType {
41     LOOPER_MSG_TYPE_OPEN_CHANNEL,
42     LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL,
43 
44     LOOPER_MSG_TYPE_ON_CHANNEL_OPENED,
45     LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED,
46 };
47 
48 struct ListenerItem {
49     TransProxyPipelineMsgType type;
50     ITransProxyPipelineListener listener;
51 };
52 
53 struct PipelineChannelItem {
54     ListNode node;
55 
56     // for open channel request context
57     int32_t requestId;
58     char networkId[NETWORK_ID_BUF_LEN];
59     TransProxyPipelineChannelOption option;
60     ITransProxyPipelineCallback callback;
61 
62     // for channel opened context
63     int32_t channelId;
64     int32_t ref;
65     char uuid[UUID_BUF_LEN];
66 };
67 
68 struct PipelineManager {
69     _Atomic bool inited;
70     SoftBusMutex lock;
71     struct ListenerItem listeners[MSG_TYPE_CNT];
72     SoftBusList *channels;
73 
74     SoftBusLooper *looper;
75     SoftBusHandler handler;
76 };
77 
78 static struct PipelineManager g_manager = {
79     .inited = false,
80     .listeners = {},
81     .looper = NULL,
82     .handler = {},
83 };
84 
85 typedef bool (*Comparable)(const struct PipelineChannelItem *item, const void *param);
SearchChannelItemUnsafe(const void * param,Comparable func)86 static struct PipelineChannelItem *SearchChannelItemUnsafe(const void *param, Comparable func)
87 {
88     struct PipelineChannelItem *target = NULL;
89     struct PipelineChannelItem *it = NULL;
90     LIST_FOR_EACH_ENTRY(it, &g_manager.channels->list, struct PipelineChannelItem, node) {
91         if (func(it, param)) {
92             target = it;
93         }
94     }
95     return target;
96 }
97 
CompareByRequestId(const struct PipelineChannelItem * item,const void * param)98 static bool CompareByRequestId(const struct PipelineChannelItem *item, const void *param)
99 {
100     return item->requestId == *(int32_t *)param;
101 }
102 
CompareByChannelId(const struct PipelineChannelItem * item,const void * param)103 static bool CompareByChannelId(const struct PipelineChannelItem *item, const void *param)
104 {
105     return item->channelId == *(int32_t *)param;
106 }
107 
CompareByUuid(const struct PipelineChannelItem * item,const void * param)108 static bool CompareByUuid(const struct PipelineChannelItem *item, const void *param)
109 {
110     return strlen(item->uuid) != 0 && strcmp(item->uuid, (const char *)param) == 0;
111 }
112 
TransProxyPipelineFreeMessage(SoftBusMessage * msg)113 static void TransProxyPipelineFreeMessage(SoftBusMessage *msg)
114 {
115     TRANS_CHECK_AND_RETURN_LOGW(msg, TRANS_CTRL, "null msg");
116     if (msg->obj != NULL) {
117         SoftBusFree(msg->obj);
118         msg->obj = NULL;
119     }
120     SoftBusFree(msg);
121 }
122 
TransProxyReuseByChannelId(int32_t channelId)123 int32_t TransProxyReuseByChannelId(int32_t channelId)
124 {
125     TRANS_LOGD(TRANS_CTRL, "enter.");
126     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
127         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
128     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
129     if (target == NULL) {
130         TRANS_LOGE(TRANS_CTRL, "channel not exist. channelId=%{public}d", channelId);
131         SoftBusMutexUnlock(&g_manager.channels->lock);
132         return SOFTBUS_NOT_FIND;
133     }
134     target->ref++;
135     SoftBusMutexUnlock(&g_manager.channels->lock);
136     return SOFTBUS_OK;
137 }
138 
TransProxyPipelineGenRequestId(void)139 int32_t TransProxyPipelineGenRequestId(void)
140 {
141     static int32_t requestIdGenerator = 0;
142     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
143         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
144     int32_t retValue = ++requestIdGenerator;
145     SoftBusMutexUnlock(&g_manager.channels->lock);
146     return retValue;
147 }
148 
TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type,const ITransProxyPipelineListener * listener)149 int32_t TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type, const ITransProxyPipelineListener *listener)
150 {
151     TRANS_LOGD(TRANS_CTRL, "enter.");
152     TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
153         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d", type);
154     TRANS_CHECK_AND_RETURN_RET_LOGW(listener && listener->onDataReceived && listener->onDisconnected,
155         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "listen is invalid");
156 
157     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
158         TRANS_CTRL, "lock failed");
159     for (int32_t i = 0; i < MSG_CNT; i++) {
160         if (g_manager.listeners[i].type == type) {
161             TRANS_LOGW(TRANS_CTRL, "repeat register listener, overwrite it. type=%{public}d", type);
162             g_manager.listeners[i].listener = *listener;
163             SoftBusMutexUnlock(&g_manager.lock);
164             return SOFTBUS_OK;
165         }
166         if (g_manager.listeners[i].type == MSG_TYPE_INVALID) {
167             g_manager.listeners[i].type = type;
168             g_manager.listeners[i].listener = *listener;
169             SoftBusMutexUnlock(&g_manager.lock);
170             return SOFTBUS_OK;
171         }
172     }
173     TRANS_LOGE(TRANS_CTRL, "register listener failed: no position. type=%{public}d", type);
174     SoftBusMutexUnlock(&g_manager.lock);
175     return SOFTBUS_TRANS_REGISTER_LISTENER_FAILED;
176 }
177 
TransProxyPipelineOpenChannel(int32_t requestId,const char * networkId,const TransProxyPipelineChannelOption * option,const ITransProxyPipelineCallback * callback)178 int32_t TransProxyPipelineOpenChannel(int32_t requestId, const char *networkId,
179     const TransProxyPipelineChannelOption *option, const ITransProxyPipelineCallback *callback)
180 {
181     TRANS_LOGD(TRANS_CTRL, "enter.");
182     if (!IsValidString(networkId, ID_MAX_LEN)) {
183         return SOFTBUS_INVALID_PARAM;
184     }
185     TRANS_CHECK_AND_RETURN_RET_LOGE(option != NULL, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "option invalid");
186     TRANS_CHECK_AND_RETURN_RET_LOGE(networkId, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid network id");
187     TRANS_CHECK_AND_RETURN_RET_LOGE(callback && callback->onChannelOpened && callback->onChannelOpenFailed,
188         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid callback");
189 
190     if (option->bleDirect) {
191         TRANS_CHECK_AND_RETURN_RET_LOGE(
192             ConnBleDirectIsEnablePacked(BLE_COC), SOFTBUS_FUNC_NOT_SUPPORT,
193                 TRANS_CTRL, "ble direct is not enable");
194     }
195     struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
196     TRANS_CHECK_AND_RETURN_RET_LOGE(
197         item != NULL, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc item failed, reqId=%{public}d", requestId);
198     item->requestId = requestId;
199     if (strcpy_s(item->networkId, NETWORK_ID_BUF_LEN, networkId) != EOK) {
200         TRANS_LOGE(TRANS_CTRL, "strcpy_s network id failed, reqId=%{public}d", requestId);
201         SoftBusFree(item);
202         return SOFTBUS_STRCPY_ERR;
203     }
204     item->option = *option;
205     item->callback = *callback;
206     item->channelId = INVALID_CHANNEL_ID;
207 
208     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
209     if (msg == NULL) {
210         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, reqId=%{public}d", requestId);
211         SoftBusFree(item);
212         return SOFTBUS_MALLOC_ERR;
213     }
214     msg->what = LOOPER_MSG_TYPE_OPEN_CHANNEL;
215     msg->arg1 = (uint64_t)requestId;
216     msg->handler = &g_manager.handler;
217     msg->FreeMessage = TransProxyPipelineFreeMessage;
218 
219     if (SoftBusMutexLock(&g_manager.channels->lock) != SOFTBUS_OK) {
220         TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d", requestId);
221         SoftBusFree(item);
222         SoftBusFree(msg);
223         return SOFTBUS_LOCK_ERR;
224     }
225     ListInit(&item->node);
226     ListAdd(&g_manager.channels->list, &item->node);
227     TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
228     g_manager.channels->cnt++;
229     SoftBusMutexUnlock(&g_manager.channels->lock);
230 
231     g_manager.looper->PostMessage(g_manager.looper, msg);
232     return SOFTBUS_OK;
233 }
234 
TransProxyPipelineSendMessage(int32_t channelId,const uint8_t * data,uint32_t dataLen,TransProxyPipelineMsgType type)235 int32_t TransProxyPipelineSendMessage(
236     int32_t channelId, const uint8_t *data, uint32_t dataLen, TransProxyPipelineMsgType type)
237 {
238     TRANS_LOGD(TRANS_CTRL, "enter.");
239     TRANS_CHECK_AND_RETURN_RET_LOGW(data, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "data is invalid");
240     TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
241         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d ", type);
242 
243     char *sendData = (char *)SoftBusCalloc(dataLen + sizeof(uint32_t));
244     TRANS_CHECK_AND_RETURN_RET_LOGW(sendData, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc send data failed");
245     *(uint32_t *)sendData = SoftBusHtoLl((uint32_t)type);
246     if (memcpy_s(sendData + sizeof(uint32_t), dataLen, data, dataLen) != EOK) {
247         TRANS_LOGE(TRANS_CTRL, "memcpy send data failed");
248         SoftBusFree(sendData);
249         return SOFTBUS_MEM_ERR;
250     }
251     int32_t ret = TransSendNetworkingMessage(channelId, sendData, dataLen + sizeof(uint32_t), CONN_HIGH);
252     if (ret != SOFTBUS_OK) {
253         TRANS_LOGE(TRANS_CTRL, "trans send data failed");
254         SoftBusFree(sendData);
255         return ret;
256     }
257     SoftBusFree(sendData);
258     return SOFTBUS_OK;
259 }
260 
TransProxyPipelineGetChannelIdByNetworkId(const char * networkId)261 int32_t TransProxyPipelineGetChannelIdByNetworkId(const char *networkId)
262 {
263     TRANS_LOGD(TRANS_CTRL, "enter.");
264     if (!IsValidString(networkId, ID_MAX_LEN)) {
265         return SOFTBUS_INVALID_PARAM;
266     }
267     char uuid[UUID_BUF_LEN] = { 0 };
268     int32_t ret = LnnGetRemoteStrInfo(networkId, STRING_KEY_UUID, uuid, sizeof(uuid));
269     if (ret != SOFTBUS_OK) {
270         TRANS_LOGE(TRANS_CTRL, "get remote uuid by network id fail, ret=%{public}d", ret);
271         return INVALID_CHANNEL_ID;
272     }
273 
274     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
275         INVALID_CHANNEL_ID, TRANS_CTRL, "lock failed");
276     struct PipelineChannelItem *target = SearchChannelItemUnsafe(uuid, CompareByUuid);
277     if (target == NULL) {
278         TRANS_LOGE(TRANS_CTRL, "channel not found");
279         SoftBusMutexUnlock(&g_manager.channels->lock);
280         return INVALID_CHANNEL_ID;
281     }
282     int32_t channelId = target->channelId;
283     SoftBusMutexUnlock(&g_manager.channels->lock);
284     return channelId;
285 }
286 
TransProxyPipelineGetUuidByChannelId(int32_t channelId,char * uuid,uint32_t uuidLen)287 int32_t TransProxyPipelineGetUuidByChannelId(int32_t channelId, char *uuid, uint32_t uuidLen)
288 {
289     TRANS_LOGD(TRANS_CTRL, "enter.");
290     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
291         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
292     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
293     if (target == NULL) {
294         TRANS_LOGE(TRANS_CTRL, "channelId not exist. channelId=%{public}d", channelId);
295         SoftBusMutexUnlock(&g_manager.channels->lock);
296         return SOFTBUS_NOT_FIND;
297     }
298     if (strcpy_s(uuid, uuidLen, target->uuid) != EOK) {
299         SoftBusMutexUnlock(&g_manager.channels->lock);
300         return SOFTBUS_STRCPY_ERR;
301     }
302     SoftBusMutexUnlock(&g_manager.channels->lock);
303     return SOFTBUS_OK;
304 }
305 
TransProxyPipelineCloseChannel(int32_t channelId)306 int32_t TransProxyPipelineCloseChannel(int32_t channelId)
307 {
308     TRANS_LOGI(TRANS_CTRL, "enter.");
309     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
310         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
311 
312     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
313     if (target != NULL) {
314         target->ref--;
315         if (target->ref <= 0) {
316             ListDelete(&target->node);
317             g_manager.channels->cnt -= 1;
318             SoftBusFree(target);
319             SoftBusMutexUnlock(&g_manager.channels->lock);
320             TRANS_LOGW(TRANS_CTRL, "close channelId=%{public}d", channelId);
321             return TransCloseNetWorkingChannel(channelId);
322         }
323         TRANS_LOGI(TRANS_CTRL, "channelId=%{public}d, ref=%{public}d", channelId, target->ref);
324     }
325     SoftBusMutexUnlock(&g_manager.channels->lock);
326     return SOFTBUS_OK;
327 }
328 
TransProxyPipelineCloseChannelDelay(int32_t channelId)329 int32_t TransProxyPipelineCloseChannelDelay(int32_t channelId)
330 {
331 #define DELAY_CLOSE_CHANNEL_MS 3000
332     TRANS_LOGD(TRANS_CTRL, "enter.");
333     TRANS_CHECK_AND_RETURN_RET_LOGW(channelId != INVALID_CHANNEL_ID, SOFTBUS_INVALID_PARAM,
334         TRANS_CTRL, "invalid channelId=%{public}d", channelId);
335     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
336     if (msg == NULL) {
337         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
338         return SOFTBUS_MALLOC_ERR;
339     }
340     msg->what = LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL;
341     msg->arg1 = (uint64_t)channelId;
342     msg->handler = &g_manager.handler;
343     msg->FreeMessage = TransProxyPipelineFreeMessage;
344     g_manager.looper->PostMessageDelay(g_manager.looper, msg, DELAY_CLOSE_CHANNEL_MS);
345     return SOFTBUS_OK;
346 }
347 
InnerSaveChannel(int32_t channelId,const char * uuid)348 int32_t InnerSaveChannel(int32_t channelId, const char *uuid)
349 {
350     if (uuid == NULL) {
351         TRANS_LOGE(TRANS_CTRL, "invalid uuid");
352         return SOFTBUS_TRANS_INVALID_UUID;
353     }
354     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
355         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
356     struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
357     if (item == NULL) {
358         SoftBusMutexUnlock(&g_manager.channels->lock);
359         return SOFTBUS_MALLOC_ERR;
360     }
361     item->channelId = channelId;
362     if (strcpy_s(item->uuid, UUID_BUF_LEN, uuid) != EOK) {
363         SoftBusFree(item);
364         SoftBusMutexUnlock(&g_manager.channels->lock);
365         return SOFTBUS_STRCPY_ERR;
366     }
367     ListInit(&item->node);
368     ListAdd(&g_manager.channels->list, &item->node);
369     TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
370     g_manager.channels->cnt += 1;
371     SoftBusMutexUnlock(&g_manager.channels->lock);
372     return SOFTBUS_OK;
373 }
374 
TransProxyPipelineOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)375 static int TransProxyPipelineOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
376 {
377     TRANS_LOGD(TRANS_CTRL, "enter.");
378     if (uuid == NULL) {
379         TRANS_LOGE(TRANS_CTRL, "invalid uuid");
380         return SOFTBUS_TRANS_INVALID_UUID;
381     }
382     char *clone = (char *)SoftBusCalloc(UUID_BUF_LEN);
383     if (clone == NULL || strcpy_s(clone, UUID_BUF_LEN, uuid) != EOK) {
384         TRANS_LOGE(TRANS_CTRL, "copy uuid failed, channelId=%{public}d", channelId);
385         SoftBusFree(clone);
386         return SOFTBUS_MEM_ERR;
387     }
388     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
389     if (msg == NULL) {
390         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
391         SoftBusFree(clone);
392         return SOFTBUS_MALLOC_ERR;
393     }
394     msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPENED;
395     msg->arg1 = (uint64_t)channelId;
396     msg->arg2 = isServer;
397     msg->obj = clone;
398     msg->handler = &g_manager.handler;
399     msg->FreeMessage = TransProxyPipelineFreeMessage;
400     g_manager.looper->PostMessage(g_manager.looper, msg);
401     return SOFTBUS_OK;
402 }
403 #ifdef  __cplusplus
404 extern "C" {
405 #endif
InnerOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)406 static void InnerOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
407 {
408     TRANS_LOGD(TRANS_CTRL, "enter.");
409     if (isServer) {
410         if (InnerSaveChannel(channelId, uuid) != SOFTBUS_OK) {
411             TRANS_LOGE(TRANS_CTRL, "save server channel failed");
412             TransCloseNetWorkingChannel(channelId);
413         }
414         return;
415     }
416     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
417     if (ret != SOFTBUS_OK) {
418         TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
419         TransCloseNetWorkingChannel(channelId);
420         return;
421     }
422 
423     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
424     if (target == NULL) {
425         TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
426         SoftBusMutexUnlock(&g_manager.channels->lock);
427         TransCloseNetWorkingChannel(channelId);
428         return;
429     }
430     int32_t requestId = target->requestId;
431     ITransProxyPipelineCallback callback = {
432         .onChannelOpened = target->callback.onChannelOpened,
433         .onChannelOpenFailed = target->callback.onChannelOpenFailed,
434     };
435     if (strcpy_s(target->uuid, UUID_BUF_LEN, uuid) != EOK) {
436         TRANS_LOGE(TRANS_CTRL, "strcpy uuid failed, channelId=%{public}d", channelId);
437         ListDelete(&target->node);
438         SoftBusFree(target);
439         g_manager.channels->cnt -= 1;
440         ret = SOFTBUS_STRCPY_ERR;
441     }
442     SoftBusMutexUnlock(&g_manager.channels->lock);
443     if (ret != SOFTBUS_OK) {
444         TransCloseNetWorkingChannel(channelId);
445         callback.onChannelOpenFailed(requestId, ret);
446     } else {
447         callback.onChannelOpened(requestId, channelId);
448     }
449 }
450 #ifdef  __cplusplus
451 }
452 #endif
TransProxyPipelineOnChannelOpenFailed(int32_t channelId,const char * uuid)453 static void TransProxyPipelineOnChannelOpenFailed(int32_t channelId, const char *uuid)
454 {
455     (void)uuid;
456     TRANS_LOGD(TRANS_CTRL, "enter.");
457     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
458     if (msg == NULL) {
459         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
460         return;
461     }
462     msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED;
463     msg->arg1 = (uint64_t)channelId;
464     msg->handler = &g_manager.handler;
465     msg->FreeMessage = TransProxyPipelineFreeMessage;
466     g_manager.looper->PostMessage(g_manager.looper, msg);
467 }
468 
InnerOnChannelOpenFailed(int32_t channelId)469 static void InnerOnChannelOpenFailed(int32_t channelId)
470 {
471     TRANS_LOGD(TRANS_CTRL, "enter.");
472     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
473     if (ret != SOFTBUS_OK) {
474         TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
475         return;
476     }
477 
478     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
479     if (target == NULL) {
480         TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
481         SoftBusMutexUnlock(&g_manager.channels->lock);
482         return;
483     }
484     int32_t requestId = target->requestId;
485     ITransProxyPipelineCallback callback = {
486         .onChannelOpenFailed = target->callback.onChannelOpenFailed,
487     };
488     ListDelete(&target->node);
489     TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
490     SoftBusFree(target);
491     g_manager.channels->cnt -= 1;
492     SoftBusMutexUnlock(&g_manager.channels->lock);
493     callback.onChannelOpenFailed(requestId, SOFTBUS_TRANS_CHANNEL_OPEN_FAILED);
494     TRANS_LOGI(TRANS_CTRL, "exit");
495 }
496 
TransProxyPipelineOnChannelClosed(int32_t channelId)497 static void TransProxyPipelineOnChannelClosed(int32_t channelId)
498 {
499     TRANS_LOGD(TRANS_CTRL, "enter.");
500     struct PipelineChannelItem *target = NULL;
501     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
502     if (ret != SOFTBUS_OK) {
503         TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
504         goto exit;
505     }
506     target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
507     if (target != NULL) {
508         ListDelete(&target->node);
509         TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
510         SoftBusFree(target);
511         g_manager.channels->cnt -= 1;
512     }
513     SoftBusMutexUnlock(&g_manager.channels->lock);
514 exit:
515     for (int32_t i = 0; i < MSG_CNT; i++) {
516         if (g_manager.listeners[i].type != MSG_TYPE_INVALID && g_manager.listeners[i].listener.onDisconnected != NULL) {
517             g_manager.listeners[i].listener.onDisconnected(channelId);
518         }
519     }
520 }
521 
TransProxyPipelineOnMessageReceived(int32_t channelId,const char * data,uint32_t len)522 static void TransProxyPipelineOnMessageReceived(int32_t channelId, const char *data, uint32_t len)
523 {
524     TRANS_LOGD(TRANS_CTRL, "enter.");
525     TRANS_CHECK_AND_RETURN_LOGW(data, TRANS_CTRL, "data is invalid");
526     TRANS_CHECK_AND_RETURN_LOGW(len > sizeof(uint32_t), TRANS_CTRL, "len is too short. len=%{public}d", len);
527 
528     uint32_t msgType = SoftBusLtoHl(*(uint32_t *)data);
529     struct ListenerItem *target = NULL;
530     for (int32_t i = 0; i < MSG_CNT; i++) {
531         if ((uint32_t)(g_manager.listeners[i].type) == msgType) {
532             target = g_manager.listeners + i;
533             break;
534         }
535     }
536 
537     if (target == NULL || target->listener.onDataReceived == NULL) {
538         TRANS_LOGE(TRANS_CTRL, "not listener for msgType=%{public}u", msgType);
539         return;
540     }
541     target->listener.onDataReceived(channelId, data + sizeof(uint32_t), len - sizeof(uint32_t));
542 }
543 
OpenNetWorkingChannel(int32_t requestId,ITransProxyPipelineCallback * callback,LanePreferredLinkList * preferred,char * networkId,struct PipelineChannelItem * target)544 static void OpenNetWorkingChannel(int32_t requestId, ITransProxyPipelineCallback *callback,
545     LanePreferredLinkList *preferred, char *networkId, struct PipelineChannelItem *target)
546 {
547     int32_t channelId = TransOpenNetWorkingChannel(SESSION_NAME, networkId, preferred);
548     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
549     TRANS_CHECK_AND_RETURN_LOGE(ret == SOFTBUS_OK, TRANS_CTRL, "fail to lock channels.");
550     target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
551     if (target == NULL) {
552         TRANS_LOGE(TRANS_CTRL,
553             "open proxy session failed, reqId=%{public}d, channelId=%{public}d", requestId, channelId);
554         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
555         if (channelId != INVALID_CHANNEL_ID) {
556             TransCloseNetWorkingChannel(channelId);
557         }
558         return;
559     }
560     callback->onChannelOpenFailed = target->callback.onChannelOpenFailed;
561 
562     if (channelId == INVALID_CHANNEL_ID) {
563         TRANS_LOGE(TRANS_CTRL, "open proxy channel failed, reqId=%{public}d", requestId);
564         ListDelete(&target->node);
565         g_manager.channels->cnt -= 1;
566         SoftBusFree(target);
567         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
568         callback->onChannelOpenFailed(requestId, SOFTBUS_TRANS_INVALID_CHANNEL_ID);
569         return;
570     }
571     target->channelId = channelId;
572     target->ref = 1;
573     (void)SoftBusMutexUnlock(&g_manager.channels->lock);
574 }
575 
InnerOpenProxyChannel(int32_t requestId)576 static void InnerOpenProxyChannel(int32_t requestId)
577 {
578     TRANS_LOGD(TRANS_CTRL, "enter.");
579     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
580     if (ret != SOFTBUS_OK) {
581         TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d, ret=%{public}d", requestId, ret);
582         return;
583     }
584     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
585     if (target == NULL) {
586         TRANS_LOGE(TRANS_CTRL, "channel not found. reqId=%{public}d", requestId);
587         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
588         return;
589     }
590     ITransProxyPipelineCallback callback = {
591         .onChannelOpenFailed = target->callback.onChannelOpenFailed,
592     };
593     LanePreferredLinkList preferred = { 0 };
594     if (target->option.bleDirect) {
595         preferred.linkTypeNum = 1;
596         preferred.linkType[0] = LANE_COC_DIRECT;
597     }
598     char networkId[NETWORK_ID_BUF_LEN] = { 0 };
599     if (strcpy_s(networkId, sizeof(networkId), target->networkId) != EOK) {
600         TRANS_LOGE(TRANS_CTRL, "strcpy_s failed, reqId=%{public}d", requestId);
601         ListDelete(&target->node);
602         g_manager.channels->cnt -= 1;
603         SoftBusFree(target);
604         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
605         callback.onChannelOpenFailed(requestId, SOFTBUS_STRCPY_ERR);
606         return;
607     }
608     target = NULL;
609     (void)SoftBusMutexUnlock(&g_manager.channels->lock);
610 
611     OpenNetWorkingChannel(requestId, &callback, &preferred, networkId, target);
612 }
613 #ifdef  __cplusplus
614 extern "C" {
615 #endif
TransProxyPipelineHandleMessage(SoftBusMessage * msg)616 static void TransProxyPipelineHandleMessage(SoftBusMessage *msg)
617 {
618     TRANS_LOGD(TRANS_CTRL, "enter, messageType=%{public}d", msg->what);
619     switch (msg->what) {
620         case LOOPER_MSG_TYPE_OPEN_CHANNEL:
621             InnerOpenProxyChannel(msg->arg1);
622             break;
623         case LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL:
624             TransProxyPipelineCloseChannel(msg->arg1);
625             break;
626         case LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED:
627             InnerOnChannelOpenFailed(msg->arg1);
628             break;
629         case LOOPER_MSG_TYPE_ON_CHANNEL_OPENED:
630             InnerOnChannelOpened(msg->arg1, (char *)msg->obj, msg->arg2);
631             break;
632         default:
633             TRANS_LOGE(TRANS_CTRL, "unknown messageType=%{public}d", msg->what);
634             break;
635     }
636 }
637 
TransProxyPipelineInit(void)638 int32_t TransProxyPipelineInit(void)
639 {
640     TRANS_LOGD(TRANS_CTRL, "enter.");
641     SoftBusList *channels = NULL;
642     int32_t ret = 0;
643     INetworkingListener listener = {
644         .onChannelOpened = TransProxyPipelineOnChannelOpened,
645         .onChannelOpenFailed = TransProxyPipelineOnChannelOpenFailed,
646         .onChannelClosed = TransProxyPipelineOnChannelClosed,
647         .onMessageReceived = TransProxyPipelineOnMessageReceived,
648     };
649 
650     if (atomic_load_explicit(&(g_manager.inited), memory_order_acquire)) {
651         return SOFTBUS_OK;
652     };
653     channels = CreateSoftBusList();
654     if (channels == NULL) {
655         goto exit;
656     }
657     if (SoftBusMutexInit(&g_manager.lock, NULL) != SOFTBUS_OK) {
658         goto exit;
659     }
660     g_manager.channels = channels;
661 
662     ret = TransRegisterNetworkingChannelListener(SESSION_NAME, &listener);
663     if (ret != SOFTBUS_OK) {
664         goto exit;
665     }
666     g_manager.looper = GetLooper(LOOP_TYPE_DEFAULT);
667     if (g_manager.looper == NULL) {
668         TRANS_LOGE(TRANS_INIT, "fail to get looper.");
669         return SOFTBUS_LOOPER_ERR;
670     }
671     g_manager.handler.looper = g_manager.looper;
672     strcpy_s(g_manager.handler.name, strlen(PIPELINEHANDLER_NAME) + 1, PIPELINEHANDLER_NAME);
673     g_manager.handler.HandleMessage = TransProxyPipelineHandleMessage;
674     atomic_store_explicit(&(g_manager.inited), true, memory_order_release);
675     return SOFTBUS_OK;
676 exit:
677     if (channels != NULL) {
678         TRANS_LOGE(TRANS_INIT, "softbus list is not null.");
679         DestroySoftBusList(channels);
680     }
681     g_manager.channels = NULL;
682     SoftBusMutexDestroy(&g_manager.lock);
683     atomic_store_explicit(&(g_manager.inited), false, memory_order_release);
684 
685     return SOFTBUS_TRANS_INIT_FAILED;
686 }
687 #ifdef  __cplusplus
688 }
689 #endif