• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "tcp_client.h"
17 #include "common/media_log.h"
18 #include "common/sharing_log.h"
19 #include "network/socket/socket_utils.h"
20 #include "network/socket/tcp_socket.h"
21 #include "utils/utils.h"
22 namespace OHOS {
23 namespace Sharing {
~TcpClient()24 TcpClient::~TcpClient()
25 {
26     SHARING_LOGD("trace.");
27     Disconnect();
28 }
29 
TcpClient()30 TcpClient::TcpClient()
31 {
32     SHARING_LOGD("trace.");
33 }
34 
Connect(const std::string & peerIp,uint16_t peerPort,const std::string & localIp,uint16_t localPort)35 bool TcpClient::Connect(const std::string &peerIp, uint16_t peerPort, const std::string &localIp, uint16_t localPort)
36 {
37     SHARING_LOGD("peerIp:%{public}s, peerPort:%{public}d, thread_id: %{public}llu.", peerIp.c_str(), peerPort,
38                  GetThreadId());
39 
40     int32_t retCode = 0;
41     socket_ = std::make_unique<TcpSocket>();
42     if (socket_) {
43         if (socket_->Connect(peerIp, peerPort, retCode, true, true, localIp, localPort)) {
44             SHARING_LOGD("connect success.");
45             auto eventRunner = OHOS::AppExecFwk::EventRunner::Create(true);
46             eventHandler_ = std::make_shared<TcpClientEventHandler>();
47             eventHandler_->SetClient(shared_from_this());
48             eventHandler_->SetEventRunner(eventRunner);
49             eventRunner->Run();
50 
51             eventListener_ = std::make_shared<TcpClientEventListener>();
52             eventListener_->SetClient(shared_from_this());
53 
54             bool ret = eventListener_->AddFdListener(socket_->GetLocalFd(), eventListener_, eventHandler_);
55 
56             auto callback = callback_.lock();
57             if (callback) {
58                 callback->OnClientConnect(true);
59                 callback->OnClientWriteable(socket_->GetLocalFd());
60             }
61 
62             return ret;
63         } else {
64             std::unique_lock<std::shared_mutex> lk(mutex_);
65             if (eventListener_) {
66                 eventListener_->RemoveFdListener(socket_->GetLocalFd());
67             } else {
68                 SHARING_LOGE("eventListener is nullptr.");
69             }
70             SocketUtils::ShutDownSocket(socket_->GetLocalFd());
71             SocketUtils::CloseSocket(socket_->GetLocalFd());
72             socket_.reset();
73         }
74     }
75     SHARING_LOGE("[TcpClient] Connect failed!");
76     auto callback = callback_.lock();
77     if (callback) {
78         callback->OnClientConnect(false);
79     }
80 
81     return false;
82 }
83 
Disconnect()84 void TcpClient::Disconnect()
85 {
86     SHARING_LOGD("trace.");
87     std::unique_lock<std::shared_mutex> lk(mutex_);
88     if (socket_ != nullptr) {
89         eventListener_->RemoveFdListener(socket_->GetLocalFd());
90         SocketUtils::ShutDownSocket(socket_->GetLocalFd());
91         SocketUtils::CloseSocket(socket_->GetLocalFd());
92         eventListener_->OnShutdown(socket_->GetLocalFd());
93         socket_.reset();
94     }
95 }
96 
Send(const DataBuffer::Ptr & buf,int32_t nSize)97 bool TcpClient::Send(const DataBuffer::Ptr &buf, int32_t nSize)
98 {
99     SHARING_LOGD("trace.");
100     return Send(buf->Peek(), nSize);
101 }
102 
Send(const char * buf,int32_t nSize)103 bool TcpClient::Send(const char *buf, int32_t nSize)
104 {
105     SHARING_LOGD("trace.");
106     std::unique_lock<std::shared_mutex> lk(mutex_);
107     if (socket_ != nullptr) {
108         if (SocketUtils::SendSocket(socket_->GetLocalFd(), buf, nSize) != 0) {
109             return true;
110         } else {
111             lk.unlock();
112             SHARING_LOGE("send Failed, Disconnect!");
113             Disconnect();
114             return false;
115         }
116     } else {
117         return false;
118     }
119 }
120 
Send(const std::string & msg)121 bool TcpClient::Send(const std::string &msg)
122 {
123     SHARING_LOGD("trace.");
124     return Send(msg.c_str(), msg.size());
125 }
126 
GetSocketInfo()127 SocketInfo::Ptr TcpClient::GetSocketInfo()
128 {
129     SHARING_LOGD("trace.");
130     return socket_;
131 }
132 
OnClientReadable(int32_t fd)133 void TcpClient::OnClientReadable(int32_t fd)
134 {
135     MEDIA_LOGD("trace fd: %{public}d, thread_id: %{public}llu.", fd, GetThreadId());
136     int32_t error = 0;
137     int32_t retCode = 0;
138     do {
139         DataBuffer::Ptr buf = std::make_shared<DataBuffer>(DEAFULT_READ_BUFFER_SIZE);
140         retCode = SocketUtils::RecvSocket(fd, (char *)buf->Data(), DEAFULT_READ_BUFFER_SIZE, flags_, error);
141         MEDIA_LOGD("recvSocket len: %{public}d.", retCode);
142         if (retCode > 0) {
143             buf->UpdateSize(retCode);
144             auto callback = callback_.lock();
145             if (callback) {
146                 callback->OnClientReadData(fd, std::move(buf));
147             }
148         } else if (retCode < 0) {
149             if (error == ECONNREFUSED) {
150                 auto callback = callback_.lock();
151                 if (callback) {
152                     callback->OnClientConnect(false);
153                 }
154             }
155         } else {
156             SHARING_LOGE("recvSocket failed!");
157             Disconnect();
158         }
159     } while (retCode == (int32_t)DEAFULT_READ_BUFFER_SIZE);
160 }
161 
162 } // namespace Sharing
163 } // namespace OHOS