1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "tcp_session.h"
17 #include "common/common_macro.h"
18 #include "common/media_log.h"
19 #include "network/socket/socket_utils.h"
20 #include "utils/utils.h"
21
22 namespace OHOS {
23 namespace Sharing {
TcpSession(SocketInfo::Ptr socket)24 TcpSession::TcpSession(SocketInfo::Ptr socket) : BaseNetworkSession(socket)
25 {
26 SHARING_LOGD("trace.");
27 }
28
~TcpSession()29 TcpSession::~TcpSession()
30 {
31 SHARING_LOGD("trace.");
32 Shutdown();
33 }
34
Start()35 bool TcpSession::Start()
36 {
37 SHARING_LOGD("trace.");
38 if (socket_) {
39 SHARING_LOGD("tcpSession AddFdListener.");
40 auto eventRunner = OHOS::AppExecFwk::EventRunner::Create(true);
41 eventHandler_ = std::make_shared<TcpSessionEventHandler>();
42 eventHandler_->SetSession(shared_from_this());
43 eventHandler_->SetEventRunner(eventRunner);
44 eventRunner->Run();
45
46 eventListener_ = std::make_shared<TcpSessionEventListener>();
47 eventListener_->SetSession(shared_from_this());
48
49 return eventListener_->AddFdListener(socket_->GetPeerFd(), eventListener_, eventHandler_);
50 }
51
52 return false;
53 }
54
Send(const DataBuffer::Ptr & buf,int32_t nSize)55 bool TcpSession::Send(const DataBuffer::Ptr &buf, int32_t nSize)
56 {
57 SHARING_LOGD("trace.");
58 RETURN_FALSE_IF_NULL(buf);
59 return TcpSession::Send(buf->Peek(), nSize);
60 }
61
Send(const char * buf,int32_t nSize)62 bool TcpSession::Send(const char *buf, int32_t nSize)
63 {
64 SHARING_LOGD("trace.");
65 RETURN_FALSE_IF_NULL(buf);
66 if (socket_) {
67 int32_t ret = SocketUtils::SendSocket(socket_->GetPeerFd(), buf, nSize);
68 if (ret > 0) {
69 return true;
70 } else {
71 SHARING_LOGE("send Failed, Shutdown!");
72 if (callback_ && socket_) {
73 callback_->OnSessionClose(socket_->GetPeerFd());
74 }
75
76 return false;
77 }
78 } else {
79 return false;
80 }
81 }
82
Shutdown()83 void TcpSession::Shutdown()
84 {
85 SHARING_LOGD("trace.");
86 std::lock_guard<std::mutex> lock(mutex_);
87 if (socket_) {
88 if (eventListener_) {
89 eventListener_->RemoveFdListener(socket_->GetPeerFd());
90 }
91 SocketUtils::ShutDownSocket(socket_->GetPeerFd());
92 SocketUtils::CloseSocket(socket_->GetPeerFd());
93 socket_->resetPeerFd();
94 socket_.reset();
95 }
96 }
97
OnSessionReadble(int32_t fd)98 void TcpSession::OnSessionReadble(int32_t fd)
99 {
100 MEDIA_LOGD("fd: %{public}d, thread_id: %{public}llu.", fd, GetThreadId());
101 std::lock_guard<std::mutex> lock(mutex_);
102 if (socket_ == nullptr) {
103 SHARING_LOGE("onReadable nullptr!");
104 return;
105 }
106 if (fd == socket_->GetPeerFd()) {
107 int32_t retCode = 0;
108 int32_t error = 0;
109 DataBuffer::Ptr buf = std::make_shared<DataBuffer>(DEFAULT_READ_BUFFER_SIZE);
110 retCode = SocketUtils::RecvSocket(fd, (char *)buf->Data(), DEFAULT_READ_BUFFER_SIZE, 0, error);
111 if (retCode > 0) {
112 buf->UpdateSize(retCode);
113 if (callback_) {
114 callback_->OnSessionReadData(fd, std::move(buf));
115 }
116 } else if (retCode == 0) {
117 if (callback_) {
118 callback_->OnSessionClose(fd);
119 }
120 MEDIA_LOGW("recvSocket Shutdown!");
121 } else {
122 MEDIA_LOGE("recvSocket error!");
123 }
124 } else {
125 MEDIA_LOGD("onReadable receive msg.");
126 }
127 }
128 } // namespace Sharing
129 } // namespace OHOS