• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "distributed_input_transport_base.h"
17 
18 #include <algorithm>
19 #include <cstring>
20 
21 #include "distributed_hardware_fwk_kit.h"
22 #include "ipc_skeleton.h"
23 #include "iservice_registry.h"
24 #include "system_ability_definition.h"
25 
26 #include "constants_dinput.h"
27 #include "dinput_context.h"
28 #include "dinput_errcode.h"
29 #include "dinput_hitrace.h"
30 #include "dinput_log.h"
31 #include "dinput_softbus_define.h"
32 #include "dinput_utils_tool.h"
33 #include "hidumper.h"
34 
35 #include "softbus_common.h"
36 #include "softbus_permission_check.h"
37 
38 namespace OHOS {
39 namespace DistributedHardware {
40 namespace DistributedInput {
41 namespace {
42 const int32_t SESSION_STATUS_OPENED = 0;
43 const int32_t SESSION_STATUS_CLOSED = 1;
44 static QosTV g_qosInfo[] = {
45     { .qos = QOS_TYPE_MIN_BW, .value = 80 * 1024 * 1024},
46     { .qos = QOS_TYPE_MAX_LATENCY, .value = 8000 },
47     { .qos = QOS_TYPE_MIN_LATENCY, .value = 2000 }
48 };
49 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(g_qosInfo[0]));
50 }
51 IMPLEMENT_SINGLE_INSTANCE(DistributedInputTransportBase);
~DistributedInputTransportBase()52 DistributedInputTransportBase::~DistributedInputTransportBase()
53 {
54     DHLOGI("Release Transport Session");
55     Release();
56 }
57 
OnBind(int32_t socket,PeerSocketInfo info)58 void OnBind(int32_t socket, PeerSocketInfo info)
59 {
60     DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionOpened(socket, info);
61 }
62 
OnShutdown(int32_t socket,ShutdownReason reason)63 void OnShutdown(int32_t socket, ShutdownReason reason)
64 {
65     DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionClosed(socket, reason);
66 }
67 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)68 void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
69 {
70     DistributedInput::DistributedInputTransportBase::GetInstance().OnBytesReceived(socket, data, dataLen);
71 }
72 
OnMessage(int32_t socket,const void * data,uint32_t dataLen)73 void OnMessage(int32_t socket, const void *data, uint32_t dataLen)
74 {
75     (void)socket;
76     (void)data;
77     (void)dataLen;
78     DHLOGI("socket: %{public}d, dataLen:%{public}d", socket, dataLen);
79 }
80 
OnStream(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)81 void OnStream(int32_t socket, const StreamData *data, const StreamData *ext,
82     const StreamFrameInfo *param)
83 {
84     (void)socket;
85     (void)data;
86     (void)ext;
87     (void)param;
88     DHLOGI("socket: %{public}d", socket);
89 }
90 
OnFile(int32_t socket,FileEvent * event)91 void OnFile(int32_t socket, FileEvent *event)
92 {
93     (void)event;
94     DHLOGI("socket: %{public}d", socket);
95 }
96 
OnQos(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)97 void OnQos(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
98 {
99     DHLOGI("OnQos, socket: %{public}d, QoSEvent: %{public}d, qosCount: %{public}u", socket, (int32_t)eventId, qosCount);
100     for (uint32_t idx = 0; idx < qosCount; idx++) {
101         DHLOGI("QosTV: type: %{public}d, value: %{public}d", (int32_t)qos[idx].qos, qos[idx].value);
102     }
103 }
104 
OnNegotiate2(int32_t socket,PeerSocketInfo info,SocketAccessInfo * peerInfo,SocketAccessInfo * localInfo)105 static bool OnNegotiate2(int32_t socket, PeerSocketInfo info, SocketAccessInfo *peerInfo, SocketAccessInfo *localInfo)
106 {
107     return DistributedInputTransportBase::GetInstance().OnNegotiate2(socket, info, peerInfo, localInfo);
108 }
109 
110 ISocketListener iSocketListener = {
111     .OnBind = OnBind,
112     .OnShutdown = OnShutdown,
113     .OnBytes = OnBytes,
114     .OnMessage = OnMessage,
115     .OnStream = OnStream,
116     .OnFile = OnFile,
117     .OnQos = OnQos,
118     .OnNegotiate2 = OnNegotiate2
119 };
120 
Init()121 int32_t DistributedInputTransportBase::Init()
122 {
123     DHLOGI("Init Transport Base Session");
124     std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
125     if (isSessSerCreateFlag_.load()) {
126         DHLOGI("SessionServer already create success.");
127         return DH_SUCCESS;
128     }
129     int32_t socket = CreateServerSocket();
130     if (socket < DH_SUCCESS) {
131         DHLOGE("CreateServerSocket failed, ret: %{public}d", socket);
132         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
133     }
134 
135     int32_t ret = Listen(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
136     if (ret != DH_SUCCESS) {
137         DHLOGE("Socket Listen failed, error code %{public}d.", ret);
138         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
139     }
140     isSessSerCreateFlag_.store(true);
141     localServerSocket_ = socket;
142     DHLOGI("Finish Init DSoftBus Server Socket, socket: %{public}d", socket);
143     return DH_SUCCESS;
144 }
145 
CreateServerSocket()146 int32_t DistributedInputTransportBase::CreateServerSocket()
147 {
148     DHLOGI("CreateServerSocket start");
149     auto localNode = std::make_unique<NodeBasicInfo>();
150     int32_t retCode = GetLocalNodeDeviceInfo(DINPUT_PKG_NAME.c_str(), localNode.get());
151     if (retCode != DH_SUCCESS) {
152         DHLOGE("Init Could not get local device id.");
153         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
154     }
155     std::string networkId = localNode->networkId;
156     localSessionName_ = SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
157     DHLOGI("CreateServerSocket local networkId is %{public}s, local socketName: %{public}s",
158         GetAnonyString(networkId).c_str(), localSessionName_.c_str());
159     SocketInfo info = {
160         .name = const_cast<char*>(localSessionName_.c_str()),
161         .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
162         .dataType = DATA_TYPE_BYTES
163     };
164     int32_t socket = Socket(info);
165     DHLOGI("CreateServerSocket Finish, socket: %{public}d", socket);
166     return socket;
167 }
168 
Release()169 void DistributedInputTransportBase::Release()
170 {
171     std::unique_lock<std::mutex> sessionLock(operationMutex_);
172     auto iter = remoteDevSessionMap_.begin();
173     for (; iter != remoteDevSessionMap_.end(); ++iter) {
174         DHLOGI("Shutdown client socket: %{public}d to remote dev: %{public}s", iter->second,
175             GetAnonyString(iter->first).c_str());
176         Shutdown(iter->second);
177     }
178 
179     {
180         std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
181         if (!isSessSerCreateFlag_.load()) {
182             DHLOGI("DSoftBus Server Socket already remove success.");
183         } else {
184             DHLOGI("Shutdown DSoftBus Server Socket, socket: %{public}d", localServerSocket_.load());
185             Shutdown(localServerSocket_.load());
186             localServerSocket_ = -1;
187             isSessSerCreateFlag_.store(false);
188         }
189     }
190     remoteDevSessionMap_.clear();
191     channelStatusMap_.clear();
192 }
193 
CheckDeviceSessionState(const std::string & remoteDevId)194 int32_t DistributedInputTransportBase::CheckDeviceSessionState(const std::string &remoteDevId)
195 {
196     std::unique_lock<std::mutex> sessionLock(operationMutex_);
197     if (remoteDevSessionMap_.find(remoteDevId) == remoteDevSessionMap_.end()) {
198         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_DEVICE_SESSION_STATE;
199     }
200     DHLOGI("CheckDeviceSessionState has opened, remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
201     return DH_SUCCESS;
202 }
203 
GetDevIdBySessionId(int32_t sessionId)204 std::string DistributedInputTransportBase::GetDevIdBySessionId(int32_t sessionId)
205 {
206     std::unique_lock<std::mutex> sessionLock(operationMutex_);
207     for (auto iter = remoteDevSessionMap_.begin(); iter != remoteDevSessionMap_.end(); ++iter) {
208         if (iter->second == sessionId) {
209             return iter->first;
210         }
211     }
212     return "";
213 }
214 
CreateClientSocket(const std::string & remoteDevId)215 int32_t DistributedInputTransportBase::CreateClientSocket(const std::string &remoteDevId)
216 {
217     DHLOGI("CreateClientSocket start, peerNetworkId: %{public}s", GetAnonyString(remoteDevId).c_str());
218     std::string localSesionName = localSessionName_ + "_" + std::to_string(GetCurrentTimeUs());
219     std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
220     SocketInfo info = {
221         .name = const_cast<char*>(localSesionName.c_str()),
222         .peerName = const_cast<char*>(peerSessionName.c_str()),
223         .peerNetworkId = const_cast<char*>(remoteDevId.c_str()),
224         .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
225         .dataType = DATA_TYPE_BYTES
226     };
227     int32_t socket = Socket(info);
228     DHLOGI("Bind Socket server, socket: %{public}d, localSessionName: %{public}s, peerSessionName: %{public}s",
229         socket, localSesionName.c_str(), peerSessionName.c_str());
230     return socket;
231 }
232 
StartSession(const std::string & remoteDevId)233 int32_t DistributedInputTransportBase::StartSession(const std::string &remoteDevId)
234 {
235     int32_t ret = CheckDeviceSessionState(remoteDevId);
236     if (ret == DH_SUCCESS) {
237         DHLOGE("Softbus session has already opened, deviceId: %{public}s", GetAnonyString(remoteDevId).c_str());
238         return DH_SUCCESS;
239     }
240     if (!SoftBusPermissionCheck::CheckSrcPermission(remoteDevId)) {
241         DHLOGE("Permission denied");
242         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_PERMISSION_DENIED;
243     }
244 
245     int socket = CreateClientSocket(remoteDevId);
246     if (socket < DH_SUCCESS) {
247         DHLOGE("StartSession failed, ret: %{public}d", socket);
248         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
249     }
250     if (!SoftBusPermissionCheck::SetAccessInfoToSocket(socket)) {
251         DHLOGW("Fill and set accessInfo failed");
252         Shutdown(socket);
253         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_CONTEXT;
254     }
255     StartAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
256     ret = Bind(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
257     if (ret < DH_SUCCESS) {
258         DHLOGE("OpenSession fail, remoteDevId: %{public}s, socket: %{public}d", GetAnonyString(remoteDevId).c_str(),
259             socket);
260         FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
261         Shutdown(socket);
262         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
263     }
264 
265     std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
266     HiDumper::GetInstance().CreateSessionInfo(remoteDevId, socket, localSessionName_, peerSessionName,
267         SessionStatus::OPENED);
268     DHLOGI("OpenSession success, remoteDevId:%{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(),
269         socket);
270     sessionId_ = socket;
271 
272     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
273     if (dhFwkKit != nullptr) {
274         DHLOGD("Enable low Latency!");
275         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
276     }
277 
278     PeerSocketInfo peerSocketInfo = {
279         .name = const_cast<char*>(peerSessionName.c_str()),
280         .networkId = const_cast<char*>(remoteDevId.c_str()),
281         .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
282         .dataType = DATA_TYPE_BYTES
283     };
284     OnSessionOpened(socket, peerSocketInfo);
285     return DH_SUCCESS;
286 }
287 
GetCurrentSessionId()288 int32_t DistributedInputTransportBase::GetCurrentSessionId()
289 {
290     return sessionId_;
291 }
292 
StopAllSession()293 void DistributedInputTransportBase::StopAllSession()
294 {
295     std::map<std::string, int32_t> remoteDevSessions;
296     {
297         std::unique_lock<std::mutex> sessionLock(operationMutex_);
298         std::for_each(remoteDevSessionMap_.begin(), remoteDevSessionMap_.end(),
299             [&remoteDevSessions] (const std::pair<std::string, int32_t> &pair) {
300             remoteDevSessions[pair.first] = pair.second;
301         });
302     }
303 
304     std::for_each(remoteDevSessions.begin(), remoteDevSessions.end(),
305         [this](const std::pair<std::string, int32_t> &pair) {
306         StopSession(pair.first);
307     });
308 }
309 
StopSession(const std::string & remoteDevId)310 void DistributedInputTransportBase::StopSession(const std::string &remoteDevId)
311 {
312     std::unique_lock<std::mutex> sessionLock(operationMutex_);
313     if (remoteDevSessionMap_.count(remoteDevId) == 0) {
314         DHLOGE("remoteDevSessionMap not find remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
315         return;
316     }
317     int32_t sessionId = remoteDevSessionMap_[remoteDevId];
318 
319     DHLOGI("RemoteDevId: %{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(), sessionId);
320     HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSING);
321     Shutdown(sessionId);
322     remoteDevSessionMap_.erase(remoteDevId);
323     channelStatusMap_.erase(remoteDevId);
324 
325     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
326     if (dhFwkKit != nullptr) {
327         DHLOGD("Disable low Latency!");
328         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
329     }
330 
331     HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSED);
332     HiDumper::GetInstance().DeleteSessionInfo(remoteDevId);
333 }
334 
RegisterSrcHandleSessionCallback(std::shared_ptr<DInputTransbaseSourceCallback> callback)335 void DistributedInputTransportBase::RegisterSrcHandleSessionCallback(
336     std::shared_ptr<DInputTransbaseSourceCallback> callback)
337 {
338     DHLOGI("RegisterSrcHandleSessionCallback");
339     srcCallback_ = callback;
340 }
341 
RegisterSinkHandleSessionCallback(std::shared_ptr<DInputTransbaseSinkCallback> callback)342 void DistributedInputTransportBase::RegisterSinkHandleSessionCallback(
343     std::shared_ptr<DInputTransbaseSinkCallback> callback)
344 {
345     DHLOGI("RegisterSinkHandleSessionCallback");
346     sinkCallback_ = callback;
347 }
348 
RegisterSourceManagerCallback(std::shared_ptr<DInputSourceManagerCallback> callback)349 void DistributedInputTransportBase::RegisterSourceManagerCallback(
350     std::shared_ptr<DInputSourceManagerCallback> callback)
351 {
352     DHLOGI("RegisterSourceManagerCallback");
353     srcMgrCallback_ = callback;
354 }
355 
RegisterSinkManagerCallback(std::shared_ptr<DInputSinkManagerCallback> callback)356 void DistributedInputTransportBase::RegisterSinkManagerCallback(
357     std::shared_ptr<DInputSinkManagerCallback> callback)
358 {
359     DHLOGI("RegisterSinkManagerCallback");
360     sinkMgrCallback_ = callback;
361 }
362 
RegisterSessionStateCb(sptr<ISessionStateCallback> callback)363 void DistributedInputTransportBase::RegisterSessionStateCb(sptr<ISessionStateCallback> callback)
364 {
365     DHLOGI("RegisterSessionStateCb");
366     SessionStateCallback_ = callback;
367 }
368 
UnregisterSessionStateCb()369 void DistributedInputTransportBase::UnregisterSessionStateCb()
370 {
371     DHLOGI("UnregisterSessionStateCb");
372     SessionStateCallback_ = nullptr;
373 }
374 
RunSessionStateCallback(const std::string & remoteDevId,const uint32_t sessionState)375 void DistributedInputTransportBase::RunSessionStateCallback(const std::string &remoteDevId,
376     const uint32_t sessionState)
377 {
378     DHLOGI("RunSessionStateCallback start.");
379     if (SessionStateCallback_ != nullptr) {
380         SessionStateCallback_->OnResult(remoteDevId, sessionState);
381         return;
382     }
383     DHLOGE("RunSessionStateCallback SessionStateCallback_ is null.");
384 }
385 
CountSession(const std::string & remoteDevId)386 int32_t DistributedInputTransportBase::CountSession(const std::string &remoteDevId)
387 {
388     return remoteDevSessionMap_.count(remoteDevId);
389 }
390 
EraseSessionId(const std::string & remoteDevId)391 void DistributedInputTransportBase::EraseSessionId(const std::string &remoteDevId)
392 {
393     remoteDevSessionMap_.erase(remoteDevId);
394 }
395 
OnSessionOpened(int32_t sessionId,const PeerSocketInfo & info)396 int32_t DistributedInputTransportBase::OnSessionOpened(int32_t sessionId, const PeerSocketInfo &info)
397 {
398     std::string peerDevId;
399     peerDevId.assign(info.networkId);
400     DHLOGI("OnSessionOpened, socket: %{public}d, peerSocketName: %{public}s, peerNetworkId: %{public}s, "
401         "peerPkgName: %{public}s", sessionId, info.name, GetAnonyString(peerDevId).c_str(), info.pkgName);
402     FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
403 
404     {
405         std::unique_lock<std::mutex> sessionLock(operationMutex_);
406         remoteDevSessionMap_[peerDevId] = sessionId;
407         channelStatusMap_[peerDevId] = true;
408     }
409     RunSessionStateCallback(peerDevId, SESSION_STATUS_OPENED);
410     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
411     if (dhFwkKit != nullptr) {
412         DHLOGD("Enable low Latency!");
413         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
414     }
415     DHLOGI("OnSessionOpened finish");
416     return DH_SUCCESS;
417 }
418 
OnSessionClosed(int32_t sessionId,ShutdownReason reason)419 void DistributedInputTransportBase::OnSessionClosed(int32_t sessionId, ShutdownReason reason)
420 {
421     DHLOGI("OnSessionClosed, socket: %{public}d, reason: %{public}d", sessionId, (int32_t)reason);
422     std::string deviceId = GetDevIdBySessionId(sessionId);
423     DHLOGI("OnSessionClosed notify session closed, sessionId: %{public}d, peer deviceId:%{public}s",
424         sessionId, GetAnonyString(deviceId).c_str());
425     RunSessionStateCallback(deviceId, SESSION_STATUS_CLOSED);
426 
427     {
428         std::unique_lock<std::mutex> sessionLock(operationMutex_);
429         if (CountSession(deviceId) > 0) {
430             EraseSessionId(deviceId);
431         }
432         channelStatusMap_.erase(deviceId);
433 
434         if (sinkCallback_ == nullptr) {
435             DHLOGE("sinkCallback is nullptr.");
436             return;
437         }
438         sinkCallback_->NotifySessionClosed(sessionId);
439 
440         if (srcCallback_ == nullptr) {
441             DHLOGE("srcCallback is nullptr.");
442             return;
443         }
444         srcCallback_->NotifySessionClosed();
445 
446         if (srcMgrCallback_ == nullptr) {
447             DHLOGE("srcMgrCallback is nullptr.");
448             return;
449         }
450         srcMgrCallback_->ResetSrcMgrResStatus();
451 
452         if (sinkMgrCallback_ == nullptr) {
453             DHLOGE("sinkMgrCallback is nullptr.");
454             return;
455         }
456         sinkMgrCallback_->ResetSinkMgrResStatus();
457     }
458 
459     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
460     if (dhFwkKit != nullptr) {
461         DHLOGD("Disable low Latency!");
462         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
463     }
464     DHLOGI("OnSessionClosed finish");
465 }
466 
CheckRecivedData(const std::string & message)467 bool DistributedInputTransportBase::CheckRecivedData(const std::string &message)
468 {
469     nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
470     if (recMsg.is_discarded()) {
471         DHLOGE("OnBytesReceived jsonStr error.");
472         return false;
473     }
474 
475     if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
476         DHLOGE("The key is invalid.");
477         return false;
478     }
479 
480     return true;
481 }
482 
OnBytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)483 void DistributedInputTransportBase::OnBytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
484 {
485     if (sessionId < 0 || data == nullptr || dataLen == 0 || dataLen > MSG_MAX_SIZE) {
486         DHLOGE("OnBytesReceived param check failed");
487         return;
488     }
489 
490     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
491     if (buf == nullptr) {
492         DHLOGE("OnBytesReceived: malloc memory failed");
493         return;
494     }
495 
496     if (memcpy_s(buf, dataLen + 1,  reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
497         DHLOGE("OnBytesReceived: memcpy memory failed");
498         free(buf);
499         return;
500     }
501 
502     std::string message(buf, buf + dataLen);
503     HandleSession(sessionId, message);
504 
505     free(buf);
506     return;
507 }
508 
HandleSession(int32_t sessionId,const std::string & message)509 void DistributedInputTransportBase::HandleSession(int32_t sessionId, const std::string &message)
510 {
511     if (CheckRecivedData(message) != true) {
512         return;
513     }
514     nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
515     if (recMsg.is_discarded()) {
516         DHLOGE("recMsg parse failed!");
517         return;
518     }
519     if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
520         DHLOGE("softbus cmd key is invalid");
521         return;
522     }
523     uint32_t cmdType = recMsg[DINPUT_SOFTBUS_KEY_CMD_TYPE];
524     if (cmdType < TRANS_MSG_SRC_SINK_SPLIT) {
525         if (srcCallback_ == nullptr) {
526             DHLOGE("srcCallback is nullptr.");
527             return;
528         }
529         srcCallback_->HandleSessionData(sessionId, message);
530         return;
531     }
532     if (cmdType > TRANS_MSG_SRC_SINK_SPLIT) {
533         if (sinkCallback_ == nullptr) {
534             DHLOGE("sinkCallback is nullptr.");
535             return;
536         }
537         sinkCallback_->HandleSessionData(sessionId, message);
538     }
539 }
540 
SendMsg(int32_t sessionId,std::string & message)541 int32_t DistributedInputTransportBase::SendMsg(int32_t sessionId, std::string &message)
542 {
543     if (message.size() > MSG_MAX_SIZE) {
544         DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE, msg size: %{public}zu", message.size());
545         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
546     }
547     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((MSG_MAX_SIZE), sizeof(uint8_t)));
548     if (buf == nullptr) {
549         DHLOGE("SendMsg: malloc memory failed");
550         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
551     }
552     int32_t outLen = 0;
553     if (memcpy_s(buf, MSG_MAX_SIZE, reinterpret_cast<const uint8_t *>(message.c_str()), message.size()) != EOK) {
554         DHLOGE("SendMsg: memcpy memory failed");
555         free(buf);
556         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
557     }
558     outLen = static_cast<int32_t>(message.size());
559     int32_t ret = SendBytes(sessionId, buf, outLen);
560     free(buf);
561     return ret;
562 }
563 
GetSessionIdByDevId(const std::string & srcId)564 int32_t DistributedInputTransportBase::GetSessionIdByDevId(const std::string &srcId)
565 {
566     std::unique_lock<std::mutex> sessionLock(operationMutex_);
567     std::map<std::string, int32_t>::iterator it = remoteDevSessionMap_.find(srcId);
568     if (it != remoteDevSessionMap_.end()) {
569         return it->second;
570     }
571     DHLOGE("get session id failed, srcId = %{public}s", GetAnonyString(srcId).c_str());
572     return ERR_DH_INPUT_SERVER_SINK_TRANSPORT_GET_SESSIONID_FAIL;
573 }
574 
OnNegotiate2(int32_t socket,PeerSocketInfo info,SocketAccessInfo * peerInfo,SocketAccessInfo * localInfo)575 bool DistributedInputTransportBase::OnNegotiate2(int32_t socket, PeerSocketInfo info, SocketAccessInfo *peerInfo,
576     SocketAccessInfo *localInfo)
577 {
578     if (peerInfo == nullptr) {
579         DHLOGE("peerInfo is nullptr. sink must be old version");
580         return true;
581     }
582 
583     AccountInfo callerAccountInfo;
584     std::string networkId = info.networkId;
585     if (!SoftBusPermissionCheck::TransCallerInfo(peerInfo, callerAccountInfo, networkId)) {
586         DHLOGE("extraAccessInfo is nullptr.");
587         return false;
588     }
589     if (!SoftBusPermissionCheck::FillLocalInfo(localInfo)) {
590         DHLOGE("FillLocalInfo failed.");
591         return false;
592     }
593     return SoftBusPermissionCheck::CheckSinkPermission(callerAccountInfo);
594 }
595 } // namespace DistributedInput
596 } // namespace DistributedHardware
597 } // namespace OHOS