1 /*
2 * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "distributed_input_transport_base.h"
17
18 #include <algorithm>
19 #include <cstring>
20
21 #include "distributed_hardware_fwk_kit.h"
22 #include "ipc_skeleton.h"
23 #include "iservice_registry.h"
24 #include "system_ability_definition.h"
25
26 #include "constants_dinput.h"
27 #include "dinput_context.h"
28 #include "dinput_errcode.h"
29 #include "dinput_hitrace.h"
30 #include "dinput_log.h"
31 #include "dinput_softbus_define.h"
32 #include "dinput_utils_tool.h"
33 #include "distributed_input_inject.h"
34 #include "hidumper.h"
35
36 #ifndef COMPILE_TEST_MODE
37 #include "session.h"
38 #else
39 #include "session_mock.h"
40 #endif
41
42 #ifndef COMPILE_TEST_MODE
43 #include "softbus_bus_center.h"
44 #else
45 #include "softbus_bus_center_mock.h"
46 #endif
47
48 #include "softbus_common.h"
49
50 namespace OHOS {
51 namespace DistributedHardware {
52 namespace DistributedInput {
53 const int32_t DINPUT_LINK_TYPE_MAX = 4;
54 static SessionAttribute g_sessionAttr = {
55 .dataType = SessionType::TYPE_BYTES,
56 .linkTypeNum = DINPUT_LINK_TYPE_MAX,
57 .linkType = {
58 LINK_TYPE_WIFI_P2P,
59 LINK_TYPE_WIFI_WLAN_2G,
60 LINK_TYPE_WIFI_WLAN_5G,
61 LINK_TYPE_BR
62 }
63 };
64
GetInstance()65 DistributedInputTransportBase &DistributedInputTransportBase::GetInstance()
66 {
67 static DistributedInputTransportBase instance;
68 return instance;
69 }
70
~DistributedInputTransportBase()71 DistributedInputTransportBase::~DistributedInputTransportBase()
72 {
73 DHLOGI("Release Transport Session");
74 Release();
75 }
76
SessionOpened(int32_t sessionId,int32_t result)77 static int32_t SessionOpened(int32_t sessionId, int32_t result)
78 {
79 return DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionOpened(sessionId, result);
80 }
81
SessionClosed(int32_t sessionId)82 static void SessionClosed(int32_t sessionId)
83 {
84 DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionClosed(sessionId);
85 }
86
BytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)87 static void BytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
88 {
89 DistributedInput::DistributedInputTransportBase::GetInstance().OnBytesReceived(sessionId, data, dataLen);
90 }
91
MessageReceived(int32_t sessionId,const void * data,uint32_t dataLen)92 static void MessageReceived(int32_t sessionId, const void *data, uint32_t dataLen)
93 {
94 (void)sessionId;
95 (void)data;
96 (void)dataLen;
97 DHLOGI("sessionId: %d, dataLen:%d", sessionId, dataLen);
98 }
99
StreamReceived(int32_t sessionId,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)100 static void StreamReceived(int32_t sessionId, const StreamData *data, const StreamData *ext,
101 const StreamFrameInfo *param)
102 {
103 (void)sessionId;
104 (void)data;
105 (void)ext;
106 (void)param;
107 DHLOGI("sessionId: %d", sessionId);
108 }
109
Init()110 int32_t DistributedInputTransportBase::Init()
111 {
112 DHLOGI("Init Transport Base Session");
113 ISessionListener iSessionListener = {
114 .OnSessionOpened = SessionOpened,
115 .OnSessionClosed = SessionClosed,
116 .OnBytesReceived = BytesReceived,
117 .OnMessageReceived = MessageReceived,
118 .OnStreamReceived = StreamReceived
119 };
120
121 auto localNode = std::make_unique<NodeBasicInfo>();
122 int32_t retCode = GetLocalNodeDeviceInfo(DINPUT_PKG_NAME.c_str(), localNode.get());
123 if (retCode != DH_SUCCESS) {
124 DHLOGE("Init Could not get local device id.");
125 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
126 }
127 std::string networkId = localNode->networkId;
128 DHLOGI("Init device local networkId is %s", GetAnonyString(networkId).c_str());
129
130 std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
131 if (isSessSerCreateFlag_.load()) {
132 DHLOGI("SessionServer already create success.");
133 return DH_SUCCESS;
134 }
135 localSessionName_ = SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
136 int32_t ret = CreateSessionServer(DINPUT_PKG_NAME.c_str(), localSessionName_.c_str(), &iSessionListener);
137 if (ret != DH_SUCCESS) {
138 DHLOGE("Init CreateSessionServer failed, error code %d.", ret);
139 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
140 }
141 isSessSerCreateFlag_.store(true);
142 return DH_SUCCESS;
143 }
144
Release()145 void DistributedInputTransportBase::Release()
146 {
147 std::unique_lock<std::mutex> sessionLock(operationMutex_);
148 auto iter = remoteDevSessionMap_.begin();
149 for (; iter != remoteDevSessionMap_.end(); ++iter) {
150 CloseSession(iter->second);
151 }
152
153 {
154 std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
155 if (!isSessSerCreateFlag_.load()) {
156 DHLOGI("SessionServer already remove success.");
157 } else {
158 (void)RemoveSessionServer(DINPUT_PKG_NAME.c_str(), localSessionName_.c_str());
159 isSessSerCreateFlag_.store(false);
160 }
161 }
162 remoteDevSessionMap_.clear();
163 channelStatusMap_.clear();
164 }
165
CheckDeviceSessionState(const std::string & remoteDevId)166 int32_t DistributedInputTransportBase::CheckDeviceSessionState(const std::string &remoteDevId)
167 {
168 std::unique_lock<std::mutex> sessionLock(operationMutex_);
169 if (remoteDevSessionMap_.find(remoteDevId) == remoteDevSessionMap_.end()) {
170 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_DEVICE_SESSION_STATE;
171 }
172 DHLOGI("CheckDeviceSessionState has opened %s", GetAnonyString(remoteDevId).c_str());
173 return DH_SUCCESS;
174 }
175
GetDevIdBySessionId(int32_t sessionId)176 std::string DistributedInputTransportBase::GetDevIdBySessionId(int32_t sessionId)
177 {
178 std::unique_lock<std::mutex> sessionLock(operationMutex_);
179 for (auto iter = remoteDevSessionMap_.begin(); iter != remoteDevSessionMap_.end(); ++iter) {
180 if (iter->second == sessionId) {
181 return iter->first;
182 }
183 }
184 return "";
185 }
186
StartSession(const std::string & remoteDevId)187 int32_t DistributedInputTransportBase::StartSession(const std::string &remoteDevId)
188 {
189 int32_t ret = CheckDeviceSessionState(remoteDevId);
190 if (ret == DH_SUCCESS) {
191 DHLOGE("Softbus session has already opened, deviceId: %s", GetAnonyString(remoteDevId).c_str());
192 return DH_SUCCESS;
193 }
194
195 std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
196 DHLOGI("OpenInputSoftbus peerSessionName:%s", peerSessionName.c_str());
197
198 StartAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
199 int32_t sessionId = OpenSession(localSessionName_.c_str(), peerSessionName.c_str(), remoteDevId.c_str(),
200 GROUP_ID.c_str(), &g_sessionAttr);
201 if (sessionId < 0) {
202 DHLOGE("OpenSession fail, remoteDevId: %s, sessionId: %d", GetAnonyString(remoteDevId).c_str(), sessionId);
203 FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
204 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
205 }
206
207 HiDumper::GetInstance().CreateSessionInfo(remoteDevId, sessionId, localSessionName_, peerSessionName,
208 SessionStatus::OPENING);
209
210 DHLOGI("Wait for channel session opened.");
211 {
212 std::unique_lock<std::mutex> waitLock(operationMutex_);
213 auto status = openSessionWaitCond_.wait_for(waitLock, std::chrono::seconds(SESSION_WAIT_TIMEOUT_SECOND),
214 [this, remoteDevId] () { return channelStatusMap_[remoteDevId]; });
215 if (!status) {
216 DHLOGE("OpenSession timeout, remoteDevId: %s, sessionId: %d",
217 GetAnonyString(remoteDevId).c_str(), sessionId);
218 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_TIMEOUT;
219 }
220 }
221
222 DHLOGI("OpenSession success, remoteDevId:%s, sessionId: %d", GetAnonyString(remoteDevId).c_str(), sessionId);
223 sessionId_ = sessionId;
224
225 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
226 if (dhFwkKit != nullptr) {
227 DHLOGD("Enable low Latency!");
228 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
229 }
230
231 HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::OPENED);
232 return DH_SUCCESS;
233 }
234
GetCurrentSessionId()235 int32_t DistributedInputTransportBase::GetCurrentSessionId()
236 {
237 return sessionId_;
238 }
239
StopSession(const std::string & remoteDevId)240 void DistributedInputTransportBase::StopSession(const std::string &remoteDevId)
241 {
242 std::unique_lock<std::mutex> sessionLock(operationMutex_);
243
244 if (remoteDevSessionMap_.count(remoteDevId) == 0) {
245 DHLOGE("remoteDevSessionMap not find remoteDevId: %s", GetAnonyString(remoteDevId).c_str());
246 return;
247 }
248 int32_t sessionId = remoteDevSessionMap_[remoteDevId];
249
250 DHLOGI("RemoteDevId: %s, sessionId: %d", GetAnonyString(remoteDevId).c_str(), sessionId);
251 HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSING);
252 CloseSession(sessionId);
253 remoteDevSessionMap_.erase(remoteDevId);
254 channelStatusMap_.erase(remoteDevId);
255
256 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
257 if (dhFwkKit != nullptr) {
258 DHLOGD("Disable low Latency!");
259 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
260 }
261
262 HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSED);
263 HiDumper::GetInstance().DeleteSessionInfo(remoteDevId);
264 }
265
RegisterSrcHandleSessionCallback(std::shared_ptr<DInputTransbaseSourceCallback> callback)266 void DistributedInputTransportBase::RegisterSrcHandleSessionCallback(
267 std::shared_ptr<DInputTransbaseSourceCallback> callback)
268 {
269 DHLOGI("RegisterTransbaseSourceRespCallback");
270 srcCallback_ = callback;
271 }
272
RegisterSinkHandleSessionCallback(std::shared_ptr<DInputTransbaseSinkCallback> callback)273 void DistributedInputTransportBase::RegisterSinkHandleSessionCallback(
274 std::shared_ptr<DInputTransbaseSinkCallback> callback)
275 {
276 DHLOGI("RegisterTransbaseSinkRespCallback");
277 sinkCallback_ = callback;
278 }
279
CountSession(const std::string & remoteDevId)280 int32_t DistributedInputTransportBase::CountSession(const std::string &remoteDevId)
281 {
282 return remoteDevSessionMap_.count(remoteDevId);
283 }
284
EraseSessionId(const std::string & remoteDevId)285 void DistributedInputTransportBase::EraseSessionId(const std::string &remoteDevId)
286 {
287 remoteDevSessionMap_.erase(remoteDevId);
288 }
289
OnSessionOpened(int32_t sessionId,int32_t result)290 int32_t DistributedInputTransportBase::OnSessionOpened(int32_t sessionId, int32_t result)
291 {
292 FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
293 if (result != DH_SUCCESS) {
294 std::string deviceId = GetDevIdBySessionId(sessionId);
295 DHLOGE("session open failed, sessionId: %d, result:%d, deviceId:%s", sessionId, result,
296 GetAnonyString(deviceId).c_str());
297 std::unique_lock<std::mutex> sessionLock(operationMutex_);
298 if (CountSession(deviceId) > 0) {
299 EraseSessionId(deviceId);
300 }
301 return DH_SUCCESS;
302 }
303
304 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
305 if (dhFwkKit != nullptr) {
306 DHLOGD("Enable low Latency!");
307 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
308 }
309
310 char mySessionName[SESSION_NAME_SIZE_MAX] = {0};
311 char peerSessionName[SESSION_NAME_SIZE_MAX] = {0};
312 char peerDevId[DEVICE_ID_SIZE_MAX] = {0};
313 int32_t ret = GetMySessionName(sessionId, mySessionName, sizeof(mySessionName));
314 if (ret != DH_SUCCESS) {
315 DHLOGE("get my session name failed, session id is %d", sessionId);
316 }
317 ret = GetPeerSessionName(sessionId, peerSessionName, sizeof(peerSessionName));
318 if (ret != DH_SUCCESS) {
319 DHLOGE("get peer session name failed, session id is %d", sessionId);
320 }
321 ret = GetPeerDeviceId(sessionId, peerDevId, sizeof(peerDevId));
322 if (ret != DH_SUCCESS) {
323 DHLOGE("get peer device id failed, session id is %d", sessionId);
324 }
325
326 {
327 std::unique_lock<std::mutex> sessionLock(operationMutex_);
328 remoteDevSessionMap_[peerDevId] = sessionId;
329 }
330
331 int32_t sessionSide = GetSessionSide(sessionId);
332 DHLOGI("session open succeed, sessionId: %d, sessionSide:%d(1 is "
333 "client side), deviceId:%s", sessionId, sessionSide, GetAnonyString(peerDevId).c_str());
334
335 DHLOGI("mySessionName:%s, peerSessionName:%s, peerDevId:%s",
336 mySessionName, peerSessionName, GetAnonyString(peerDevId).c_str());
337 {
338 std::lock_guard<std::mutex> notifyLock(operationMutex_);
339 channelStatusMap_[peerDevId] = true;
340 openSessionWaitCond_.notify_all();
341 }
342
343 return DH_SUCCESS;
344 }
345
OnSessionClosed(int32_t sessionId)346 void DistributedInputTransportBase::OnSessionClosed(int32_t sessionId)
347 {
348 std::string deviceId = GetDevIdBySessionId(sessionId);
349 DHLOGI("OnSessionClosed, sessionId: %d, deviceId:%s", sessionId, GetAnonyString(deviceId).c_str());
350
351 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
352 if (dhFwkKit != nullptr) {
353 DHLOGD("Disable low Latency!");
354 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
355 }
356
357 {
358 std::unique_lock<std::mutex> sessionLock(operationMutex_);
359 if (CountSession(deviceId) > 0) {
360 EraseSessionId(deviceId);
361 }
362 channelStatusMap_.erase(deviceId);
363
364 if (sinkCallback_ == nullptr) {
365 DHLOGE("sinkCallback is nullptr.");
366 return;
367 }
368 sinkCallback_->NotifySessionClosed(sessionId);
369
370 if (srcCallback_ == nullptr) {
371 DHLOGE("srcCallback is nullptr.");
372 return;
373 }
374 srcCallback_->NotifySessionClosed();
375 }
376 }
377
CheckRecivedData(const std::string & message)378 bool DistributedInputTransportBase::CheckRecivedData(const std::string &message)
379 {
380 nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
381 if (recMsg.is_discarded()) {
382 DHLOGE("OnBytesReceived jsonStr error.");
383 return false;
384 }
385
386 if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
387 DHLOGE("The key is invalid.");
388 return false;
389 }
390
391 return true;
392 }
393
OnBytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)394 void DistributedInputTransportBase::OnBytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
395 {
396 DHLOGI("OnBytesReceived, sessionId: %d, dataLen:%d", sessionId, dataLen);
397 if (sessionId < 0 || data == nullptr || dataLen <= 0) {
398 DHLOGE("OnBytesReceived param check failed");
399 return;
400 }
401
402 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
403 if (buf == nullptr) {
404 DHLOGE("OnBytesReceived: malloc memory failed");
405 return;
406 }
407
408 if (memcpy_s(buf, dataLen + 1, reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
409 DHLOGE("OnBytesReceived: memcpy memory failed");
410 free(buf);
411 return;
412 }
413
414 std::string message(buf, buf + dataLen);
415 DHLOGI("OnBytesReceived message:%s.", SetAnonyId(message).c_str());
416 HandleSession(sessionId, message);
417
418 free(buf);
419 DHLOGI("OnBytesReceived completed");
420 return;
421 }
422
HandleSession(int32_t sessionId,const std::string & message)423 void DistributedInputTransportBase::HandleSession(int32_t sessionId, const std::string &message)
424 {
425 if (CheckRecivedData(message) != true) {
426 return;
427 }
428 nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
429 if (recMsg.is_discarded()) {
430 DHLOGE("recMsg parse failed!");
431 return;
432 }
433 if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
434 DHLOGE("softbus cmd key is invalid");
435 return;
436 }
437 uint32_t cmdType = recMsg[DINPUT_SOFTBUS_KEY_CMD_TYPE];
438 DHLOGI("HandleSession cmdType %u.", cmdType);
439 if (cmdType < TRANS_MSG_SRC_SINK_SPLIT) {
440 if (srcCallback_ == nullptr) {
441 DHLOGE("srcCallback is nullptr.");
442 return;
443 }
444 DHLOGI("HandleSession to source.");
445 srcCallback_->HandleSessionData(sessionId, message);
446 return;
447 }
448 if (cmdType > TRANS_MSG_SRC_SINK_SPLIT) {
449 if (sinkCallback_ == nullptr) {
450 DHLOGE("sinkCallback is nullptr.");
451 return;
452 }
453 DHLOGI("HandleSession to sink.");
454 sinkCallback_->HandleSessionData(sessionId, message);
455 }
456 }
457
SendMsg(int32_t sessionId,std::string & message)458 int32_t DistributedInputTransportBase::SendMsg(int32_t sessionId, std::string &message)
459 {
460 DHLOGD("start SendMsg");
461 if (message.size() > MSG_MAX_SIZE) {
462 DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE");
463 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
464 }
465 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((MSG_MAX_SIZE), sizeof(uint8_t)));
466 if (buf == nullptr) {
467 DHLOGE("SendMsg: malloc memory failed");
468 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
469 }
470 int32_t outLen = 0;
471 if (memcpy_s(buf, MSG_MAX_SIZE, reinterpret_cast<const uint8_t *>(message.c_str()), message.size()) != EOK) {
472 DHLOGE("SendMsg: memcpy memory failed");
473 free(buf);
474 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
475 }
476 outLen = static_cast<int32_t>(message.size());
477 int32_t ret = SendBytes(sessionId, buf, outLen);
478 free(buf);
479 return ret;
480 }
481
GetSessionIdByDevId(const std::string & srcId)482 int32_t DistributedInputTransportBase::GetSessionIdByDevId(const std::string &srcId)
483 {
484 std::map<std::string, int32_t>::iterator it = remoteDevSessionMap_.find(srcId);
485 if (it != remoteDevSessionMap_.end()) {
486 return it->second;
487 }
488 DHLOGE("get session id failed, srcId = %s", GetAnonyString(srcId).c_str());
489 return ERR_DH_INPUT_SERVER_SINK_TRANSPORT_GET_SESSIONID_FAIL;
490 }
491 } // namespace DistributedInput
492 } // namespace DistributedHardware
493 } // namespace OHOS