• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "softbus_distributed_data_manager.h"
17 
18 #include <thread>
19 #include <chrono>
20 #include "avsession_log.h"
21 #include "avsession_errors.h"
22 #include "migrate_avsession_constant.h"
23 #include "softbus_session_utils.h"
24 
25 
26 namespace OHOS::AVSession {
SoftbusDistributedDataManager()27 SoftbusDistributedDataManager::SoftbusDistributedDataManager() {}
28 
~SoftbusDistributedDataManager()29 SoftbusDistributedDataManager::~SoftbusDistributedDataManager() {}
30 
31 // LCOV_EXCL_START
Init()32 void SoftbusDistributedDataManager::Init()
33 {
34     std::weak_ptr<SoftbusDistributedDataManager> managerWeak(shared_from_this());
35     std::lock_guard lockGuard(softbusDistributedDataLock_);
36     ssListener_ = std::make_shared<SSListener>(managerWeak);
37     SoftbusSessionManager::GetInstance().AddSessionListener(ssListener_);
38 }
39 
SessionOpened(int32_t socket,PeerSocketInfo info)40 void SoftbusDistributedDataManager::SessionOpened(int32_t socket, PeerSocketInfo info)
41 {
42     std::string sessionName = info.name;
43     peerSocketInfo.name = info.name;
44     peerSocketInfo.networkId = info.networkId;
45     peerSocketInfo.pkgName = info.pkgName;
46     peerSocketInfo.dataType = info.dataType;
47     if (sessionName != CONFIG_SOFTBUS_SESSION_TAG) {
48         SLOGE("onSessionOpened: the group id is not match the media session group. sessionName is %{public}s",
49             sessionName.c_str());
50         return;
51     }
52     if (isServer_) {
53         OnSessionServerOpened();
54     }
55 }
56 
SessionClosed(int32_t socket)57 void SoftbusDistributedDataManager::SessionClosed(int32_t socket)
58 {
59     if (isServer_ && peerSocketInfo.name != CONFIG_SOFTBUS_SESSION_TAG) {
60         SLOGE("onSessionClosed: the group id is not match the media session group.");
61         return;
62     }
63     if (isServer_) {
64         OnSessionServerClosed(socket);
65     } else {
66         OnSessionProxyClosed(socket);
67     }
68 }
69 
MessageReceived(int32_t socket,const std::string & data)70 void SoftbusDistributedDataManager::MessageReceived(int32_t socket, const std::string &data)
71 {
72     if (peerSocketInfo.name != CONFIG_SOFTBUS_SESSION_TAG) {
73         SLOGE("onMessageReceived: the group id is not match the media session group. sessionName is %{public}s",
74             peerSocketInfo.name);
75         return;
76     }
77     if (isServer_) {
78         OnMessageHandleReceived(socket, data);
79     }
80 }
81 // LCOV_EXCL_STOP
82 
BytesReceived(int32_t socket,const std::string & data)83 void SoftbusDistributedDataManager::BytesReceived(int32_t socket, const std::string &data)
84 {
85     if (isServer_) {
86         if (peerSocketInfo.name != CONFIG_SOFTBUS_SESSION_TAG) {
87             SLOGE("onBytesReceived: the group id is not match the media session group. sessionName is %{public}s",
88                 peerSocketInfo.name);
89             return;
90         }
91         OnBytesServerReceived(socket, data);
92     } else {
93         OnBytesProxyReceived(socket, data);
94     }
95 }
96 
InitSessionServer(const std::string & pkg)97 void SoftbusDistributedDataManager::InitSessionServer(const std::string &pkg)
98 {
99     SLOGI("init session server...");
100     isServer_ = true;
101     std::lock_guard lockGuard(softbusDistributedDataLock_);
102     int32_t socket = SoftbusSessionManager::GetInstance().Socket(pkg);
103     mServerSocketMap_.insert({pkg, socket});
104 }
105 
CreateServer(const std::shared_ptr<SoftbusSessionServer> & server)106 void SoftbusDistributedDataManager::CreateServer(const std::shared_ptr<SoftbusSessionServer> &server)
107 {
108     if (server == nullptr) {
109         SLOGE("createServer fail for server is null.");
110         return;
111     }
112     int characteristic = server->GetCharacteristic();
113     std::lock_guard lockGuard(softbusDistributedDataLock_);
114     serverMap_.insert({ characteristic, server });
115 }
116 
CreateProxy(const std::shared_ptr<SoftbusSessionProxy> & proxy,const std::string & peerNetworkId,const std::string & packageName)117 bool SoftbusDistributedDataManager::CreateProxy(const std::shared_ptr<SoftbusSessionProxy> &proxy,
118     const std::string &peerNetworkId, const std::string &packageName)
119 {
120     SLOGI("create session proxy...");
121     if (proxy == nullptr || peerNetworkId.c_str() == nullptr || peerNetworkId.empty() ||
122         packageName.c_str() == nullptr || packageName.empty()) {
123         SLOGW("createProxy fail for params invalid.");
124         return false;
125     }
126     isServer_ = false;
127     std::lock_guard lockGuard(softbusDistributedDataLock_);
128     if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
129         int32_t socketId = mProxySocketMap_[peerNetworkId];
130         if (socketId > 0) {
131             std::string softbusNetworkId;
132             SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socketId, softbusNetworkId);
133             if (peerNetworkId == softbusNetworkId) {
134                 proxy->ConnectServer(socketId);
135                 return true;
136             }
137             mProxySocketMap_.erase(peerNetworkId);
138         }
139     }
140     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
141     int32_t characteristic = proxy->GetCharacteristic();
142     SLOGI("createProxy for device %{public}s characteristic:%{public}d",
143         anonymizeNetworkId.c_str(), characteristic);
144     std::map<int32_t, std::shared_ptr<SoftbusSessionProxy>> proxyMap;
145     if (mDeviceToProxyMap_.find(peerNetworkId) == mDeviceToProxyMap_.end()) {
146         proxyMap.insert({characteristic, proxy});
147         mDeviceToProxyMap_.insert({peerNetworkId, proxyMap});
148     } else {
149         proxyMap = mDeviceToProxyMap_[peerNetworkId];
150         proxyMap[characteristic] = proxy;
151     }
152     int32_t socketId = ConnectRemoteDevice(peerNetworkId, packageName, softbusLinkMaxRetryTimes);
153     if (socketId <= 0) {
154         proxyMap.erase(characteristic);
155         if (proxyMap.empty()) {
156             mDeviceToProxyMap_.erase(peerNetworkId);
157         }
158         SLOGI("createProxy failed for no remote device connected");
159         return false;
160     } else {
161         OnSessionProxyOpened(socketId);
162         return true;
163     }
164 }
165 
ReleaseProxy(const std::shared_ptr<SoftbusSessionProxy> & proxy,const std::string & peerNetworkId)166 bool SoftbusDistributedDataManager::ReleaseProxy(const std::shared_ptr<SoftbusSessionProxy> &proxy,
167     const std::string &peerNetworkId)
168 {
169     if (proxy == nullptr) {
170         SLOGE("ReleaseProxy fail for proxy is null.");
171         return true;
172     }
173     if (peerNetworkId.c_str() == nullptr || peerNetworkId.empty()) {
174         SLOGE("ReleaseProxy fail for networkid is invalid.");
175         return false;
176     }
177     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
178     SLOGI("ReleaseProxy for device %{public}s", anonymizeNetworkId.c_str());
179     std::lock_guard lockGuard(softbusDistributedDataLock_);
180     if (mDeviceToProxyMap_.find(peerNetworkId) == mDeviceToProxyMap_.end()) {
181         SLOGE("ReleaseProxy fail fo proxyMap is null");
182         return true;
183     }
184 
185     auto proxyMap = mDeviceToProxyMap_[peerNetworkId];
186     int32_t characteristic = proxy->GetCharacteristic();
187     if (proxyMap.find(characteristic) != proxyMap.end()) {
188         auto proxy = proxyMap[characteristic];
189         if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
190             int32_t socket = mProxySocketMap_[peerNetworkId];
191             proxy->DisconnectServer(socket);
192         }
193         proxyMap.erase(characteristic);
194     }
195     if (proxyMap.empty()) {
196         if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
197             int32_t socket = mProxySocketMap_[peerNetworkId];
198             OnSessionProxyClosed(socket);
199             SoftbusSessionManager::GetInstance().Shutdown(socket);
200         }
201         mDeviceToProxyMap_.erase(peerNetworkId);
202         return true;
203     }
204     return false;
205 }
206 
DestroySessionServer(const std::string & pkg)207 void SoftbusDistributedDataManager::DestroySessionServer(const std::string &pkg)
208 {
209     SLOGI("destroy session server...");
210     std::lock_guard lockGuard(softbusDistributedDataLock_);
211     if  (isServer_) {
212         for (auto it = serverMap_.begin(); it != serverMap_.end();) {
213             it->second->DisconnectAllProxy();
214             it = serverMap_.erase(it);
215         }
216         int32_t mSocket = mServerSocketMap_[pkg];
217         mServerSocketMap_.erase(pkg);
218         SoftbusSessionManager::GetInstance().Shutdown(mSocket);
219     } else {
220         for (auto proxyMap = mDeviceToProxyMap_.begin(); proxyMap != mDeviceToProxyMap_.end(); proxyMap++) {
221             for (auto it = proxyMap->second.begin(); it != proxyMap->second.end(); it++) {
222                 ReleaseProxy(it->second, proxyMap->first);
223             }
224         }
225     }
226 }
227 
ReleaseServer(const std::shared_ptr<SoftbusSessionServer> & server)228 void SoftbusDistributedDataManager::ReleaseServer(const std::shared_ptr<SoftbusSessionServer> &server)
229 {
230     CHECK_AND_RETURN_LOG(server != nullptr, "server is nullptr");
231     int characteristic = server->GetCharacteristic();
232     std::lock_guard lockGuard(softbusDistributedDataLock_);
233     auto iter = serverMap_.find(characteristic);
234     if (iter != serverMap_.end() && iter->second == server) {
235         server->DisconnectAllProxy();
236         serverMap_.erase(characteristic);
237     }
238 }
239 
240 // LCOV_EXCL_START
OnSessionServerOpened()241 void SoftbusDistributedDataManager::OnSessionServerOpened()
242 {
243     SLOGI("OnSessionServerOpened: the peer device id is %{public}s.",
244         SoftbusSessionUtils::AnonymizeDeviceId(peerSocketInfo.networkId).c_str());
245 }
246 
OnSessionServerClosed(int32_t socket)247 void SoftbusDistributedDataManager::OnSessionServerClosed(int32_t socket)
248 {
249     SLOGI("OnSessionServerClosed: the peer device id is %{public}s.",
250         SoftbusSessionUtils::AnonymizeDeviceId(peerSocketInfo.networkId).c_str());
251     std::lock_guard lockGuard(softbusDistributedDataLock_);
252     for (auto it = serverMap_.begin(); it != serverMap_.end(); it++) {
253         it->second->DisconnectProxy(socket);
254     }
255 }
256 
OnMessageHandleReceived(int32_t socket,const std::string & data)257 void SoftbusDistributedDataManager::OnMessageHandleReceived(int32_t socket, const std::string &data)
258 {
259     std::string deviceId = peerSocketInfo.networkId;
260     std::string anonymizeDeviceId = SoftbusSessionUtils::AnonymizeDeviceId(deviceId);
261     SLOGI("onMessageHandleReceived: %{public}s", anonymizeDeviceId.c_str());
262     if (data.length() > 1 && data[0] == MESSAGE_CODE_CONNECT_SERVER) {
263         std::lock_guard lockGuard(softbusDistributedDataLock_);
264         auto iter = serverMap_.find(data[1]);
265         if (iter == serverMap_.end()) {
266             SLOGE("onMessageHandleReceived: server is invalid deviceId %{public}s", anonymizeDeviceId.c_str());
267             return;
268         }
269         iter->second->ConnectProxy(socket);
270     }
271 }
272 
OnBytesServerReceived(int32_t socket,const std::string & data)273 void SoftbusDistributedDataManager::OnBytesServerReceived(int32_t socket, const std::string &data)
274 {
275     std::string deviceId = peerSocketInfo.networkId;
276     std::string anonymizeDeviceId = SoftbusSessionUtils::AnonymizeDeviceId(deviceId);
277     SLOGI("onBytesServerReceived: %{public}s", anonymizeDeviceId.c_str());
278     if (data.length() > 0) {
279         std::lock_guard lockGuard(softbusDistributedDataLock_);
280         auto iter = serverMap_.find(data[0]);
281         if (iter == serverMap_.end()) {
282             SLOGE("onBytesServerReceived: server is invalid deviceId %{public}s", anonymizeDeviceId.c_str());
283             return;
284         }
285         if (data.length() > 1 && data[1] == MESSAGE_CODE_CONNECT_SERVER) {
286             iter->second->ConnectProxy(socket);
287             return;
288         }
289         iter->second->OnBytesReceived(deviceId, data);
290     }
291 }
292 
ConnectRemoteDevice(const std::string & peerNetworkId,const std::string & packageName,int32_t retryCount)293 int32_t SoftbusDistributedDataManager::ConnectRemoteDevice(const std::string &peerNetworkId,
294     const std::string &packageName, int32_t retryCount)
295 {
296     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
297     SLOGI("ConnectRemoteDevice remote device %{public}s, retryCount: %{public}d",
298         anonymizeNetworkId.c_str(), retryCount);
299     if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
300         SLOGI("%{public}s is connected, no need to connect.", anonymizeNetworkId.c_str());
301         return mProxySocketMap_[peerNetworkId];
302     }
303     int32_t socket = SoftbusSessionManager::GetInstance().Bind(peerNetworkId, packageName);
304     if (socket <= 0 && retryCount > 0) {
305         std::this_thread::sleep_for(std::chrono::milliseconds(retryIntervalTime));
306         socket = ConnectRemoteDevice(peerNetworkId, packageName, retryCount - 1);
307     }
308     return socket;
309 }
310 
OnSessionProxyOpened(int32_t socket)311 void SoftbusDistributedDataManager::OnSessionProxyOpened(int32_t socket)
312 {
313     std::string softbusNetworkId;
314     int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, softbusNetworkId);
315     if (result < 0) {
316         SLOGE("OnSessionProxyOpened: get softbus peer network id failed.");
317         return;
318     }
319     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(softbusNetworkId);
320     if (mProxySocketMap_.find(softbusNetworkId) == mProxySocketMap_.end()) {
321         mProxySocketMap_.insert({softbusNetworkId, socket});
322     } else {
323         SLOGI("OnSessionProxyOpened: session exit, no need to add for %{public}s", anonymizeNetworkId.c_str());
324     }
325     if (mDeviceToProxyMap_.find(softbusNetworkId) == mDeviceToProxyMap_.end()) {
326         return;
327     }
328     std::map<int32_t, std::shared_ptr<SoftbusSessionProxy>> proxyMap = mDeviceToProxyMap_[softbusNetworkId];
329     for (auto it = proxyMap.begin(); it != proxyMap.end(); it++) {
330         it->second->ConnectServer(socket);
331     }
332 }
333 
OnSessionProxyClosed(int32_t socket)334 void SoftbusDistributedDataManager::OnSessionProxyClosed(int32_t socket)
335 {
336     std::string networkId;
337     int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, networkId);
338     if (result < 0) {
339         SLOGE("OnSessionProxyClosed: get softbus peer network id failed.");
340         return;
341     }
342     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(networkId);
343     SLOGI("OnSessionProxyClosed: the peer network id is %{public}s.", anonymizeNetworkId.c_str());
344     std::lock_guard lockGuard(softbusDistributedDataLock_);
345     mDeviceToProxyMap_.erase(networkId);
346     if (mDeviceToProxyMap_.find(networkId) != mDeviceToProxyMap_.end()) {
347         SLOGW("OnSessionProxyClosed: found no socket for device %{public}s", anonymizeNetworkId.c_str());
348         return;
349     }
350     auto proxyMap = mDeviceToProxyMap_[networkId];
351     if (proxyMap.empty()) {
352         return;
353     }
354     for (auto it = proxyMap.begin(); it != proxyMap.end(); it++) {
355         it->second->DisconnectServer(socket);
356     }
357 }
358 
OnBytesProxyReceived(int32_t socket,const std::string & data)359 void SoftbusDistributedDataManager::OnBytesProxyReceived(int32_t socket, const std::string &data)
360 {
361     if (data.length() <= 0) {
362         SLOGE("OnBytesProxyReceived invalid data.");
363         return;
364     }
365     std::string networkId;
366     int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, networkId);
367     if (result < 0) {
368         SLOGE("OnBytesProxyReceived: get no softbus peer network");
369         return;
370     }
371     std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(networkId);
372     SLOGI("OnBytesProxyReceived: %{public}s.", anonymizeNetworkId.c_str());
373     std::lock_guard lockGuard(softbusDistributedDataLock_);
374     if (mDeviceToProxyMap_.find(networkId) == mDeviceToProxyMap_.end()) {
375         SLOGW("OnBytesProxyReceived no proxy for the device");
376         return;
377     }
378     auto proxyMap = mDeviceToProxyMap_[networkId];
379     auto iter = proxyMap.find(data[0]);
380     if (iter == proxyMap.end()) {
381         SLOGE("OnBytesProxyReceived: found no match characteristic");
382         return;
383     }
384     iter->second->OnBytesReceived(networkId, data);
385 }
386 // LCOV_EXCL_STOP
387 } // namespace OHOS::AVSession