1 /*
2 * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "server.h"
17 #include <fcntl.h>
18 #include <pthread.h>
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/epoll.h>
22 #include <sys/socket.h>
23 #include <sys/un.h>
24 #include "common.h"
25 #include "log.h"
26 #include "net.h"
27
28 #undef LOG_TAG
29 #define LOG_TAG "WifiRpcServer"
30
31 const int DEFAULT_LISTEN_QUEUE_SIZE = 10;
32 const int MAX_SUPPORT_CLIENT_FD_SIZE = 256; /* support max clients online */
33 const int DEFAULT_HASHTABLE_SLOTS = 7;
34 const int SERIAL_DATA_HEAD_SIZE = 2; /* RPC message head size: N| / C| just 2 */
35
36 static int BeforeLoop(RpcServer *server);
37 static int RemoveCallback(RpcServer *server, const Context *context);
38
OnAccept(RpcServer * server,unsigned int mask)39 static int OnAccept(RpcServer *server, unsigned int mask)
40 {
41 if (server == NULL) {
42 return -1;
43 }
44
45 if ((mask & READ_EVENT) == 0) {
46 return 0;
47 }
48 int fd = accept(server->listenFd, NULL, NULL);
49 if (fd < 0) {
50 return -1;
51 }
52 SetNonBlock(fd, 1);
53 fcntl(fd, F_SETFD, FD_CLOEXEC);
54 Context *context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
55 if (context != NULL) {
56 context->fd = fd;
57 InsertHashTable(server->clients, context);
58 AddFdEvent(server->loop, fd, READ_EVENT | WRIT_EVENT);
59 } else {
60 close(fd);
61 LOGE("Init Client context failed!");
62 return -1;
63 }
64 return 0;
65 }
66
CreateRpcServer(const char * path)67 RpcServer *CreateRpcServer(const char *path)
68 {
69 if (path == NULL) {
70 return NULL;
71 }
72 RpcServer *server = (RpcServer *)calloc(1, sizeof(RpcServer));
73 if (server == NULL) {
74 return NULL;
75 }
76 int flag = 1;
77 do {
78 int ret = CreateUnixServer(path, DEFAULT_LISTEN_QUEUE_SIZE);
79 if (ret < 0) {
80 break;
81 }
82 server->listenFd = ret;
83 server->isHandlingMsg = false;
84 server->loop = CreateEventLoop(MAX_SUPPORT_CLIENT_FD_SIZE);
85 if (server->loop == NULL) {
86 break;
87 }
88 server->clients = InitHashTable(DEFAULT_HASHTABLE_SLOTS);
89 if (server->clients == NULL) {
90 break;
91 }
92 if (AddFdEvent(server->loop, server->listenFd, READ_EVENT) < 0) {
93 break;
94 }
95 pthread_mutex_init(&server->mutex, NULL);
96 flag = 0;
97 } while (0);
98 if (flag) {
99 ReleaseRpcServer(server);
100 return NULL;
101 }
102 return server;
103 }
104
DealReadMessage(RpcServer * server,Context * client)105 static int DealReadMessage(RpcServer *server, Context *client)
106 {
107 if ((server == NULL) || (client == NULL)) {
108 return 0;
109 }
110 char *buf = ContextGetReadRecord(client);
111 if (buf == NULL) {
112 return 0;
113 }
114 client->oneProcess = buf;
115 client->nPos = SERIAL_DATA_HEAD_SIZE; /* N| */
116 client->nSize = strlen(buf);
117 OnTransact(server, client);
118 free(buf);
119 buf = NULL;
120 AddFdEvent(server->loop, client->fd, WRIT_EVENT);
121 return 1;
122 }
123
CheckEventMask(const struct epoll_event * e)124 static unsigned int CheckEventMask(const struct epoll_event *e)
125 {
126 if (e == NULL) {
127 return 0;
128 }
129 unsigned int mask = NONE_EVENT;
130 if ((e->events & EPOLLERR) || (e->events & EPOLLHUP)) {
131 mask |= READ_EVENT | WRIT_EVENT | EXCP_EVENT;
132 } else {
133 if (e->events & EPOLLIN) {
134 mask |= READ_EVENT;
135 }
136 if (e->events & EPOLLOUT) {
137 mask |= WRIT_EVENT;
138 }
139 }
140 return mask;
141 }
142
DealFdReadEvent(RpcServer * server,Context * client,unsigned int mask)143 static void DealFdReadEvent(RpcServer *server, Context *client, unsigned int mask)
144 {
145 if ((server == NULL) || (client == NULL)) {
146 return;
147 }
148 DealReadMessage(server, client);
149 int ret = ContextReadNet(client);
150 if ((ret == SOCK_ERR) || ((ret == SOCK_CLOSE) && (mask & EXCP_EVENT))) {
151 DelFdEvent(server->loop, client->fd, READ_EVENT | WRIT_EVENT);
152 } else if (ret == SOCK_CLOSE) {
153 DelFdEvent(server->loop, client->fd, READ_EVENT);
154 } else if (ret > 0) {
155 int haveMsg;
156 do {
157 haveMsg = DealReadMessage(server, client);
158 } while (haveMsg);
159 }
160 return;
161 }
162
DealFdWriteEvent(RpcServer * server,Context * client)163 static void DealFdWriteEvent(RpcServer *server, Context *client)
164 {
165 if ((server == NULL) || (client == NULL)) {
166 return;
167 }
168
169 if (client->wBegin != client->wEnd) {
170 int tmp = ContextWriteNet(client);
171 if (tmp < 0) {
172 DelFdEvent(server->loop, client->fd, READ_EVENT | WRIT_EVENT);
173 }
174 } else {
175 DelFdEvent(server->loop, client->fd, WRIT_EVENT);
176 }
177 return;
178 }
179
DealFdEvents(RpcServer * server,int fd,unsigned int mask)180 static void DealFdEvents(RpcServer *server, int fd, unsigned int mask)
181 {
182 if (server == NULL) {
183 return;
184 }
185 Context *client = FindContext(server->clients, fd);
186 if (client == NULL) {
187 LOGD("not find %{public}d clients!", fd);
188 return;
189 }
190 if (mask & READ_EVENT) {
191 DealFdReadEvent(server, client, mask);
192 }
193 if (mask & WRIT_EVENT) {
194 DealFdWriteEvent(server, client);
195 }
196 if (server->loop->fdMasks[fd].mask == NONE_EVENT) {
197 close(fd);
198 DeleteHashTable(server->clients, client);
199 RemoveCallback(server, client);
200 ReleaseContext(client);
201 }
202 return;
203 }
204
RunRpcLoop(RpcServer * server)205 int RunRpcLoop(RpcServer *server)
206 {
207 if (server == NULL) {
208 return -1;
209 }
210
211 EventLoop *loop = server->loop;
212 while (!loop->stop) {
213 BeforeLoop(server);
214 server->isHandlingMsg = false;
215 int retval = epoll_wait(loop->epfd, loop->epEvents, loop->setSize, -1);
216 server->isHandlingMsg = true;
217 for (int i = 0; i < retval; ++i) {
218 struct epoll_event *e = loop->epEvents + i;
219 int fd = e->data.fd;
220 unsigned int mask = CheckEventMask(e);
221 if (fd == server->listenFd) {
222 OnAccept(server, mask);
223 } else {
224 DealFdEvents(server, fd, mask);
225 }
226 }
227 }
228 return 0;
229 }
230
ReleaseRpcServer(RpcServer * server)231 void ReleaseRpcServer(RpcServer *server)
232 {
233 if (server != NULL) {
234 if (server->clients != NULL) {
235 DestroyHashTable(server->clients);
236 }
237 if (server->loop != NULL) {
238 DestroyEventLoop(server->loop);
239 }
240 if (server->listenFd > 0) {
241 close(server->listenFd);
242 }
243 pthread_mutex_destroy(&server->mutex);
244 free(server);
245 server = NULL;
246 }
247 }
248
BeforeLoop(RpcServer * server)249 static int BeforeLoop(RpcServer *server)
250 {
251 if (server == NULL) {
252 return -1;
253 }
254 pthread_mutex_lock(&server->mutex);
255 for (int i = 0; i < server->nEvents; ++i) {
256 int event = server->events[i];
257 int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
258 int pos = event % num;
259 struct Node *p = server->eventNode[pos].head;
260 while (p != NULL) {
261 Context *context = p->context;
262 OnCallbackTransact(server, event, context);
263 AddFdEvent(server->loop, context->fd, WRIT_EVENT);
264 p = p->next;
265 }
266 EndCallbackTransact(server, event);
267 }
268 server->nEvents = 0;
269 pthread_mutex_unlock(&server->mutex);
270 return 0;
271 }
272
EmitEvent(RpcServer * server,int event)273 int EmitEvent(RpcServer *server, int event)
274 {
275 if (server == NULL) {
276 return -1;
277 }
278 int num = sizeof(server->events) / sizeof(server->events[0]);
279 pthread_mutex_lock(&server->mutex);
280 if (server->nEvents >= num) {
281 pthread_mutex_unlock(&server->mutex);
282 return -1;
283 }
284 server->events[server->nEvents] = event;
285 ++server->nEvents;
286 pthread_mutex_unlock(&server->mutex);
287 /* Triger write to socket */
288 if (server->isHandlingMsg == false) {
289 BeforeLoop(server);
290 }
291 return 0;
292 }
293
RegisterCallback(RpcServer * server,int event,Context * context)294 int RegisterCallback(RpcServer *server, int event, Context *context)
295 {
296 if ((server == NULL) || (context == NULL)) {
297 return -1;
298 }
299
300 int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
301 int pos = event % num;
302 server->eventNode[pos].event = event;
303 struct Node *p = server->eventNode[pos].head;
304 while (p != NULL && p->context->fd != context->fd) {
305 p = p->next;
306 }
307 if (p == NULL) {
308 p = (struct Node *)calloc(1, sizeof(struct Node));
309 if (p != NULL) {
310 p->next = server->eventNode[pos].head;
311 p->context = context;
312 server->eventNode[pos].head = p;
313 }
314 }
315 return 0;
316 }
317
UnRegisterCallback(RpcServer * server,int event,const Context * context)318 int UnRegisterCallback(RpcServer *server, int event, const Context *context)
319 {
320 if ((server == NULL) || (context == NULL)) {
321 return -1;
322 }
323
324 int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
325 int pos = event % num;
326 server->eventNode[pos].event = event;
327 struct Node *p = server->eventNode[pos].head;
328 struct Node *q = p;
329 while (p != NULL && p->context->fd != context->fd) {
330 q = p;
331 p = p->next;
332 }
333 if (p != NULL) {
334 if (p == server->eventNode[pos].head) {
335 server->eventNode[pos].head = p->next;
336 } else {
337 q->next = p->next;
338 }
339 free(p);
340 p = NULL;
341 }
342 return 0;
343 }
344
RemoveCallback(RpcServer * server,const Context * context)345 static int RemoveCallback(RpcServer *server, const Context *context)
346 {
347 if ((server == NULL) || (context == NULL)) {
348 return -1;
349 }
350
351 int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
352 for (int i = 0; i < num; ++i) {
353 struct Node *p = server->eventNode[i].head;
354 if (p == NULL) {
355 continue;
356 }
357 struct Node *q = p;
358 while (p != NULL && p->context->fd != context->fd) {
359 q = p;
360 p = p->next;
361 }
362 if (p != NULL) {
363 if (p == server->eventNode[i].head) {
364 server->eventNode[i].head = p->next;
365 } else {
366 q->next = p->next;
367 }
368 free(p);
369 p = NULL;
370 }
371 }
372 return 0;
373 }
374