• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "client.h"
17 #include <pthread.h>
18 #include <securec.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/un.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include "common.h"
25 #include "log.h"
26 #include "net.h"
27 
28 #undef LOG_TAG
29 #define LOG_TAG "WifiRpcClient"
30 
31 const int FD_CHECK_TIMEOUT = 1000; /* poll wait time, units: ms */
32 const int CLIENT_STATE_IDLE = 0;
33 const int CLIENT_STATE_DEAL_REPLY = 1;
34 const int CLIENT_STATE_EXIT = 2;
35 
36 #define TMP_BUFF_SIZE 16
37 
38 static void *RpcClientThreadDeal(void *arg);
39 
RpcClientReadMsg(RpcClient * client)40 static char *RpcClientReadMsg(RpcClient *client)
41 {
42     if (client == NULL) {
43         return NULL;
44     }
45 
46     char *buff = ContextGetReadRecord(client->context);
47     while (buff == NULL && client->threadRunFlag) {
48         int ret = WaitFdEvent(client->context->fd, READ_EVENT, FD_CHECK_TIMEOUT);
49         if (ret < 0) {
50             LOGE("wait server reply message failed!");
51             client->threadRunFlag = 0;
52             return NULL;
53         } else if (ret == 0) {
54             continue;
55         }
56         ret = ContextReadNet(client->context);
57         if (ret < 0) {
58             LOGE("read server reply message failed!");
59             client->threadRunFlag = 0;
60             return NULL;
61         }
62         buff = ContextGetReadRecord(client->context);
63     }
64     if (!client->threadRunFlag) {
65         if (buff != NULL) {
66             free(buff);
67             buff = NULL;
68         }
69         return NULL;
70     }
71     return buff;
72 }
73 
RpcClientDealReadMsg(RpcClient * client,char * buff)74 static void RpcClientDealReadMsg(RpcClient *client, char *buff)
75 {
76     if (client == NULL) {
77         if (buff != NULL) {
78             free(buff);
79             buff = NULL;
80         }
81         return;
82     }
83 
84     char szTmp[TMP_BUFF_SIZE] = {0};
85     if (snprintf_s(szTmp, sizeof(szTmp), sizeof(szTmp) - 1, "N%c", client->context->cSplit) < 0) {
86         if (buff != NULL) {
87             free(buff);
88             buff = NULL;
89         }
90         return;
91     }
92     if (strncmp(buff, szTmp, strlen(szTmp)) == 0) { /* deal reply message */
93         pthread_mutex_lock(&client->mutex);
94         client->waitReply = CLIENT_STATE_DEAL_REPLY;
95         client->context->oneProcess = buff;
96         client->context->nPos = strlen(szTmp);
97         client->context->nSize = strlen(buff);
98         pthread_cond_signal(&client->condW);
99         pthread_mutex_unlock(&client->mutex);
100     } else { /* deal callback message */
101         pthread_mutex_lock(&client->mutex);
102         while (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
103             pthread_cond_wait(&client->condW, &client->mutex);
104         }
105         pthread_mutex_unlock(&client->mutex);
106         client->context->oneProcess = buff;
107         client->context->nPos = strlen(szTmp);
108         client->context->nSize = strlen(buff);
109         OnTransact(client->context);
110         free(buff);
111         buff = NULL;
112     }
113     return;
114 }
115 
RpcClientThreadDeal(void * arg)116 static void *RpcClientThreadDeal(void *arg)
117 {
118     RpcClient *client = (RpcClient *)arg;
119     if (client == NULL) {
120         return NULL;
121     }
122 
123     while (client->threadRunFlag) {
124         char *buff = RpcClientReadMsg(client);
125         if (buff == NULL) {
126             continue;
127         }
128         RpcClientDealReadMsg(client, buff);
129     }
130     pthread_mutex_lock(&client->mutex);
131     client->waitReply = CLIENT_STATE_EXIT;
132     pthread_cond_signal(&client->condW);
133     pthread_mutex_unlock(&client->mutex);
134     LOGI("Client read message thread exiting!");
135     return NULL;
136 }
137 
CreateRpcClient(const char * path)138 RpcClient *CreateRpcClient(const char *path)
139 {
140     int fd = ConnectUnixServer(path);
141     if (fd < 0) {
142         LOGE("connect server failed.");
143         return NULL;
144     }
145     SetNonBlock(fd, 1);
146     RpcClient *client = (RpcClient *)calloc(1, sizeof(RpcClient));
147     if (client == NULL) {
148         close(fd);
149         return NULL;
150     }
151     client->context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
152     if (client->context == NULL) {
153         LOGE("create context failed.");
154         close(fd);
155         free(client);
156         client = NULL;
157         return NULL;
158     }
159     client->context->fd = fd;
160     client->threadRunFlag = 1;
161     client->threadId = 0;
162     client->waitReply = CLIENT_STATE_IDLE;
163     client->callLockFlag = 0;
164     pthread_mutex_init(&client->mutex, NULL);
165     pthread_cond_init(&client->condW, NULL);
166     pthread_mutex_init(&client->lockMutex, NULL);
167     pthread_cond_init(&client->lockCond, NULL);
168     int ret = pthread_create(&client->threadId, NULL, RpcClientThreadDeal, client);
169     if (ret) {
170         pthread_cond_destroy(&client->condW);
171         pthread_mutex_destroy(&client->mutex);
172         pthread_cond_destroy(&client->lockCond);
173         pthread_mutex_destroy(&client->lockMutex);
174         ReleaseContext(client->context);
175         close(fd);
176         free(client);
177         client = NULL;
178         return NULL;
179     }
180     pthread_setname_np(client->threadId, "RpcClientThread");
181     signal(SIGPIPE, SIG_IGN);
182     return client;
183 }
184 
ReleaseRpcClient(RpcClient * client)185 void ReleaseRpcClient(RpcClient *client)
186 {
187     if (client != NULL) {
188         if (client->threadId != 0) {
189             client->threadRunFlag = 0;
190             pthread_join(client->threadId, NULL);
191         }
192         pthread_cond_destroy(&client->condW);
193         pthread_mutex_destroy(&client->mutex);
194         pthread_cond_destroy(&client->lockCond);
195         pthread_mutex_destroy(&client->lockMutex);
196         close(client->context->fd);
197         ReleaseContext(client->context);
198         free(client);
199         client = NULL;
200     }
201     return;
202 }
203 
RemoteCall(RpcClient * client)204 int RemoteCall(RpcClient *client)
205 {
206     if (client == NULL) {
207         return -1;
208     }
209     if (client->waitReply == CLIENT_STATE_EXIT) {
210         LOGE("remote call, but client exit.");
211         return -1;
212     }
213     int ret = 0;
214     Context *context = client->context;
215     while (context->wBegin != context->wEnd && ret >= 0) {
216         ret = ContextWriteNet(context);
217     }
218     if (ret < 0) {
219         LOGE("context write failed.");
220         return ret;
221     }
222     ret = 0; /* reset ret value */
223     pthread_mutex_lock(&client->mutex);
224     while (client->waitReply != CLIENT_STATE_DEAL_REPLY && client->waitReply != CLIENT_STATE_EXIT) {
225         pthread_cond_wait(&client->condW, &client->mutex);
226     }
227     if (client->waitReply == CLIENT_STATE_EXIT) {
228         LOGE("client exit.");
229         ret = -1;
230     }
231     pthread_mutex_unlock(&client->mutex);
232     return ret;
233 }
234 
ReadClientEnd(RpcClient * client)235 void ReadClientEnd(RpcClient *client)
236 {
237     if (client == NULL) {
238         return;
239     }
240 
241     pthread_mutex_lock(&client->mutex);
242     free(client->context->oneProcess);
243     client->context->oneProcess = NULL;
244     if (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
245         client->waitReply = CLIENT_STATE_IDLE;
246     }
247     pthread_cond_signal(&client->condW);
248     pthread_mutex_unlock(&client->mutex);
249     return;
250 }
251 
LockRpcClient(RpcClient * client)252 void LockRpcClient(RpcClient *client)
253 {
254     if (client == NULL) {
255         return;
256     }
257 
258     pthread_mutex_lock(&client->lockMutex);
259     while (client->callLockFlag != 0) {
260         pthread_cond_wait(&client->lockCond, &client->lockMutex);
261     }
262     client->callLockFlag = 1;
263     pthread_mutex_unlock(&client->lockMutex);
264     return;
265 }
266 
UnlockRpcClient(RpcClient * client)267 void UnlockRpcClient(RpcClient *client)
268 {
269     if (client == NULL) {
270         return;
271     }
272 
273     pthread_mutex_lock(&client->lockMutex);
274     client->callLockFlag = 0;
275     pthread_cond_signal(&client->lockCond);
276     pthread_mutex_unlock(&client->lockMutex);
277     return;
278 }
279