• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "softbus_distributed_data_manager.h"
17 
18 #include <thread>
19 #include <chrono>
20 #include "avsession_log.h"
21 #include "avsession_errors.h"
22 #include "migrate_avsession_constant.h"
23 #include "softbus_session_utils.h"
24 
25 
26 namespace OHOS::AVSession {
SoftbusDistributedDataManager()27 SoftbusDistributedDataManager::SoftbusDistributedDataManager() {}
28 
~SoftbusDistributedDataManager()29 SoftbusDistributedDataManager::~SoftbusDistributedDataManager() {}
30 
31 // LCOV_EXCL_START
Init()32 void SoftbusDistributedDataManager::Init()
33 {
34     std::weak_ptr<SoftbusDistributedDataManager> managerWeak(shared_from_this());
35     std::lock_guard lockGuard(softbusDistributedDataLock_);
36     ssListener_ = std::make_shared<SSListener>(managerWeak);
37     SoftbusSessionManager::GetInstance().AddSessionListener(ssListener_);
38 }
39 
SessionOpened(int32_t socket,PeerSocketInfo info)40 void SoftbusDistributedDataManager::SessionOpened(int32_t socket, PeerSocketInfo info)
41 {
42     std::string sessionName = info.name;
43     if (sessionName != CONFIG_SOFTBUS_SESSION_TAG) {
44         SLOGE("onSessionOpened: the group id is not match the media session group. sessionName is %{public}s",
45             sessionName.c_str());
46         return;
47     }
48     SLOGI("socket:%{public}d set sessionName to:%{public}s onSessionOpened", socket, info.name);
49     socketNameCache_.assign(info.name);
50     peerSocketInfo.name = info.name;
51     peerSocketInfo.networkId = info.networkId;
52     peerSocketInfo.pkgName = info.pkgName;
53     peerSocketInfo.dataType = info.dataType;
54     if (isServer_) {
55         OnSessionServerOpened();
56     }
57 }
58 
SessionClosed(int32_t socket)59 void SoftbusDistributedDataManager::SessionClosed(int32_t socket)
60 {
61     if (isServer_ && socketNameCache_ != CONFIG_SOFTBUS_SESSION_TAG) {
62         SLOGE("onSessionClosed: the group id is not match the media session group.");
63         return;
64     }
65     if (isServer_) {
66         OnSessionServerClosed(socket);
67     } else {
68         OnSessionProxyClosed(socket);
69     }
70 }
71 
MessageReceived(int32_t socket,const std::string & data)72 void SoftbusDistributedDataManager::MessageReceived(int32_t socket, const std::string &data)
73 {
74     if (socketNameCache_ != CONFIG_SOFTBUS_SESSION_TAG) {
75         SLOGE("onMessageReceived: the group id is not match the media session group. sessionName is %{public}s",
76             socketNameCache_.c_str());
77         return;
78     }
79     if (isServer_) {
80         OnMessageHandleReceived(socket, data);
81     }
82 }
83 // LCOV_EXCL_STOP
84 
BytesReceived(int32_t socket,const std::string & data)85 void SoftbusDistributedDataManager::BytesReceived(int32_t socket, const std::string &data)
86 {
87     if (isServer_) {
88         if (socketNameCache_ != CONFIG_SOFTBUS_SESSION_TAG) {
89             SLOGE("onBytesReceived: the group id is not match the media session group. sessionName is %{public}s",
90                 socketNameCache_.c_str());
91             return;
92         }
93         OnBytesServerReceived(socket, data);
94     } else {
95         OnBytesProxyReceived(socket, data);
96     }
97 }
98 
InitSessionServer(const std::string & pkg)99 void SoftbusDistributedDataManager::InitSessionServer(const std::string &pkg)
100 {
101     SLOGI("init session server...");
102     isServer_ = true;
103     std::lock_guard lockGuard(softbusDistributedDataLock_);
104     int32_t socket = SoftbusSessionManager::GetInstance().Socket(pkg);
105     mServerSocketMap_.insert({pkg, socket});
106 }
107 
CreateServer(const std::shared_ptr<SoftbusSessionServer> & server)108 void SoftbusDistributedDataManager::CreateServer(const std::shared_ptr<SoftbusSessionServer> &server)
109 {
110     if (server == nullptr) {
111         SLOGE("createServer fail for server is null.");
112         return;
113     }
114     int characteristic = server->GetCharacteristic();
115     std::lock_guard lockGuard(softbusDistributedDataLock_);
116     serverMap_.insert({ characteristic, server });
117 }
118 
CreateProxy(const std::shared_ptr<SoftbusSessionProxy> & proxy,const std::string & peerNetworkId,const std::string & packageName)119 bool SoftbusDistributedDataManager::CreateProxy(const std::shared_ptr<SoftbusSessionProxy> &proxy,
120     const std::string &peerNetworkId, const std::string &packageName)
121 {
122     SLOGI("create session proxy...");
123     if (proxy == nullptr || peerNetworkId.c_str() == nullptr || peerNetworkId.empty() ||
124         packageName.c_str() == nullptr || packageName.empty()) {
125         SLOGW("createProxy fail for params invalid.");
126         return false;
127     }
128     isServer_ = false;
129     std::lock_guard lockGuard(softbusDistributedDataLock_);
130     if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
131         int32_t socketId = mProxySocketMap_[peerNetworkId];
132         if (socketId > 0) {
133             std::string softbusNetworkId;
134             SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socketId, softbusNetworkId);
135             if (peerNetworkId == softbusNetworkId) {
136                 proxy->ConnectServer(socketId);
137                 return true;
138             }
139             mProxySocketMap_.erase(peerNetworkId);
140         }
141     }
142     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
143     int32_t characteristic = proxy->GetCharacteristic();
144     SLOGI("createProxy for device %{public}s characteristic:%{public}d",
145         anonymizeNetworkId.c_str(), characteristic);
146     std::map<int32_t, std::shared_ptr<SoftbusSessionProxy>> proxyMap;
147     if (mDeviceToProxyMap_.find(peerNetworkId) == mDeviceToProxyMap_.end()) {
148         proxyMap.insert({characteristic, proxy});
149         mDeviceToProxyMap_.insert({peerNetworkId, proxyMap});
150     } else {
151         proxyMap = mDeviceToProxyMap_[peerNetworkId];
152         proxyMap[characteristic] = proxy;
153     }
154     int32_t socketId = ConnectRemoteDevice(peerNetworkId, packageName, softbusLinkMaxRetryTimes);
155     if (socketId <= 0) {
156         proxyMap.erase(characteristic);
157         if (proxyMap.empty()) {
158             mDeviceToProxyMap_.erase(peerNetworkId);
159         }
160         SLOGI("createProxy failed for no remote device connected");
161         return false;
162     } else {
163         OnSessionProxyOpened(socketId);
164         return true;
165     }
166 }
167 
ReleaseProxy(const std::shared_ptr<SoftbusSessionProxy> & proxy,const std::string & peerNetworkId)168 bool SoftbusDistributedDataManager::ReleaseProxy(const std::shared_ptr<SoftbusSessionProxy> &proxy,
169     const std::string &peerNetworkId)
170 {
171     if (proxy == nullptr) {
172         SLOGE("ReleaseProxy fail for proxy is null.");
173         return true;
174     }
175     if (peerNetworkId.c_str() == nullptr || peerNetworkId.empty()) {
176         SLOGE("ReleaseProxy fail for networkid is invalid.");
177         return false;
178     }
179     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
180     SLOGI("ReleaseProxy for device %{public}s", anonymizeNetworkId.c_str());
181     std::lock_guard lockGuard(softbusDistributedDataLock_);
182     if (mDeviceToProxyMap_.find(peerNetworkId) == mDeviceToProxyMap_.end()) {
183         SLOGE("ReleaseProxy fail fo proxyMap is null");
184         return true;
185     }
186 
187     auto proxyMap = mDeviceToProxyMap_[peerNetworkId];
188     int32_t characteristic = proxy->GetCharacteristic();
189     if (proxyMap.find(characteristic) != proxyMap.end()) {
190         auto proxy = proxyMap[characteristic];
191         if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
192             int32_t socket = mProxySocketMap_[peerNetworkId];
193             proxy->DisconnectServer(socket);
194         }
195         proxyMap.erase(characteristic);
196     }
197     if (proxyMap.empty()) {
198         if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
199             int32_t socket = mProxySocketMap_[peerNetworkId];
200             OnSessionProxyClosed(socket);
201             SoftbusSessionManager::GetInstance().Shutdown(socket);
202         }
203         mDeviceToProxyMap_.erase(peerNetworkId);
204         return true;
205     }
206     return false;
207 }
208 
DestroySessionServer(const std::string & pkg)209 void SoftbusDistributedDataManager::DestroySessionServer(const std::string &pkg)
210 {
211     SLOGI("destroy session server...");
212     std::lock_guard lockGuard(softbusDistributedDataLock_);
213     if  (isServer_) {
214         for (auto it = serverMap_.begin(); it != serverMap_.end();) {
215             it->second->DisconnectAllProxy();
216             it = serverMap_.erase(it);
217         }
218         int32_t mSocket = mServerSocketMap_[pkg];
219         mServerSocketMap_.erase(pkg);
220         SoftbusSessionManager::GetInstance().Shutdown(mSocket);
221     } else {
222         for (auto proxyMap = mDeviceToProxyMap_.begin(); proxyMap != mDeviceToProxyMap_.end(); proxyMap++) {
223             for (auto it = proxyMap->second.begin(); it != proxyMap->second.end(); it++) {
224                 ReleaseProxy(it->second, proxyMap->first);
225             }
226         }
227     }
228 }
229 
ReleaseServer(const std::shared_ptr<SoftbusSessionServer> & server)230 void SoftbusDistributedDataManager::ReleaseServer(const std::shared_ptr<SoftbusSessionServer> &server)
231 {
232     CHECK_AND_RETURN_LOG(server != nullptr, "server is nullptr");
233     int characteristic = server->GetCharacteristic();
234     std::lock_guard lockGuard(softbusDistributedDataLock_);
235     auto iter = serverMap_.find(characteristic);
236     if (iter != serverMap_.end() && iter->second == server) {
237         server->DisconnectAllProxy();
238         serverMap_.erase(characteristic);
239     }
240 }
241 
242 // LCOV_EXCL_START
OnSessionServerOpened()243 void SoftbusDistributedDataManager::OnSessionServerOpened()
244 {
245     SLOGI("OnSessionServerOpened: the peer device id is %{public}s.",
246         SoftbusSessionUtils::AnonymizeDeviceId(peerSocketInfo.networkId).c_str());
247 }
248 
OnSessionServerClosed(int32_t socket)249 void SoftbusDistributedDataManager::OnSessionServerClosed(int32_t socket)
250 {
251     SLOGI("OnSessionServerClosed: the peer device id is %{public}s.",
252         SoftbusSessionUtils::AnonymizeDeviceId(peerSocketInfo.networkId).c_str());
253     std::lock_guard lockGuard(softbusDistributedDataLock_);
254     for (auto it = serverMap_.begin(); it != serverMap_.end(); it++) {
255         it->second->DisconnectProxy(socket);
256     }
257 }
258 
OnMessageHandleReceived(int32_t socket,const std::string & data)259 void SoftbusDistributedDataManager::OnMessageHandleReceived(int32_t socket, const std::string &data)
260 {
261     std::string deviceId = peerSocketInfo.networkId;
262     std::string anonymizeDeviceId = SoftbusSessionUtils::AnonymizeDeviceId(deviceId);
263     SLOGI("onMessageHandleReceived: %{public}s", anonymizeDeviceId.c_str());
264     if (data.length() > 1 && data[0] == MESSAGE_CODE_CONNECT_SERVER) {
265         std::lock_guard lockGuard(softbusDistributedDataLock_);
266         auto iter = serverMap_.find(data[1]);
267         if (iter == serverMap_.end()) {
268             SLOGE("onMessageHandleReceived: server is invalid deviceId %{public}s", anonymizeDeviceId.c_str());
269             return;
270         }
271         iter->second->ConnectProxy(socket);
272     }
273 }
274 
OnBytesServerReceived(int32_t socket,const std::string & data)275 void SoftbusDistributedDataManager::OnBytesServerReceived(int32_t socket, const std::string &data)
276 {
277     std::string deviceId = peerSocketInfo.networkId;
278     std::string anonymizeDeviceId = SoftbusSessionUtils::AnonymizeDeviceId(deviceId);
279     SLOGI("onBytesServerReceived: %{public}s", anonymizeDeviceId.c_str());
280     if (data.length() > 0) {
281         std::lock_guard lockGuard(softbusDistributedDataLock_);
282         auto iter = serverMap_.find(data[0]);
283         if (iter == serverMap_.end()) {
284             SLOGE("onBytesServerReceived: server is invalid deviceId %{public}s", anonymizeDeviceId.c_str());
285             return;
286         }
287         if (data.length() > 1 && data[1] == MESSAGE_CODE_CONNECT_SERVER) {
288             iter->second->ConnectProxy(socket);
289             return;
290         }
291         iter->second->OnBytesReceived(deviceId, data);
292     }
293 }
294 
ConnectRemoteDevice(const std::string & peerNetworkId,const std::string & packageName,int32_t retryCount)295 int32_t SoftbusDistributedDataManager::ConnectRemoteDevice(const std::string &peerNetworkId,
296     const std::string &packageName, int32_t retryCount)
297 {
298     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
299     SLOGI("ConnectRemoteDevice remote device %{public}s, retryCount: %{public}d",
300         anonymizeNetworkId.c_str(), retryCount);
301     if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
302         SLOGI("%{public}s is connected, no need to connect.", anonymizeNetworkId.c_str());
303         return mProxySocketMap_[peerNetworkId];
304     }
305     int32_t socket = SoftbusSessionManager::GetInstance().Bind(peerNetworkId, packageName);
306     if (socket <= 0 && retryCount > 0) {
307         std::this_thread::sleep_for(std::chrono::milliseconds(retryIntervalTime));
308         socket = ConnectRemoteDevice(peerNetworkId, packageName, retryCount - 1);
309     }
310     return socket;
311 }
312 
OnSessionProxyOpened(int32_t socket)313 void SoftbusDistributedDataManager::OnSessionProxyOpened(int32_t socket)
314 {
315     std::string softbusNetworkId;
316     int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, softbusNetworkId);
317     if (result < 0) {
318         SLOGE("OnSessionProxyOpened: get softbus peer network id failed.");
319         return;
320     }
321     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(softbusNetworkId);
322     if (mProxySocketMap_.find(softbusNetworkId) == mProxySocketMap_.end()) {
323         mProxySocketMap_.insert({softbusNetworkId, socket});
324     } else {
325         SLOGI("OnSessionProxyOpened: session exit, no need to add for %{public}s", anonymizeNetworkId.c_str());
326     }
327     if (mDeviceToProxyMap_.find(softbusNetworkId) == mDeviceToProxyMap_.end()) {
328         return;
329     }
330     std::map<int32_t, std::shared_ptr<SoftbusSessionProxy>> proxyMap = mDeviceToProxyMap_[softbusNetworkId];
331     for (auto it = proxyMap.begin(); it != proxyMap.end(); it++) {
332         it->second->ConnectServer(socket);
333     }
334 }
335 
OnSessionProxyClosed(int32_t socket)336 void SoftbusDistributedDataManager::OnSessionProxyClosed(int32_t socket)
337 {
338     std::string networkId;
339     int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, networkId);
340     if (result < 0) {
341         SLOGE("OnSessionProxyClosed: get softbus peer network id failed.");
342         return;
343     }
344     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(networkId);
345     SLOGI("OnSessionProxyClosed: the peer network id is %{public}s.", anonymizeNetworkId.c_str());
346     std::lock_guard lockGuard(softbusDistributedDataLock_);
347     if (mDeviceToProxyMap_.find(networkId) == mDeviceToProxyMap_.end()) {
348         SLOGW("no found socket for device %{public}s.", anonymizeNetworkId.c_str());
349         return;
350     }
351     auto proxyMap = mDeviceToProxyMap_[networkId];
352     if (proxyMap.empty()) {
353         SLOGE("get proxyMap empty for device %{public}s.", anonymizeNetworkId.c_str());
354         return;
355     }
356     for (auto it = proxyMap.begin(); it != proxyMap.end(); it++) {
357         it->second->DisconnectServer(socket);
358     }
359     mDeviceToProxyMap_.erase(networkId);
360 }
361 
OnBytesProxyReceived(int32_t socket,const std::string & data)362 void SoftbusDistributedDataManager::OnBytesProxyReceived(int32_t socket, const std::string &data)
363 {
364     if (data.length() <= 0) {
365         SLOGE("OnBytesProxyReceived invalid data.");
366         return;
367     }
368     std::string networkId;
369     int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, networkId);
370     if (result < 0) {
371         SLOGE("OnBytesProxyReceived: get no softbus peer network");
372         return;
373     }
374     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(networkId);
375     SLOGI("OnBytesProxyReceived: %{public}s.", anonymizeNetworkId.c_str());
376     std::lock_guard lockGuard(softbusDistributedDataLock_);
377     if (mDeviceToProxyMap_.find(networkId) == mDeviceToProxyMap_.end()) {
378         SLOGW("OnBytesProxyReceived no proxy for the device");
379         return;
380     }
381     auto proxyMap = mDeviceToProxyMap_[networkId];
382     auto iter = proxyMap.find(data[0]);
383     if (iter == proxyMap.end()) {
384         SLOGE("OnBytesProxyReceived: found no match characteristic");
385         return;
386     }
387     iter->second->OnBytesReceived(networkId, data);
388 }
389 // LCOV_EXCL_STOP
390 } // namespace OHOS::AVSession