1 /*
2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsched_transport_softbus_adapter.h"
17
18 #include "distributed_sched_utils.h"
19 #include "dsched_all_connect_manager.h"
20 #include "dsched_collab_manager.h"
21 #include "dsched_continue_manager.h"
22 #include "dtbschedmgr_device_info_storage.h"
23 #include "dtbschedmgr_log.h"
24 #include "distributed_sched_service.h"
25 #include "mission/wifi_state_adapter.h"
26 #include "softbus_bus_center.h"
27 #include "softbus_common.h"
28 #include "softbus_error_code.h"
29 #ifdef DMSFWK_INTERACTIVE_ADAPTER
30 #include "softbus_adapter/softbus_adapter.h"
31 #endif
32 #include "token_setproc.h"
33
34 namespace OHOS {
35 namespace DistributedSchedule {
36 namespace {
37 const std::string TAG = "DSchedTransportSoftbusAdapter";
38 constexpr int32_t INVALID_SESSION_ID = -1;
39 constexpr int32_t BIND_RETRY_INTERVAL = 500;
40 constexpr int32_t MAX_BIND_RETRY_TIME = 4000;
41 constexpr int32_t MAX_RETRY_TIMES = 8;
42 constexpr int32_t MS_TO_US = 1000;
43 #ifndef DMS_CHECK_BLUETOOTH
44 constexpr int32_t SOFTBUS_QOS_CASE_HIGH = 0;
45 constexpr int32_t SOFTBUS_QOS_CASE_MIDDLE_HIGH = 1;
46 constexpr int32_t SOFTBUS_QOS_CASE_MIDDLE_LOW = 2;
47 constexpr int32_t SOFTBUS_QOS_CASE_LOW = 3;
48 #endif
49 }
50
51 IMPLEMENT_SINGLE_INSTANCE(DSchedTransportSoftbusAdapter);
52
53 static QosTV g_qosInfo[] = {
54 { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_QOS_TYPE_MIN_BW },
55 { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_QOS_TYPE_MAX_LATENCY },
56 { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_QOS_TYPE_MIN_LATENCY }
57 };
58
59 QosTV g_watch_collab_qosInfo[] = {
60 { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW },
61 { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_QOS_TYPE_MAX_LATENCY },
62 { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_QOS_TYPE_MIN_LATENCY }
63 };
64
65 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(QosTV));
66
OnBind(int32_t socket,PeerSocketInfo info)67 static void OnBind(int32_t socket, PeerSocketInfo info)
68 {
69 std::string peerDeviceId(info.networkId);
70 DSchedTransportSoftbusAdapter::GetInstance().OnBind(socket, peerDeviceId);
71 }
72
OnShutdown(int32_t socket,ShutdownReason reason)73 static void OnShutdown(int32_t socket, ShutdownReason reason)
74 {
75 DSchedTransportSoftbusAdapter::GetInstance().OnShutdown(socket, false);
76 }
77
OnBytes(int32_t socket,const void * data,uint32_t dataLen)78 static void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
79 {
80 DSchedTransportSoftbusAdapter::GetInstance().OnBytes(socket, data, dataLen);
81 }
82
83 ISocketListener iSocketListener = {
84 .OnBind = OnBind,
85 .OnShutdown = OnShutdown,
86 .OnBytes = OnBytes
87 };
88
DSchedTransportSoftbusAdapter()89 DSchedTransportSoftbusAdapter::DSchedTransportSoftbusAdapter()
90 {
91 }
92
~DSchedTransportSoftbusAdapter()93 DSchedTransportSoftbusAdapter::~DSchedTransportSoftbusAdapter()
94 {
95 }
96
InitChannel()97 int32_t DSchedTransportSoftbusAdapter::InitChannel()
98 {
99 HILOGI("start init channel");
100 int32_t ret = ERR_OK;
101 #ifdef DMSFWK_ALL_CONNECT_MGR
102 ret = DSchedAllConnectManager::GetInstance().InitAllConnectManager();
103 if (ret != ERR_OK) {
104 HILOGE("Init all connect manager fail, ret: %{public}d.", ret);
105 isAllConnectExist_ = false;
106 }
107 #endif
108
109 serverSocket_ = CreateServerSocket();
110 if (serverSocket_ <= 0) {
111 HILOGE("create socket failed, ret: %{public}d", serverSocket_);
112 return serverSocket_;
113 }
114
115 ret = Listen(serverSocket_, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
116 if (ret != ERR_OK) {
117 HILOGE("service listen failed, ret: %{public}d", ret);
118 return ret;
119 }
120 HILOGI("end");
121 return ERR_OK;
122 }
123
CreateServerSocket()124 int32_t DSchedTransportSoftbusAdapter::CreateServerSocket()
125 {
126 HILOGI("start");
127 localSessionName_ = SOCKET_DMS_SESSION_NAME;
128 SocketInfo info = {
129 .name = const_cast<char*>(localSessionName_.c_str()),
130 .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
131 .dataType = DATA_TYPE_BYTES
132 };
133 int32_t socket = Socket(info);
134 HILOGI("finish, socket session id: %{public}d", socket);
135 return socket;
136 }
137
ConnectDevice(const std::string & peerDeviceId,int32_t & sessionId,DSchedServiceType type)138 int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDeviceId,
139 int32_t &sessionId, DSchedServiceType type)
140 {
141 HILOGI("try to connect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
142 {
143 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
144 if (!sessions_.empty()) {
145 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
146 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
147 HILOGI("peer device already connected");
148 iter->second->OnConnect();
149 sessionId = iter->first;
150 #ifdef DMSFWK_ALL_CONNECT_MGR
151 NotifyConnectDecision(peerDeviceId, type);
152 #endif
153 return ERR_OK;
154 }
155 }
156 }
157 }
158 int32_t ret = ERR_OK;
159 if (IsNeedAllConnect(type)) {
160 HILOGI("waiting all connect decision");
161 ret = DecisionByAllConnect(peerDeviceId, type);
162 if (ret != ERR_OK) {
163 HILOGE("decision fail, ret: %{public}d", ret);
164 return ret;
165 }
166 }
167 ret = AddNewPeerSession(peerDeviceId, sessionId, type);
168 if (ret != ERR_OK || sessionId <= 0) {
169 HILOGE("Add new peer connect session fail, ret: %{public}d, sessionId: %{public}d.", ret, sessionId);
170 }
171 return ret;
172 }
173
NotifyConnectDecision(const std::string & peerDeviceId,DSchedServiceType type)174 void DSchedTransportSoftbusAdapter::NotifyConnectDecision(const std::string &peerDeviceId, DSchedServiceType type)
175 {
176 if (!IsNeedAllConnect(type)) {
177 HILOGW("don't need notify all connect decision");
178 return;
179 }
180 if (type == SERVICE_TYPE_CONTINUE) {
181 DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
182 } else if (type == SERVICE_TYPE_COLLAB) {
183 DSchedCollabManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
184 }
185 }
186
DecisionByAllConnect(const std::string & peerDeviceId,DSchedServiceType type)187 int32_t DSchedTransportSoftbusAdapter::DecisionByAllConnect(const std::string &peerDeviceId, DSchedServiceType type)
188 {
189 #ifdef DMSFWK_ALL_CONNECT_MGR
190 ServiceCollaborationManager_ResourceRequestInfoSets reqInfoSets;
191 DSchedAllConnectManager::GetInstance().GetResourceRequest(reqInfoSets);
192 int32_t ret = DSchedAllConnectManager::GetInstance().ApplyAdvanceResource(peerDeviceId, reqInfoSets);
193 if (ret != ERR_OK) {
194 HILOGE("Apply advance resource fail, ret: %{public}d.", ret);
195 NotifyConnectDecision(peerDeviceId, type);
196 return ret;
197 }
198 NotifyConnectDecision(peerDeviceId, type);
199 ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
200 if (ret != ERR_OK) {
201 HILOGE("Publish prepare state fail, ret %{public}d, peerDeviceId %{public}s.",
202 ret, GetAnonymStr(peerDeviceId).c_str());
203 }
204 #endif
205 return ERR_OK;
206 }
207
IsNeedAllConnect(DSchedServiceType type)208 bool DSchedTransportSoftbusAdapter::IsNeedAllConnect(DSchedServiceType type)
209 {
210 #ifndef COLLAB_ALL_CONNECT_DECISIONS
211 if (type == SERVICE_TYPE_COLLAB || type == SERVICE_TYPE_INVALID) {
212 HILOGI("called, don't need all connect, type: collab or invalid.");
213 return false;
214 }
215 #endif
216 bool result = isAllConnectExist_ && WifiStateAdapter::GetInstance().IsWifiActive();
217 HILOGI("called, result: %{public}d", result);
218 return result;
219 }
220
AddNewPeerSession(const std::string & peerDeviceId,int32_t & sessionId,DSchedServiceType type)221 int32_t DSchedTransportSoftbusAdapter::AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId,
222 DSchedServiceType type)
223 {
224 int32_t ret = ERR_OK;
225 sessionId = CreateClientSocket(peerDeviceId);
226 if (sessionId <= 0) {
227 HILOGE("create socket failed, sessionId: %{public}d.", sessionId);
228 #ifdef DMSFWK_ALL_CONNECT_MGR
229 ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
230 if (ret != ERR_OK) {
231 HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
232 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
233 }
234 #endif
235 return REMOTE_DEVICE_BIND_ABILITY_ERR;
236 }
237
238 ret = SetFirstCallerTokenID(callingTokenId_);
239 HILOGD("SetFirstCallerTokenID callingTokenId: %{public}s, ret: %{public}d",
240 GetAnonymStr(std::to_string(callingTokenId_)).c_str(), ret);
241 callingTokenId_ = 0;
242
243 #ifdef DMSFWK_INTERACTIVE_ADAPTER
244 if (DistributedSchedService::GetInstance().CheckRemoteOsType(peerDeviceId)) {
245 HILOGE("peer device is not a OH");
246 return DMS_PERMISSION_DENIED;
247 }
248 #endif
249 do {
250 ret = ServiceBind(sessionId, type, peerDeviceId);
251 if (ret != ERR_OK) {
252 HILOGE("client bind failed, ret: %{public}d", ret);
253 break;
254 }
255 ret = CreateSessionRecord(sessionId, peerDeviceId, false, type);
256 if (ret != ERR_OK) {
257 HILOGE("Client create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
258 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
259 break;
260 }
261 } while (false);
262
263 if (ret != ERR_OK) {
264 ShutdownSession(peerDeviceId, sessionId);
265 sessionId = INVALID_SESSION_ID;
266 }
267 return ret;
268 }
269
ServiceBind(int32_t & sessionId,DSchedServiceType type,const std::string & peerDeviceId)270 int32_t DSchedTransportSoftbusAdapter::ServiceBind(int32_t &sessionId, DSchedServiceType type,
271 const std::string &peerDeviceId)
272 {
273 HILOGI("begin");
274 int32_t ret = ERR_OK;
275 int retryCount = 0;
276 do {
277 // The DMS-CHECK_SLUETOOTH macro is used to distinguish the binding logic on different devices
278 #ifdef DMS_CHECK_BLUETOOTH
279 HILOGI("collab bind begin");
280 if (type == SERVICE_TYPE_COLLAB) {
281 ret = Bind(sessionId, g_watch_collab_qosInfo, g_QosTV_Param_Index, &iSocketListener);
282 HILOGI("end bind high qos");
283 }
284 #endif
285 #ifndef DMS_CHECK_BLUETOOTH
286 uint32_t validQosCase = SOFTBUS_MAX_VALID_QOS_CASE;
287 ret = QueryValidQos(peerDeviceId, validQosCase);
288 HILOGI("SoftBus query valid qos result: %{public}d", ret);
289 // case 0 : [160Mbps, Max); case 1 : (30Mbps, 160Mbps); case 2 : (384Kbps, 30Mbps]; case 3 : (0, 384Kbps];
290 switch (validQosCase) {
291 case SOFTBUS_QOS_CASE_HIGH:
292 case SOFTBUS_QOS_CASE_MIDDLE_HIGH:
293 ret = Bind(sessionId, g_watch_collab_qosInfo, g_QosTV_Param_Index, &iSocketListener);
294 break;
295 case SOFTBUS_QOS_CASE_MIDDLE_LOW:
296 case SOFTBUS_QOS_CASE_LOW:
297 ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
298 break;
299 default:
300 ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
301 }
302 HILOGI("end bind stardard qos");
303 #endif
304 if (ret == ERR_OK) {
305 return ret;
306 }
307 HILOGE("bind failed, err=%{public}d", ret);
308 if (ret != SOFTBUS_TRANS_PEER_SESSION_NOT_CREATED) {
309 return ret;
310 }
311 if (retryCount * BIND_RETRY_INTERVAL >= MAX_BIND_RETRY_TIME) {
312 HILOGE("bind failed after max retry time %{public}d ms", MAX_BIND_RETRY_TIME);
313 return ret;
314 }
315 HILOGI("bind failed, retrying after %{public}d ms, retry %{public}d", BIND_RETRY_INTERVAL, retryCount + 1);
316 usleep(BIND_RETRY_INTERVAL * MS_TO_US);
317 retryCount++;
318 } while (retryCount < MAX_RETRY_TIMES);
319 return ret;
320 }
321
QueryValidQos(const std::string & peerDeviceId,uint32_t & validQosCase)322 int32_t DSchedTransportSoftbusAdapter::QueryValidQos(const std::string &peerDeviceId, uint32_t &validQosCase)
323 {
324 int32_t ret = SOFTBUS_QUERY_VALID_QOS_ERR;
325 #ifdef SOFTBUS_QUERY_VALID_QOS
326 if (SoftbusAdapter::GetInstance().dmsAdapetr_.QueryValidQos == nullptr) {
327 return INVALID_PARAMETERS_ERR;
328 }
329 ret = SoftbusAdapter::GetInstance().dmsAdapetr_.QueryValidQos(peerDeviceId, validQosCase);
330 if (ret != ERR_OK) {
331 HILOGW("query Valid Qos failed, result: %{public}d", ret);
332 }
333 #endif
334 return ret;
335 }
336
CreateClientSocket(const std::string & peerDeviceId)337 int32_t DSchedTransportSoftbusAdapter::CreateClientSocket(const std::string &peerDeviceId)
338 {
339 HILOGI("start");
340 SocketInfo info = {
341 .name = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
342 .peerName = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
343 .peerNetworkId = const_cast<char*>(peerDeviceId.c_str()),
344 .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
345 .dataType = DATA_TYPE_BYTES
346 };
347 int32_t sessionId = Socket(info);
348 HILOGI("finish, socket session id: %{public}d", sessionId);
349 return sessionId;
350 }
351
CreateSessionRecord(int32_t sessionId,const std::string & peerDeviceId,bool isServer,DSchedServiceType type)352 int32_t DSchedTransportSoftbusAdapter::CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId,
353 bool isServer, DSchedServiceType type)
354 {
355 std::string localDeviceId;
356 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
357 HILOGE("GetLocalDeviceId failed");
358 ShutdownSession(peerDeviceId, sessionId);
359 return GET_LOCAL_DEVICE_ERR;
360 }
361 {
362 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
363 std::string sessionName = SOCKET_DMS_SESSION_NAME;
364 SessionInfo info = { sessionId, localDeviceId, peerDeviceId, sessionName, isServer };
365 auto session = std::make_shared<DSchedSoftbusSession>(info);
366 sessions_[sessionId] = session;
367 }
368
369 #ifdef DMSFWK_ALL_CONNECT_MGR
370 if (IsNeedAllConnect(type)) {
371 int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_CONNECTED);
372 if (ret != ERR_OK) {
373 HILOGE("Publish connected state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
374 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
375 }
376 }
377 #endif
378 return ERR_OK;
379 }
380
DisconnectDevice(const std::string & peerDeviceId)381 void DSchedTransportSoftbusAdapter::DisconnectDevice(const std::string &peerDeviceId)
382 {
383 HILOGI("try to disconnect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
384 int32_t sessionId = 0;
385 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
386 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
387 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
388 sessionId = iter->first;
389 break;
390 }
391 }
392 if (sessionId != 0 && sessions_[sessionId] != nullptr && sessions_[sessionId]->OnDisconnect()) {
393 HILOGI("peer %{public}s shutdown, socket sessionId: %{public}d.",
394 GetAnonymStr(sessions_[sessionId]->GetPeerDeviceId()).c_str(), sessionId);
395 ShutdownSession(peerDeviceId, sessionId);
396 sessions_.erase(sessionId);
397 NotifyListenersSessionShutdown(sessionId, true);
398 }
399 HILOGI("finish, socket session id: %{public}d", sessionId);
400 return;
401 }
402
ShutdownSession(const std::string & peerDeviceId,int32_t sessionId)403 void DSchedTransportSoftbusAdapter::ShutdownSession(const std::string &peerDeviceId, int32_t sessionId)
404 {
405 Shutdown(sessionId);
406 #ifdef DMSFWK_ALL_CONNECT_MGR
407 int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
408 if (ret != ERR_OK) {
409 HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
410 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
411 }
412 #endif
413 }
414
GetSessionIdByDeviceId(const std::string & peerDeviceId,int32_t & sessionId)415 bool DSchedTransportSoftbusAdapter::GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId)
416 {
417 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
418 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
419 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
420 sessionId = iter->first;
421 return true;
422 }
423 }
424 return false;
425 }
426
OnBind(int32_t sessionId,const std::string & peerDeviceId)427 void DSchedTransportSoftbusAdapter::OnBind(int32_t sessionId, const std::string &peerDeviceId)
428 {
429 #ifdef DMSFWK_INTERACTIVE_ADAPTER
430 if (DistributedSchedService::GetInstance().CheckRemoteOsType(peerDeviceId)) {
431 HILOGE("peer device is not a OH");
432 return;
433 }
434 #endif
435 int32_t ret = CreateSessionRecord(sessionId, peerDeviceId, true, SERVICE_TYPE_INVALID);
436 if (ret != ERR_OK) {
437 HILOGE("Service create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
438 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
439 }
440 }
441
GetPeerDeviceIdBySocket(const int32_t sessionId)442 std::string DSchedTransportSoftbusAdapter::GetPeerDeviceIdBySocket(const int32_t sessionId)
443 {
444 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
445 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
446 if (iter->first == sessionId) {
447 return iter->second->GetPeerDeviceId();
448 }
449 }
450 return "";
451 }
452
OnShutdown(int32_t sessionId,bool isSelfcalled)453 void DSchedTransportSoftbusAdapter::OnShutdown(int32_t sessionId, bool isSelfcalled)
454 {
455 {
456 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
457 if (sessions_.empty() || sessions_.count(sessionId) == 0 || sessions_[sessionId] == nullptr) {
458 HILOGE("error, invalid sessionId %{public}d", sessionId);
459 return;
460 }
461 std::string peerDeviceId = sessions_[sessionId]->GetPeerDeviceId();
462 HILOGI("peerDeviceId: %{public}s shutdown, socket sessionId: %{public}d.",
463 GetAnonymStr(peerDeviceId).c_str(), sessionId);
464 ShutdownSession(peerDeviceId, sessionId);
465 sessions_.erase(sessionId);
466 }
467 NotifyListenersSessionShutdown(sessionId, isSelfcalled);
468 }
469
NotifyListenersSessionShutdown(int32_t sessionId,bool isSelfcalled)470 void DSchedTransportSoftbusAdapter::NotifyListenersSessionShutdown(int32_t sessionId, bool isSelfcalled)
471 {
472 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
473 if (listeners_.empty()) {
474 HILOGE("no listener has registered");
475 return;
476 }
477 for (auto iterItem = listeners_.begin(); iterItem != listeners_.end(); iterItem++) {
478 std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
479 for (auto iter : objs) {
480 iter->OnShutdown(sessionId, isSelfcalled);
481 }
482 }
483 return;
484 }
485
ReleaseChannel()486 int32_t DSchedTransportSoftbusAdapter::ReleaseChannel()
487 {
488 HILOGI("start");
489 {
490 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
491 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
492 std::string peerDeviceId = (iter->second != nullptr) ? iter->second->GetPeerDeviceId() : "";
493 HILOGI("shutdown client: %{public}s, socket sessionId: %{public}d.",
494 GetAnonymStr(peerDeviceId).c_str(), iter->first);
495 ShutdownSession(peerDeviceId, iter->first);
496 }
497 sessions_.clear();
498 }
499 HILOGI("shutdown server, socket session id: %{public}d", serverSocket_);
500 Shutdown(serverSocket_);
501 serverSocket_ = 0;
502
503 #ifdef DMSFWK_ALL_CONNECT_MGR
504 int32_t ret = DSchedAllConnectManager::GetInstance().UninitAllConnectManager();
505 if (ret != ERR_OK) {
506 HILOGE("Uninit all connect manager fail, ret: %{public}d.", ret);
507 }
508 #endif
509 return ERR_OK;
510 }
511
SendData(int32_t sessionId,int32_t dataType,std::shared_ptr<DSchedDataBuffer> dataBuffer)512 int32_t DSchedTransportSoftbusAdapter::SendData(int32_t sessionId, int32_t dataType,
513 std::shared_ptr<DSchedDataBuffer> dataBuffer)
514 {
515 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
516 if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
517 HILOGE("error, invalid session id %{public}d", sessionId);
518 return INVALID_SESSION_ID;
519 }
520 return sessions_[sessionId]->SendData(dataBuffer, dataType);
521 }
522
SendBytesBySoftbus(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)523 int32_t DSchedTransportSoftbusAdapter::SendBytesBySoftbus(int32_t sessionId,
524 std::shared_ptr<DSchedDataBuffer> dataBuffer)
525 {
526 if (dataBuffer != nullptr) {
527 return SendBytes(sessionId, dataBuffer->Data(), dataBuffer->Size());
528 } else {
529 HILOGE("dataBuffer is nullptr");
530 return INVALID_PARAMETERS_ERR;
531 }
532 }
533
OnBytes(int32_t sessionId,const void * data,uint32_t dataLen)534 void DSchedTransportSoftbusAdapter::OnBytes(int32_t sessionId, const void *data, uint32_t dataLen)
535 {
536 if (dataLen == 0 || dataLen > DSCHED_MAX_RECV_DATA_LEN || data == nullptr) {
537 HILOGE("error, dataLen: %{public}d, session id: %{public}d", dataLen, sessionId);
538 return;
539 }
540 HILOGD("start, sessionId: %{public}d", sessionId);
541 {
542 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
543 if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
544 HILOGE("invalid session id %{public}d", sessionId);
545 return;
546 }
547 std::shared_ptr<DSchedDataBuffer> buffer = std::make_shared<DSchedDataBuffer>(dataLen);
548 int32_t ret = memcpy_s(buffer->Data(), buffer->Capacity(), data, dataLen);
549 if (ret != ERR_OK) {
550 HILOGE("memcpy_s failed ret: %{public}d", ret);
551 return;
552 }
553 sessions_[sessionId]->OnBytesReceived(buffer);
554 }
555 HILOGD("end, session id: %{public}d", sessionId);
556 return;
557 }
558
OnDataReady(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer,uint32_t dataType)559 void DSchedTransportSoftbusAdapter::OnDataReady(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer,
560 uint32_t dataType)
561 {
562 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
563 if (listeners_.empty()) {
564 HILOGE("no listener has registered");
565 return;
566 }
567 auto iterItem = listeners_.find(dataType);
568 if (iterItem == listeners_.end()) {
569 HILOGE("get iterItem failed from listeners_, type %{public}d, sessionId: %{public}d", dataType, sessionId);
570 return;
571 }
572 std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
573 for (auto iter : objs) {
574 iter->OnDataRecv(sessionId, dataBuffer);
575 }
576 return;
577 }
578
RegisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)579 void DSchedTransportSoftbusAdapter::RegisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
580 {
581 HILOGI("start, service type: %{public}d", serviceType);
582 if (listener == nullptr) {
583 HILOGE("listener is null, type: %{public}d", serviceType);
584 return;
585 }
586 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
587 if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
588 HILOGD("service type %{public}d does not exist in the listeners, adding", serviceType);
589 std::vector<std::shared_ptr<IDataListener>> newListeners;
590 newListeners.emplace_back(listener);
591 listeners_[serviceType] = newListeners;
592 HILOGI("listener register success");
593 return;
594 }
595 auto iterItem = listeners_.find(serviceType);
596 for (auto iter : iterItem->second) {
597 if (iter == listener) {
598 HILOGI("listener already registed");
599 return;
600 }
601 }
602 iterItem->second.emplace_back(listener);
603 HILOGI("listener register success");
604 return;
605 }
606
UnregisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)607 void DSchedTransportSoftbusAdapter::UnregisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
608 {
609 HILOGI("start, service type: %{public}d", serviceType);
610 if (listener == nullptr) {
611 HILOGE("listener is null, type: %{public}d", serviceType);
612 return;
613 }
614 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
615 if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
616 HILOGD("service type %{public}d does not exist in the listeners, ignore", serviceType);
617 return;
618 }
619 auto typeListeners = listeners_.find(serviceType);
620 for (size_t i = 0; i < typeListeners->second.size(); i++) {
621 if (typeListeners->second[i] == listener) {
622 typeListeners->second.erase(typeListeners->second.begin() + i);
623 if (typeListeners->second.empty()) {
624 listeners_.erase(typeListeners);
625 }
626 break;
627 }
628 }
629 HILOGI("listener unregister success");
630 return;
631 }
632
SetCallingTokenId(int32_t callingTokenId)633 void DSchedTransportSoftbusAdapter::SetCallingTokenId(int32_t callingTokenId)
634 {
635 callingTokenId_ = callingTokenId;
636 }
637 } // namespace DistributedSchedule
638 } // namespace OHOS
639