1 /*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "softbus_proxychannel_pipeline.h"
17
18 #include <securec.h>
19 #include <stdatomic.h>
20
21 #include "bus_center_manager.h"
22 #include "common_list.h"
23 #include "conn_log.h"
24 #include "g_enhance_conn_func.h"
25 #include "g_enhance_conn_func_pack.h"
26 #include "lnn_lane_interface.h"
27 #include "message_handler.h"
28 #include "softbus_adapter_mem.h"
29 #include "softbus_adapter_socket.h"
30 #include "softbus_error_code.h"
31 #include "softbus_transmission_interface.h"
32 #include "softbus_utils.h"
33 #include "softbus_init_common.h"
34 #include "trans_log.h"
35
36 #define SESSION_NAME "ohos.dsoftbus.inner.p2pchannel"
37 #define PIPELINEHANDLER_NAME "ProxyChannelPipelineHandler"
38 #define MSG_CNT 2
39
40 enum PipelineLooperMsgType {
41 LOOPER_MSG_TYPE_OPEN_CHANNEL,
42 LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL,
43
44 LOOPER_MSG_TYPE_ON_CHANNEL_OPENED,
45 LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED,
46 };
47
48 struct ListenerItem {
49 TransProxyPipelineMsgType type;
50 ITransProxyPipelineListener listener;
51 };
52
53 struct PipelineChannelItem {
54 ListNode node;
55
56 // for open channel request context
57 int32_t requestId;
58 char networkId[NETWORK_ID_BUF_LEN];
59 TransProxyPipelineChannelOption option;
60 ITransProxyPipelineCallback callback;
61
62 // for channel opened context
63 int32_t channelId;
64 int32_t ref;
65 char uuid[UUID_BUF_LEN];
66 };
67
68 struct PipelineManager {
69 _Atomic bool inited;
70 SoftBusMutex lock;
71 struct ListenerItem listeners[MSG_TYPE_CNT];
72 SoftBusList *channels;
73
74 SoftBusLooper *looper;
75 SoftBusHandler handler;
76 };
77
78 static struct PipelineManager g_manager = {
79 .inited = false,
80 .listeners = {},
81 .looper = NULL,
82 .handler = {},
83 };
84
85 typedef bool (*Comparable)(const struct PipelineChannelItem *item, const void *param);
SearchChannelItemUnsafe(const void * param,Comparable func)86 static struct PipelineChannelItem *SearchChannelItemUnsafe(const void *param, Comparable func)
87 {
88 struct PipelineChannelItem *target = NULL;
89 struct PipelineChannelItem *it = NULL;
90 LIST_FOR_EACH_ENTRY(it, &g_manager.channels->list, struct PipelineChannelItem, node) {
91 if (func(it, param)) {
92 target = it;
93 }
94 }
95 return target;
96 }
97
CompareByRequestId(const struct PipelineChannelItem * item,const void * param)98 static bool CompareByRequestId(const struct PipelineChannelItem *item, const void *param)
99 {
100 return item->requestId == *(int32_t *)param;
101 }
102
CompareByChannelId(const struct PipelineChannelItem * item,const void * param)103 static bool CompareByChannelId(const struct PipelineChannelItem *item, const void *param)
104 {
105 return item->channelId == *(int32_t *)param;
106 }
107
CompareByUuid(const struct PipelineChannelItem * item,const void * param)108 static bool CompareByUuid(const struct PipelineChannelItem *item, const void *param)
109 {
110 return strlen(item->uuid) != 0 && strcmp(item->uuid, (const char *)param) == 0;
111 }
112
TransProxyPipelineFreeMessage(SoftBusMessage * msg)113 static void TransProxyPipelineFreeMessage(SoftBusMessage *msg)
114 {
115 TRANS_CHECK_AND_RETURN_LOGW(msg, TRANS_CTRL, "null msg");
116 if (msg->obj != NULL) {
117 SoftBusFree(msg->obj);
118 msg->obj = NULL;
119 }
120 SoftBusFree(msg);
121 }
122
TransProxyReuseByChannelId(int32_t channelId)123 int32_t TransProxyReuseByChannelId(int32_t channelId)
124 {
125 TRANS_LOGD(TRANS_CTRL, "enter.");
126 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
127 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
128 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
129 if (target == NULL) {
130 TRANS_LOGE(TRANS_CTRL, "channel not exist. channelId=%{public}d", channelId);
131 SoftBusMutexUnlock(&g_manager.channels->lock);
132 return SOFTBUS_NOT_FIND;
133 }
134 target->ref++;
135 SoftBusMutexUnlock(&g_manager.channels->lock);
136 return SOFTBUS_OK;
137 }
138
TransProxyPipelineGenRequestId(void)139 int32_t TransProxyPipelineGenRequestId(void)
140 {
141 static int32_t requestIdGenerator = 0;
142 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
143 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
144 int32_t retValue = ++requestIdGenerator;
145 SoftBusMutexUnlock(&g_manager.channels->lock);
146 return retValue;
147 }
148
TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type,const ITransProxyPipelineListener * listener)149 int32_t TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type, const ITransProxyPipelineListener *listener)
150 {
151 TRANS_LOGD(TRANS_CTRL, "enter.");
152 TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
153 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d", type);
154 TRANS_CHECK_AND_RETURN_RET_LOGW(listener && listener->onDataReceived && listener->onDisconnected,
155 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "listen is invalid");
156
157 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
158 TRANS_CTRL, "lock failed");
159 for (int32_t i = 0; i < MSG_CNT; i++) {
160 if (g_manager.listeners[i].type == type) {
161 TRANS_LOGW(TRANS_CTRL, "repeat register listener, overwrite it. type=%{public}d", type);
162 g_manager.listeners[i].listener = *listener;
163 SoftBusMutexUnlock(&g_manager.lock);
164 return SOFTBUS_OK;
165 }
166 if (g_manager.listeners[i].type == MSG_TYPE_INVALID) {
167 g_manager.listeners[i].type = type;
168 g_manager.listeners[i].listener = *listener;
169 SoftBusMutexUnlock(&g_manager.lock);
170 return SOFTBUS_OK;
171 }
172 }
173 TRANS_LOGE(TRANS_CTRL, "register listener failed: no position. type=%{public}d", type);
174 SoftBusMutexUnlock(&g_manager.lock);
175 return SOFTBUS_TRANS_REGISTER_LISTENER_FAILED;
176 }
177
TransProxyPipelineOpenChannel(int32_t requestId,const char * networkId,const TransProxyPipelineChannelOption * option,const ITransProxyPipelineCallback * callback)178 int32_t TransProxyPipelineOpenChannel(int32_t requestId, const char *networkId,
179 const TransProxyPipelineChannelOption *option, const ITransProxyPipelineCallback *callback)
180 {
181 TRANS_LOGD(TRANS_CTRL, "enter.");
182 if (!IsValidString(networkId, ID_MAX_LEN)) {
183 return SOFTBUS_INVALID_PARAM;
184 }
185 TRANS_CHECK_AND_RETURN_RET_LOGE(option != NULL, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "option invalid");
186 TRANS_CHECK_AND_RETURN_RET_LOGE(networkId, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid network id");
187 TRANS_CHECK_AND_RETURN_RET_LOGE(callback && callback->onChannelOpened && callback->onChannelOpenFailed,
188 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid callback");
189
190 if (option->bleDirect) {
191 TRANS_CHECK_AND_RETURN_RET_LOGE(
192 ConnBleDirectIsEnablePacked(BLE_COC), SOFTBUS_FUNC_NOT_SUPPORT,
193 TRANS_CTRL, "ble direct is not enable");
194 }
195 struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
196 TRANS_CHECK_AND_RETURN_RET_LOGE(
197 item != NULL, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc item failed, reqId=%{public}d", requestId);
198 item->requestId = requestId;
199 if (strcpy_s(item->networkId, NETWORK_ID_BUF_LEN, networkId) != EOK) {
200 TRANS_LOGE(TRANS_CTRL, "strcpy_s network id failed, reqId=%{public}d", requestId);
201 SoftBusFree(item);
202 return SOFTBUS_STRCPY_ERR;
203 }
204 item->option = *option;
205 item->callback = *callback;
206 item->channelId = INVALID_CHANNEL_ID;
207
208 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
209 if (msg == NULL) {
210 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, reqId=%{public}d", requestId);
211 SoftBusFree(item);
212 return SOFTBUS_MALLOC_ERR;
213 }
214 msg->what = LOOPER_MSG_TYPE_OPEN_CHANNEL;
215 msg->arg1 = (uint64_t)requestId;
216 msg->handler = &g_manager.handler;
217 msg->FreeMessage = TransProxyPipelineFreeMessage;
218
219 if (SoftBusMutexLock(&g_manager.channels->lock) != SOFTBUS_OK) {
220 TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d", requestId);
221 SoftBusFree(item);
222 SoftBusFree(msg);
223 return SOFTBUS_LOCK_ERR;
224 }
225 ListInit(&item->node);
226 ListAdd(&g_manager.channels->list, &item->node);
227 TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
228 g_manager.channels->cnt++;
229 SoftBusMutexUnlock(&g_manager.channels->lock);
230
231 g_manager.looper->PostMessage(g_manager.looper, msg);
232 return SOFTBUS_OK;
233 }
234
TransProxyPipelineSendMessage(int32_t channelId,const uint8_t * data,uint32_t dataLen,TransProxyPipelineMsgType type)235 int32_t TransProxyPipelineSendMessage(
236 int32_t channelId, const uint8_t *data, uint32_t dataLen, TransProxyPipelineMsgType type)
237 {
238 TRANS_LOGD(TRANS_CTRL, "enter.");
239 TRANS_CHECK_AND_RETURN_RET_LOGW(data, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "data is invalid");
240 TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
241 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d ", type);
242
243 char *sendData = (char *)SoftBusCalloc(dataLen + sizeof(uint32_t));
244 TRANS_CHECK_AND_RETURN_RET_LOGW(sendData, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc send data failed");
245 *(uint32_t *)sendData = SoftBusHtoLl((uint32_t)type);
246 if (memcpy_s(sendData + sizeof(uint32_t), dataLen, data, dataLen) != EOK) {
247 TRANS_LOGE(TRANS_CTRL, "memcpy send data failed");
248 SoftBusFree(sendData);
249 return SOFTBUS_MEM_ERR;
250 }
251 int32_t ret = TransSendNetworkingMessage(channelId, sendData, dataLen + sizeof(uint32_t), CONN_HIGH);
252 if (ret != SOFTBUS_OK) {
253 TRANS_LOGE(TRANS_CTRL, "trans send data failed");
254 SoftBusFree(sendData);
255 return ret;
256 }
257 SoftBusFree(sendData);
258 return SOFTBUS_OK;
259 }
260
TransProxyPipelineGetChannelIdByNetworkId(const char * networkId)261 int32_t TransProxyPipelineGetChannelIdByNetworkId(const char *networkId)
262 {
263 TRANS_LOGD(TRANS_CTRL, "enter.");
264 if (!IsValidString(networkId, ID_MAX_LEN)) {
265 return SOFTBUS_INVALID_PARAM;
266 }
267 char uuid[UUID_BUF_LEN] = { 0 };
268 int32_t ret = LnnGetRemoteStrInfo(networkId, STRING_KEY_UUID, uuid, sizeof(uuid));
269 if (ret != SOFTBUS_OK) {
270 TRANS_LOGE(TRANS_CTRL, "get remote uuid by network id fail, ret=%{public}d", ret);
271 return INVALID_CHANNEL_ID;
272 }
273
274 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
275 INVALID_CHANNEL_ID, TRANS_CTRL, "lock failed");
276 struct PipelineChannelItem *target = SearchChannelItemUnsafe(uuid, CompareByUuid);
277 if (target == NULL) {
278 TRANS_LOGE(TRANS_CTRL, "channel not found");
279 SoftBusMutexUnlock(&g_manager.channels->lock);
280 return INVALID_CHANNEL_ID;
281 }
282 int32_t channelId = target->channelId;
283 SoftBusMutexUnlock(&g_manager.channels->lock);
284 return channelId;
285 }
286
TransProxyPipelineGetUuidByChannelId(int32_t channelId,char * uuid,uint32_t uuidLen)287 int32_t TransProxyPipelineGetUuidByChannelId(int32_t channelId, char *uuid, uint32_t uuidLen)
288 {
289 TRANS_LOGD(TRANS_CTRL, "enter.");
290 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
291 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
292 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
293 if (target == NULL) {
294 TRANS_LOGE(TRANS_CTRL, "channelId not exist. channelId=%{public}d", channelId);
295 SoftBusMutexUnlock(&g_manager.channels->lock);
296 return SOFTBUS_NOT_FIND;
297 }
298 if (strcpy_s(uuid, uuidLen, target->uuid) != EOK) {
299 SoftBusMutexUnlock(&g_manager.channels->lock);
300 return SOFTBUS_STRCPY_ERR;
301 }
302 SoftBusMutexUnlock(&g_manager.channels->lock);
303 return SOFTBUS_OK;
304 }
305
TransProxyPipelineCloseChannel(int32_t channelId)306 int32_t TransProxyPipelineCloseChannel(int32_t channelId)
307 {
308 TRANS_LOGI(TRANS_CTRL, "enter.");
309 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
310 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
311
312 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
313 if (target != NULL) {
314 target->ref--;
315 if (target->ref <= 0) {
316 ListDelete(&target->node);
317 g_manager.channels->cnt -= 1;
318 SoftBusFree(target);
319 SoftBusMutexUnlock(&g_manager.channels->lock);
320 TRANS_LOGW(TRANS_CTRL, "close channelId=%{public}d", channelId);
321 return TransCloseNetWorkingChannel(channelId);
322 }
323 TRANS_LOGI(TRANS_CTRL, "channelId=%{public}d, ref=%{public}d", channelId, target->ref);
324 }
325 SoftBusMutexUnlock(&g_manager.channels->lock);
326 return SOFTBUS_OK;
327 }
328
TransProxyPipelineCloseChannelDelay(int32_t channelId)329 int32_t TransProxyPipelineCloseChannelDelay(int32_t channelId)
330 {
331 #define DELAY_CLOSE_CHANNEL_MS 3000
332 TRANS_LOGD(TRANS_CTRL, "enter.");
333 TRANS_CHECK_AND_RETURN_RET_LOGW(channelId != INVALID_CHANNEL_ID, SOFTBUS_INVALID_PARAM,
334 TRANS_CTRL, "invalid channelId=%{public}d", channelId);
335 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
336 if (msg == NULL) {
337 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
338 return SOFTBUS_MALLOC_ERR;
339 }
340 msg->what = LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL;
341 msg->arg1 = (uint64_t)channelId;
342 msg->handler = &g_manager.handler;
343 msg->FreeMessage = TransProxyPipelineFreeMessage;
344 g_manager.looper->PostMessageDelay(g_manager.looper, msg, DELAY_CLOSE_CHANNEL_MS);
345 return SOFTBUS_OK;
346 }
347
InnerSaveChannel(int32_t channelId,const char * uuid)348 int32_t InnerSaveChannel(int32_t channelId, const char *uuid)
349 {
350 if (uuid == NULL) {
351 TRANS_LOGE(TRANS_CTRL, "invalid uuid");
352 return SOFTBUS_TRANS_INVALID_UUID;
353 }
354 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
355 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
356 struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
357 if (item == NULL) {
358 SoftBusMutexUnlock(&g_manager.channels->lock);
359 return SOFTBUS_MALLOC_ERR;
360 }
361 item->channelId = channelId;
362 if (strcpy_s(item->uuid, UUID_BUF_LEN, uuid) != EOK) {
363 SoftBusFree(item);
364 SoftBusMutexUnlock(&g_manager.channels->lock);
365 return SOFTBUS_STRCPY_ERR;
366 }
367 ListInit(&item->node);
368 ListAdd(&g_manager.channels->list, &item->node);
369 TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
370 g_manager.channels->cnt += 1;
371 SoftBusMutexUnlock(&g_manager.channels->lock);
372 return SOFTBUS_OK;
373 }
374
TransProxyPipelineOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)375 static int TransProxyPipelineOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
376 {
377 TRANS_LOGD(TRANS_CTRL, "enter.");
378 if (uuid == NULL) {
379 TRANS_LOGE(TRANS_CTRL, "invalid uuid");
380 return SOFTBUS_TRANS_INVALID_UUID;
381 }
382 char *clone = (char *)SoftBusCalloc(UUID_BUF_LEN);
383 if (clone == NULL || strcpy_s(clone, UUID_BUF_LEN, uuid) != EOK) {
384 TRANS_LOGE(TRANS_CTRL, "copy uuid failed, channelId=%{public}d", channelId);
385 SoftBusFree(clone);
386 return SOFTBUS_MEM_ERR;
387 }
388 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
389 if (msg == NULL) {
390 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
391 SoftBusFree(clone);
392 return SOFTBUS_MALLOC_ERR;
393 }
394 msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPENED;
395 msg->arg1 = (uint64_t)channelId;
396 msg->arg2 = isServer;
397 msg->obj = clone;
398 msg->handler = &g_manager.handler;
399 msg->FreeMessage = TransProxyPipelineFreeMessage;
400 g_manager.looper->PostMessage(g_manager.looper, msg);
401 return SOFTBUS_OK;
402 }
403 #ifdef __cplusplus
404 extern "C" {
405 #endif
InnerOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)406 static void InnerOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
407 {
408 TRANS_LOGD(TRANS_CTRL, "enter.");
409 if (isServer) {
410 if (InnerSaveChannel(channelId, uuid) != SOFTBUS_OK) {
411 TRANS_LOGE(TRANS_CTRL, "save server channel failed");
412 TransCloseNetWorkingChannel(channelId);
413 }
414 return;
415 }
416 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
417 if (ret != SOFTBUS_OK) {
418 TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
419 TransCloseNetWorkingChannel(channelId);
420 return;
421 }
422
423 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
424 if (target == NULL) {
425 TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
426 SoftBusMutexUnlock(&g_manager.channels->lock);
427 TransCloseNetWorkingChannel(channelId);
428 return;
429 }
430 int32_t requestId = target->requestId;
431 ITransProxyPipelineCallback callback = {
432 .onChannelOpened = target->callback.onChannelOpened,
433 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
434 };
435 if (strcpy_s(target->uuid, UUID_BUF_LEN, uuid) != EOK) {
436 TRANS_LOGE(TRANS_CTRL, "strcpy uuid failed, channelId=%{public}d", channelId);
437 ListDelete(&target->node);
438 SoftBusFree(target);
439 g_manager.channels->cnt -= 1;
440 ret = SOFTBUS_STRCPY_ERR;
441 }
442 SoftBusMutexUnlock(&g_manager.channels->lock);
443 if (ret != SOFTBUS_OK) {
444 TransCloseNetWorkingChannel(channelId);
445 callback.onChannelOpenFailed(requestId, ret);
446 } else {
447 callback.onChannelOpened(requestId, channelId);
448 }
449 }
450 #ifdef __cplusplus
451 }
452 #endif
TransProxyPipelineOnChannelOpenFailed(int32_t channelId,const char * uuid)453 static void TransProxyPipelineOnChannelOpenFailed(int32_t channelId, const char *uuid)
454 {
455 (void)uuid;
456 TRANS_LOGD(TRANS_CTRL, "enter.");
457 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
458 if (msg == NULL) {
459 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
460 return;
461 }
462 msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED;
463 msg->arg1 = (uint64_t)channelId;
464 msg->handler = &g_manager.handler;
465 msg->FreeMessage = TransProxyPipelineFreeMessage;
466 g_manager.looper->PostMessage(g_manager.looper, msg);
467 }
468
InnerOnChannelOpenFailed(int32_t channelId)469 static void InnerOnChannelOpenFailed(int32_t channelId)
470 {
471 TRANS_LOGD(TRANS_CTRL, "enter.");
472 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
473 if (ret != SOFTBUS_OK) {
474 TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
475 return;
476 }
477
478 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
479 if (target == NULL) {
480 TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
481 SoftBusMutexUnlock(&g_manager.channels->lock);
482 return;
483 }
484 int32_t requestId = target->requestId;
485 ITransProxyPipelineCallback callback = {
486 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
487 };
488 ListDelete(&target->node);
489 TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
490 SoftBusFree(target);
491 g_manager.channels->cnt -= 1;
492 SoftBusMutexUnlock(&g_manager.channels->lock);
493 callback.onChannelOpenFailed(requestId, SOFTBUS_TRANS_CHANNEL_OPEN_FAILED);
494 TRANS_LOGI(TRANS_CTRL, "exit");
495 }
496
TransProxyPipelineOnChannelClosed(int32_t channelId)497 static void TransProxyPipelineOnChannelClosed(int32_t channelId)
498 {
499 TRANS_LOGD(TRANS_CTRL, "enter.");
500 struct PipelineChannelItem *target = NULL;
501 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
502 if (ret != SOFTBUS_OK) {
503 TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
504 goto exit;
505 }
506 target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
507 if (target != NULL) {
508 ListDelete(&target->node);
509 TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
510 SoftBusFree(target);
511 g_manager.channels->cnt -= 1;
512 }
513 SoftBusMutexUnlock(&g_manager.channels->lock);
514 exit:
515 for (int32_t i = 0; i < MSG_CNT; i++) {
516 if (g_manager.listeners[i].type != MSG_TYPE_INVALID && g_manager.listeners[i].listener.onDisconnected != NULL) {
517 g_manager.listeners[i].listener.onDisconnected(channelId);
518 }
519 }
520 }
521
TransProxyPipelineOnMessageReceived(int32_t channelId,const char * data,uint32_t len)522 static void TransProxyPipelineOnMessageReceived(int32_t channelId, const char *data, uint32_t len)
523 {
524 TRANS_LOGD(TRANS_CTRL, "enter.");
525 TRANS_CHECK_AND_RETURN_LOGW(data, TRANS_CTRL, "data is invalid");
526 TRANS_CHECK_AND_RETURN_LOGW(len > sizeof(uint32_t), TRANS_CTRL, "len is too short. len=%{public}d", len);
527
528 uint32_t msgType = SoftBusLtoHl(*(uint32_t *)data);
529 struct ListenerItem *target = NULL;
530 for (int32_t i = 0; i < MSG_CNT; i++) {
531 if ((uint32_t)(g_manager.listeners[i].type) == msgType) {
532 target = g_manager.listeners + i;
533 break;
534 }
535 }
536
537 if (target == NULL || target->listener.onDataReceived == NULL) {
538 TRANS_LOGE(TRANS_CTRL, "not listener for msgType=%{public}u", msgType);
539 return;
540 }
541 target->listener.onDataReceived(channelId, data + sizeof(uint32_t), len - sizeof(uint32_t));
542 }
543
OpenNetWorkingChannel(int32_t requestId,ITransProxyPipelineCallback * callback,LanePreferredLinkList * preferred,char * networkId,struct PipelineChannelItem * target)544 static void OpenNetWorkingChannel(int32_t requestId, ITransProxyPipelineCallback *callback,
545 LanePreferredLinkList *preferred, char *networkId, struct PipelineChannelItem *target)
546 {
547 int32_t channelId = TransOpenNetWorkingChannel(SESSION_NAME, networkId, preferred);
548 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
549 TRANS_CHECK_AND_RETURN_LOGE(ret == SOFTBUS_OK, TRANS_CTRL, "fail to lock channels.");
550 target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
551 if (target == NULL) {
552 TRANS_LOGE(TRANS_CTRL,
553 "open proxy session failed, reqId=%{public}d, channelId=%{public}d", requestId, channelId);
554 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
555 if (channelId != INVALID_CHANNEL_ID) {
556 TransCloseNetWorkingChannel(channelId);
557 }
558 return;
559 }
560 callback->onChannelOpenFailed = target->callback.onChannelOpenFailed;
561
562 if (channelId == INVALID_CHANNEL_ID) {
563 TRANS_LOGE(TRANS_CTRL, "open proxy channel failed, reqId=%{public}d", requestId);
564 ListDelete(&target->node);
565 g_manager.channels->cnt -= 1;
566 SoftBusFree(target);
567 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
568 callback->onChannelOpenFailed(requestId, SOFTBUS_TRANS_INVALID_CHANNEL_ID);
569 return;
570 }
571 target->channelId = channelId;
572 target->ref = 1;
573 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
574 }
575
InnerOpenProxyChannel(int32_t requestId)576 static void InnerOpenProxyChannel(int32_t requestId)
577 {
578 TRANS_LOGD(TRANS_CTRL, "enter.");
579 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
580 if (ret != SOFTBUS_OK) {
581 TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d, ret=%{public}d", requestId, ret);
582 return;
583 }
584 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
585 if (target == NULL) {
586 TRANS_LOGE(TRANS_CTRL, "channel not found. reqId=%{public}d", requestId);
587 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
588 return;
589 }
590 ITransProxyPipelineCallback callback = {
591 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
592 };
593 LanePreferredLinkList preferred = { 0 };
594 if (target->option.bleDirect) {
595 preferred.linkTypeNum = 1;
596 preferred.linkType[0] = LANE_COC_DIRECT;
597 }
598 char networkId[NETWORK_ID_BUF_LEN] = { 0 };
599 if (strcpy_s(networkId, sizeof(networkId), target->networkId) != EOK) {
600 TRANS_LOGE(TRANS_CTRL, "strcpy_s failed, reqId=%{public}d", requestId);
601 ListDelete(&target->node);
602 g_manager.channels->cnt -= 1;
603 SoftBusFree(target);
604 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
605 callback.onChannelOpenFailed(requestId, SOFTBUS_STRCPY_ERR);
606 return;
607 }
608 target = NULL;
609 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
610
611 OpenNetWorkingChannel(requestId, &callback, &preferred, networkId, target);
612 }
613 #ifdef __cplusplus
614 extern "C" {
615 #endif
TransProxyPipelineHandleMessage(SoftBusMessage * msg)616 static void TransProxyPipelineHandleMessage(SoftBusMessage *msg)
617 {
618 TRANS_LOGD(TRANS_CTRL, "enter, messageType=%{public}d", msg->what);
619 switch (msg->what) {
620 case LOOPER_MSG_TYPE_OPEN_CHANNEL:
621 InnerOpenProxyChannel(msg->arg1);
622 break;
623 case LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL:
624 TransProxyPipelineCloseChannel(msg->arg1);
625 break;
626 case LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED:
627 InnerOnChannelOpenFailed(msg->arg1);
628 break;
629 case LOOPER_MSG_TYPE_ON_CHANNEL_OPENED:
630 InnerOnChannelOpened(msg->arg1, (char *)msg->obj, msg->arg2);
631 break;
632 default:
633 TRANS_LOGE(TRANS_CTRL, "unknown messageType=%{public}d", msg->what);
634 break;
635 }
636 }
637
TransProxyPipelineInit(void)638 int32_t TransProxyPipelineInit(void)
639 {
640 TRANS_LOGD(TRANS_CTRL, "enter.");
641 SoftBusList *channels = NULL;
642 int32_t ret = 0;
643 INetworkingListener listener = {
644 .onChannelOpened = TransProxyPipelineOnChannelOpened,
645 .onChannelOpenFailed = TransProxyPipelineOnChannelOpenFailed,
646 .onChannelClosed = TransProxyPipelineOnChannelClosed,
647 .onMessageReceived = TransProxyPipelineOnMessageReceived,
648 };
649
650 if (atomic_load_explicit(&(g_manager.inited), memory_order_acquire)) {
651 return SOFTBUS_OK;
652 };
653 channels = CreateSoftBusList();
654 if (channels == NULL) {
655 goto exit;
656 }
657 if (SoftBusMutexInit(&g_manager.lock, NULL) != SOFTBUS_OK) {
658 goto exit;
659 }
660 g_manager.channels = channels;
661
662 ret = TransRegisterNetworkingChannelListener(SESSION_NAME, &listener);
663 if (ret != SOFTBUS_OK) {
664 goto exit;
665 }
666 g_manager.looper = GetLooper(LOOP_TYPE_DEFAULT);
667 if (g_manager.looper == NULL) {
668 TRANS_LOGE(TRANS_INIT, "fail to get looper.");
669 return SOFTBUS_LOOPER_ERR;
670 }
671 g_manager.handler.looper = g_manager.looper;
672 strcpy_s(g_manager.handler.name, strlen(PIPELINEHANDLER_NAME) + 1, PIPELINEHANDLER_NAME);
673 g_manager.handler.HandleMessage = TransProxyPipelineHandleMessage;
674 atomic_store_explicit(&(g_manager.inited), true, memory_order_release);
675 return SOFTBUS_OK;
676 exit:
677 if (channels != NULL) {
678 TRANS_LOGE(TRANS_INIT, "softbus list is not null.");
679 DestroySoftBusList(channels);
680 }
681 g_manager.channels = NULL;
682 SoftBusMutexDestroy(&g_manager.lock);
683 atomic_store_explicit(&(g_manager.inited), false, memory_order_release);
684
685 return SOFTBUS_TRANS_INIT_FAILED;
686 }
687 #ifdef __cplusplus
688 }
689 #endif