• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "distributed_input_transport_base.h"
17 
18 #include <algorithm>
19 #include <cstring>
20 
21 #include "distributed_hardware_fwk_kit.h"
22 #include "ipc_skeleton.h"
23 #include "iservice_registry.h"
24 #include "system_ability_definition.h"
25 
26 #include "constants_dinput.h"
27 #include "dinput_context.h"
28 #include "dinput_errcode.h"
29 #include "dinput_hitrace.h"
30 #include "dinput_log.h"
31 #include "dinput_softbus_define.h"
32 #include "dinput_utils_tool.h"
33 #include "distributed_input_inject.h"
34 #include "hidumper.h"
35 
36 #ifndef COMPILE_TEST_MODE
37 #include "session.h"
38 #else
39 #include "session_mock.h"
40 #endif
41 
42 #ifndef COMPILE_TEST_MODE
43 #include "softbus_bus_center.h"
44 #else
45 #include "softbus_bus_center_mock.h"
46 #endif
47 
48 #include "softbus_common.h"
49 
50 namespace OHOS {
51 namespace DistributedHardware {
52 namespace DistributedInput {
53 const int32_t DINPUT_LINK_TYPE_MAX = 4;
54 static SessionAttribute g_sessionAttr = {
55     .dataType = SessionType::TYPE_BYTES,
56     .linkTypeNum = DINPUT_LINK_TYPE_MAX,
57     .linkType = {
58         LINK_TYPE_WIFI_P2P,
59         LINK_TYPE_WIFI_WLAN_2G,
60         LINK_TYPE_WIFI_WLAN_5G,
61         LINK_TYPE_BR
62     }
63 };
64 
GetInstance()65 DistributedInputTransportBase &DistributedInputTransportBase::GetInstance()
66 {
67     static DistributedInputTransportBase instance;
68     return instance;
69 }
70 
~DistributedInputTransportBase()71 DistributedInputTransportBase::~DistributedInputTransportBase()
72 {
73     DHLOGI("Release Transport Session");
74     Release();
75 }
76 
SessionOpened(int32_t sessionId,int32_t result)77 static int32_t SessionOpened(int32_t sessionId, int32_t result)
78 {
79     return DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionOpened(sessionId, result);
80 }
81 
SessionClosed(int32_t sessionId)82 static void SessionClosed(int32_t sessionId)
83 {
84     DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionClosed(sessionId);
85 }
86 
BytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)87 static void BytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
88 {
89     DistributedInput::DistributedInputTransportBase::GetInstance().OnBytesReceived(sessionId, data, dataLen);
90 }
91 
MessageReceived(int32_t sessionId,const void * data,uint32_t dataLen)92 static void MessageReceived(int32_t sessionId, const void *data, uint32_t dataLen)
93 {
94     (void)sessionId;
95     (void)data;
96     (void)dataLen;
97     DHLOGI("sessionId: %d, dataLen:%d", sessionId, dataLen);
98 }
99 
StreamReceived(int32_t sessionId,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)100 static void StreamReceived(int32_t sessionId, const StreamData *data, const StreamData *ext,
101     const StreamFrameInfo *param)
102 {
103     (void)sessionId;
104     (void)data;
105     (void)ext;
106     (void)param;
107     DHLOGI("sessionId: %d", sessionId);
108 }
109 
Init()110 int32_t DistributedInputTransportBase::Init()
111 {
112     DHLOGI("Init Transport Base Session");
113     ISessionListener iSessionListener = {
114         .OnSessionOpened = SessionOpened,
115         .OnSessionClosed = SessionClosed,
116         .OnBytesReceived = BytesReceived,
117         .OnMessageReceived = MessageReceived,
118         .OnStreamReceived = StreamReceived
119     };
120 
121     auto localNode = std::make_unique<NodeBasicInfo>();
122     int32_t retCode = GetLocalNodeDeviceInfo(DINPUT_PKG_NAME.c_str(), localNode.get());
123     if (retCode != DH_SUCCESS) {
124         DHLOGE("Init Could not get local device id.");
125         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
126     }
127     std::string networkId = localNode->networkId;
128     DHLOGI("Init device local networkId is %s", GetAnonyString(networkId).c_str());
129 
130     std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
131     if (isSessSerCreateFlag_.load()) {
132         DHLOGI("SessionServer already create success.");
133         return DH_SUCCESS;
134     }
135     localSessionName_ = SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
136     int32_t ret = CreateSessionServer(DINPUT_PKG_NAME.c_str(), localSessionName_.c_str(), &iSessionListener);
137     if (ret != DH_SUCCESS) {
138         DHLOGE("Init CreateSessionServer failed, error code %d.", ret);
139         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
140     }
141     isSessSerCreateFlag_.store(true);
142     return DH_SUCCESS;
143 }
144 
Release()145 void DistributedInputTransportBase::Release()
146 {
147     std::unique_lock<std::mutex> sessionLock(operationMutex_);
148     auto iter = remoteDevSessionMap_.begin();
149     for (; iter != remoteDevSessionMap_.end(); ++iter) {
150         CloseSession(iter->second);
151     }
152 
153     {
154         std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
155         if (!isSessSerCreateFlag_.load()) {
156             DHLOGI("SessionServer already remove success.");
157         } else {
158             (void)RemoveSessionServer(DINPUT_PKG_NAME.c_str(), localSessionName_.c_str());
159             isSessSerCreateFlag_.store(false);
160         }
161     }
162     remoteDevSessionMap_.clear();
163     channelStatusMap_.clear();
164 }
165 
CheckDeviceSessionState(const std::string & remoteDevId)166 int32_t DistributedInputTransportBase::CheckDeviceSessionState(const std::string &remoteDevId)
167 {
168     std::unique_lock<std::mutex> sessionLock(operationMutex_);
169     if (remoteDevSessionMap_.find(remoteDevId) == remoteDevSessionMap_.end()) {
170         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_DEVICE_SESSION_STATE;
171     }
172     DHLOGI("CheckDeviceSessionState has opened %s", GetAnonyString(remoteDevId).c_str());
173     return DH_SUCCESS;
174 }
175 
GetDevIdBySessionId(int32_t sessionId)176 std::string DistributedInputTransportBase::GetDevIdBySessionId(int32_t sessionId)
177 {
178     std::unique_lock<std::mutex> sessionLock(operationMutex_);
179     for (auto iter = remoteDevSessionMap_.begin(); iter != remoteDevSessionMap_.end(); ++iter) {
180         if (iter->second == sessionId) {
181             return iter->first;
182         }
183     }
184     return "";
185 }
186 
StartSession(const std::string & remoteDevId)187 int32_t DistributedInputTransportBase::StartSession(const std::string &remoteDevId)
188 {
189     int32_t ret = CheckDeviceSessionState(remoteDevId);
190     if (ret == DH_SUCCESS) {
191         DHLOGE("Softbus session has already opened, deviceId: %s", GetAnonyString(remoteDevId).c_str());
192         return DH_SUCCESS;
193     }
194 
195     std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
196     DHLOGI("OpenInputSoftbus peerSessionName:%s", peerSessionName.c_str());
197 
198     StartAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
199     int32_t sessionId = OpenSession(localSessionName_.c_str(), peerSessionName.c_str(), remoteDevId.c_str(),
200         GROUP_ID.c_str(), &g_sessionAttr);
201     if (sessionId < 0) {
202         DHLOGE("OpenSession fail, remoteDevId: %s, sessionId: %d", GetAnonyString(remoteDevId).c_str(), sessionId);
203         FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
204         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
205     }
206 
207     HiDumper::GetInstance().CreateSessionInfo(remoteDevId, sessionId, localSessionName_, peerSessionName,
208         SessionStatus::OPENING);
209 
210     DHLOGI("Wait for channel session opened.");
211     {
212         std::unique_lock<std::mutex> waitLock(operationMutex_);
213         auto status = openSessionWaitCond_.wait_for(waitLock, std::chrono::seconds(SESSION_WAIT_TIMEOUT_SECOND),
214             [this, remoteDevId] () { return channelStatusMap_[remoteDevId]; });
215         if (!status) {
216             DHLOGE("OpenSession timeout, remoteDevId: %s, sessionId: %d",
217                 GetAnonyString(remoteDevId).c_str(), sessionId);
218             return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_TIMEOUT;
219         }
220     }
221 
222     DHLOGI("OpenSession success, remoteDevId:%s, sessionId: %d", GetAnonyString(remoteDevId).c_str(), sessionId);
223     sessionId_ = sessionId;
224 
225     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
226     if (dhFwkKit != nullptr) {
227         DHLOGD("Enable low Latency!");
228         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
229     }
230 
231     HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::OPENED);
232     return DH_SUCCESS;
233 }
234 
GetCurrentSessionId()235 int32_t DistributedInputTransportBase::GetCurrentSessionId()
236 {
237     return sessionId_;
238 }
239 
StopSession(const std::string & remoteDevId)240 void DistributedInputTransportBase::StopSession(const std::string &remoteDevId)
241 {
242     std::unique_lock<std::mutex> sessionLock(operationMutex_);
243 
244     if (remoteDevSessionMap_.count(remoteDevId) == 0) {
245         DHLOGE("remoteDevSessionMap not find remoteDevId: %s", GetAnonyString(remoteDevId).c_str());
246         return;
247     }
248     int32_t sessionId = remoteDevSessionMap_[remoteDevId];
249 
250     DHLOGI("RemoteDevId: %s, sessionId: %d", GetAnonyString(remoteDevId).c_str(), sessionId);
251     HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSING);
252     CloseSession(sessionId);
253     remoteDevSessionMap_.erase(remoteDevId);
254     channelStatusMap_.erase(remoteDevId);
255 
256     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
257     if (dhFwkKit != nullptr) {
258         DHLOGD("Disable low Latency!");
259         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
260     }
261 
262     HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSED);
263     HiDumper::GetInstance().DeleteSessionInfo(remoteDevId);
264 }
265 
RegisterSrcHandleSessionCallback(std::shared_ptr<DInputTransbaseSourceCallback> callback)266 void DistributedInputTransportBase::RegisterSrcHandleSessionCallback(
267     std::shared_ptr<DInputTransbaseSourceCallback> callback)
268 {
269     DHLOGI("RegisterTransbaseSourceRespCallback");
270     srcCallback_ = callback;
271 }
272 
RegisterSinkHandleSessionCallback(std::shared_ptr<DInputTransbaseSinkCallback> callback)273 void DistributedInputTransportBase::RegisterSinkHandleSessionCallback(
274     std::shared_ptr<DInputTransbaseSinkCallback> callback)
275 {
276     DHLOGI("RegisterTransbaseSinkRespCallback");
277     sinkCallback_ = callback;
278 }
279 
CountSession(const std::string & remoteDevId)280 int32_t DistributedInputTransportBase::CountSession(const std::string &remoteDevId)
281 {
282     return remoteDevSessionMap_.count(remoteDevId);
283 }
284 
EraseSessionId(const std::string & remoteDevId)285 void DistributedInputTransportBase::EraseSessionId(const std::string &remoteDevId)
286 {
287     remoteDevSessionMap_.erase(remoteDevId);
288 }
289 
OnSessionOpened(int32_t sessionId,int32_t result)290 int32_t DistributedInputTransportBase::OnSessionOpened(int32_t sessionId, int32_t result)
291 {
292     FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
293     if (result != DH_SUCCESS) {
294         std::string deviceId = GetDevIdBySessionId(sessionId);
295         DHLOGE("session open failed, sessionId: %d, result:%d, deviceId:%s", sessionId, result,
296             GetAnonyString(deviceId).c_str());
297         std::unique_lock<std::mutex> sessionLock(operationMutex_);
298         if (CountSession(deviceId) > 0) {
299             EraseSessionId(deviceId);
300         }
301         return DH_SUCCESS;
302     }
303 
304     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
305     if (dhFwkKit != nullptr) {
306         DHLOGD("Enable low Latency!");
307         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
308     }
309 
310     char mySessionName[SESSION_NAME_SIZE_MAX] = {0};
311     char peerSessionName[SESSION_NAME_SIZE_MAX] = {0};
312     char peerDevId[DEVICE_ID_SIZE_MAX] = {0};
313     int32_t ret = GetMySessionName(sessionId, mySessionName, sizeof(mySessionName));
314     if (ret != DH_SUCCESS) {
315         DHLOGE("get my session name failed, session id is %d", sessionId);
316     }
317     ret = GetPeerSessionName(sessionId, peerSessionName, sizeof(peerSessionName));
318     if (ret != DH_SUCCESS) {
319         DHLOGE("get peer session name failed, session id is %d", sessionId);
320     }
321     ret = GetPeerDeviceId(sessionId, peerDevId, sizeof(peerDevId));
322     if (ret != DH_SUCCESS) {
323         DHLOGE("get peer device id failed, session id is %d", sessionId);
324     }
325 
326     {
327         std::unique_lock<std::mutex> sessionLock(operationMutex_);
328         remoteDevSessionMap_[peerDevId] = sessionId;
329     }
330 
331     int32_t sessionSide = GetSessionSide(sessionId);
332     DHLOGI("session open succeed, sessionId: %d, sessionSide:%d(1 is "
333         "client side), deviceId:%s", sessionId, sessionSide, GetAnonyString(peerDevId).c_str());
334 
335     DHLOGI("mySessionName:%s, peerSessionName:%s, peerDevId:%s",
336         mySessionName, peerSessionName, GetAnonyString(peerDevId).c_str());
337     {
338         std::lock_guard<std::mutex> notifyLock(operationMutex_);
339         channelStatusMap_[peerDevId] = true;
340         openSessionWaitCond_.notify_all();
341     }
342 
343     return DH_SUCCESS;
344 }
345 
OnSessionClosed(int32_t sessionId)346 void DistributedInputTransportBase::OnSessionClosed(int32_t sessionId)
347 {
348     std::string deviceId = GetDevIdBySessionId(sessionId);
349     DHLOGI("OnSessionClosed, sessionId: %d, deviceId:%s", sessionId, GetAnonyString(deviceId).c_str());
350 
351     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
352     if (dhFwkKit != nullptr) {
353         DHLOGD("Disable low Latency!");
354         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
355     }
356 
357     {
358         std::unique_lock<std::mutex> sessionLock(operationMutex_);
359         if (CountSession(deviceId) > 0) {
360             EraseSessionId(deviceId);
361         }
362         channelStatusMap_.erase(deviceId);
363 
364         if (sinkCallback_ == nullptr) {
365             DHLOGE("sinkCallback is nullptr.");
366             return;
367         }
368         sinkCallback_->NotifySessionClosed(sessionId);
369 
370         if (srcCallback_ == nullptr) {
371             DHLOGE("srcCallback is nullptr.");
372             return;
373         }
374         srcCallback_->NotifySessionClosed();
375     }
376 }
377 
CheckRecivedData(const std::string & message)378 bool DistributedInputTransportBase::CheckRecivedData(const std::string &message)
379 {
380     nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
381     if (recMsg.is_discarded()) {
382         DHLOGE("OnBytesReceived jsonStr error.");
383         return false;
384     }
385 
386     if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
387         DHLOGE("The key is invalid.");
388         return false;
389     }
390 
391     return true;
392 }
393 
OnBytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)394 void DistributedInputTransportBase::OnBytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
395 {
396     DHLOGI("OnBytesReceived, sessionId: %d, dataLen:%d", sessionId, dataLen);
397     if (sessionId < 0 || data == nullptr || dataLen <= 0) {
398         DHLOGE("OnBytesReceived param check failed");
399         return;
400     }
401 
402     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
403     if (buf == nullptr) {
404         DHLOGE("OnBytesReceived: malloc memory failed");
405         return;
406     }
407 
408     if (memcpy_s(buf, dataLen + 1,  reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
409         DHLOGE("OnBytesReceived: memcpy memory failed");
410         free(buf);
411         return;
412     }
413 
414     std::string message(buf, buf + dataLen);
415     DHLOGI("OnBytesReceived message:%s.", SetAnonyId(message).c_str());
416     HandleSession(sessionId, message);
417 
418     free(buf);
419     DHLOGI("OnBytesReceived completed");
420     return;
421 }
422 
HandleSession(int32_t sessionId,const std::string & message)423 void DistributedInputTransportBase::HandleSession(int32_t sessionId, const std::string &message)
424 {
425     if (CheckRecivedData(message) != true) {
426         return;
427     }
428     nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
429     if (recMsg.is_discarded()) {
430         DHLOGE("recMsg parse failed!");
431         return;
432     }
433     if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
434         DHLOGE("softbus cmd key is invalid");
435         return;
436     }
437     uint32_t cmdType = recMsg[DINPUT_SOFTBUS_KEY_CMD_TYPE];
438     DHLOGI("HandleSession cmdType %u.", cmdType);
439     if (cmdType < TRANS_MSG_SRC_SINK_SPLIT) {
440         if (srcCallback_ == nullptr) {
441             DHLOGE("srcCallback is nullptr.");
442             return;
443         }
444         DHLOGI("HandleSession to source.");
445         srcCallback_->HandleSessionData(sessionId, message);
446         return;
447     }
448     if (cmdType > TRANS_MSG_SRC_SINK_SPLIT) {
449         if (sinkCallback_ == nullptr) {
450             DHLOGE("sinkCallback is nullptr.");
451             return;
452         }
453         DHLOGI("HandleSession to sink.");
454         sinkCallback_->HandleSessionData(sessionId, message);
455     }
456 }
457 
SendMsg(int32_t sessionId,std::string & message)458 int32_t DistributedInputTransportBase::SendMsg(int32_t sessionId, std::string &message)
459 {
460     DHLOGD("start SendMsg");
461     if (message.size() > MSG_MAX_SIZE) {
462         DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE");
463         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
464     }
465     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((MSG_MAX_SIZE), sizeof(uint8_t)));
466     if (buf == nullptr) {
467         DHLOGE("SendMsg: malloc memory failed");
468         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
469     }
470     int32_t outLen = 0;
471     if (memcpy_s(buf, MSG_MAX_SIZE, reinterpret_cast<const uint8_t *>(message.c_str()), message.size()) != EOK) {
472         DHLOGE("SendMsg: memcpy memory failed");
473         free(buf);
474         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
475     }
476     outLen = static_cast<int32_t>(message.size());
477     int32_t ret = SendBytes(sessionId, buf, outLen);
478     free(buf);
479     return ret;
480 }
481 
GetSessionIdByDevId(const std::string & srcId)482 int32_t DistributedInputTransportBase::GetSessionIdByDevId(const std::string &srcId)
483 {
484     std::map<std::string, int32_t>::iterator it = remoteDevSessionMap_.find(srcId);
485     if (it != remoteDevSessionMap_.end()) {
486         return it->second;
487     }
488     DHLOGE("get session id failed, srcId = %s", GetAnonyString(srcId).c_str());
489     return ERR_DH_INPUT_SERVER_SINK_TRANSPORT_GET_SESSIONID_FAIL;
490 }
491 } // namespace DistributedInput
492 } // namespace DistributedHardware
493 } // namespace OHOS