• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "server.h"
17 #include <fcntl.h>
18 #include <pthread.h>
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/epoll.h>
22 #include <sys/socket.h>
23 #include <sys/un.h>
24 #include "common.h"
25 #include "log.h"
26 #include "net.h"
27 
28 #undef LOG_TAG
29 #define LOG_TAG "WifiRpcServer"
30 
31 const int DEFAULT_LISTEN_QUEUE_SIZE = 10;
32 const int MAX_SUPPORT_CLIENT_FD_SIZE = 256; /* support max clients online */
33 const int DEFAULT_HASHTABLE_SLOTS = 7;
34 const int SERIAL_DATA_HEAD_SIZE = 2; /* RPC message head size: N| / C| just 2 */
35 
36 static int BeforeLoop(RpcServer *server);
37 static int RemoveCallback(RpcServer *server, const Context *context);
38 
OnAccept(RpcServer * server,unsigned int mask)39 static int OnAccept(RpcServer *server, unsigned int mask)
40 {
41     if (server == NULL) {
42         return -1;
43     }
44 
45     if ((mask & READ_EVENT) == 0) {
46         return 0;
47     }
48     int fd = accept(server->listenFd, NULL, NULL);
49     if (fd < 0) {
50         return -1;
51     }
52     SetNonBlock(fd, 1);
53     fcntl(fd, F_SETFD, FD_CLOEXEC);
54     Context *context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
55     if (context != NULL) {
56         context->fd = fd;
57         InsertHashTable(server->clients, context);
58         AddFdEvent(server->loop, fd, READ_EVENT | WRIT_EVENT);
59     } else {
60         close(fd);
61         LOGE("Init Client context failed!");
62         return -1;
63     }
64     return 0;
65 }
66 
CreateRpcServer(const char * path)67 RpcServer *CreateRpcServer(const char *path)
68 {
69     if (path == NULL) {
70         return NULL;
71     }
72     RpcServer *server = (RpcServer *)calloc(1, sizeof(RpcServer));
73     if (server == NULL) {
74         return NULL;
75     }
76     int flag = 1;
77     do {
78         int ret = CreateUnixServer(path, DEFAULT_LISTEN_QUEUE_SIZE);
79         if (ret < 0) {
80             break;
81         }
82         server->listenFd = ret;
83         server->isHandlingMsg = false;
84         server->loop = CreateEventLoop(MAX_SUPPORT_CLIENT_FD_SIZE);
85         if (server->loop == NULL) {
86             break;
87         }
88         server->clients = InitHashTable(DEFAULT_HASHTABLE_SLOTS);
89         if (server->clients == NULL) {
90             break;
91         }
92         if (AddFdEvent(server->loop, server->listenFd, READ_EVENT) < 0) {
93             break;
94         }
95         pthread_mutex_init(&server->mutex, NULL);
96         flag = 0;
97     } while (0);
98     if (flag) {
99         ReleaseRpcServer(server);
100         return NULL;
101     }
102     return server;
103 }
104 
DealReadMessage(RpcServer * server,Context * client)105 static int DealReadMessage(RpcServer *server, Context *client)
106 {
107     if ((server == NULL) || (client == NULL)) {
108         return 0;
109     }
110     char *buf = ContextGetReadRecord(client);
111     if (buf == NULL) {
112         return 0;
113     }
114     client->oneProcess = buf;
115     client->nPos = SERIAL_DATA_HEAD_SIZE; /* N| */
116     client->nSize = strlen(buf);
117     OnTransact(server, client);
118     free(buf);
119     buf = NULL;
120     AddFdEvent(server->loop, client->fd, WRIT_EVENT);
121     return 1;
122 }
123 
CheckEventMask(const struct epoll_event * e)124 static unsigned int CheckEventMask(const struct epoll_event *e)
125 {
126     if (e == NULL) {
127         return 0;
128     }
129     unsigned int mask = NONE_EVENT;
130     if ((e->events & EPOLLERR) || (e->events & EPOLLHUP)) {
131         mask |= READ_EVENT | WRIT_EVENT | EXCP_EVENT;
132     } else {
133         if (e->events & EPOLLIN) {
134             mask |= READ_EVENT;
135         }
136         if (e->events & EPOLLOUT) {
137             mask |= WRIT_EVENT;
138         }
139     }
140     return mask;
141 }
142 
DealFdReadEvent(RpcServer * server,Context * client,unsigned int mask)143 static void DealFdReadEvent(RpcServer *server, Context *client, unsigned int mask)
144 {
145     if ((server == NULL) || (client == NULL)) {
146         return;
147     }
148     DealReadMessage(server, client);
149     int ret = ContextReadNet(client);
150     if ((ret == SOCK_ERR) || ((ret == SOCK_CLOSE) && (mask & EXCP_EVENT))) {
151         DelFdEvent(server->loop, client->fd, READ_EVENT | WRIT_EVENT);
152     } else if (ret == SOCK_CLOSE) {
153         DelFdEvent(server->loop, client->fd, READ_EVENT);
154     } else if (ret > 0) {
155         int haveMsg;
156         do {
157             haveMsg = DealReadMessage(server, client);
158         } while (haveMsg);
159     }
160     return;
161 }
162 
DealFdWriteEvent(RpcServer * server,Context * client)163 static void DealFdWriteEvent(RpcServer *server, Context *client)
164 {
165     if ((server == NULL) || (client == NULL)) {
166         return;
167     }
168 
169     if (client->wBegin != client->wEnd) {
170         int tmp = ContextWriteNet(client);
171         if (tmp < 0) {
172             DelFdEvent(server->loop, client->fd, READ_EVENT | WRIT_EVENT);
173         }
174     } else {
175         DelFdEvent(server->loop, client->fd, WRIT_EVENT);
176     }
177     return;
178 }
179 
DealFdEvents(RpcServer * server,int fd,unsigned int mask)180 static void DealFdEvents(RpcServer *server, int fd, unsigned int mask)
181 {
182     if (server == NULL) {
183         return;
184     }
185     Context *client = FindContext(server->clients, fd);
186     if (client == NULL) {
187         LOGD("not find %{public}d clients!", fd);
188         return;
189     }
190     if (mask & READ_EVENT) {
191         DealFdReadEvent(server, client, mask);
192     }
193     if (mask & WRIT_EVENT) {
194         DealFdWriteEvent(server, client);
195     }
196     if (server->loop->fdMasks[fd].mask == NONE_EVENT) {
197         close(fd);
198         DeleteHashTable(server->clients, client);
199         RemoveCallback(server, client);
200         ReleaseContext(client);
201     }
202     return;
203 }
204 
RunRpcLoop(RpcServer * server)205 int RunRpcLoop(RpcServer *server)
206 {
207     if (server == NULL) {
208         return -1;
209     }
210 
211     EventLoop *loop = server->loop;
212     while (!loop->stop) {
213         BeforeLoop(server);
214         server->isHandlingMsg = false;
215         int retval = epoll_wait(loop->epfd, loop->epEvents, loop->setSize, -1);
216         server->isHandlingMsg = true;
217         for (int i = 0; i < retval; ++i) {
218             struct epoll_event *e = loop->epEvents + i;
219             int fd = e->data.fd;
220             unsigned int mask = CheckEventMask(e);
221             if (fd == server->listenFd) {
222                 OnAccept(server, mask);
223             } else {
224                 DealFdEvents(server, fd, mask);
225             }
226         }
227     }
228     return 0;
229 }
230 
ReleaseRpcServer(RpcServer * server)231 void ReleaseRpcServer(RpcServer *server)
232 {
233     if (server != NULL) {
234         if (server->clients != NULL) {
235             DestroyHashTable(server->clients);
236         }
237         if (server->loop != NULL) {
238             DestroyEventLoop(server->loop);
239         }
240         if (server->listenFd > 0) {
241             close(server->listenFd);
242         }
243         pthread_mutex_destroy(&server->mutex);
244         free(server);
245         server = NULL;
246     }
247 }
248 
BeforeLoop(RpcServer * server)249 static int BeforeLoop(RpcServer *server)
250 {
251     if (server == NULL) {
252         return -1;
253     }
254     pthread_mutex_lock(&server->mutex);
255     for (int i = 0; i < server->nEvents; ++i) {
256         int event = server->events[i];
257         int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
258         int pos = event % num;
259         struct Node *p = server->eventNode[pos].head;
260         while (p != NULL) {
261             Context *context = p->context;
262             OnCallbackTransact(server, event, context);
263             AddFdEvent(server->loop, context->fd, WRIT_EVENT);
264             p = p->next;
265         }
266         EndCallbackTransact(server, event);
267     }
268     server->nEvents = 0;
269     pthread_mutex_unlock(&server->mutex);
270     return 0;
271 }
272 
EmitEvent(RpcServer * server,int event)273 int EmitEvent(RpcServer *server, int event)
274 {
275     if (server == NULL) {
276         return -1;
277     }
278     int num = sizeof(server->events) / sizeof(server->events[0]);
279     pthread_mutex_lock(&server->mutex);
280     if (server->nEvents >= num) {
281         pthread_mutex_unlock(&server->mutex);
282         return -1;
283     }
284     server->events[server->nEvents] = event;
285     ++server->nEvents;
286     pthread_mutex_unlock(&server->mutex);
287     /* Triger write to socket */
288     if (server->isHandlingMsg == false) {
289         BeforeLoop(server);
290     }
291     return 0;
292 }
293 
RegisterCallback(RpcServer * server,int event,Context * context)294 int RegisterCallback(RpcServer *server, int event, Context *context)
295 {
296     if ((server == NULL) || (context == NULL)) {
297         return -1;
298     }
299 
300     int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
301     int pos = event % num;
302     server->eventNode[pos].event = event;
303     struct Node *p = server->eventNode[pos].head;
304     while (p != NULL && p->context->fd != context->fd) {
305         p = p->next;
306     }
307     if (p == NULL) {
308         p = (struct Node *)calloc(1, sizeof(struct Node));
309         if (p != NULL) {
310             p->next = server->eventNode[pos].head;
311             p->context = context;
312             server->eventNode[pos].head = p;
313         }
314     }
315     return 0;
316 }
317 
UnRegisterCallback(RpcServer * server,int event,const Context * context)318 int UnRegisterCallback(RpcServer *server, int event, const Context *context)
319 {
320     if ((server == NULL) || (context == NULL)) {
321         return -1;
322     }
323 
324     int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
325     int pos = event % num;
326     server->eventNode[pos].event = event;
327     struct Node *p = server->eventNode[pos].head;
328     struct Node *q = p;
329     while (p != NULL && p->context->fd != context->fd) {
330         q = p;
331         p = p->next;
332     }
333     if (p != NULL) {
334         if (p == server->eventNode[pos].head) {
335             server->eventNode[pos].head = p->next;
336         } else {
337             q->next = p->next;
338         }
339         free(p);
340         p = NULL;
341     }
342     return 0;
343 }
344 
RemoveCallback(RpcServer * server,const Context * context)345 static int RemoveCallback(RpcServer *server, const Context *context)
346 {
347     if ((server == NULL) || (context == NULL)) {
348         return -1;
349     }
350 
351     int num = sizeof(server->eventNode) / sizeof(server->eventNode[0]);
352     for (int i = 0; i < num; ++i) {
353         struct Node *p = server->eventNode[i].head;
354         if (p == NULL) {
355             continue;
356         }
357         struct Node *q = p;
358         while (p != NULL && p->context->fd != context->fd) {
359             q = p;
360             p = p->next;
361         }
362         if (p != NULL) {
363             if (p == server->eventNode[i].head) {
364                 server->eventNode[i].head = p->next;
365             } else {
366                 q->next = p->next;
367             }
368             free(p);
369             p = NULL;
370         }
371     }
372     return 0;
373 }
374