• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsched_transport_softbus_adapter.h"
17 
18 #include "distributed_sched_utils.h"
19 #include "dsched_all_connect_manager.h"
20 #include "dsched_collab_manager.h"
21 #include "dsched_continue_manager.h"
22 #include "dtbschedmgr_device_info_storage.h"
23 #include "dtbschedmgr_log.h"
24 #include "distributed_sched_service.h"
25 #include "mission/wifi_state_adapter.h"
26 #include "softbus_bus_center.h"
27 #include "softbus_common.h"
28 #include "softbus_error_code.h"
29 #ifdef DMSFWK_INTERACTIVE_ADAPTER
30 #include "softbus_adapter/softbus_adapter.h"
31 #endif
32 #include "token_setproc.h"
33 
34 namespace OHOS {
35 namespace DistributedSchedule {
36 namespace {
37 const std::string TAG = "DSchedTransportSoftbusAdapter";
38 constexpr int32_t INVALID_SESSION_ID = -1;
39 constexpr int32_t BIND_RETRY_INTERVAL = 500;
40 constexpr int32_t MAX_BIND_RETRY_TIME = 4000;
41 constexpr int32_t MAX_RETRY_TIMES = 8;
42 constexpr int32_t MS_TO_US = 1000;
43 #ifndef DMS_CHECK_BLUETOOTH
44 constexpr int32_t SOFTBUS_QOS_CASE_HIGH = 0;
45 constexpr int32_t SOFTBUS_QOS_CASE_MIDDLE_HIGH = 1;
46 constexpr int32_t SOFTBUS_QOS_CASE_MIDDLE_LOW = 2;
47 constexpr int32_t SOFTBUS_QOS_CASE_LOW = 3;
48 #endif
49 }
50 
51 IMPLEMENT_SINGLE_INSTANCE(DSchedTransportSoftbusAdapter);
52 
53 static QosTV g_qosInfo[] = {
54     { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_QOS_TYPE_MIN_BW },
55     { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_QOS_TYPE_MAX_LATENCY },
56     { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_QOS_TYPE_MIN_LATENCY }
57 };
58 
59 QosTV g_watch_collab_qosInfo[] = {
60     { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW },
61     { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_QOS_TYPE_MAX_LATENCY },
62     { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_QOS_TYPE_MIN_LATENCY }
63 };
64 
65 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(QosTV));
66 
OnBind(int32_t socket,PeerSocketInfo info)67 static void OnBind(int32_t socket, PeerSocketInfo info)
68 {
69     std::string peerDeviceId(info.networkId);
70     DSchedTransportSoftbusAdapter::GetInstance().OnBind(socket, peerDeviceId);
71 }
72 
OnShutdown(int32_t socket,ShutdownReason reason)73 static void OnShutdown(int32_t socket, ShutdownReason reason)
74 {
75     DSchedTransportSoftbusAdapter::GetInstance().OnShutdown(socket, false);
76 }
77 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)78 static void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
79 {
80     DSchedTransportSoftbusAdapter::GetInstance().OnBytes(socket, data, dataLen);
81 }
82 
83 ISocketListener iSocketListener = {
84     .OnBind = OnBind,
85     .OnShutdown = OnShutdown,
86     .OnBytes = OnBytes
87 };
88 
DSchedTransportSoftbusAdapter()89 DSchedTransportSoftbusAdapter::DSchedTransportSoftbusAdapter()
90 {
91 }
92 
~DSchedTransportSoftbusAdapter()93 DSchedTransportSoftbusAdapter::~DSchedTransportSoftbusAdapter()
94 {
95 }
96 
InitChannel()97 int32_t DSchedTransportSoftbusAdapter::InitChannel()
98 {
99     HILOGI("start init channel");
100     int32_t ret = ERR_OK;
101 #ifdef DMSFWK_ALL_CONNECT_MGR
102     ret = DSchedAllConnectManager::GetInstance().InitAllConnectManager();
103     if (ret != ERR_OK) {
104         HILOGE("Init all connect manager fail, ret: %{public}d.", ret);
105         isAllConnectExist_ = false;
106     }
107 #endif
108 
109     serverSocket_ = CreateServerSocket();
110     if (serverSocket_ <= 0) {
111         HILOGE("create socket failed, ret: %{public}d", serverSocket_);
112         return serverSocket_;
113     }
114 
115     ret = Listen(serverSocket_, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
116     if (ret != ERR_OK) {
117         HILOGE("service listen failed, ret: %{public}d", ret);
118         return ret;
119     }
120     HILOGI("end");
121     return ERR_OK;
122 }
123 
CreateServerSocket()124 int32_t DSchedTransportSoftbusAdapter::CreateServerSocket()
125 {
126     HILOGI("start");
127     localSessionName_ = SOCKET_DMS_SESSION_NAME;
128     SocketInfo info = {
129         .name = const_cast<char*>(localSessionName_.c_str()),
130         .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
131         .dataType = DATA_TYPE_BYTES
132     };
133     int32_t socket = Socket(info);
134     HILOGI("finish, socket session id: %{public}d", socket);
135     return socket;
136 }
137 
ConnectDevice(const std::string & peerDeviceId,int32_t & sessionId,DSchedServiceType type)138 int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDeviceId,
139     int32_t &sessionId, DSchedServiceType type)
140 {
141     HILOGI("try to connect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
142     {
143         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
144         if (!sessions_.empty()) {
145             for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
146                 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
147                     HILOGI("peer device already connected");
148                     iter->second->OnConnect();
149                     sessionId = iter->first;
150 #ifdef DMSFWK_ALL_CONNECT_MGR
151                     NotifyConnectDecision(peerDeviceId, type);
152 #endif
153                     return ERR_OK;
154                 }
155             }
156         }
157     }
158     int32_t ret = ERR_OK;
159     if (IsNeedAllConnect(type)) {
160         HILOGI("waiting all connect decision");
161         ret = DecisionByAllConnect(peerDeviceId, type);
162         if (ret != ERR_OK) {
163             HILOGE("decision fail, ret: %{public}d", ret);
164             return ret;
165         }
166     }
167     ret = AddNewPeerSession(peerDeviceId, sessionId, type);
168     if (ret != ERR_OK || sessionId <= 0) {
169         HILOGE("Add new peer connect session fail, ret: %{public}d, sessionId: %{public}d.", ret, sessionId);
170     }
171     return ret;
172 }
173 
NotifyConnectDecision(const std::string & peerDeviceId,DSchedServiceType type)174 void DSchedTransportSoftbusAdapter::NotifyConnectDecision(const std::string &peerDeviceId, DSchedServiceType type)
175 {
176     if (!IsNeedAllConnect(type)) {
177         HILOGW("don't need notify all connect decision");
178         return;
179     }
180     if (type == SERVICE_TYPE_CONTINUE) {
181         DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
182     } else if (type == SERVICE_TYPE_COLLAB) {
183         DSchedCollabManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
184     }
185 }
186 
DecisionByAllConnect(const std::string & peerDeviceId,DSchedServiceType type)187 int32_t DSchedTransportSoftbusAdapter::DecisionByAllConnect(const std::string &peerDeviceId, DSchedServiceType type)
188 {
189 #ifdef DMSFWK_ALL_CONNECT_MGR
190     ServiceCollaborationManager_ResourceRequestInfoSets reqInfoSets;
191     DSchedAllConnectManager::GetInstance().GetResourceRequest(reqInfoSets);
192     int32_t ret = DSchedAllConnectManager::GetInstance().ApplyAdvanceResource(peerDeviceId, reqInfoSets);
193     if (ret != ERR_OK) {
194         HILOGE("Apply advance resource fail, ret: %{public}d.", ret);
195         NotifyConnectDecision(peerDeviceId, type);
196         return ret;
197     }
198     NotifyConnectDecision(peerDeviceId, type);
199     ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
200     if (ret != ERR_OK) {
201         HILOGE("Publish prepare state fail, ret %{public}d, peerDeviceId %{public}s.",
202             ret, GetAnonymStr(peerDeviceId).c_str());
203     }
204 #endif
205     return ERR_OK;
206 }
207 
IsNeedAllConnect(DSchedServiceType type)208 bool DSchedTransportSoftbusAdapter::IsNeedAllConnect(DSchedServiceType type)
209 {
210 #ifndef COLLAB_ALL_CONNECT_DECISIONS
211     if (type == SERVICE_TYPE_COLLAB || type == SERVICE_TYPE_INVALID) {
212         HILOGI("called, don't need all connect, type: collab or invalid.");
213         return false;
214     }
215 #endif
216     bool result = isAllConnectExist_ && WifiStateAdapter::GetInstance().IsWifiActive();
217     HILOGI("called, result: %{public}d", result);
218     return result;
219 }
220 
AddNewPeerSession(const std::string & peerDeviceId,int32_t & sessionId,DSchedServiceType type)221 int32_t DSchedTransportSoftbusAdapter::AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId,
222     DSchedServiceType type)
223 {
224     int32_t ret = ERR_OK;
225     sessionId = CreateClientSocket(peerDeviceId);
226     if (sessionId <= 0) {
227         HILOGE("create socket failed, sessionId: %{public}d.", sessionId);
228 #ifdef DMSFWK_ALL_CONNECT_MGR
229         ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
230         if (ret != ERR_OK) {
231             HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
232                 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
233         }
234 #endif
235         return REMOTE_DEVICE_BIND_ABILITY_ERR;
236     }
237 
238     ret = SetFirstCallerTokenID(callingTokenId_);
239     HILOGD("SetFirstCallerTokenID callingTokenId: %{public}s, ret: %{public}d",
240         GetAnonymStr(std::to_string(callingTokenId_)).c_str(), ret);
241     callingTokenId_ = 0;
242 
243 #ifdef DMSFWK_INTERACTIVE_ADAPTER
244     if (DistributedSchedService::GetInstance().CheckRemoteOsType(peerDeviceId)) {
245         HILOGE("peer device is not a OH");
246         return DMS_PERMISSION_DENIED;
247     }
248 #endif
249     do {
250         ret = ServiceBind(sessionId, type, peerDeviceId);
251         if (ret != ERR_OK) {
252             HILOGE("client bind failed, ret: %{public}d", ret);
253             break;
254         }
255         ret = CreateSessionRecord(sessionId, peerDeviceId, false, type);
256         if (ret != ERR_OK) {
257             HILOGE("Client create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
258                 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
259             break;
260         }
261     } while (false);
262 
263     if (ret != ERR_OK) {
264         ShutdownSession(peerDeviceId, sessionId);
265         sessionId = INVALID_SESSION_ID;
266     }
267     return ret;
268 }
269 
ServiceBind(int32_t & sessionId,DSchedServiceType type,const std::string & peerDeviceId)270 int32_t DSchedTransportSoftbusAdapter::ServiceBind(int32_t &sessionId, DSchedServiceType type,
271     const std::string &peerDeviceId)
272 {
273     HILOGI("begin");
274     int32_t ret = ERR_OK;
275     int retryCount = 0;
276     do {
277         // The DMS-CHECK_SLUETOOTH macro is used to distinguish the binding logic on different devices
278 #ifdef DMS_CHECK_BLUETOOTH
279         HILOGI("collab bind begin");
280         if (type == SERVICE_TYPE_COLLAB) {
281             ret = Bind(sessionId, g_watch_collab_qosInfo, g_QosTV_Param_Index, &iSocketListener);
282             HILOGI("end bind high qos");
283         }
284 #endif
285 #ifndef DMS_CHECK_BLUETOOTH
286         uint32_t validQosCase = SOFTBUS_MAX_VALID_QOS_CASE;
287         ret = QueryValidQos(peerDeviceId, validQosCase);
288         HILOGI("SoftBus query valid qos result: %{public}d", ret);
289         // case 0 : [160Mbps, Max); case 1 : (30Mbps, 160Mbps); case 2 : (384Kbps, 30Mbps]; case 3 : (0, 384Kbps];
290         switch (validQosCase) {
291             case SOFTBUS_QOS_CASE_HIGH:
292             case SOFTBUS_QOS_CASE_MIDDLE_HIGH:
293                 ret = Bind(sessionId, g_watch_collab_qosInfo, g_QosTV_Param_Index, &iSocketListener);
294                 break;
295             case SOFTBUS_QOS_CASE_MIDDLE_LOW:
296             case SOFTBUS_QOS_CASE_LOW:
297                 ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
298                 break;
299             default:
300                 ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
301         }
302         HILOGI("end bind stardard qos");
303 #endif
304         if (ret == ERR_OK) {
305             return ret;
306         }
307         HILOGE("bind failed, err=%{public}d", ret);
308         if (ret != SOFTBUS_TRANS_PEER_SESSION_NOT_CREATED) {
309             return ret;
310         }
311         if (retryCount * BIND_RETRY_INTERVAL >= MAX_BIND_RETRY_TIME) {
312             HILOGE("bind failed after max retry time %{public}d ms", MAX_BIND_RETRY_TIME);
313             return ret;
314         }
315         HILOGI("bind failed, retrying after %{public}d ms, retry %{public}d", BIND_RETRY_INTERVAL, retryCount + 1);
316         usleep(BIND_RETRY_INTERVAL * MS_TO_US);
317         retryCount++;
318     } while (retryCount < MAX_RETRY_TIMES);
319     return ret;
320 }
321 
QueryValidQos(const std::string & peerDeviceId,uint32_t & validQosCase)322 int32_t DSchedTransportSoftbusAdapter::QueryValidQos(const std::string &peerDeviceId, uint32_t &validQosCase)
323 {
324     int32_t ret = SOFTBUS_QUERY_VALID_QOS_ERR;
325 #ifdef SOFTBUS_QUERY_VALID_QOS
326    if (SoftbusAdapter::GetInstance().dmsAdapetr_.QueryValidQos == nullptr) {
327         return INVALID_PARAMETERS_ERR;
328     }
329     ret = SoftbusAdapter::GetInstance().dmsAdapetr_.QueryValidQos(peerDeviceId, validQosCase);
330     if (ret != ERR_OK) {
331         HILOGW("query Valid Qos failed, result: %{public}d", ret);
332     }
333 #endif
334     return ret;
335 }
336 
CreateClientSocket(const std::string & peerDeviceId)337 int32_t DSchedTransportSoftbusAdapter::CreateClientSocket(const std::string &peerDeviceId)
338 {
339     HILOGI("start");
340     SocketInfo info = {
341         .name = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
342         .peerName = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
343         .peerNetworkId = const_cast<char*>(peerDeviceId.c_str()),
344         .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
345         .dataType = DATA_TYPE_BYTES
346     };
347     int32_t sessionId = Socket(info);
348     HILOGI("finish, socket session id: %{public}d", sessionId);
349     return sessionId;
350 }
351 
CreateSessionRecord(int32_t sessionId,const std::string & peerDeviceId,bool isServer,DSchedServiceType type)352 int32_t DSchedTransportSoftbusAdapter::CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId,
353     bool isServer, DSchedServiceType type)
354 {
355     std::string localDeviceId;
356     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
357         HILOGE("GetLocalDeviceId failed");
358         ShutdownSession(peerDeviceId, sessionId);
359         return GET_LOCAL_DEVICE_ERR;
360     }
361     {
362         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
363         std::string sessionName = SOCKET_DMS_SESSION_NAME;
364         SessionInfo info = { sessionId, localDeviceId, peerDeviceId, sessionName, isServer };
365         auto session = std::make_shared<DSchedSoftbusSession>(info);
366         sessions_[sessionId] = session;
367     }
368 
369 #ifdef DMSFWK_ALL_CONNECT_MGR
370     if (IsNeedAllConnect(type)) {
371         int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_CONNECTED);
372         if (ret != ERR_OK) {
373             HILOGE("Publish connected state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
374                 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
375         }
376     }
377 #endif
378     return ERR_OK;
379 }
380 
DisconnectDevice(const std::string & peerDeviceId)381 void DSchedTransportSoftbusAdapter::DisconnectDevice(const std::string &peerDeviceId)
382 {
383     HILOGI("try to disconnect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
384     int32_t sessionId = 0;
385     std::lock_guard<std::mutex> sessionLock(sessionMutex_);
386     for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
387         if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
388             sessionId = iter->first;
389             break;
390         }
391     }
392     if (sessionId != 0 && sessions_[sessionId] != nullptr && sessions_[sessionId]->OnDisconnect()) {
393         HILOGI("peer %{public}s shutdown, socket sessionId: %{public}d.",
394             GetAnonymStr(sessions_[sessionId]->GetPeerDeviceId()).c_str(), sessionId);
395         ShutdownSession(peerDeviceId, sessionId);
396         sessions_.erase(sessionId);
397         NotifyListenersSessionShutdown(sessionId, true);
398     }
399     HILOGI("finish, socket session id: %{public}d", sessionId);
400     return;
401 }
402 
ShutdownSession(const std::string & peerDeviceId,int32_t sessionId)403 void DSchedTransportSoftbusAdapter::ShutdownSession(const std::string &peerDeviceId, int32_t sessionId)
404 {
405     Shutdown(sessionId);
406 #ifdef DMSFWK_ALL_CONNECT_MGR
407     int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
408     if (ret != ERR_OK) {
409         HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
410             ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
411     }
412 #endif
413 }
414 
GetSessionIdByDeviceId(const std::string & peerDeviceId,int32_t & sessionId)415 bool DSchedTransportSoftbusAdapter::GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId)
416 {
417     std::lock_guard<std::mutex> sessionLock(sessionMutex_);
418     for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
419         if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
420             sessionId = iter->first;
421             return true;
422         }
423     }
424     return false;
425 }
426 
OnBind(int32_t sessionId,const std::string & peerDeviceId)427 void DSchedTransportSoftbusAdapter::OnBind(int32_t sessionId, const std::string &peerDeviceId)
428 {
429 #ifdef DMSFWK_INTERACTIVE_ADAPTER
430     if (DistributedSchedService::GetInstance().CheckRemoteOsType(peerDeviceId)) {
431         HILOGE("peer device is not a OH");
432         return;
433     }
434 #endif
435     int32_t ret = CreateSessionRecord(sessionId, peerDeviceId, true, SERVICE_TYPE_INVALID);
436     if (ret != ERR_OK) {
437         HILOGE("Service create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
438             ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
439     }
440 }
441 
GetPeerDeviceIdBySocket(const int32_t sessionId)442 std::string DSchedTransportSoftbusAdapter::GetPeerDeviceIdBySocket(const int32_t sessionId)
443 {
444     std::lock_guard<std::mutex> sessionLock(sessionMutex_);
445     for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
446         if (iter->first == sessionId) {
447             return iter->second->GetPeerDeviceId();
448         }
449     }
450     return "";
451 }
452 
OnShutdown(int32_t sessionId,bool isSelfcalled)453 void DSchedTransportSoftbusAdapter::OnShutdown(int32_t sessionId, bool isSelfcalled)
454 {
455     {
456         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
457         if (sessions_.empty() || sessions_.count(sessionId) == 0 || sessions_[sessionId] == nullptr) {
458             HILOGE("error, invalid sessionId %{public}d", sessionId);
459             return;
460         }
461         std::string peerDeviceId = sessions_[sessionId]->GetPeerDeviceId();
462         HILOGI("peerDeviceId: %{public}s shutdown, socket sessionId: %{public}d.",
463             GetAnonymStr(peerDeviceId).c_str(), sessionId);
464         ShutdownSession(peerDeviceId, sessionId);
465         sessions_.erase(sessionId);
466     }
467     NotifyListenersSessionShutdown(sessionId, isSelfcalled);
468 }
469 
NotifyListenersSessionShutdown(int32_t sessionId,bool isSelfcalled)470 void DSchedTransportSoftbusAdapter::NotifyListenersSessionShutdown(int32_t sessionId, bool isSelfcalled)
471 {
472     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
473     if (listeners_.empty()) {
474         HILOGE("no listener has registered");
475         return;
476     }
477     for (auto iterItem = listeners_.begin(); iterItem != listeners_.end(); iterItem++) {
478         std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
479         for (auto iter : objs) {
480             iter->OnShutdown(sessionId, isSelfcalled);
481         }
482     }
483     return;
484 }
485 
ReleaseChannel()486 int32_t DSchedTransportSoftbusAdapter::ReleaseChannel()
487 {
488     HILOGI("start");
489     {
490         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
491         for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
492             std::string peerDeviceId = (iter->second != nullptr) ? iter->second->GetPeerDeviceId() : "";
493             HILOGI("shutdown client: %{public}s, socket sessionId: %{public}d.",
494                 GetAnonymStr(peerDeviceId).c_str(), iter->first);
495             ShutdownSession(peerDeviceId, iter->first);
496         }
497         sessions_.clear();
498     }
499     HILOGI("shutdown server, socket session id: %{public}d", serverSocket_);
500     Shutdown(serverSocket_);
501     serverSocket_ = 0;
502 
503 #ifdef DMSFWK_ALL_CONNECT_MGR
504     int32_t ret = DSchedAllConnectManager::GetInstance().UninitAllConnectManager();
505     if (ret != ERR_OK) {
506         HILOGE("Uninit all connect manager fail, ret: %{public}d.", ret);
507     }
508 #endif
509     return ERR_OK;
510 }
511 
SendData(int32_t sessionId,int32_t dataType,std::shared_ptr<DSchedDataBuffer> dataBuffer)512 int32_t DSchedTransportSoftbusAdapter::SendData(int32_t sessionId, int32_t dataType,
513     std::shared_ptr<DSchedDataBuffer> dataBuffer)
514 {
515     std::lock_guard<std::mutex> sessionLock(sessionMutex_);
516     if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
517         HILOGE("error, invalid session id %{public}d", sessionId);
518         return INVALID_SESSION_ID;
519     }
520     return sessions_[sessionId]->SendData(dataBuffer, dataType);
521 }
522 
SendBytesBySoftbus(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)523 int32_t DSchedTransportSoftbusAdapter::SendBytesBySoftbus(int32_t sessionId,
524     std::shared_ptr<DSchedDataBuffer> dataBuffer)
525 {
526     if (dataBuffer != nullptr) {
527         return SendBytes(sessionId, dataBuffer->Data(), dataBuffer->Size());
528     } else {
529         HILOGE("dataBuffer is nullptr");
530         return INVALID_PARAMETERS_ERR;
531     }
532 }
533 
OnBytes(int32_t sessionId,const void * data,uint32_t dataLen)534 void DSchedTransportSoftbusAdapter::OnBytes(int32_t sessionId, const void *data, uint32_t dataLen)
535 {
536     if (dataLen == 0 || dataLen > DSCHED_MAX_RECV_DATA_LEN || data == nullptr) {
537         HILOGE("error, dataLen: %{public}d, session id: %{public}d", dataLen, sessionId);
538         return;
539     }
540     HILOGD("start, sessionId: %{public}d", sessionId);
541     {
542         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
543         if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
544             HILOGE("invalid session id %{public}d", sessionId);
545             return;
546         }
547         std::shared_ptr<DSchedDataBuffer> buffer = std::make_shared<DSchedDataBuffer>(dataLen);
548         int32_t ret = memcpy_s(buffer->Data(), buffer->Capacity(), data, dataLen);
549         if (ret != ERR_OK) {
550             HILOGE("memcpy_s failed ret: %{public}d", ret);
551             return;
552         }
553         sessions_[sessionId]->OnBytesReceived(buffer);
554     }
555     HILOGD("end, session id: %{public}d", sessionId);
556     return;
557 }
558 
OnDataReady(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer,uint32_t dataType)559 void DSchedTransportSoftbusAdapter::OnDataReady(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer,
560     uint32_t dataType)
561 {
562     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
563     if (listeners_.empty()) {
564         HILOGE("no listener has registered");
565         return;
566     }
567     auto iterItem = listeners_.find(dataType);
568     if (iterItem == listeners_.end()) {
569         HILOGE("get iterItem failed from listeners_, type %{public}d, sessionId: %{public}d", dataType, sessionId);
570         return;
571     }
572     std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
573     for (auto iter : objs) {
574         iter->OnDataRecv(sessionId, dataBuffer);
575     }
576     return;
577 }
578 
RegisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)579 void DSchedTransportSoftbusAdapter::RegisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
580 {
581     HILOGI("start, service type: %{public}d", serviceType);
582     if (listener == nullptr) {
583         HILOGE("listener is null, type: %{public}d", serviceType);
584         return;
585     }
586     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
587     if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
588         HILOGD("service type %{public}d does not exist in the listeners, adding", serviceType);
589         std::vector<std::shared_ptr<IDataListener>> newListeners;
590         newListeners.emplace_back(listener);
591         listeners_[serviceType] = newListeners;
592         HILOGI("listener register success");
593         return;
594     }
595     auto iterItem = listeners_.find(serviceType);
596     for (auto iter : iterItem->second) {
597         if (iter == listener) {
598             HILOGI("listener already registed");
599             return;
600         }
601     }
602     iterItem->second.emplace_back(listener);
603     HILOGI("listener register success");
604     return;
605 }
606 
UnregisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)607 void DSchedTransportSoftbusAdapter::UnregisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
608 {
609     HILOGI("start, service type: %{public}d", serviceType);
610     if (listener == nullptr) {
611         HILOGE("listener is null, type: %{public}d", serviceType);
612         return;
613     }
614     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
615     if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
616         HILOGD("service type %{public}d does not exist in the listeners, ignore", serviceType);
617         return;
618     }
619     auto typeListeners = listeners_.find(serviceType);
620     for (size_t i = 0; i < typeListeners->second.size(); i++) {
621         if (typeListeners->second[i] == listener) {
622             typeListeners->second.erase(typeListeners->second.begin() + i);
623             if (typeListeners->second.empty()) {
624                 listeners_.erase(typeListeners);
625             }
626             break;
627         }
628     }
629     HILOGI("listener unregister success");
630     return;
631 }
632 
SetCallingTokenId(int32_t callingTokenId)633 void DSchedTransportSoftbusAdapter::SetCallingTokenId(int32_t callingTokenId)
634 {
635     callingTokenId_ = callingTokenId;
636 }
637 }  // namespace DistributedSchedule
638 }  // namespace OHOS
639