1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "udp_server.h"
17 #include <algorithm>
18 #include <arpa/inet.h>
19 #include <iostream>
20 #include <securec.h>
21 #include <unistd.h>
22 #include "common/media_log.h"
23 #include "network/session/udp_session.h"
24 #include "network/socket/socket_utils.h"
25 #include "network/socket/udp_socket.h"
26 #include "utils/utils.h"
27 namespace OHOS {
28 namespace Sharing {
Start(uint16_t port,const std::string & host,bool enableReuse,uint32_t backlog)29 bool UdpServer::Start(uint16_t port, const std::string &host, bool enableReuse, uint32_t backlog)
30 {
31 SHARING_LOGD("server ip:%{public}s, Port:%{public}d, thread_id: %{public}llu.", GetAnonyString(host).c_str(), port,
32 GetThreadId());
33 std::unique_lock<std::shared_mutex> lk(mutex_);
34 socket_ = std::make_unique<UdpSocket>();
35 if (socket_) {
36 if (socket_->Bind(port, host, enableReuse)) {
37 SHARING_LOGD("start success, fd: %{public}d.", socket_->GetLocalFd());
38
39 auto eventRunner = OHOS::AppExecFwk::EventRunner::Create(true);
40 eventHandler_ = std::make_shared<UdpServerEventHandler>();
41 eventHandler_->SetServer(shared_from_this());
42 eventHandler_->SetEventRunner(eventRunner);
43 eventRunner->Run();
44
45 eventListener_ = std::make_shared<UdpServerEventListener>();
46 eventListener_->SetServer(shared_from_this());
47
48 return eventListener_->AddFdListener(socket_->GetLocalFd(), eventListener_, eventHandler_);
49 }
50 }
51
52 SHARING_LOGE("start failed!");
53 return false;
54 }
55
~UdpServer()56 UdpServer::~UdpServer()
57 {
58 SHARING_LOGD("trace.");
59 Stop();
60 }
61
UdpServer()62 UdpServer::UdpServer()
63 {
64 SHARING_LOGD("trace.");
65 }
66
Stop()67 void UdpServer::Stop()
68 {
69 SHARING_LOGD("stop.");
70 std::unique_lock<std::shared_mutex> lk(mutex_);
71
72 for (auto kv : sessionMap_) {
73 if (kv.second) {
74 kv.second->Shutdown();
75 kv.second.reset();
76 }
77 SocketUtils::CloseSocket(kv.first);
78 }
79
80 if (socket_ != nullptr) {
81 if (eventListener_) {
82 eventListener_->RemoveFdListener(socket_->GetLocalFd());
83 }
84 SocketUtils::ShutDownSocket(socket_->GetLocalFd());
85 SocketUtils::CloseSocket(socket_->GetLocalFd());
86 socket_.reset();
87 }
88 }
89
GetSocketInfo()90 SocketInfo::Ptr UdpServer::GetSocketInfo()
91 {
92 SHARING_LOGD("trace.");
93 return socket_;
94 }
95
CloseClientSocket(int32_t fd)96 void UdpServer::CloseClientSocket(int32_t fd)
97 {
98 SHARING_LOGD("fd: %{public}d.", fd);
99 std::unique_lock<std::shared_mutex> lk(mutex_);
100 if (fd > 0) {
101 auto itemItr = sessionMap_.find(fd);
102 if (itemItr != sessionMap_.end()) {
103 if (itemItr->second) {
104 itemItr->second->Shutdown();
105 itemItr->second.reset();
106 }
107 SocketUtils::CloseSocket(fd);
108 sessionMap_.erase(itemItr);
109 SHARING_LOGD("erase fd: %{public}d.", fd);
110 }
111 }
112 }
113
OnServerReadable(int32_t fd)114 void UdpServer::OnServerReadable(int32_t fd)
115 {
116 MEDIA_LOGD("fd: %{public}d, thread_id: %{public}llu tid:%{public}d", fd, GetThreadId(), gettid());
117
118 std::shared_lock<std::shared_mutex> lk(mutex_);
119 if (socket_ == nullptr) {
120 SHARING_LOGE("onReadable socket null!");
121 return;
122 }
123
124 if (fd != socket_->GetLocalFd()) {
125 SHARING_LOGE("onReadable receive msg!");
126 return;
127 }
128
129 auto callback = callback_.lock();
130 if (callback == nullptr) {
131 SHARING_LOGE("callback null!");
132 return;
133 }
134
135 int32_t retry = 0;
136 int32_t retCode = 0;
137 bool firstRead = true;
138 bool reading = true;
139 while (reading) {
140 DataBuffer::Ptr buf = std::make_shared<DataBuffer>(DEAFULT_READ_BUFFER_SIZE);
141 struct sockaddr_in clientAddr;
142 socklen_t len = sizeof(struct sockaddr_in);
143 retCode = ::recvfrom(fd, buf->Data(), DEAFULT_READ_BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &len);
144 MEDIA_LOGD("recvSocket len: %{public}d,address: %{public}s,port: %{public}d,socklen: %{public}d.", retCode,
145 inet_ntoa(clientAddr.sin_addr), clientAddr.sin_port, len);
146
147 if (retCode < 0) {
148 if (errno != EAGAIN) {
149 MEDIA_LOGD("on read data error %{public}d : %{public}s!", errno, strerror(errno));
150 callback->OnServerException(fd);
151 break;
152 }
153
154 if (firstRead && retry < 5) { // 5: retry 5 times
155 SHARING_LOGE("first read error %{public}d : %{public}s retry: %{public}d", errno, strerror(errno),
156 retry);
157 usleep(1000 * 5); // 1000 * 5: sleep 1000 * 5 millionseconds
158 retry++;
159 continue;
160 }
161 break;
162 }
163
164 if (retCode > 0) {
165 firstRead = false;
166 buf->UpdateSize(retCode);
167 BaseNetworkSession::Ptr session = FindOrCreateSession(clientAddr);
168 if (session) {
169 callback->OnServerReadData(fd, std::move(buf), session);
170 }
171 } else {
172 SHARING_LOGE("onReadable error: %{public}s!", strerror(errno));
173 break;
174 }
175
176 if (retCode == 0) {
177 SHARING_LOGE("onReadable error: %{public}s!", strerror(errno));
178 reading = false;
179 }
180 }
181
182 MEDIA_LOGD("fd: %{public}d, thread_id: %{public}llu tid:%{public}d exit.", fd, GetThreadId(), gettid());
183 }
184
FindOrCreateSession(const struct sockaddr_in & addr)185 std::shared_ptr<BaseNetworkSession> UdpServer::FindOrCreateSession(const struct sockaddr_in &addr)
186 {
187 MEDIA_LOGD("trace.");
188
189 auto it = std::find_if(addrToFdMap_.begin(), addrToFdMap_.end(),
190 [&addr](std::pair<std::shared_ptr<struct sockaddr_in>, int32_t> value) {
191 return value.first->sin_addr.s_addr == addr.sin_addr.s_addr && value.first->sin_port == addr.sin_port;
192 });
193 if (it != addrToFdMap_.end()) {
194 return sessionMap_[it->second];
195 } else if (socket_ != nullptr) {
196 MEDIA_LOGD("not find, create session!");
197 int32_t peerFd = 0;
198 SocketUtils::CreateSocket(SOCK_DGRAM, peerFd);
199 if (peerFd == 0 || !BindAndConnectClinetFd(peerFd, addr)) {
200 SHARING_LOGE("create socket failed!");
201 return nullptr;
202 }
203
204 SocketInfo::Ptr socketInfo =
205 std::make_shared<SocketInfo>(socket_->GetLocalIp(), inet_ntoa(addr.sin_addr), socket_->GetLocalFd(), peerFd,
206 socket_->GetLocalPort(), addr.sin_port);
207 if (socketInfo == nullptr) {
208 SHARING_LOGE("create socket info failed!");
209 return nullptr;
210 }
211 auto ret = memcpy_s(&socketInfo->udpClientAddr_, sizeof(struct sockaddr_in), &addr, sizeof(struct sockaddr_in));
212 if (ret != EOK) {
213 MEDIA_LOGE("mem copy data failed.");
214 return nullptr;
215 }
216 socketInfo->SetSocketType(SOCKET_TYPE_UDP);
217
218 BaseNetworkSession::Ptr session = std::make_shared<UdpSession>(std::move(socketInfo));
219 if (session) {
220 auto peerAddr = std::make_shared<struct sockaddr_in>();
221 auto ret = memcpy_s(peerAddr.get(), sizeof(struct sockaddr_in), &addr, sizeof(struct sockaddr_in));
222 if (ret != EOK) {
223 MEDIA_LOGE("mem copy data failed.");
224 return nullptr;
225 }
226 addrToFdMap_.insert(make_pair(peerAddr, peerFd));
227 sessionMap_.insert(make_pair(peerFd, std::move(session)));
228 auto callback = callback_.lock();
229 if (callback) {
230 callback->OnAccept(sessionMap_[peerFd]);
231 }
232
233 return sessionMap_[peerFd];
234 }
235 }
236
237 return nullptr;
238 }
239
BindAndConnectClinetFd(int32_t fd,const struct sockaddr_in & addr)240 bool UdpServer::BindAndConnectClinetFd(int32_t fd, const struct sockaddr_in &addr)
241 {
242 SHARING_LOGD("trace.");
243
244 int32_t ret = 0;
245 SocketUtils::SetNonBlocking(fd);
246 SocketUtils::SetReusePort(fd, true);
247 SocketUtils::SetReuseAddr(fd, true);
248 SocketUtils::SetSendBuf(fd);
249 SocketUtils::SetRecvBuf(fd);
250
251 if (!SocketUtils::BindSocket(fd, "", socket_->GetLocalPort())) {
252 SocketUtils::ShutDownSocket(fd);
253 SHARING_LOGE("bind BindSocket Failed!");
254 return false;
255 }
256
257 SocketUtils::ConnectSocket(fd, true, inet_ntoa(addr.sin_addr), addr.sin_port, ret);
258 if (ret < 0) {
259 SHARING_LOGE("connectSocket error: %{public}s!", strerror(errno));
260 SocketUtils::CloseSocket(fd);
261 return false;
262 }
263
264 return true;
265 }
266
267 } // namespace Sharing
268 } // namespace OHOS
269