1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "softbus_distributed_data_manager.h"
17
18 #include <thread>
19 #include <chrono>
20 #include "avsession_log.h"
21 #include "avsession_errors.h"
22 #include "migrate_avsession_constant.h"
23 #include "softbus_session_utils.h"
24
25
26 namespace OHOS::AVSession {
SoftbusDistributedDataManager()27 SoftbusDistributedDataManager::SoftbusDistributedDataManager() {}
28
~SoftbusDistributedDataManager()29 SoftbusDistributedDataManager::~SoftbusDistributedDataManager() {}
30
31 // LCOV_EXCL_START
Init()32 void SoftbusDistributedDataManager::Init()
33 {
34 std::weak_ptr<SoftbusDistributedDataManager> managerWeak(shared_from_this());
35 std::lock_guard lockGuard(softbusDistributedDataLock_);
36 ssListener_ = std::make_shared<SSListener>(managerWeak);
37 SoftbusSessionManager::GetInstance().AddSessionListener(ssListener_);
38 }
39
SessionOpened(int32_t socket,PeerSocketInfo info)40 void SoftbusDistributedDataManager::SessionOpened(int32_t socket, PeerSocketInfo info)
41 {
42 std::string sessionName = info.name;
43 peerSocketInfo.name = info.name;
44 peerSocketInfo.networkId = info.networkId;
45 peerSocketInfo.pkgName = info.pkgName;
46 peerSocketInfo.dataType = info.dataType;
47 if (sessionName != CONFIG_SOFTBUS_SESSION_TAG) {
48 SLOGE("onSessionOpened: the group id is not match the media session group. sessionName is %{public}s",
49 sessionName.c_str());
50 return;
51 }
52 if (isServer_) {
53 OnSessionServerOpened();
54 }
55 }
56
SessionClosed(int32_t socket)57 void SoftbusDistributedDataManager::SessionClosed(int32_t socket)
58 {
59 if (isServer_ && peerSocketInfo.name != CONFIG_SOFTBUS_SESSION_TAG) {
60 SLOGE("onSessionClosed: the group id is not match the media session group.");
61 return;
62 }
63 if (isServer_) {
64 OnSessionServerClosed(socket);
65 } else {
66 OnSessionProxyClosed(socket);
67 }
68 }
69
MessageReceived(int32_t socket,const std::string & data)70 void SoftbusDistributedDataManager::MessageReceived(int32_t socket, const std::string &data)
71 {
72 if (peerSocketInfo.name != CONFIG_SOFTBUS_SESSION_TAG) {
73 SLOGE("onMessageReceived: the group id is not match the media session group. sessionName is %{public}s",
74 peerSocketInfo.name);
75 return;
76 }
77 if (isServer_) {
78 OnMessageHandleReceived(socket, data);
79 }
80 }
81 // LCOV_EXCL_STOP
82
BytesReceived(int32_t socket,const std::string & data)83 void SoftbusDistributedDataManager::BytesReceived(int32_t socket, const std::string &data)
84 {
85 if (isServer_) {
86 if (peerSocketInfo.name != CONFIG_SOFTBUS_SESSION_TAG) {
87 SLOGE("onBytesReceived: the group id is not match the media session group. sessionName is %{public}s",
88 peerSocketInfo.name);
89 return;
90 }
91 OnBytesServerReceived(socket, data);
92 } else {
93 OnBytesProxyReceived(socket, data);
94 }
95 }
96
InitSessionServer(const std::string & pkg)97 void SoftbusDistributedDataManager::InitSessionServer(const std::string &pkg)
98 {
99 SLOGI("init session server...");
100 isServer_ = true;
101 std::lock_guard lockGuard(softbusDistributedDataLock_);
102 int32_t socket = SoftbusSessionManager::GetInstance().Socket(pkg);
103 mServerSocketMap_.insert({pkg, socket});
104 }
105
CreateServer(const std::shared_ptr<SoftbusSessionServer> & server)106 void SoftbusDistributedDataManager::CreateServer(const std::shared_ptr<SoftbusSessionServer> &server)
107 {
108 if (server == nullptr) {
109 SLOGE("createServer fail for server is null.");
110 return;
111 }
112 int characteristic = server->GetCharacteristic();
113 std::lock_guard lockGuard(softbusDistributedDataLock_);
114 serverMap_.insert({ characteristic, server });
115 }
116
CreateProxy(const std::shared_ptr<SoftbusSessionProxy> & proxy,const std::string & peerNetworkId,const std::string & packageName)117 bool SoftbusDistributedDataManager::CreateProxy(const std::shared_ptr<SoftbusSessionProxy> &proxy,
118 const std::string &peerNetworkId, const std::string &packageName)
119 {
120 SLOGI("create session proxy...");
121 if (proxy == nullptr || peerNetworkId.c_str() == nullptr || peerNetworkId.empty() ||
122 packageName.c_str() == nullptr || packageName.empty()) {
123 SLOGW("createProxy fail for params invalid.");
124 return false;
125 }
126 isServer_ = false;
127 std::lock_guard lockGuard(softbusDistributedDataLock_);
128 if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
129 int32_t socketId = mProxySocketMap_[peerNetworkId];
130 if (socketId > 0) {
131 std::string softbusNetworkId;
132 SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socketId, softbusNetworkId);
133 if (peerNetworkId == softbusNetworkId) {
134 proxy->ConnectServer(socketId);
135 return true;
136 }
137 mProxySocketMap_.erase(peerNetworkId);
138 }
139 }
140 std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
141 int32_t characteristic = proxy->GetCharacteristic();
142 SLOGI("createProxy for device %{public}s characteristic:%{public}d",
143 anonymizeNetworkId.c_str(), characteristic);
144 std::map<int32_t, std::shared_ptr<SoftbusSessionProxy>> proxyMap;
145 if (mDeviceToProxyMap_.find(peerNetworkId) == mDeviceToProxyMap_.end()) {
146 proxyMap.insert({characteristic, proxy});
147 mDeviceToProxyMap_.insert({peerNetworkId, proxyMap});
148 } else {
149 proxyMap = mDeviceToProxyMap_[peerNetworkId];
150 proxyMap[characteristic] = proxy;
151 }
152 int32_t socketId = ConnectRemoteDevice(peerNetworkId, packageName, softbusLinkMaxRetryTimes);
153 if (socketId <= 0) {
154 proxyMap.erase(characteristic);
155 if (proxyMap.empty()) {
156 mDeviceToProxyMap_.erase(peerNetworkId);
157 }
158 SLOGI("createProxy failed for no remote device connected");
159 return false;
160 } else {
161 OnSessionProxyOpened(socketId);
162 return true;
163 }
164 }
165
ReleaseProxy(const std::shared_ptr<SoftbusSessionProxy> & proxy,const std::string & peerNetworkId)166 bool SoftbusDistributedDataManager::ReleaseProxy(const std::shared_ptr<SoftbusSessionProxy> &proxy,
167 const std::string &peerNetworkId)
168 {
169 if (proxy == nullptr) {
170 SLOGE("ReleaseProxy fail for proxy is null.");
171 return true;
172 }
173 if (peerNetworkId.c_str() == nullptr || peerNetworkId.empty()) {
174 SLOGE("ReleaseProxy fail for networkid is invalid.");
175 return false;
176 }
177 std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
178 SLOGI("ReleaseProxy for device %{public}s", anonymizeNetworkId.c_str());
179 std::lock_guard lockGuard(softbusDistributedDataLock_);
180 if (mDeviceToProxyMap_.find(peerNetworkId) == mDeviceToProxyMap_.end()) {
181 SLOGE("ReleaseProxy fail fo proxyMap is null");
182 return true;
183 }
184
185 auto proxyMap = mDeviceToProxyMap_[peerNetworkId];
186 int32_t characteristic = proxy->GetCharacteristic();
187 if (proxyMap.find(characteristic) != proxyMap.end()) {
188 auto proxy = proxyMap[characteristic];
189 if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
190 int32_t socket = mProxySocketMap_[peerNetworkId];
191 proxy->DisconnectServer(socket);
192 }
193 proxyMap.erase(characteristic);
194 }
195 if (proxyMap.empty()) {
196 if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
197 int32_t socket = mProxySocketMap_[peerNetworkId];
198 OnSessionProxyClosed(socket);
199 SoftbusSessionManager::GetInstance().Shutdown(socket);
200 }
201 mDeviceToProxyMap_.erase(peerNetworkId);
202 return true;
203 }
204 return false;
205 }
206
DestroySessionServer(const std::string & pkg)207 void SoftbusDistributedDataManager::DestroySessionServer(const std::string &pkg)
208 {
209 SLOGI("destroy session server...");
210 std::lock_guard lockGuard(softbusDistributedDataLock_);
211 if (isServer_) {
212 for (auto it = serverMap_.begin(); it != serverMap_.end();) {
213 it->second->DisconnectAllProxy();
214 it = serverMap_.erase(it);
215 }
216 int32_t mSocket = mServerSocketMap_[pkg];
217 mServerSocketMap_.erase(pkg);
218 SoftbusSessionManager::GetInstance().Shutdown(mSocket);
219 } else {
220 for (auto proxyMap = mDeviceToProxyMap_.begin(); proxyMap != mDeviceToProxyMap_.end(); proxyMap++) {
221 for (auto it = proxyMap->second.begin(); it != proxyMap->second.end(); it++) {
222 ReleaseProxy(it->second, proxyMap->first);
223 }
224 }
225 }
226 }
227
ReleaseServer(const std::shared_ptr<SoftbusSessionServer> & server)228 void SoftbusDistributedDataManager::ReleaseServer(const std::shared_ptr<SoftbusSessionServer> &server)
229 {
230 CHECK_AND_RETURN_LOG(server != nullptr, "server is nullptr");
231 int characteristic = server->GetCharacteristic();
232 std::lock_guard lockGuard(softbusDistributedDataLock_);
233 auto iter = serverMap_.find(characteristic);
234 if (iter != serverMap_.end() && iter->second == server) {
235 server->DisconnectAllProxy();
236 serverMap_.erase(characteristic);
237 }
238 }
239
240 // LCOV_EXCL_START
OnSessionServerOpened()241 void SoftbusDistributedDataManager::OnSessionServerOpened()
242 {
243 SLOGI("OnSessionServerOpened: the peer device id is %{public}s.",
244 SoftbusSessionUtils::AnonymizeDeviceId(peerSocketInfo.networkId).c_str());
245 }
246
OnSessionServerClosed(int32_t socket)247 void SoftbusDistributedDataManager::OnSessionServerClosed(int32_t socket)
248 {
249 SLOGI("OnSessionServerClosed: the peer device id is %{public}s.",
250 SoftbusSessionUtils::AnonymizeDeviceId(peerSocketInfo.networkId).c_str());
251 std::lock_guard lockGuard(softbusDistributedDataLock_);
252 for (auto it = serverMap_.begin(); it != serverMap_.end(); it++) {
253 it->second->DisconnectProxy(socket);
254 }
255 }
256
OnMessageHandleReceived(int32_t socket,const std::string & data)257 void SoftbusDistributedDataManager::OnMessageHandleReceived(int32_t socket, const std::string &data)
258 {
259 std::string deviceId = peerSocketInfo.networkId;
260 std::string anonymizeDeviceId = SoftbusSessionUtils::AnonymizeDeviceId(deviceId);
261 SLOGI("onMessageHandleReceived: %{public}s", anonymizeDeviceId.c_str());
262 if (data.length() > 1 && data[0] == MESSAGE_CODE_CONNECT_SERVER) {
263 std::lock_guard lockGuard(softbusDistributedDataLock_);
264 auto iter = serverMap_.find(data[1]);
265 if (iter == serverMap_.end()) {
266 SLOGE("onMessageHandleReceived: server is invalid deviceId %{public}s", anonymizeDeviceId.c_str());
267 return;
268 }
269 iter->second->ConnectProxy(socket);
270 }
271 }
272
OnBytesServerReceived(int32_t socket,const std::string & data)273 void SoftbusDistributedDataManager::OnBytesServerReceived(int32_t socket, const std::string &data)
274 {
275 std::string deviceId = peerSocketInfo.networkId;
276 std::string anonymizeDeviceId = SoftbusSessionUtils::AnonymizeDeviceId(deviceId);
277 SLOGI("onBytesServerReceived: %{public}s", anonymizeDeviceId.c_str());
278 if (data.length() > 0) {
279 std::lock_guard lockGuard(softbusDistributedDataLock_);
280 auto iter = serverMap_.find(data[0]);
281 if (iter == serverMap_.end()) {
282 SLOGE("onBytesServerReceived: server is invalid deviceId %{public}s", anonymizeDeviceId.c_str());
283 return;
284 }
285 if (data.length() > 1 && data[1] == MESSAGE_CODE_CONNECT_SERVER) {
286 iter->second->ConnectProxy(socket);
287 return;
288 }
289 iter->second->OnBytesReceived(deviceId, data);
290 }
291 }
292
ConnectRemoteDevice(const std::string & peerNetworkId,const std::string & packageName,int32_t retryCount)293 int32_t SoftbusDistributedDataManager::ConnectRemoteDevice(const std::string &peerNetworkId,
294 const std::string &packageName, int32_t retryCount)
295 {
296 std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(peerNetworkId);
297 SLOGI("ConnectRemoteDevice remote device %{public}s, retryCount: %{public}d",
298 anonymizeNetworkId.c_str(), retryCount);
299 if (mProxySocketMap_.find(peerNetworkId) != mProxySocketMap_.end()) {
300 SLOGI("%{public}s is connected, no need to connect.", anonymizeNetworkId.c_str());
301 return mProxySocketMap_[peerNetworkId];
302 }
303 int32_t socket = SoftbusSessionManager::GetInstance().Bind(peerNetworkId, packageName);
304 if (socket <= 0 && retryCount > 0) {
305 std::this_thread::sleep_for(std::chrono::milliseconds(retryIntervalTime));
306 socket = ConnectRemoteDevice(peerNetworkId, packageName, retryCount - 1);
307 }
308 return socket;
309 }
310
OnSessionProxyOpened(int32_t socket)311 void SoftbusDistributedDataManager::OnSessionProxyOpened(int32_t socket)
312 {
313 std::string softbusNetworkId;
314 int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, softbusNetworkId);
315 if (result < 0) {
316 SLOGE("OnSessionProxyOpened: get softbus peer network id failed.");
317 return;
318 }
319 std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(softbusNetworkId);
320 if (mProxySocketMap_.find(softbusNetworkId) == mProxySocketMap_.end()) {
321 mProxySocketMap_.insert({softbusNetworkId, socket});
322 } else {
323 SLOGI("OnSessionProxyOpened: session exit, no need to add for %{public}s", anonymizeNetworkId.c_str());
324 }
325 if (mDeviceToProxyMap_.find(softbusNetworkId) == mDeviceToProxyMap_.end()) {
326 return;
327 }
328 std::map<int32_t, std::shared_ptr<SoftbusSessionProxy>> proxyMap = mDeviceToProxyMap_[softbusNetworkId];
329 for (auto it = proxyMap.begin(); it != proxyMap.end(); it++) {
330 it->second->ConnectServer(socket);
331 }
332 }
333
OnSessionProxyClosed(int32_t socket)334 void SoftbusDistributedDataManager::OnSessionProxyClosed(int32_t socket)
335 {
336 std::string networkId;
337 int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, networkId);
338 if (result < 0) {
339 SLOGE("OnSessionProxyClosed: get softbus peer network id failed.");
340 return;
341 }
342 std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(networkId);
343 SLOGI("OnSessionProxyClosed: the peer network id is %{public}s.", anonymizeNetworkId.c_str());
344 std::lock_guard lockGuard(softbusDistributedDataLock_);
345 mDeviceToProxyMap_.erase(networkId);
346 if (mDeviceToProxyMap_.find(networkId) != mDeviceToProxyMap_.end()) {
347 SLOGW("OnSessionProxyClosed: found no socket for device %{public}s", anonymizeNetworkId.c_str());
348 return;
349 }
350 auto proxyMap = mDeviceToProxyMap_[networkId];
351 if (proxyMap.empty()) {
352 return;
353 }
354 for (auto it = proxyMap.begin(); it != proxyMap.end(); it++) {
355 it->second->DisconnectServer(socket);
356 }
357 }
358
OnBytesProxyReceived(int32_t socket,const std::string & data)359 void SoftbusDistributedDataManager::OnBytesProxyReceived(int32_t socket, const std::string &data)
360 {
361 if (data.length() <= 0) {
362 SLOGE("OnBytesProxyReceived invalid data.");
363 return;
364 }
365 std::string networkId;
366 int32_t result = SoftbusSessionManager::GetInstance().ObtainPeerDeviceId(socket, networkId);
367 if (result < 0) {
368 SLOGE("OnBytesProxyReceived: get no softbus peer network");
369 return;
370 }
371 std::string anonymizeNetworkId = SoftbusSessionUtils::AnonymizeDeviceId(networkId);
372 SLOGI("OnBytesProxyReceived: %{public}s.", anonymizeNetworkId.c_str());
373 std::lock_guard lockGuard(softbusDistributedDataLock_);
374 if (mDeviceToProxyMap_.find(networkId) == mDeviceToProxyMap_.end()) {
375 SLOGW("OnBytesProxyReceived no proxy for the device");
376 return;
377 }
378 auto proxyMap = mDeviceToProxyMap_[networkId];
379 auto iter = proxyMap.find(data[0]);
380 if (iter == proxyMap.end()) {
381 SLOGE("OnBytesProxyReceived: found no match characteristic");
382 return;
383 }
384 iter->second->OnBytesReceived(networkId, data);
385 }
386 // LCOV_EXCL_STOP
387 } // namespace OHOS::AVSession