• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "client.h"
17 #include <pthread.h>
18 #include <securec.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/un.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include "common.h"
25 #include "log.h"
26 #include "net.h"
27 
28 #undef LOG_TAG
29 #define LOG_TAG "WifiRpcClient"
30 
31 const int FD_CHECK_TIMEOUT = 1000; /* poll wait time, units: ms */
32 const int CLIENT_STATE_IDLE = 0;
33 const int CLIENT_STATE_DEAL_REPLY = 1;
34 const int CLIENT_STATE_EXIT = 2;
35 
36 #define TMP_BUFF_SIZE 16
37 
38 static void *RpcClientThreadDeal(void *arg);
39 
RpcClientReadMsg(RpcClient * client)40 static char *RpcClientReadMsg(RpcClient *client)
41 {
42     if (client == NULL) {
43         return NULL;
44     }
45 
46     char *buff = ContextGetReadRecord(client->context);
47     while (buff == NULL && client->threadRunFlag) {
48         int ret = WaitFdEvent(client->context->fd, READ_EVENT, FD_CHECK_TIMEOUT);
49         if (ret < 0) {
50             LOGE("wait server reply message failed!");
51             client->threadRunFlag = 0;
52             return NULL;
53         } else if (ret == 0) {
54             continue;
55         }
56         ret = ContextReadNet(client->context);
57         if (ret < 0) {
58             LOGE("read server reply message failed!");
59             client->threadRunFlag = 0;
60             return NULL;
61         }
62         buff = ContextGetReadRecord(client->context);
63     }
64     if (!client->threadRunFlag) {
65         if (buff != NULL) {
66             free(buff);
67             buff = NULL;
68         }
69         return NULL;
70     }
71     return buff;
72 }
73 
RpcClientDealReadMsg(RpcClient * client,char * buff)74 static void RpcClientDealReadMsg(RpcClient *client, char *buff)
75 {
76     if (client == NULL) {
77         if (buff != NULL) {
78             free(buff);
79             buff = NULL;
80         }
81         return;
82     }
83 
84     char szTmp[TMP_BUFF_SIZE] = {0};
85     if (snprintf_s(szTmp, sizeof(szTmp), sizeof(szTmp) - 1, "N%c", client->context->cSplit) < 0) {
86         if (buff != NULL) {
87             free(buff);
88             buff = NULL;
89         }
90         return;
91     }
92     if (strncmp(buff, szTmp, strlen(szTmp)) == 0) { /* deal reply message */
93         pthread_mutex_lock(&client->mutex);
94         client->waitReply = CLIENT_STATE_DEAL_REPLY;
95         client->context->oneProcess = buff;
96         client->context->nPos = strlen(szTmp);
97         client->context->nSize = strlen(buff);
98         pthread_cond_signal(&client->condW);
99         pthread_mutex_unlock(&client->mutex);
100     } else { /* deal callback message */
101         pthread_mutex_lock(&client->mutex);
102         while (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
103             pthread_cond_wait(&client->condW, &client->mutex);
104         }
105         pthread_mutex_unlock(&client->mutex);
106         client->context->oneProcess = buff;
107         client->context->nPos = strlen(szTmp);
108         client->context->nSize = strlen(buff);
109         OnTransact(client->context);
110         free(buff);
111         buff = NULL;
112     }
113     return;
114 }
115 
RpcClientThreadDeal(void * arg)116 static void *RpcClientThreadDeal(void *arg)
117 {
118     RpcClient *client = (RpcClient *)arg;
119     if (client == NULL) {
120         return NULL;
121     }
122 
123     while (client->threadRunFlag) {
124         char *buff = RpcClientReadMsg(client);
125         if (buff == NULL) {
126             continue;
127         }
128         RpcClientDealReadMsg(client, buff);
129     }
130     pthread_mutex_lock(&client->mutex);
131     client->waitReply = CLIENT_STATE_EXIT;
132     pthread_cond_signal(&client->condW);
133     pthread_mutex_unlock(&client->mutex);
134     LOGI("Client read message thread exiting!");
135     return NULL;
136 }
137 
CreateRpcClient(const char * path)138 RpcClient *CreateRpcClient(const char *path)
139 {
140     int fd = ConnectUnixServer(path);
141     if (fd < 0) {
142         return NULL;
143     }
144     SetNonBlock(fd, 1);
145     RpcClient *client = (RpcClient *)calloc(1, sizeof(RpcClient));
146     if (client == NULL) {
147         close(fd);
148         return NULL;
149     }
150     client->context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
151     if (client->context == NULL) {
152         close(fd);
153         free(client);
154         client = NULL;
155         return NULL;
156     }
157     client->context->fd = fd;
158     client->threadRunFlag = 1;
159     client->threadId = 0;
160     client->waitReply = CLIENT_STATE_IDLE;
161     client->callLockFlag = 0;
162     pthread_mutex_init(&client->mutex, NULL);
163     pthread_cond_init(&client->condW, NULL);
164     pthread_mutex_init(&client->lockMutex, NULL);
165     pthread_cond_init(&client->lockCond, NULL);
166     int ret = pthread_create(&client->threadId, NULL, RpcClientThreadDeal, client);
167     if (ret) {
168         pthread_cond_destroy(&client->condW);
169         pthread_mutex_destroy(&client->mutex);
170         pthread_cond_destroy(&client->lockCond);
171         pthread_mutex_destroy(&client->lockMutex);
172         ReleaseContext(client->context);
173         close(fd);
174         free(client);
175         client = NULL;
176         return NULL;
177     }
178     pthread_setname_np(client->threadId, "RpcClientThread");
179     signal(SIGPIPE, SIG_IGN);
180     return client;
181 }
182 
ReleaseRpcClient(RpcClient * client)183 void ReleaseRpcClient(RpcClient *client)
184 {
185     if (client != NULL) {
186         if (client->threadId != 0) {
187             client->threadRunFlag = 0;
188             pthread_join(client->threadId, NULL);
189         }
190         pthread_cond_destroy(&client->condW);
191         pthread_mutex_destroy(&client->mutex);
192         pthread_cond_destroy(&client->lockCond);
193         pthread_mutex_destroy(&client->lockMutex);
194         close(client->context->fd);
195         ReleaseContext(client->context);
196         free(client);
197         client = NULL;
198     }
199     return;
200 }
201 
RemoteCall(RpcClient * client)202 int RemoteCall(RpcClient *client)
203 {
204     if (client == NULL) {
205         return -1;
206     }
207     if (client->waitReply == CLIENT_STATE_EXIT) {
208         return -1;
209     }
210     int ret = 0;
211     Context *context = client->context;
212     while (context->wBegin != context->wEnd && ret >= 0) {
213         ret = ContextWriteNet(context);
214     }
215     if (ret < 0) {
216         return ret;
217     }
218     ret = 0; /* reset ret value */
219     pthread_mutex_lock(&client->mutex);
220     while (client->waitReply != CLIENT_STATE_DEAL_REPLY && client->waitReply != CLIENT_STATE_EXIT) {
221         pthread_cond_wait(&client->condW, &client->mutex);
222     }
223     if (client->waitReply == CLIENT_STATE_EXIT) {
224         ret = -1;
225     }
226     pthread_mutex_unlock(&client->mutex);
227     return ret;
228 }
229 
ReadClientEnd(RpcClient * client)230 void ReadClientEnd(RpcClient *client)
231 {
232     if (client == NULL) {
233         return;
234     }
235 
236     pthread_mutex_lock(&client->mutex);
237     free(client->context->oneProcess);
238     client->context->oneProcess = NULL;
239     if (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
240         client->waitReply = CLIENT_STATE_IDLE;
241     }
242     pthread_cond_signal(&client->condW);
243     pthread_mutex_unlock(&client->mutex);
244     return;
245 }
246 
LockRpcClient(RpcClient * client)247 void LockRpcClient(RpcClient *client)
248 {
249     if (client == NULL) {
250         return;
251     }
252 
253     pthread_mutex_lock(&client->lockMutex);
254     while (client->callLockFlag != 0) {
255         pthread_cond_wait(&client->lockCond, &client->lockMutex);
256     }
257     client->callLockFlag = 1;
258     pthread_mutex_unlock(&client->lockMutex);
259     return;
260 }
261 
UnlockRpcClient(RpcClient * client)262 void UnlockRpcClient(RpcClient *client)
263 {
264     if (client == NULL) {
265         return;
266     }
267 
268     pthread_mutex_lock(&client->lockMutex);
269     client->callLockFlag = 0;
270     pthread_cond_signal(&client->lockCond);
271     pthread_mutex_unlock(&client->lockMutex);
272     return;
273 }
274