1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "tcp_server.h"
17 #include <arpa/inet.h>
18 #include "common/media_log.h"
19 #include "network/session/tcp_session.h"
20 #include "network/socket/socket_utils.h"
21 #include "network/socket/tcp_socket.h"
22 #include "utils/utils.h"
23 namespace OHOS {
24 namespace Sharing {
~TcpServer()25 TcpServer::~TcpServer()
26 {
27 SHARING_LOGD("trace.");
28 Stop();
29 }
30
TcpServer()31 TcpServer::TcpServer()
32 {
33 SHARING_LOGD("trace.");
34 }
35
Start(uint16_t port,const std::string & host,bool enableReuse,uint32_t backlog)36 bool TcpServer::Start(uint16_t port, const std::string &host, bool enableReuse, uint32_t backlog)
37 {
38 SHARING_LOGD("server ip:%{public}s, Port:%{public}d, thread_id: %{public}llu.", host.c_str(), port, GetThreadId());
39 socket_ = std::make_unique<TcpSocket>();
40 if (socket_) {
41 if (socket_->Bind(port, host, enableReuse, backlog)) {
42 SHARING_LOGD("start success fd: %{public}d.", socket_->GetLocalFd());
43 socketLocalFd_ = socket_->GetLocalFd();
44
45 auto eventRunner = OHOS::AppExecFwk::EventRunner::Create(true);
46 eventHandler_ = std::make_shared<TcpServerEventHandler>();
47 eventHandler_->SetServer(shared_from_this());
48 eventHandler_->SetEventRunner(eventRunner);
49 eventRunner->Run();
50
51 eventListener_ = std::make_shared<TcpServerEventListener>();
52 eventListener_->SetServer(shared_from_this());
53
54 return eventListener_->AddFdListener(socket_->GetLocalFd(), eventListener_, eventHandler_);
55 }
56 }
57
58 SHARING_LOGE("start failed!");
59 return false;
60 }
61
Stop()62 void TcpServer::Stop()
63 {
64 SHARING_LOGD("trace.");
65 std::unique_lock<std::shared_mutex> lk(mutex_);
66 if (socket_ != nullptr) {
67 if (eventListener_) {
68 eventListener_->RemoveFdListener(socket_->GetLocalFd());
69 }
70 SocketUtils::ShutDownSocket(socket_->GetLocalFd());
71 SocketUtils::CloseSocket(socket_->GetLocalFd());
72 for (auto it = sessions_.begin(); it != sessions_.end(); it++) {
73 SHARING_LOGD("closeClientSocket:erase fd: %{public}d,size: %{public}zu.", it->first, sessions_.size());
74 if (it->second) {
75 it->second->Shutdown();
76 it->second.reset();
77 }
78 }
79 sessions_.clear();
80 socket_.reset();
81 }
82 }
83
CloseClientSocket(int32_t fd)84 void TcpServer::CloseClientSocket(int32_t fd)
85 {
86 SHARING_LOGD("fd: %{public}d.", fd);
87 if (fd > 0) {
88 auto itemItr = sessions_.find(fd);
89 if (itemItr != sessions_.end()) {
90 if (itemItr->second) {
91 itemItr->second->Shutdown();
92 itemItr->second.reset();
93 }
94 sessions_.erase(itemItr);
95 SHARING_LOGD("erase fd: %{public}d.", fd);
96 }
97 }
98 }
99
GetSocketInfo()100 SocketInfo::Ptr TcpServer::GetSocketInfo()
101 {
102 SHARING_LOGD("trace.");
103 return socket_;
104 }
105
OnServerReadable(int32_t fd)106 void TcpServer::OnServerReadable(int32_t fd)
107 {
108 SHARING_LOGD("fd: %{public}d, socketLocalFd: %{public}d, thread_id: %{public}llu.", fd, socketLocalFd_,
109 GetThreadId());
110 std::unique_lock<std::shared_mutex> lk(mutex_);
111 struct sockaddr_in clientAddr;
112 socklen_t addrLen = sizeof(sockaddr_in);
113 if (fd == socketLocalFd_) {
114 int32_t clientFd = SocketUtils::AcceptSocket(fd, &clientAddr, &addrLen);
115 if (clientFd < 0) {
116 SHARING_LOGE("onReadable accept client error!");
117 return;
118 }
119 SocketUtils::SetNonBlocking(clientFd);
120 SocketUtils::SetNoDelay(clientFd, true);
121 SocketUtils::SetSendBuf(clientFd);
122 SocketUtils::SetRecvBuf(clientFd);
123 SocketUtils::SetCloseWait(clientFd);
124 SocketUtils::SetCloExec(clientFd, true);
125 SocketUtils::SetKeepAlive(clientFd);
126 SHARING_LOGD("onReadable accept client fd: %{public}d.", clientFd);
127 if (socket_) {
128 socket_->socketPeerFd_ = clientFd;
129
130 SocketInfo::Ptr socketInfo = std::make_shared<SocketInfo>(
131 socket_->GetLocalIp(), inet_ntoa(clientAddr.sin_addr), socket_->GetLocalFd(), clientFd,
132 socket_->GetLocalPort(), clientAddr.sin_port);
133 if (socketInfo) {
134 socketInfo->SetSocketType(SOCKET_TYPE_TCP);
135 BaseNetworkSession::Ptr session = std::make_shared<TcpSession>(std::move(socketInfo));
136 if (session) {
137 MEDIA_LOGE("[TcpServer] OnReadable new session start.");
138 sessions_.insert(make_pair(clientFd, std::move(session)));
139 auto callback = callback_.lock();
140 if (callback) {
141 callback->OnAccept(sessions_[clientFd]);
142 }
143 } else {
144 MEDIA_LOGE("onReadable create session failed!");
145 }
146 } else {
147 MEDIA_LOGE("onReadable create SocketInfo failed!");
148 }
149 }
150 } else {
151 MEDIA_LOGD("onReadable receive msg!");
152 }
153 }
154 } // namespace Sharing
155 } // namespace OHOS