1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "tcp_client.h"
17 #include "common/media_log.h"
18 #include "common/sharing_log.h"
19 #include "network/socket/socket_utils.h"
20 #include "network/socket/tcp_socket.h"
21 #include "utils/utils.h"
22 namespace OHOS {
23 namespace Sharing {
~TcpClient()24 TcpClient::~TcpClient()
25 {
26 SHARING_LOGD("trace.");
27 Disconnect();
28 }
29
TcpClient()30 TcpClient::TcpClient()
31 {
32 SHARING_LOGD("trace.");
33 }
34
Connect(const std::string & peerIp,uint16_t peerPort,const std::string & localIp,uint16_t localPort)35 bool TcpClient::Connect(const std::string &peerIp, uint16_t peerPort, const std::string &localIp, uint16_t localPort)
36 {
37 SHARING_LOGD("peerIp:%{public}s, peerPort:%{public}d, thread_id: %{public}llu.", peerIp.c_str(), peerPort,
38 GetThreadId());
39
40 int32_t retCode = 0;
41 socket_ = std::make_unique<TcpSocket>();
42 if (socket_) {
43 if (socket_->Connect(peerIp, peerPort, retCode, true, true, localIp, localPort)) {
44 SHARING_LOGD("connect success.");
45 auto eventRunner = OHOS::AppExecFwk::EventRunner::Create(true);
46 eventHandler_ = std::make_shared<TcpClientEventHandler>();
47 eventHandler_->SetClient(shared_from_this());
48 eventHandler_->SetEventRunner(eventRunner);
49 eventRunner->Run();
50
51 eventListener_ = std::make_shared<TcpClientEventListener>();
52 eventListener_->SetClient(shared_from_this());
53
54 bool ret = eventListener_->AddFdListener(socket_->GetLocalFd(), eventListener_, eventHandler_);
55
56 auto callback = callback_.lock();
57 if (callback) {
58 callback->OnClientConnect(true);
59 callback->OnClientWriteable(socket_->GetLocalFd());
60 }
61
62 return ret;
63 } else {
64 std::unique_lock<std::shared_mutex> lk(mutex_);
65 if (eventListener_) {
66 eventListener_->RemoveFdListener(socket_->GetLocalFd());
67 } else {
68 SHARING_LOGE("eventListener is nullptr.");
69 }
70 SocketUtils::ShutDownSocket(socket_->GetLocalFd());
71 SocketUtils::CloseSocket(socket_->GetLocalFd());
72 socket_.reset();
73 }
74 }
75 SHARING_LOGE("[TcpClient] Connect failed!");
76 auto callback = callback_.lock();
77 if (callback) {
78 callback->OnClientConnect(false);
79 }
80
81 return false;
82 }
83
Disconnect()84 void TcpClient::Disconnect()
85 {
86 SHARING_LOGD("trace.");
87 std::unique_lock<std::shared_mutex> lk(mutex_);
88 if (socket_ != nullptr) {
89 eventListener_->RemoveFdListener(socket_->GetLocalFd());
90 SocketUtils::ShutDownSocket(socket_->GetLocalFd());
91 SocketUtils::CloseSocket(socket_->GetLocalFd());
92 eventListener_->OnShutdown(socket_->GetLocalFd());
93 socket_.reset();
94 }
95 }
96
Send(const DataBuffer::Ptr & buf,int32_t nSize)97 bool TcpClient::Send(const DataBuffer::Ptr &buf, int32_t nSize)
98 {
99 SHARING_LOGD("trace.");
100 return Send(buf->Peek(), nSize);
101 }
102
Send(const char * buf,int32_t nSize)103 bool TcpClient::Send(const char *buf, int32_t nSize)
104 {
105 SHARING_LOGD("trace.");
106 std::unique_lock<std::shared_mutex> lk(mutex_);
107 if (socket_ != nullptr) {
108 if (SocketUtils::SendSocket(socket_->GetLocalFd(), buf, nSize) != 0) {
109 return true;
110 } else {
111 lk.unlock();
112 SHARING_LOGE("send Failed, Disconnect!");
113 Disconnect();
114 return false;
115 }
116 } else {
117 return false;
118 }
119 }
120
Send(const std::string & msg)121 bool TcpClient::Send(const std::string &msg)
122 {
123 SHARING_LOGD("trace.");
124 return Send(msg.c_str(), msg.size());
125 }
126
GetSocketInfo()127 SocketInfo::Ptr TcpClient::GetSocketInfo()
128 {
129 SHARING_LOGD("trace.");
130 return socket_;
131 }
132
OnClientReadable(int32_t fd)133 void TcpClient::OnClientReadable(int32_t fd)
134 {
135 MEDIA_LOGD("trace fd: %{public}d, thread_id: %{public}llu.", fd, GetThreadId());
136 int32_t error = 0;
137 int32_t retCode = 0;
138 do {
139 DataBuffer::Ptr buf = std::make_shared<DataBuffer>(DEAFULT_READ_BUFFER_SIZE);
140 retCode = SocketUtils::RecvSocket(fd, (char *)buf->Data(), DEAFULT_READ_BUFFER_SIZE, flags_, error);
141 MEDIA_LOGD("recvSocket len: %{public}d.", retCode);
142 if (retCode > 0) {
143 buf->UpdateSize(retCode);
144 auto callback = callback_.lock();
145 if (callback) {
146 callback->OnClientReadData(fd, std::move(buf));
147 }
148 } else if (retCode < 0) {
149 if (error == ECONNREFUSED) {
150 auto callback = callback_.lock();
151 if (callback) {
152 callback->OnClientConnect(false);
153 }
154 }
155 } else {
156 SHARING_LOGE("recvSocket failed!");
157 Disconnect();
158 }
159 } while (retCode == (int32_t)DEAFULT_READ_BUFFER_SIZE);
160 }
161
162 } // namespace Sharing
163 } // namespace OHOS