• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "stream_server.h"
17 
18 #include <cinttypes>
19 #include <list>
20 
21 #include <sys/socket.h>
22 
23 #include "devicestatus_service.h"
24 #include "fi_log.h"
25 
26 namespace OHOS {
27 namespace Msdp {
28 namespace DeviceStatus {
29 namespace {
30 constexpr OHOS::HiviewDFX::HiLogLabel LABEL { LOG_CORE, MSDP_DOMAIN_ID, "StreamServer" };
31 } // namespace
32 
~StreamServer()33 StreamServer::~StreamServer()
34 {
35     CALL_DEBUG_ENTER;
36     UdsStop();
37 }
38 
UdsStop()39 void StreamServer::UdsStop()
40 {
41     if (epollFd_ != -1) {
42         if (close(epollFd_) < 0) {
43             FI_HILOGE("Close epoll fd failed, error:%{public}s, epollFd_:%{public}d", strerror(errno), epollFd_);
44         }
45         epollFd_ = -1;
46     }
47 
48     for (const auto &item : sessionsMap_) {
49         item.second->Close();
50     }
51     sessionsMap_.clear();
52 }
53 
GetClientFd(int32_t pid) const54 int32_t StreamServer::GetClientFd(int32_t pid) const
55 {
56     auto it = idxPidMap_.find(pid);
57     if (it == idxPidMap_.end()) {
58         return INVALID_FD;
59     }
60     return it->second;
61 }
62 
GetClientPid(int32_t fd) const63 int32_t StreamServer::GetClientPid(int32_t fd) const
64 {
65     auto it = sessionsMap_.find(fd);
66     if (it == sessionsMap_.end()) {
67         return INVALID_PID;
68     }
69     return it->second->GetPid();
70 }
71 
SendMsg(int32_t fd,NetPacket & pkt)72 bool StreamServer::SendMsg(int32_t fd, NetPacket& pkt)
73 {
74     if (fd < 0) {
75         FI_HILOGE("The fd is less than 0");
76         return false;
77     }
78     auto ses = GetSession(fd);
79     if (ses == nullptr) {
80         FI_HILOGE("The fd:%{public}d not found, The message was discarded, errCode:%{public}d",
81             fd, SESSION_NOT_FOUND);
82         return false;
83     }
84     return ses->SendMsg(pkt);
85 }
86 
Multicast(const std::vector<int32_t> & fdList,NetPacket & pkt)87 void StreamServer::Multicast(const std::vector<int32_t>& fdList, NetPacket& pkt)
88 {
89     for (const auto &item : fdList) {
90         SendMsg(item, pkt);
91     }
92 }
93 
AddSocketPairInfo(const std::string & programName,int32_t moduleType,int32_t uid,int32_t pid,int32_t & serverFd,int32_t & toReturnClientFd,int32_t & tokenType)94 int32_t StreamServer::AddSocketPairInfo(const std::string& programName, int32_t moduleType, int32_t uid, int32_t pid,
95     int32_t& serverFd, int32_t& toReturnClientFd, int32_t& tokenType)
96 {
97     CALL_DEBUG_ENTER;
98     int32_t sockFds[2] = { -1 };
99 
100     if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockFds) != 0) {
101         FI_HILOGE("Call socketpair failed, errno:%{public}d", errno);
102         return RET_ERR;
103     }
104     serverFd = sockFds[0];
105     toReturnClientFd = sockFds[1];
106     if (serverFd < 0 || toReturnClientFd < 0) {
107         FI_HILOGE("Call fcntl failed, errno:%{public}d", errno);
108         return RET_ERR;
109     }
110     static constexpr size_t bufferSize = 32 * 1024;
111     static constexpr size_t nativeBufferSize = 64 * 1024;
112     SessionPtr sess = nullptr;
113     if (setsockopt(serverFd, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)) != 0) {
114         FI_HILOGE("setsockopt serverFd failed, errno:%{public}d", errno);
115         goto CLOSE_SOCK;
116     }
117     if (setsockopt(serverFd, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)) != 0) {
118         FI_HILOGE("setsockopt serverFd failed, errno:%{public}d", errno);
119         goto CLOSE_SOCK;
120     }
121     if (tokenType == TokenType::TOKEN_NATIVE) {
122         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_SNDBUF, &nativeBufferSize, sizeof(nativeBufferSize)) != 0) {
123             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
124             goto CLOSE_SOCK;
125         }
126         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_RCVBUF, &nativeBufferSize, sizeof(nativeBufferSize)) != 0) {
127             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
128             goto CLOSE_SOCK;
129         }
130     } else {
131         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)) != 0) {
132             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
133             goto CLOSE_SOCK;
134         }
135         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)) != 0) {
136             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
137             goto CLOSE_SOCK;
138         }
139     }
140     sess = std::make_shared<StreamSession>(programName, moduleType, serverFd, uid, pid);
141     sess->SetTokenType(tokenType);
142     if (!AddSession(sess)) {
143         FI_HILOGE("AddSession fail errCode:%{public}d", ADD_SESSION_FAIL);
144         goto CLOSE_SOCK;
145     }
146     if (AddEpoll(EPOLL_EVENT_SOCKET, serverFd) != RET_OK) {
147         FI_HILOGE("epoll_ctl EPOLL_CTL_ADD failed, errCode:%{public}d", EPOLL_MODIFY_FAIL);
148         goto CLOSE_SOCK;
149     }
150     OnConnected(sess);
151     return RET_OK;
152 
153 CLOSE_SOCK:
154     if (close(serverFd) < 0) {
155         FI_HILOGE("Close server fd failed, error:%{public}s, serverFd:%{public}d", strerror(errno), serverFd);
156     }
157     serverFd = -1;
158     if (close(toReturnClientFd) < 0) {
159         FI_HILOGE("Close fd failed, error:%{public}s, toReturnClientFd:%{public}d", strerror(errno), toReturnClientFd);
160     }
161     toReturnClientFd = -1;
162     return RET_ERR;
163 }
164 
SetRecvFun(MsgServerFunCallback fun)165 void StreamServer::SetRecvFun(MsgServerFunCallback fun)
166 {
167     recvFun_ = fun;
168 }
169 
ReleaseSession(int32_t fd,epoll_event & ev)170 void StreamServer::ReleaseSession(int32_t fd, epoll_event& ev)
171 {
172     auto secPtr = GetSession(fd);
173     if (secPtr != nullptr) {
174         OnDisconnected(secPtr);
175         DelSession(fd);
176     }
177     if (ev.data.ptr) {
178         free(ev.data.ptr);
179         ev.data.ptr = nullptr;
180     }
181     if (auto it = circleBufMap_.find(fd); it != circleBufMap_.end()) {
182         circleBufMap_.erase(it);
183     }
184     auto DeviceStatusService = DeviceStatus::DelayedSpSingleton<DeviceStatus::DeviceStatusService>::GetInstance();
185     DeviceStatusService->DelEpoll(EPOLL_EVENT_SOCKET, fd);
186     if (close(fd) < 0) {
187         FI_HILOGE("Close fd failed, error:%{public}s, fd:%{public}d", strerror(errno), fd);
188     }
189 }
190 
OnPacket(int32_t fd,NetPacket & pkt)191 void StreamServer::OnPacket(int32_t fd, NetPacket& pkt)
192 {
193     auto sess = GetSession(fd);
194     CHKPV(sess);
195     recvFun_(sess, pkt);
196 }
197 
OnEpollRecv(int32_t fd,epoll_event & ev)198 void StreamServer::OnEpollRecv(int32_t fd, epoll_event& ev)
199 {
200     if (fd < 0) {
201         FI_HILOGE("Invalid fd:%{public}d", fd);
202         return;
203     }
204     auto& buf = circleBufMap_[fd];
205     char szBuf[MAX_PACKET_BUF_SIZE] = {};
206     for (int32_t i = 0; i < MAX_RECV_LIMIT; i++) {
207         ssize_t size = recv(fd, szBuf, MAX_PACKET_BUF_SIZE, MSG_DONTWAIT | MSG_NOSIGNAL);
208         if (size > 0) {
209             if (!buf.Write(szBuf, size)) {
210                 FI_HILOGW("Write data failed, size:%{public}zd", size);
211             }
212             OnReadPackets(buf, std::bind(&StreamServer::OnPacket, this, fd, std::placeholders::_1));
213         } else if (size < 0) {
214             if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
215                 FI_HILOGD("Continue for errno EAGAIN|EINTR|EWOULDBLOCK size:%{public}zd errno:%{public}d",
216                     size, errno);
217                 continue;
218             }
219             FI_HILOGE("Recv return %{public}zd, errno:%{public}d", size, errno);
220             break;
221         } else {
222             FI_HILOGE("The client side disconnect with the server, size:0, errno:%{public}d", errno);
223             ReleaseSession(fd, ev);
224             break;
225         }
226         if (static_cast<size_t>(size) < MAX_PACKET_BUF_SIZE) {
227             break;
228         }
229     }
230 }
231 
OnEpollEvent(epoll_event & ev)232 void StreamServer::OnEpollEvent(epoll_event& ev)
233 {
234     CHKPV(ev.data.ptr);
235     int32_t fd = *static_cast<int32_t*>(ev.data.ptr);
236     if (fd < 0) {
237         FI_HILOGE("The fd less than 0, errCode:%{public}d", PARAM_INPUT_INVALID);
238         return;
239     }
240     if ((ev.events & EPOLLERR) || (ev.events & EPOLLHUP)) {
241         FI_HILOGI("EPOLLERR or EPOLLHUP, fd:%{public}d, ev.events:0x%{public}x", fd, ev.events);
242         ReleaseSession(fd, ev);
243     } else if (ev.events & EPOLLIN) {
244         OnEpollRecv(fd, ev);
245     }
246 }
247 
DumpSession(const std::string & title)248 void StreamServer::DumpSession(const std::string &title)
249 {
250     FI_HILOGD("in %s:%s", __func__, title.c_str());
251     int32_t i = 0;
252     for (auto &[key, value] : sessionsMap_) {
253         CHKPV(value);
254         FI_HILOGD("%d, %s", i, value->GetDescript().c_str());
255         i++;
256     }
257 }
258 
GetSession(int32_t fd) const259 SessionPtr StreamServer::GetSession(int32_t fd) const
260 {
261     auto it = sessionsMap_.find(fd);
262     if (it == sessionsMap_.end()) {
263         FI_HILOGE("Session not found, fd:%{public}d", fd);
264         return nullptr;
265     }
266     CHKPP(it->second);
267     return it->second->GetSharedPtr();
268 }
269 
GetSessionByPid(int32_t pid) const270 SessionPtr StreamServer::GetSessionByPid(int32_t pid) const
271 {
272     int32_t fd = GetClientFd(pid);
273     if (fd <= 0) {
274         FI_HILOGE("Session not found, pid:%{public}d", pid);
275         return nullptr;
276     }
277     return GetSession(fd);
278 }
279 
AddSession(SessionPtr ses)280 bool StreamServer::AddSession(SessionPtr ses)
281 {
282     CHKPF(ses);
283     FI_HILOGI("pid:%{public}d, fd:%{public}d", ses->GetPid(), ses->GetFd());
284     int32_t fd = ses->GetFd();
285     if (fd < 0) {
286         FI_HILOGE("The fd is less than 0");
287         return false;
288     }
289     int32_t pid = ses->GetPid();
290     if (pid <= 0) {
291         FI_HILOGE("Get process failed");
292         return false;
293     }
294     if (sessionsMap_.size() > MAX_SESSON_ALARM) {
295         FI_HILOGE("Too many clients, Warning Value:%{public}zu, Current Value:%{public}zu",
296             MAX_SESSON_ALARM, sessionsMap_.size());
297         return false;
298     }
299     DumpSession("AddSession");
300     idxPidMap_[pid] = fd;
301     sessionsMap_[fd] = ses;
302     FI_HILOGI("Add session end");
303     return true;
304 }
305 
DelSession(int32_t fd)306 void StreamServer::DelSession(int32_t fd)
307 {
308     CALL_DEBUG_ENTER;
309     FI_HILOGI("fd:%{public}d", fd);
310     if (fd < 0) {
311         FI_HILOGE("The fd less than 0, errCode:%{public}d", PARAM_INPUT_INVALID);
312         return;
313     }
314     int32_t pid = GetClientPid(fd);
315     if (pid > 0) {
316         idxPidMap_.erase(pid);
317     }
318     auto it = sessionsMap_.find(fd);
319     if (it != sessionsMap_.end()) {
320         NotifySessionDeleted(it->second);
321         sessionsMap_.erase(it);
322     }
323     DumpSession("DelSession");
324 }
325 
AddSessionDeletedCallback(int32_t pid,std::function<void (SessionPtr)> callback)326 void StreamServer::AddSessionDeletedCallback(int32_t pid, std::function<void(SessionPtr)> callback)
327 {
328     CALL_DEBUG_ENTER;
329     auto it = callbacks_.find(pid);
330     if (it != callbacks_.end()) {
331         FI_HILOGW("Deleted session already exists");
332         return;
333     }
334     callbacks_[pid] = callback;
335 }
336 
NotifySessionDeleted(SessionPtr ses)337 void StreamServer::NotifySessionDeleted(SessionPtr ses)
338 {
339     CALL_DEBUG_ENTER;
340     auto it = callbacks_.find(ses->GetPid());
341     if (it != callbacks_.end()) {
342         it->second(ses);
343         callbacks_.erase(it);
344     }
345 }
346 } // namespace DeviceStatus
347 } // namespace Msdp
348 } // namespace OHOS
349