1 /*
2 * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "distributed_input_transport_base.h"
17
18 #include <algorithm>
19 #include <cstring>
20
21 #include "distributed_hardware_fwk_kit.h"
22 #include "ipc_skeleton.h"
23 #include "iservice_registry.h"
24 #include "system_ability_definition.h"
25
26 #include "constants_dinput.h"
27 #include "dinput_context.h"
28 #include "dinput_errcode.h"
29 #include "dinput_hitrace.h"
30 #include "dinput_log.h"
31 #include "dinput_softbus_define.h"
32 #include "dinput_utils_tool.h"
33 #include "hidumper.h"
34
35 #include "softbus_common.h"
36 #include "softbus_permission_check.h"
37
38 namespace OHOS {
39 namespace DistributedHardware {
40 namespace DistributedInput {
41 namespace {
42 const int32_t SESSION_STATUS_OPENED = 0;
43 const int32_t SESSION_STATUS_CLOSED = 1;
44 static QosTV g_qosInfo[] = {
45 { .qos = QOS_TYPE_MIN_BW, .value = 80 * 1024 * 1024},
46 { .qos = QOS_TYPE_MAX_LATENCY, .value = 8000 },
47 { .qos = QOS_TYPE_MIN_LATENCY, .value = 2000 }
48 };
49 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(g_qosInfo[0]));
50 }
51 IMPLEMENT_SINGLE_INSTANCE(DistributedInputTransportBase);
~DistributedInputTransportBase()52 DistributedInputTransportBase::~DistributedInputTransportBase()
53 {
54 DHLOGI("Release Transport Session");
55 Release();
56 }
57
OnBind(int32_t socket,PeerSocketInfo info)58 void OnBind(int32_t socket, PeerSocketInfo info)
59 {
60 DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionOpened(socket, info);
61 }
62
OnShutdown(int32_t socket,ShutdownReason reason)63 void OnShutdown(int32_t socket, ShutdownReason reason)
64 {
65 DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionClosed(socket, reason);
66 }
67
OnBytes(int32_t socket,const void * data,uint32_t dataLen)68 void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
69 {
70 DistributedInput::DistributedInputTransportBase::GetInstance().OnBytesReceived(socket, data, dataLen);
71 }
72
OnMessage(int32_t socket,const void * data,uint32_t dataLen)73 void OnMessage(int32_t socket, const void *data, uint32_t dataLen)
74 {
75 (void)socket;
76 (void)data;
77 (void)dataLen;
78 DHLOGI("socket: %{public}d, dataLen:%{public}d", socket, dataLen);
79 }
80
OnStream(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)81 void OnStream(int32_t socket, const StreamData *data, const StreamData *ext,
82 const StreamFrameInfo *param)
83 {
84 (void)socket;
85 (void)data;
86 (void)ext;
87 (void)param;
88 DHLOGI("socket: %{public}d", socket);
89 }
90
OnFile(int32_t socket,FileEvent * event)91 void OnFile(int32_t socket, FileEvent *event)
92 {
93 (void)event;
94 DHLOGI("socket: %{public}d", socket);
95 }
96
OnQos(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)97 void OnQos(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
98 {
99 DHLOGI("OnQos, socket: %{public}d, QoSEvent: %{public}d, qosCount: %{public}u", socket, (int32_t)eventId, qosCount);
100 for (uint32_t idx = 0; idx < qosCount; idx++) {
101 DHLOGI("QosTV: type: %{public}d, value: %{public}d", (int32_t)qos[idx].qos, qos[idx].value);
102 }
103 }
104
OnNegotiate2(int32_t socket,PeerSocketInfo info,SocketAccessInfo * peerInfo,SocketAccessInfo * localInfo)105 static bool OnNegotiate2(int32_t socket, PeerSocketInfo info, SocketAccessInfo *peerInfo, SocketAccessInfo *localInfo)
106 {
107 return DistributedInputTransportBase::GetInstance().OnNegotiate2(socket, info, peerInfo, localInfo);
108 }
109
110 ISocketListener iSocketListener = {
111 .OnBind = OnBind,
112 .OnShutdown = OnShutdown,
113 .OnBytes = OnBytes,
114 .OnMessage = OnMessage,
115 .OnStream = OnStream,
116 .OnFile = OnFile,
117 .OnQos = OnQos,
118 .OnNegotiate2 = OnNegotiate2
119 };
120
Init()121 int32_t DistributedInputTransportBase::Init()
122 {
123 DHLOGI("Init Transport Base Session");
124 std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
125 if (isSessSerCreateFlag_.load()) {
126 DHLOGI("SessionServer already create success.");
127 return DH_SUCCESS;
128 }
129 int32_t socket = CreateServerSocket();
130 if (socket < DH_SUCCESS) {
131 DHLOGE("CreateServerSocket failed, ret: %{public}d", socket);
132 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
133 }
134
135 int32_t ret = Listen(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
136 if (ret != DH_SUCCESS) {
137 DHLOGE("Socket Listen failed, error code %{public}d.", ret);
138 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
139 }
140 isSessSerCreateFlag_.store(true);
141 localServerSocket_ = socket;
142 DHLOGI("Finish Init DSoftBus Server Socket, socket: %{public}d", socket);
143 return DH_SUCCESS;
144 }
145
CreateServerSocket()146 int32_t DistributedInputTransportBase::CreateServerSocket()
147 {
148 DHLOGI("CreateServerSocket start");
149 auto localNode = std::make_unique<NodeBasicInfo>();
150 int32_t retCode = GetLocalNodeDeviceInfo(DINPUT_PKG_NAME.c_str(), localNode.get());
151 if (retCode != DH_SUCCESS) {
152 DHLOGE("Init Could not get local device id.");
153 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
154 }
155 std::string networkId = localNode->networkId;
156 localSessionName_ = SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
157 DHLOGI("CreateServerSocket local networkId is %{public}s, local socketName: %{public}s",
158 GetAnonyString(networkId).c_str(), localSessionName_.c_str());
159 SocketInfo info = {
160 .name = const_cast<char*>(localSessionName_.c_str()),
161 .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
162 .dataType = DATA_TYPE_BYTES
163 };
164 int32_t socket = Socket(info);
165 DHLOGI("CreateServerSocket Finish, socket: %{public}d", socket);
166 return socket;
167 }
168
Release()169 void DistributedInputTransportBase::Release()
170 {
171 std::unique_lock<std::mutex> sessionLock(operationMutex_);
172 auto iter = remoteDevSessionMap_.begin();
173 for (; iter != remoteDevSessionMap_.end(); ++iter) {
174 DHLOGI("Shutdown client socket: %{public}d to remote dev: %{public}s", iter->second,
175 GetAnonyString(iter->first).c_str());
176 Shutdown(iter->second);
177 }
178
179 {
180 std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
181 if (!isSessSerCreateFlag_.load()) {
182 DHLOGI("DSoftBus Server Socket already remove success.");
183 } else {
184 DHLOGI("Shutdown DSoftBus Server Socket, socket: %{public}d", localServerSocket_.load());
185 Shutdown(localServerSocket_.load());
186 localServerSocket_ = -1;
187 isSessSerCreateFlag_.store(false);
188 }
189 }
190 remoteDevSessionMap_.clear();
191 channelStatusMap_.clear();
192 }
193
CheckDeviceSessionState(const std::string & remoteDevId)194 int32_t DistributedInputTransportBase::CheckDeviceSessionState(const std::string &remoteDevId)
195 {
196 std::unique_lock<std::mutex> sessionLock(operationMutex_);
197 if (remoteDevSessionMap_.find(remoteDevId) == remoteDevSessionMap_.end()) {
198 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_DEVICE_SESSION_STATE;
199 }
200 DHLOGI("CheckDeviceSessionState has opened, remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
201 return DH_SUCCESS;
202 }
203
GetDevIdBySessionId(int32_t sessionId)204 std::string DistributedInputTransportBase::GetDevIdBySessionId(int32_t sessionId)
205 {
206 std::unique_lock<std::mutex> sessionLock(operationMutex_);
207 for (auto iter = remoteDevSessionMap_.begin(); iter != remoteDevSessionMap_.end(); ++iter) {
208 if (iter->second == sessionId) {
209 return iter->first;
210 }
211 }
212 return "";
213 }
214
CreateClientSocket(const std::string & remoteDevId)215 int32_t DistributedInputTransportBase::CreateClientSocket(const std::string &remoteDevId)
216 {
217 DHLOGI("CreateClientSocket start, peerNetworkId: %{public}s", GetAnonyString(remoteDevId).c_str());
218 std::string localSesionName = localSessionName_ + "_" + std::to_string(GetCurrentTimeUs());
219 std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
220 SocketInfo info = {
221 .name = const_cast<char*>(localSesionName.c_str()),
222 .peerName = const_cast<char*>(peerSessionName.c_str()),
223 .peerNetworkId = const_cast<char*>(remoteDevId.c_str()),
224 .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
225 .dataType = DATA_TYPE_BYTES
226 };
227 int32_t socket = Socket(info);
228 DHLOGI("Bind Socket server, socket: %{public}d, localSessionName: %{public}s, peerSessionName: %{public}s",
229 socket, localSesionName.c_str(), peerSessionName.c_str());
230 return socket;
231 }
232
StartSession(const std::string & remoteDevId)233 int32_t DistributedInputTransportBase::StartSession(const std::string &remoteDevId)
234 {
235 int32_t ret = CheckDeviceSessionState(remoteDevId);
236 if (ret == DH_SUCCESS) {
237 DHLOGE("Softbus session has already opened, deviceId: %{public}s", GetAnonyString(remoteDevId).c_str());
238 return DH_SUCCESS;
239 }
240 if (!SoftBusPermissionCheck::CheckSrcPermission(remoteDevId)) {
241 DHLOGE("Permission denied");
242 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_PERMISSION_DENIED;
243 }
244
245 int socket = CreateClientSocket(remoteDevId);
246 if (socket < DH_SUCCESS) {
247 DHLOGE("StartSession failed, ret: %{public}d", socket);
248 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
249 }
250 if (!SoftBusPermissionCheck::SetAccessInfoToSocket(socket)) {
251 DHLOGW("Fill and set accessInfo failed");
252 Shutdown(socket);
253 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_CONTEXT;
254 }
255 StartAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
256 ret = Bind(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
257 if (ret < DH_SUCCESS) {
258 DHLOGE("OpenSession fail, remoteDevId: %{public}s, socket: %{public}d", GetAnonyString(remoteDevId).c_str(),
259 socket);
260 FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
261 Shutdown(socket);
262 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
263 }
264
265 std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
266 HiDumper::GetInstance().CreateSessionInfo(remoteDevId, socket, localSessionName_, peerSessionName,
267 SessionStatus::OPENED);
268 DHLOGI("OpenSession success, remoteDevId:%{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(),
269 socket);
270 sessionId_ = socket;
271
272 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
273 if (dhFwkKit != nullptr) {
274 DHLOGD("Enable low Latency!");
275 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
276 }
277
278 PeerSocketInfo peerSocketInfo = {
279 .name = const_cast<char*>(peerSessionName.c_str()),
280 .networkId = const_cast<char*>(remoteDevId.c_str()),
281 .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
282 .dataType = DATA_TYPE_BYTES
283 };
284 OnSessionOpened(socket, peerSocketInfo);
285 return DH_SUCCESS;
286 }
287
GetCurrentSessionId()288 int32_t DistributedInputTransportBase::GetCurrentSessionId()
289 {
290 return sessionId_;
291 }
292
StopAllSession()293 void DistributedInputTransportBase::StopAllSession()
294 {
295 std::map<std::string, int32_t> remoteDevSessions;
296 {
297 std::unique_lock<std::mutex> sessionLock(operationMutex_);
298 std::for_each(remoteDevSessionMap_.begin(), remoteDevSessionMap_.end(),
299 [&remoteDevSessions] (const std::pair<std::string, int32_t> &pair) {
300 remoteDevSessions[pair.first] = pair.second;
301 });
302 }
303
304 std::for_each(remoteDevSessions.begin(), remoteDevSessions.end(),
305 [this](const std::pair<std::string, int32_t> &pair) {
306 StopSession(pair.first);
307 });
308 }
309
StopSession(const std::string & remoteDevId)310 void DistributedInputTransportBase::StopSession(const std::string &remoteDevId)
311 {
312 std::unique_lock<std::mutex> sessionLock(operationMutex_);
313 if (remoteDevSessionMap_.count(remoteDevId) == 0) {
314 DHLOGE("remoteDevSessionMap not find remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
315 return;
316 }
317 int32_t sessionId = remoteDevSessionMap_[remoteDevId];
318
319 DHLOGI("RemoteDevId: %{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(), sessionId);
320 HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSING);
321 Shutdown(sessionId);
322 remoteDevSessionMap_.erase(remoteDevId);
323 channelStatusMap_.erase(remoteDevId);
324
325 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
326 if (dhFwkKit != nullptr) {
327 DHLOGD("Disable low Latency!");
328 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
329 }
330
331 HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSED);
332 HiDumper::GetInstance().DeleteSessionInfo(remoteDevId);
333 }
334
RegisterSrcHandleSessionCallback(std::shared_ptr<DInputTransbaseSourceCallback> callback)335 void DistributedInputTransportBase::RegisterSrcHandleSessionCallback(
336 std::shared_ptr<DInputTransbaseSourceCallback> callback)
337 {
338 DHLOGI("RegisterSrcHandleSessionCallback");
339 srcCallback_ = callback;
340 }
341
RegisterSinkHandleSessionCallback(std::shared_ptr<DInputTransbaseSinkCallback> callback)342 void DistributedInputTransportBase::RegisterSinkHandleSessionCallback(
343 std::shared_ptr<DInputTransbaseSinkCallback> callback)
344 {
345 DHLOGI("RegisterSinkHandleSessionCallback");
346 sinkCallback_ = callback;
347 }
348
RegisterSourceManagerCallback(std::shared_ptr<DInputSourceManagerCallback> callback)349 void DistributedInputTransportBase::RegisterSourceManagerCallback(
350 std::shared_ptr<DInputSourceManagerCallback> callback)
351 {
352 DHLOGI("RegisterSourceManagerCallback");
353 srcMgrCallback_ = callback;
354 }
355
RegisterSinkManagerCallback(std::shared_ptr<DInputSinkManagerCallback> callback)356 void DistributedInputTransportBase::RegisterSinkManagerCallback(
357 std::shared_ptr<DInputSinkManagerCallback> callback)
358 {
359 DHLOGI("RegisterSinkManagerCallback");
360 sinkMgrCallback_ = callback;
361 }
362
RegisterSessionStateCb(sptr<ISessionStateCallback> callback)363 void DistributedInputTransportBase::RegisterSessionStateCb(sptr<ISessionStateCallback> callback)
364 {
365 DHLOGI("RegisterSessionStateCb");
366 SessionStateCallback_ = callback;
367 }
368
UnregisterSessionStateCb()369 void DistributedInputTransportBase::UnregisterSessionStateCb()
370 {
371 DHLOGI("UnregisterSessionStateCb");
372 SessionStateCallback_ = nullptr;
373 }
374
RunSessionStateCallback(const std::string & remoteDevId,const uint32_t sessionState)375 void DistributedInputTransportBase::RunSessionStateCallback(const std::string &remoteDevId,
376 const uint32_t sessionState)
377 {
378 DHLOGI("RunSessionStateCallback start.");
379 if (SessionStateCallback_ != nullptr) {
380 SessionStateCallback_->OnResult(remoteDevId, sessionState);
381 return;
382 }
383 DHLOGE("RunSessionStateCallback SessionStateCallback_ is null.");
384 }
385
CountSession(const std::string & remoteDevId)386 int32_t DistributedInputTransportBase::CountSession(const std::string &remoteDevId)
387 {
388 return remoteDevSessionMap_.count(remoteDevId);
389 }
390
EraseSessionId(const std::string & remoteDevId)391 void DistributedInputTransportBase::EraseSessionId(const std::string &remoteDevId)
392 {
393 remoteDevSessionMap_.erase(remoteDevId);
394 }
395
OnSessionOpened(int32_t sessionId,const PeerSocketInfo & info)396 int32_t DistributedInputTransportBase::OnSessionOpened(int32_t sessionId, const PeerSocketInfo &info)
397 {
398 std::string peerDevId;
399 peerDevId.assign(info.networkId);
400 DHLOGI("OnSessionOpened, socket: %{public}d, peerSocketName: %{public}s, peerNetworkId: %{public}s, "
401 "peerPkgName: %{public}s", sessionId, info.name, GetAnonyString(peerDevId).c_str(), info.pkgName);
402 FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
403
404 {
405 std::unique_lock<std::mutex> sessionLock(operationMutex_);
406 remoteDevSessionMap_[peerDevId] = sessionId;
407 channelStatusMap_[peerDevId] = true;
408 }
409 RunSessionStateCallback(peerDevId, SESSION_STATUS_OPENED);
410 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
411 if (dhFwkKit != nullptr) {
412 DHLOGD("Enable low Latency!");
413 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
414 }
415 DHLOGI("OnSessionOpened finish");
416 return DH_SUCCESS;
417 }
418
OnSessionClosed(int32_t sessionId,ShutdownReason reason)419 void DistributedInputTransportBase::OnSessionClosed(int32_t sessionId, ShutdownReason reason)
420 {
421 DHLOGI("OnSessionClosed, socket: %{public}d, reason: %{public}d", sessionId, (int32_t)reason);
422 std::string deviceId = GetDevIdBySessionId(sessionId);
423 DHLOGI("OnSessionClosed notify session closed, sessionId: %{public}d, peer deviceId:%{public}s",
424 sessionId, GetAnonyString(deviceId).c_str());
425 RunSessionStateCallback(deviceId, SESSION_STATUS_CLOSED);
426
427 {
428 std::unique_lock<std::mutex> sessionLock(operationMutex_);
429 if (CountSession(deviceId) > 0) {
430 EraseSessionId(deviceId);
431 }
432 channelStatusMap_.erase(deviceId);
433
434 if (sinkCallback_ == nullptr) {
435 DHLOGE("sinkCallback is nullptr.");
436 return;
437 }
438 sinkCallback_->NotifySessionClosed(sessionId);
439
440 if (srcCallback_ == nullptr) {
441 DHLOGE("srcCallback is nullptr.");
442 return;
443 }
444 srcCallback_->NotifySessionClosed();
445
446 if (srcMgrCallback_ == nullptr) {
447 DHLOGE("srcMgrCallback is nullptr.");
448 return;
449 }
450 srcMgrCallback_->ResetSrcMgrResStatus();
451
452 if (sinkMgrCallback_ == nullptr) {
453 DHLOGE("sinkMgrCallback is nullptr.");
454 return;
455 }
456 sinkMgrCallback_->ResetSinkMgrResStatus();
457 }
458
459 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
460 if (dhFwkKit != nullptr) {
461 DHLOGD("Disable low Latency!");
462 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
463 }
464 DHLOGI("OnSessionClosed finish");
465 }
466
CheckRecivedData(const std::string & message)467 bool DistributedInputTransportBase::CheckRecivedData(const std::string &message)
468 {
469 nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
470 if (recMsg.is_discarded()) {
471 DHLOGE("OnBytesReceived jsonStr error.");
472 return false;
473 }
474
475 if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
476 DHLOGE("The key is invalid.");
477 return false;
478 }
479
480 return true;
481 }
482
OnBytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)483 void DistributedInputTransportBase::OnBytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
484 {
485 if (sessionId < 0 || data == nullptr || dataLen == 0 || dataLen > MSG_MAX_SIZE) {
486 DHLOGE("OnBytesReceived param check failed");
487 return;
488 }
489
490 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
491 if (buf == nullptr) {
492 DHLOGE("OnBytesReceived: malloc memory failed");
493 return;
494 }
495
496 if (memcpy_s(buf, dataLen + 1, reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
497 DHLOGE("OnBytesReceived: memcpy memory failed");
498 free(buf);
499 return;
500 }
501
502 std::string message(buf, buf + dataLen);
503 HandleSession(sessionId, message);
504
505 free(buf);
506 return;
507 }
508
HandleSession(int32_t sessionId,const std::string & message)509 void DistributedInputTransportBase::HandleSession(int32_t sessionId, const std::string &message)
510 {
511 if (CheckRecivedData(message) != true) {
512 return;
513 }
514 nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
515 if (recMsg.is_discarded()) {
516 DHLOGE("recMsg parse failed!");
517 return;
518 }
519 if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
520 DHLOGE("softbus cmd key is invalid");
521 return;
522 }
523 uint32_t cmdType = recMsg[DINPUT_SOFTBUS_KEY_CMD_TYPE];
524 if (cmdType < TRANS_MSG_SRC_SINK_SPLIT) {
525 if (srcCallback_ == nullptr) {
526 DHLOGE("srcCallback is nullptr.");
527 return;
528 }
529 srcCallback_->HandleSessionData(sessionId, message);
530 return;
531 }
532 if (cmdType > TRANS_MSG_SRC_SINK_SPLIT) {
533 if (sinkCallback_ == nullptr) {
534 DHLOGE("sinkCallback is nullptr.");
535 return;
536 }
537 sinkCallback_->HandleSessionData(sessionId, message);
538 }
539 }
540
SendMsg(int32_t sessionId,std::string & message)541 int32_t DistributedInputTransportBase::SendMsg(int32_t sessionId, std::string &message)
542 {
543 if (message.size() > MSG_MAX_SIZE) {
544 DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE, msg size: %{public}zu", message.size());
545 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
546 }
547 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((MSG_MAX_SIZE), sizeof(uint8_t)));
548 if (buf == nullptr) {
549 DHLOGE("SendMsg: malloc memory failed");
550 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
551 }
552 int32_t outLen = 0;
553 if (memcpy_s(buf, MSG_MAX_SIZE, reinterpret_cast<const uint8_t *>(message.c_str()), message.size()) != EOK) {
554 DHLOGE("SendMsg: memcpy memory failed");
555 free(buf);
556 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
557 }
558 outLen = static_cast<int32_t>(message.size());
559 int32_t ret = SendBytes(sessionId, buf, outLen);
560 free(buf);
561 return ret;
562 }
563
GetSessionIdByDevId(const std::string & srcId)564 int32_t DistributedInputTransportBase::GetSessionIdByDevId(const std::string &srcId)
565 {
566 std::unique_lock<std::mutex> sessionLock(operationMutex_);
567 std::map<std::string, int32_t>::iterator it = remoteDevSessionMap_.find(srcId);
568 if (it != remoteDevSessionMap_.end()) {
569 return it->second;
570 }
571 DHLOGE("get session id failed, srcId = %{public}s", GetAnonyString(srcId).c_str());
572 return ERR_DH_INPUT_SERVER_SINK_TRANSPORT_GET_SESSIONID_FAIL;
573 }
574
OnNegotiate2(int32_t socket,PeerSocketInfo info,SocketAccessInfo * peerInfo,SocketAccessInfo * localInfo)575 bool DistributedInputTransportBase::OnNegotiate2(int32_t socket, PeerSocketInfo info, SocketAccessInfo *peerInfo,
576 SocketAccessInfo *localInfo)
577 {
578 if (peerInfo == nullptr) {
579 DHLOGE("peerInfo is nullptr. sink must be old version");
580 return true;
581 }
582
583 AccountInfo callerAccountInfo;
584 std::string networkId = info.networkId;
585 if (!SoftBusPermissionCheck::TransCallerInfo(peerInfo, callerAccountInfo, networkId)) {
586 DHLOGE("extraAccessInfo is nullptr.");
587 return false;
588 }
589 if (!SoftBusPermissionCheck::FillLocalInfo(localInfo)) {
590 DHLOGE("FillLocalInfo failed.");
591 return false;
592 }
593 return SoftBusPermissionCheck::CheckSinkPermission(callerAccountInfo);
594 }
595 } // namespace DistributedInput
596 } // namespace DistributedHardware
597 } // namespace OHOS