1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "stream_server.h"
17
18 #include <cinttypes>
19 #include <list>
20
21 #include <sys/socket.h>
22
23 #include "devicestatus_service.h"
24 #include "fi_log.h"
25
26 namespace OHOS {
27 namespace Msdp {
28 namespace DeviceStatus {
29 namespace {
30 constexpr OHOS::HiviewDFX::HiLogLabel LABEL { LOG_CORE, MSDP_DOMAIN_ID, "StreamServer" };
31 } // namespace
32
~StreamServer()33 StreamServer::~StreamServer()
34 {
35 CALL_DEBUG_ENTER;
36 UdsStop();
37 }
38
UdsStop()39 void StreamServer::UdsStop()
40 {
41 if (epollFd_ != -1) {
42 if (close(epollFd_) < 0) {
43 FI_HILOGE("Close epoll fd failed, error:%{public}s, epollFd_:%{public}d", strerror(errno), epollFd_);
44 }
45 epollFd_ = -1;
46 }
47
48 for (const auto &item : sessionsMap_) {
49 item.second->Close();
50 }
51 sessionsMap_.clear();
52 }
53
GetClientFd(int32_t pid) const54 int32_t StreamServer::GetClientFd(int32_t pid) const
55 {
56 auto it = idxPidMap_.find(pid);
57 if (it == idxPidMap_.end()) {
58 return INVALID_FD;
59 }
60 return it->second;
61 }
62
GetClientPid(int32_t fd) const63 int32_t StreamServer::GetClientPid(int32_t fd) const
64 {
65 auto it = sessionsMap_.find(fd);
66 if (it == sessionsMap_.end()) {
67 return INVALID_PID;
68 }
69 return it->second->GetPid();
70 }
71
SendMsg(int32_t fd,NetPacket & pkt)72 bool StreamServer::SendMsg(int32_t fd, NetPacket& pkt)
73 {
74 if (fd < 0) {
75 FI_HILOGE("The fd is less than 0");
76 return false;
77 }
78 auto ses = GetSession(fd);
79 if (ses == nullptr) {
80 FI_HILOGE("The fd:%{public}d not found, The message was discarded, errCode:%{public}d",
81 fd, SESSION_NOT_FOUND);
82 return false;
83 }
84 return ses->SendMsg(pkt);
85 }
86
Multicast(const std::vector<int32_t> & fdList,NetPacket & pkt)87 void StreamServer::Multicast(const std::vector<int32_t>& fdList, NetPacket& pkt)
88 {
89 for (const auto &item : fdList) {
90 SendMsg(item, pkt);
91 }
92 }
93
AddSocketPairInfo(const std::string & programName,int32_t moduleType,int32_t uid,int32_t pid,int32_t & serverFd,int32_t & toReturnClientFd,int32_t & tokenType)94 int32_t StreamServer::AddSocketPairInfo(const std::string& programName, int32_t moduleType, int32_t uid, int32_t pid,
95 int32_t& serverFd, int32_t& toReturnClientFd, int32_t& tokenType)
96 {
97 CALL_DEBUG_ENTER;
98 int32_t sockFds[2] = { -1 };
99
100 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockFds) != 0) {
101 FI_HILOGE("Call socketpair failed, errno:%{public}d", errno);
102 return RET_ERR;
103 }
104 serverFd = sockFds[0];
105 toReturnClientFd = sockFds[1];
106 if (serverFd < 0 || toReturnClientFd < 0) {
107 FI_HILOGE("Call fcntl failed, errno:%{public}d", errno);
108 return RET_ERR;
109 }
110 static constexpr size_t bufferSize = 32 * 1024;
111 static constexpr size_t nativeBufferSize = 64 * 1024;
112 SessionPtr sess = nullptr;
113 if (setsockopt(serverFd, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)) != 0) {
114 FI_HILOGE("setsockopt serverFd failed, errno:%{public}d", errno);
115 goto CLOSE_SOCK;
116 }
117 if (setsockopt(serverFd, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)) != 0) {
118 FI_HILOGE("setsockopt serverFd failed, errno:%{public}d", errno);
119 goto CLOSE_SOCK;
120 }
121 if (tokenType == TokenType::TOKEN_NATIVE) {
122 if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_SNDBUF, &nativeBufferSize, sizeof(nativeBufferSize)) != 0) {
123 FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
124 goto CLOSE_SOCK;
125 }
126 if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_RCVBUF, &nativeBufferSize, sizeof(nativeBufferSize)) != 0) {
127 FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
128 goto CLOSE_SOCK;
129 }
130 } else {
131 if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)) != 0) {
132 FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
133 goto CLOSE_SOCK;
134 }
135 if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)) != 0) {
136 FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
137 goto CLOSE_SOCK;
138 }
139 }
140 sess = std::make_shared<StreamSession>(programName, moduleType, serverFd, uid, pid);
141 sess->SetTokenType(tokenType);
142 if (!AddSession(sess)) {
143 FI_HILOGE("AddSession fail errCode:%{public}d", ADD_SESSION_FAIL);
144 goto CLOSE_SOCK;
145 }
146 if (AddEpoll(EPOLL_EVENT_SOCKET, serverFd) != RET_OK) {
147 FI_HILOGE("epoll_ctl EPOLL_CTL_ADD failed, errCode:%{public}d", EPOLL_MODIFY_FAIL);
148 goto CLOSE_SOCK;
149 }
150 OnConnected(sess);
151 return RET_OK;
152
153 CLOSE_SOCK:
154 if (close(serverFd) < 0) {
155 FI_HILOGE("Close server fd failed, error:%{public}s, serverFd:%{public}d", strerror(errno), serverFd);
156 }
157 serverFd = -1;
158 if (close(toReturnClientFd) < 0) {
159 FI_HILOGE("Close fd failed, error:%{public}s, toReturnClientFd:%{public}d", strerror(errno), toReturnClientFd);
160 }
161 toReturnClientFd = -1;
162 return RET_ERR;
163 }
164
SetRecvFun(MsgServerFunCallback fun)165 void StreamServer::SetRecvFun(MsgServerFunCallback fun)
166 {
167 recvFun_ = fun;
168 }
169
ReleaseSession(int32_t fd,epoll_event & ev)170 void StreamServer::ReleaseSession(int32_t fd, epoll_event& ev)
171 {
172 auto secPtr = GetSession(fd);
173 if (secPtr != nullptr) {
174 OnDisconnected(secPtr);
175 DelSession(fd);
176 }
177 if (ev.data.ptr) {
178 free(ev.data.ptr);
179 ev.data.ptr = nullptr;
180 }
181 if (auto it = circleBufMap_.find(fd); it != circleBufMap_.end()) {
182 circleBufMap_.erase(it);
183 }
184 auto DeviceStatusService = DeviceStatus::DelayedSpSingleton<DeviceStatus::DeviceStatusService>::GetInstance();
185 DeviceStatusService->DelEpoll(EPOLL_EVENT_SOCKET, fd);
186 if (close(fd) < 0) {
187 FI_HILOGE("Close fd failed, error:%{public}s, fd:%{public}d", strerror(errno), fd);
188 }
189 }
190
OnPacket(int32_t fd,NetPacket & pkt)191 void StreamServer::OnPacket(int32_t fd, NetPacket& pkt)
192 {
193 auto sess = GetSession(fd);
194 CHKPV(sess);
195 recvFun_(sess, pkt);
196 }
197
OnEpollRecv(int32_t fd,epoll_event & ev)198 void StreamServer::OnEpollRecv(int32_t fd, epoll_event& ev)
199 {
200 if (fd < 0) {
201 FI_HILOGE("Invalid fd:%{public}d", fd);
202 return;
203 }
204 auto& buf = circleBufMap_[fd];
205 char szBuf[MAX_PACKET_BUF_SIZE] = {};
206 for (int32_t i = 0; i < MAX_RECV_LIMIT; i++) {
207 ssize_t size = recv(fd, szBuf, MAX_PACKET_BUF_SIZE, MSG_DONTWAIT | MSG_NOSIGNAL);
208 if (size > 0) {
209 if (!buf.Write(szBuf, size)) {
210 FI_HILOGW("Write data failed, size:%{public}zd", size);
211 }
212 OnReadPackets(buf, std::bind(&StreamServer::OnPacket, this, fd, std::placeholders::_1));
213 } else if (size < 0) {
214 if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
215 FI_HILOGD("Continue for errno EAGAIN|EINTR|EWOULDBLOCK size:%{public}zd errno:%{public}d",
216 size, errno);
217 continue;
218 }
219 FI_HILOGE("Recv return %{public}zd, errno:%{public}d", size, errno);
220 break;
221 } else {
222 FI_HILOGE("The client side disconnect with the server, size:0, errno:%{public}d", errno);
223 ReleaseSession(fd, ev);
224 break;
225 }
226 if (static_cast<size_t>(size) < MAX_PACKET_BUF_SIZE) {
227 break;
228 }
229 }
230 }
231
OnEpollEvent(epoll_event & ev)232 void StreamServer::OnEpollEvent(epoll_event& ev)
233 {
234 CHKPV(ev.data.ptr);
235 int32_t fd = *static_cast<int32_t*>(ev.data.ptr);
236 if (fd < 0) {
237 FI_HILOGE("The fd less than 0, errCode:%{public}d", PARAM_INPUT_INVALID);
238 return;
239 }
240 if ((ev.events & EPOLLERR) || (ev.events & EPOLLHUP)) {
241 FI_HILOGI("EPOLLERR or EPOLLHUP, fd:%{public}d, ev.events:0x%{public}x", fd, ev.events);
242 ReleaseSession(fd, ev);
243 } else if (ev.events & EPOLLIN) {
244 OnEpollRecv(fd, ev);
245 }
246 }
247
DumpSession(const std::string & title)248 void StreamServer::DumpSession(const std::string &title)
249 {
250 FI_HILOGD("in %s:%s", __func__, title.c_str());
251 int32_t i = 0;
252 for (auto &[key, value] : sessionsMap_) {
253 CHKPV(value);
254 FI_HILOGD("%d, %s", i, value->GetDescript().c_str());
255 i++;
256 }
257 }
258
GetSession(int32_t fd) const259 SessionPtr StreamServer::GetSession(int32_t fd) const
260 {
261 auto it = sessionsMap_.find(fd);
262 if (it == sessionsMap_.end()) {
263 FI_HILOGE("Session not found, fd:%{public}d", fd);
264 return nullptr;
265 }
266 CHKPP(it->second);
267 return it->second->GetSharedPtr();
268 }
269
GetSessionByPid(int32_t pid) const270 SessionPtr StreamServer::GetSessionByPid(int32_t pid) const
271 {
272 int32_t fd = GetClientFd(pid);
273 if (fd <= 0) {
274 FI_HILOGE("Session not found, pid:%{public}d", pid);
275 return nullptr;
276 }
277 return GetSession(fd);
278 }
279
AddSession(SessionPtr ses)280 bool StreamServer::AddSession(SessionPtr ses)
281 {
282 CHKPF(ses);
283 FI_HILOGI("pid:%{public}d, fd:%{public}d", ses->GetPid(), ses->GetFd());
284 int32_t fd = ses->GetFd();
285 if (fd < 0) {
286 FI_HILOGE("The fd is less than 0");
287 return false;
288 }
289 int32_t pid = ses->GetPid();
290 if (pid <= 0) {
291 FI_HILOGE("Get process failed");
292 return false;
293 }
294 if (sessionsMap_.size() > MAX_SESSON_ALARM) {
295 FI_HILOGE("Too many clients, Warning Value:%{public}zu, Current Value:%{public}zu",
296 MAX_SESSON_ALARM, sessionsMap_.size());
297 return false;
298 }
299 DumpSession("AddSession");
300 idxPidMap_[pid] = fd;
301 sessionsMap_[fd] = ses;
302 FI_HILOGI("Add session end");
303 return true;
304 }
305
DelSession(int32_t fd)306 void StreamServer::DelSession(int32_t fd)
307 {
308 CALL_DEBUG_ENTER;
309 FI_HILOGI("fd:%{public}d", fd);
310 if (fd < 0) {
311 FI_HILOGE("The fd less than 0, errCode:%{public}d", PARAM_INPUT_INVALID);
312 return;
313 }
314 int32_t pid = GetClientPid(fd);
315 if (pid > 0) {
316 idxPidMap_.erase(pid);
317 }
318 auto it = sessionsMap_.find(fd);
319 if (it != sessionsMap_.end()) {
320 NotifySessionDeleted(it->second);
321 sessionsMap_.erase(it);
322 }
323 DumpSession("DelSession");
324 }
325
AddSessionDeletedCallback(int32_t pid,std::function<void (SessionPtr)> callback)326 void StreamServer::AddSessionDeletedCallback(int32_t pid, std::function<void(SessionPtr)> callback)
327 {
328 CALL_DEBUG_ENTER;
329 auto it = callbacks_.find(pid);
330 if (it != callbacks_.end()) {
331 FI_HILOGW("Deleted session already exists");
332 return;
333 }
334 callbacks_[pid] = callback;
335 }
336
NotifySessionDeleted(SessionPtr ses)337 void StreamServer::NotifySessionDeleted(SessionPtr ses)
338 {
339 CALL_DEBUG_ENTER;
340 auto it = callbacks_.find(ses->GetPid());
341 if (it != callbacks_.end()) {
342 it->second(ses);
343 callbacks_.erase(it);
344 }
345 }
346 } // namespace DeviceStatus
347 } // namespace Msdp
348 } // namespace OHOS
349