1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "softbus_proxychannel_pipeline.h"
17
18 #include <securec.h>
19
20 #include "bus_center_manager.h"
21 #include "common_list.h"
22 #include "lnn_lane_interface.h"
23 #include "message_handler.h"
24 #include "softbus_adapter_mem.h"
25 #include "softbus_error_code.h"
26 #include "softbus_log.h"
27 #include "softbus_transmission_interface.h"
28 #include "softbus_utils.h"
29
30 #define SESSION_NAME "ohos.dsoftbus.inner.p2pchannel"
31
32 enum PipelineLooperMsgType {
33 LOOPER_MSG_TYPE_OPEN_CHANNEL,
34 LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL,
35
36 LOOPER_MSG_TYPE_ON_CHANNEL_OPENED,
37 LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED,
38 };
39
40 struct ListenerItem {
41 TransProxyPipelineMsgType type;
42 ITransProxyPipelineListener listener;
43 };
44
45 struct PipelineChannelItem {
46 ListNode node;
47
48 // for open channel request context
49 int32_t requestId;
50 char networkId[NETWORK_ID_BUF_LEN];
51 TransProxyPipelineChannelOption option;
52 ITransProxyPipelineCallback callback;
53
54 // for channel opened context
55 int32_t channelId;
56 char uuid[UUID_BUF_LEN];
57 };
58
59 struct PipelineManager {
60 bool inited;
61 SoftBusMutex lock;
62 struct ListenerItem listeners[MSG_TYPE_CNT];
63 SoftBusList *channels;
64
65 SoftBusLooper *looper;
66 SoftBusHandler handler;
67 };
68
69 static struct PipelineManager g_manager = {
70 .inited = false,
71 .listeners = { 0 },
72 .looper = NULL,
73 .handler = { 0 },
74 };
75
76 typedef bool (*Comparable)(const struct PipelineChannelItem *item, const void *param);
SearchChannelItemUnsafe(const void * param,Comparable func)77 static struct PipelineChannelItem *SearchChannelItemUnsafe(const void *param, Comparable func)
78 {
79 struct PipelineChannelItem *target = NULL;
80 struct PipelineChannelItem *it = NULL;
81 LIST_FOR_EACH_ENTRY(it, &g_manager.channels->list, struct PipelineChannelItem, node) {
82 if (func(it, param)) {
83 target = it;
84 }
85 }
86 return target;
87 }
88
CompareByRequestId(const struct PipelineChannelItem * item,const void * param)89 static bool CompareByRequestId(const struct PipelineChannelItem *item, const void *param)
90 {
91 return item->requestId == *(int32_t *)param;
92 }
93
CompareByChannelId(const struct PipelineChannelItem * item,const void * param)94 static bool CompareByChannelId(const struct PipelineChannelItem *item, const void *param)
95 {
96 return item->channelId == *(int32_t *)param;
97 }
98
CompareByUuid(const struct PipelineChannelItem * item,const void * param)99 static bool CompareByUuid(const struct PipelineChannelItem *item, const void *param)
100 {
101 return strlen(item->uuid) != 0 && strcmp(item->uuid, (const char *)param) == 0;
102 }
103
TransProxyPipelineFreeMessage(SoftBusMessage * msg)104 static void TransProxyPipelineFreeMessage(SoftBusMessage *msg)
105 {
106 CONN_CHECK_AND_RETURN_LOG(msg, "null msg");
107 if (msg->obj != NULL) {
108 SoftBusFree(msg->obj);
109 msg->obj = NULL;
110 }
111 SoftBusFree(msg);
112 }
113
TransProxyPipelineGenRequestId(void)114 int32_t TransProxyPipelineGenRequestId(void)
115 {
116 static int32_t requestIdGenerator = 0;
117 return ++requestIdGenerator;
118 }
119
TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type,const ITransProxyPipelineListener * listener)120 int32_t TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type, const ITransProxyPipelineListener *listener)
121 {
122 TLOGI("enter");
123 TRAN_CHECK_AND_RETURN_RET_LOG(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE, SOFTBUS_INVALID_PARAM,
124 "type: %d is invalid", type);
125 TRAN_CHECK_AND_RETURN_RET_LOG(
126 listener && listener->onDataReceived && listener->onDisconnected, SOFTBUS_INVALID_PARAM, "listen is invalid");
127
128 TRAN_CHECK_AND_RETURN_RET_LOG(SoftBusMutexLock(&g_manager.lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR, "lock failed");
129 for (int32_t i = 0; i < MSG_TYPE_CNT; i++) {
130 if (g_manager.listeners[i].type == type) {
131 TLOGW("type: %d repeat register listener, overwrite it", type);
132 g_manager.listeners[i].listener = *listener;
133 SoftBusMutexUnlock(&g_manager.lock);
134 return SOFTBUS_OK;
135 }
136 if (g_manager.listeners[i].type == MSG_TYPE_INVALID) {
137 g_manager.listeners[i].type = type;
138 g_manager.listeners[i].listener = *listener;
139 SoftBusMutexUnlock(&g_manager.lock);
140 return SOFTBUS_OK;
141 }
142 }
143 TLOGE("type: %d register listener failed: no position", type);
144 SoftBusMutexUnlock(&g_manager.lock);
145 return SOFTBUS_ERR;
146 }
147
TransProxyPipelineOpenChannel(int32_t requestId,const char * networkId,const TransProxyPipelineChannelOption * option,const ITransProxyPipelineCallback * callback)148 int32_t TransProxyPipelineOpenChannel(int32_t requestId, const char *networkId,
149 const TransProxyPipelineChannelOption *option, const ITransProxyPipelineCallback *callback)
150 {
151 TLOGI("enter");
152 TRAN_CHECK_AND_RETURN_RET_LOG(networkId, SOFTBUS_INVALID_PARAM, "invalid network id");
153 TRAN_CHECK_AND_RETURN_RET_LOG(callback && callback->onChannelOpened && callback->onChannelOpenFailed,
154 SOFTBUS_INVALID_PARAM, "invalid callback");
155
156 if (option->bleDirect) {
157 if (!ConnBleDirectIsEnable(BLE_COC)) {
158 TLOGE("ble direct is not enable");
159 return SOFTBUS_FUNC_NOT_SUPPORT;
160 }
161 }
162 struct PipelineChannelItem *item = SoftBusCalloc(sizeof(struct PipelineChannelItem));
163 if (item == NULL) {
164 TLOGE("malloc item failed, request id: %d", requestId);
165 return SOFTBUS_MEM_ERR;
166 }
167 item->requestId = requestId;
168 if (strcpy_s(item->networkId, NETWORK_ID_BUF_LEN, networkId) != EOK) {
169 TLOGE("strcpy_s network id failed, request id: %d", requestId);
170 SoftBusFree(item);
171 return SOFTBUS_STRCPY_ERR;
172 }
173 item->option = *option;
174 item->callback = *callback;
175 item->channelId = INVALID_CHANNEL_ID;
176
177 struct SoftBusMessage *msg = SoftBusCalloc(sizeof(SoftBusMessage));
178 if (msg == NULL) {
179 TLOGE("malloc msg failed, request id: %d", requestId);
180 SoftBusFree(item);
181 return SOFTBUS_MEM_ERR;
182 }
183 msg->what = LOOPER_MSG_TYPE_OPEN_CHANNEL;
184 msg->arg1 = requestId;
185 msg->handler = &g_manager.handler;
186 msg->FreeMessage = TransProxyPipelineFreeMessage;
187
188 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
189 if (ret != SOFTBUS_OK) {
190 TLOGE("lock channels failed, request id: %d, error: %d", requestId, ret);
191 SoftBusFree(item);
192 SoftBusFree(msg);
193 return SOFTBUS_LOCK_ERR;
194 }
195 ListInit(&item->node);
196 ListAdd(&g_manager.channels->list, &item->node);
197 g_manager.channels->cnt += 1;
198 SoftBusMutexUnlock(&g_manager.channels->lock);
199
200 g_manager.looper->PostMessage(g_manager.looper, msg);
201 return SOFTBUS_OK;
202 }
203
TransProxyPipelineSendMessage(int32_t channelId,const uint8_t * data,uint32_t dataLen,TransProxyPipelineMsgType type)204 int32_t TransProxyPipelineSendMessage(
205 int32_t channelId, const uint8_t *data, uint32_t dataLen, TransProxyPipelineMsgType type)
206 {
207 TLOGI("enter");
208 TRAN_CHECK_AND_RETURN_RET_LOG(data, SOFTBUS_INVALID_PARAM, "data is invalid");
209 TRAN_CHECK_AND_RETURN_RET_LOG(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE, SOFTBUS_INVALID_PARAM,
210 "type: %d is invalid", type);
211
212 char *sendData = SoftBusCalloc(dataLen + sizeof(uint32_t));
213 TRAN_CHECK_AND_RETURN_RET_LOG(sendData, SOFTBUS_MALLOC_ERR, "malloc send data failed");
214 *(uint32_t *)sendData = (uint32_t)type;
215 if (memcpy_s(sendData + sizeof(uint32_t), dataLen, data, dataLen) != EOK) {
216 TLOGE("memcpy send data failed");
217 SoftBusFree(sendData);
218 return SOFTBUS_ERR;
219 }
220 if (TransSendNetworkingMessage(channelId, sendData, dataLen + sizeof(uint32_t), CONN_HIGH) != SOFTBUS_OK) {
221 TLOGE("trans send data failed");
222 SoftBusFree(sendData);
223 return SOFTBUS_ERR;
224 }
225 return SOFTBUS_OK;
226 }
227
TransProxyPipelineGetChannelIdByNetworkId(const char * networkId)228 int32_t TransProxyPipelineGetChannelIdByNetworkId(const char *networkId)
229 {
230 TLOGI("enter");
231 char uuid[UUID_BUF_LEN] = { 0 };
232 int32_t ret = LnnGetRemoteStrInfo(networkId, STRING_KEY_UUID, uuid, sizeof(uuid));
233 if (ret != SOFTBUS_OK) {
234 TLOGE("get remote uuid by network id fail, error: %d", ret);
235 return INVALID_CHANNEL_ID;
236 }
237
238 TRAN_CHECK_AND_RETURN_RET_LOG(
239 SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK, INVALID_CHANNEL_ID, "lock failed");
240 struct PipelineChannelItem *target = SearchChannelItemUnsafe(uuid, CompareByUuid);
241 if (target == NULL) {
242 TLOGE("channel not found");
243 SoftBusMutexUnlock(&g_manager.channels->lock);
244 return INVALID_CHANNEL_ID;
245 }
246 int32_t channelId = target->channelId;
247 SoftBusMutexUnlock(&g_manager.channels->lock);
248 return channelId;
249 }
250
TransProxyPipelineGetUuidByChannelId(int32_t channelId,char * uuid,uint32_t uuidLen)251 int32_t TransProxyPipelineGetUuidByChannelId(int32_t channelId, char *uuid, uint32_t uuidLen)
252 {
253 TLOGI("enter");
254 TRAN_CHECK_AND_RETURN_RET_LOG(
255 SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR, "lock failed");
256
257 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
258 if (target == NULL) {
259 TLOGW("channel id: %d not exist", channelId);
260 SoftBusMutexUnlock(&g_manager.channels->lock);
261 return SOFTBUS_NOT_FIND;
262 }
263 if (strcpy_s(uuid, uuidLen, target->uuid) != EOK) {
264 SoftBusMutexUnlock(&g_manager.channels->lock);
265 return SOFTBUS_STRCPY_ERR;
266 }
267 SoftBusMutexUnlock(&g_manager.channels->lock);
268 return SOFTBUS_OK;
269 }
270
TransProxyPipelineCloseChannel(int32_t channelId)271 int32_t TransProxyPipelineCloseChannel(int32_t channelId)
272 {
273 TLOGI("enter");
274 TRAN_CHECK_AND_RETURN_RET_LOG(
275 SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR, "lock failed");
276
277 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
278 if (target != NULL) {
279 ListDelete(&target->node);
280 g_manager.channels->cnt -= 1;
281 SoftBusFree(target);
282 }
283 SoftBusMutexUnlock(&g_manager.channels->lock);
284 TLOGW("close channel id: %d", channelId);
285 return TransCloseNetWorkingChannel(channelId);
286 }
287
TransProxyPipelineCloseChannelDelay(int32_t channelId)288 int32_t TransProxyPipelineCloseChannelDelay(int32_t channelId)
289 {
290 #define DELAY_CLOSE_CHANNEL_MS 3000
291 TLOGI("enter");
292 TRAN_CHECK_AND_RETURN_RET_LOG(
293 channelId != INVALID_CHANNEL_ID, SOFTBUS_INVALID_PARAM, "invalid channel id: %d", channelId);
294 struct SoftBusMessage *msg = SoftBusCalloc(sizeof(SoftBusMessage));
295 if (msg == NULL) {
296 TLOGE("malloc msg failed, channel id: %d", channelId);
297 return SOFTBUS_MEM_ERR;
298 }
299 msg->what = LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL;
300 msg->arg1 = channelId;
301 msg->handler = &g_manager.handler;
302 msg->FreeMessage = TransProxyPipelineFreeMessage;
303 g_manager.looper->PostMessageDelay(g_manager.looper, msg, DELAY_CLOSE_CHANNEL_MS);
304 return SOFTBUS_OK;
305 }
306
InnerSaveChannel(int32_t channelId,const char * uuid)307 int32_t InnerSaveChannel(int32_t channelId, const char *uuid)
308 {
309 TRAN_CHECK_AND_RETURN_RET_LOG(
310 SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR, "lock failed");
311 struct PipelineChannelItem *item = SoftBusCalloc(sizeof(struct PipelineChannelItem));
312 if (item == NULL) {
313 SoftBusMutexUnlock(&g_manager.channels->lock);
314 return SOFTBUS_LOCK_ERR;
315 }
316 item->channelId = channelId;
317 if (strcpy_s(item->uuid, UUID_BUF_LEN, uuid) != EOK) {
318 SoftBusFree(item);
319 SoftBusMutexUnlock(&g_manager.channels->lock);
320 return SOFTBUS_STRCPY_ERR;
321 }
322 ListInit(&item->node);
323 ListAdd(&g_manager.channels->list, &item->node);
324 g_manager.channels->cnt += 1;
325 SoftBusMutexUnlock(&g_manager.channels->lock);
326 return SOFTBUS_OK;
327 }
328
TransProxyPipelineOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)329 static int TransProxyPipelineOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
330 {
331 TLOGI("enter");
332 char *clone = SoftBusCalloc(UUID_BUF_LEN);
333 if (clone == NULL || strcpy_s(clone, UUID_BUF_LEN, uuid) != EOK) {
334 TLOGE("copy uuid failed, channel id: %d", channelId);
335 SoftBusFree(clone);
336 return SOFTBUS_MEM_ERR;
337 }
338 struct SoftBusMessage *msg = SoftBusCalloc(sizeof(SoftBusMessage));
339 if (msg == NULL) {
340 TLOGE("malloc msg failed, channel id: %d", channelId);
341 SoftBusFree(clone);
342 return SOFTBUS_MEM_ERR;
343 }
344 msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPENED;
345 msg->arg1 = channelId;
346 msg->arg2 = isServer;
347 msg->obj = clone;
348 msg->handler = &g_manager.handler;
349 msg->FreeMessage = TransProxyPipelineFreeMessage;
350 g_manager.looper->PostMessage(g_manager.looper, msg);
351 return SOFTBUS_OK;
352 }
353
InnerOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)354 static void InnerOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
355 {
356 TLOGI("enter");
357 if (isServer) {
358 if (InnerSaveChannel(channelId, uuid) != SOFTBUS_OK) {
359 TLOGE("save server channel failed");
360 TransCloseNetWorkingChannel(channelId);
361 }
362 return;
363 }
364 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
365 if (ret != SOFTBUS_OK) {
366 TLOGE("lock channels failed, channel id: %d, error: %d", channelId, ret);
367 TransCloseNetWorkingChannel(channelId);
368 return;
369 }
370
371 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
372 if (target == NULL) {
373 TLOGE("channel id: %d not found", channelId);
374 SoftBusMutexUnlock(&g_manager.channels->lock);
375 TransCloseNetWorkingChannel(channelId);
376 return;
377 }
378 int32_t requestId = target->requestId;
379 ITransProxyPipelineCallback callback = {
380 .onChannelOpened = target->callback.onChannelOpened,
381 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
382 };
383 if (strcpy_s(target->uuid, UUID_BUF_LEN, uuid) != EOK) {
384 TLOGE("strcpy uuid failed, channel id: %d", channelId);
385 ListDelete(&target->node);
386 SoftBusFree(target);
387 g_manager.channels->cnt -= 1;
388 ret = SOFTBUS_STRCPY_ERR;
389 }
390 SoftBusMutexUnlock(&g_manager.channels->lock);
391 if (ret != SOFTBUS_OK) {
392 TransCloseNetWorkingChannel(channelId);
393 callback.onChannelOpenFailed(requestId, ret);
394 } else {
395 callback.onChannelOpened(requestId, channelId);
396 }
397 }
398
TransProxyPipelineOnChannelOpenFailed(int32_t channelId,const char * uuid)399 static void TransProxyPipelineOnChannelOpenFailed(int32_t channelId, const char *uuid)
400 {
401 (void)uuid;
402 TLOGI("enter");
403 struct SoftBusMessage *msg = SoftBusCalloc(sizeof(SoftBusMessage));
404 if (msg == NULL) {
405 TLOGE("malloc msg failed, channel id: %d", channelId);
406 return;
407 }
408 msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED;
409 msg->arg1 = channelId;
410 msg->handler = &g_manager.handler;
411 msg->FreeMessage = TransProxyPipelineFreeMessage;
412 g_manager.looper->PostMessage(g_manager.looper, msg);
413 }
414
InnerOnChannelOpenFailed(int32_t channelId)415 static void InnerOnChannelOpenFailed(int32_t channelId)
416 {
417 TLOGI("enter");
418 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
419 if (ret != SOFTBUS_OK) {
420 TLOGE("lock channels failed, channel id: %d, error: %d", channelId, ret);
421 return;
422 }
423
424 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
425 if (target == NULL) {
426 TLOGE("channel id: %d not found", channelId);
427 SoftBusMutexUnlock(&g_manager.channels->lock);
428 return;
429 }
430 int32_t requestId = target->requestId;
431 ITransProxyPipelineCallback callback = {
432 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
433 };
434 ListDelete(&target->node);
435 SoftBusFree(target);
436 g_manager.channels->cnt -= 1;
437 SoftBusMutexUnlock(&g_manager.channels->lock);
438 callback.onChannelOpenFailed(requestId, SOFTBUS_ERR);
439 TLOGI("exit");
440 }
441
TransProxyPipelineOnChannelClosed(int32_t channelId)442 static void TransProxyPipelineOnChannelClosed(int32_t channelId)
443 {
444 TLOGI("enter");
445 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
446 if (ret != SOFTBUS_OK) {
447 TLOGE("lock channels failed, channel id: %d, error: %d", channelId, ret);
448 goto exit;
449 }
450 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
451 if (target != NULL) {
452 ListDelete(&target->node);
453 SoftBusFree(target);
454 g_manager.channels->cnt -= 1;
455 }
456 SoftBusMutexUnlock(&g_manager.channels->lock);
457 exit:
458 for (int32_t i = 0; i < MSG_TYPE_CNT; i++) {
459 if (g_manager.listeners[i].type != MSG_TYPE_INVALID && g_manager.listeners[i].listener.onDisconnected != NULL) {
460 g_manager.listeners[i].listener.onDisconnected(channelId);
461 }
462 }
463 }
464
TransProxyPipelineOnMessageReceived(int32_t channelId,const char * data,uint32_t len)465 static void TransProxyPipelineOnMessageReceived(int32_t channelId, const char *data, uint32_t len)
466 {
467 TLOGI("enter");
468 TRAN_CHECK_AND_RETURN_LOG(data, "data is invalid");
469 TRAN_CHECK_AND_RETURN_LOG(len > sizeof(uint32_t), "len: %d is too short", len);
470
471 uint32_t msgType = *(uint32_t *)data;
472 struct ListenerItem *target = NULL;
473 for (int32_t i = 0; i < MSG_TYPE_CNT; i++) {
474 if ((uint32_t)(g_manager.listeners[i].type) == msgType) {
475 target = g_manager.listeners + i;
476 break;
477 }
478 }
479
480 if (target == NULL || target->listener.onDataReceived == NULL) {
481 TLOGE("not listener for msg type: %u", msgType);
482 return;
483 }
484 target->listener.onDataReceived(channelId, data + sizeof(uint32_t), len - sizeof(uint32_t));
485 }
486
InnerOpenProxyChannel(int32_t requestId)487 static void InnerOpenProxyChannel(int32_t requestId)
488 {
489 TLOGI("enter");
490 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
491 if (ret != SOFTBUS_OK) {
492 TLOGE("lock channels failed, request id: %d, error: %d", requestId, ret);
493 return;
494 }
495 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
496 if (target == NULL) {
497 TLOGE("request id %d not found", requestId);
498 SoftBusMutexUnlock(&g_manager.channels->lock);
499 return;
500 }
501 ITransProxyPipelineCallback callback = {
502 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
503 };
504 LanePreferredLinkList preferred = { 0 };
505 if (target->option.bleDirect) {
506 preferred.linkTypeNum = 1;
507 preferred.linkType[0] = LANE_COC_DIRECT;
508 }
509 char networkId[NETWORK_ID_BUF_LEN] = { 0 };
510 if (strcpy_s(networkId, sizeof(networkId), target->networkId) != EOK) {
511 TLOGE("strcpy_s failed, request id: %d", requestId);
512 ListDelete(&target->node);
513 g_manager.channels->cnt -= 1;
514 SoftBusFree(target);
515 SoftBusMutexUnlock(&g_manager.channels->lock);
516 callback.onChannelOpenFailed(requestId, SOFTBUS_STRCPY_ERR);
517 return;
518 }
519 target = NULL;
520 SoftBusMutexUnlock(&g_manager.channels->lock);
521
522 int32_t channelId = TransOpenNetWorkingChannel(SESSION_NAME, networkId, &preferred);
523 ret = SoftBusMutexLock(&g_manager.channels->lock);
524 if (ret != SOFTBUS_OK) {
525 TLOGE("lock channels failed, channel id: %d, error: %d", channelId, ret);
526 return;
527 }
528 target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
529 if (target == NULL) {
530 TLOGE("open proxy session failed, request id: %d, channel id: %d", requestId, channelId);
531 SoftBusMutexUnlock(&g_manager.channels->lock);
532 if (channelId != INVALID_CHANNEL_ID) {
533 TransCloseNetWorkingChannel(channelId);
534 }
535 return;
536 }
537 callback.onChannelOpenFailed = target->callback.onChannelOpenFailed;
538 if (channelId == INVALID_CHANNEL_ID) {
539 TLOGE("open proxy channel failed, request id: %d", requestId);
540 ListDelete(&target->node);
541 g_manager.channels->cnt -= 1;
542 SoftBusFree(target);
543 SoftBusMutexUnlock(&g_manager.channels->lock);
544 callback.onChannelOpenFailed(requestId, SOFTBUS_ERR);
545 return;
546 }
547 target->channelId = channelId;
548 SoftBusMutexUnlock(&g_manager.channels->lock);
549 }
550
TransProxyPipelineHandleMessage(SoftBusMessage * msg)551 static void TransProxyPipelineHandleMessage(SoftBusMessage *msg)
552 {
553 TLOGI("enter, what: %d", msg->what);
554 switch (msg->what) {
555 case LOOPER_MSG_TYPE_OPEN_CHANNEL:
556 InnerOpenProxyChannel(msg->arg1);
557 break;
558 case LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL:
559 TransProxyPipelineCloseChannel(msg->arg1);
560 break;
561 case LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED:
562 InnerOnChannelOpenFailed(msg->arg1);
563 break;
564 case LOOPER_MSG_TYPE_ON_CHANNEL_OPENED:
565 InnerOnChannelOpened(msg->arg1, msg->obj, msg->arg2);
566 break;
567 default:
568 TLOGE("unknown message type: %d", msg->what);
569 break;
570 }
571 }
572
TransProxyPipelineInit(void)573 int32_t TransProxyPipelineInit(void)
574 {
575 TLOGI("enter");
576 if (g_manager.inited) {
577 return SOFTBUS_OK;
578 };
579 SoftBusList *channels = CreateSoftBusList();
580 if (channels == NULL) {
581 goto exit;
582 }
583 if (SoftBusMutexInit(&g_manager.lock, NULL) != SOFTBUS_OK) {
584 goto exit;
585 }
586 g_manager.channels = channels;
587
588 INetworkingListener listener = {
589 .onChannelOpened = TransProxyPipelineOnChannelOpened,
590 .onChannelOpenFailed = TransProxyPipelineOnChannelOpenFailed,
591 .onChannelClosed = TransProxyPipelineOnChannelClosed,
592 .onMessageReceived = TransProxyPipelineOnMessageReceived,
593 };
594 int32_t ret = TransRegisterNetworkingChannelListener(SESSION_NAME, &listener);
595 if (ret != SOFTBUS_OK) {
596 goto exit;
597 }
598 g_manager.looper = CreateNewLooper("proxy_looper");
599 g_manager.handler.looper = g_manager.looper;
600 g_manager.handler.name = "ProxyChannelPipelineHandler";
601 g_manager.handler.HandleMessage = TransProxyPipelineHandleMessage;
602 g_manager.inited = true;
603 return SOFTBUS_OK;
604 exit:
605 if (channels != NULL) {
606 DestroySoftBusList(channels);
607 }
608 g_manager.channels = NULL;
609 SoftBusMutexDestroy(&g_manager.lock);
610 g_manager.inited = false;
611
612 return SOFTBUS_ERR;
613 }