1 /*
2 * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "client.h"
17 #include <pthread.h>
18 #include <securec.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/un.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include "common.h"
25 #include "log.h"
26 #include "net.h"
27
28 #undef LOG_TAG
29 #define LOG_TAG "WifiRpcClient"
30
31 const int FD_CHECK_TIMEOUT = 1000; /* poll wait time, units: ms */
32 const int CLIENT_STATE_IDLE = 0;
33 const int CLIENT_STATE_DEAL_REPLY = 1;
34 const int CLIENT_STATE_EXIT = 2;
35
36 #define TMP_BUFF_SIZE 16
37
38 static void *RpcClientThreadDeal(void *arg);
39
RpcClientReadMsg(RpcClient * client)40 static char *RpcClientReadMsg(RpcClient *client)
41 {
42 if (client == NULL) {
43 return NULL;
44 }
45
46 char *buff = ContextGetReadRecord(client->context);
47 while (buff == NULL && client->threadRunFlag) {
48 int ret = WaitFdEvent(client->context->fd, READ_EVENT, FD_CHECK_TIMEOUT);
49 if (ret < 0) {
50 LOGE("wait server reply message failed!");
51 client->threadRunFlag = 0;
52 return NULL;
53 } else if (ret == 0) {
54 continue;
55 }
56 ret = ContextReadNet(client->context);
57 if (ret < 0) {
58 LOGE("read server reply message failed!");
59 client->threadRunFlag = 0;
60 return NULL;
61 }
62 buff = ContextGetReadRecord(client->context);
63 }
64 if (!client->threadRunFlag) {
65 if (buff != NULL) {
66 free(buff);
67 buff = NULL;
68 }
69 return NULL;
70 }
71 return buff;
72 }
73
RpcClientDealReadMsg(RpcClient * client,char * buff)74 static void RpcClientDealReadMsg(RpcClient *client, char *buff)
75 {
76 if (client == NULL) {
77 if (buff != NULL) {
78 free(buff);
79 buff = NULL;
80 }
81 return;
82 }
83
84 char szTmp[TMP_BUFF_SIZE] = {0};
85 if (snprintf_s(szTmp, sizeof(szTmp), sizeof(szTmp) - 1, "N%c", client->context->cSplit) < 0) {
86 if (buff != NULL) {
87 free(buff);
88 buff = NULL;
89 }
90 return;
91 }
92 if (strncmp(buff, szTmp, strlen(szTmp)) == 0) { /* deal reply message */
93 pthread_mutex_lock(&client->mutex);
94 client->waitReply = CLIENT_STATE_DEAL_REPLY;
95 client->context->oneProcess = buff;
96 client->context->nPos = strlen(szTmp);
97 client->context->nSize = strlen(buff);
98 pthread_cond_signal(&client->condW);
99 pthread_mutex_unlock(&client->mutex);
100 } else { /* deal callback message */
101 pthread_mutex_lock(&client->mutex);
102 while (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
103 pthread_cond_wait(&client->condW, &client->mutex);
104 }
105 pthread_mutex_unlock(&client->mutex);
106 client->context->oneProcess = buff;
107 client->context->nPos = strlen(szTmp);
108 client->context->nSize = strlen(buff);
109 OnTransact(client->context);
110 free(buff);
111 buff = NULL;
112 }
113 return;
114 }
115
RpcClientThreadDeal(void * arg)116 static void *RpcClientThreadDeal(void *arg)
117 {
118 RpcClient *client = (RpcClient *)arg;
119 if (client == NULL) {
120 return NULL;
121 }
122
123 while (client->threadRunFlag) {
124 char *buff = RpcClientReadMsg(client);
125 if (buff == NULL) {
126 continue;
127 }
128 RpcClientDealReadMsg(client, buff);
129 }
130 pthread_mutex_lock(&client->mutex);
131 client->waitReply = CLIENT_STATE_EXIT;
132 pthread_cond_signal(&client->condW);
133 pthread_mutex_unlock(&client->mutex);
134 LOGI("Client read message thread exiting!");
135 return NULL;
136 }
137
CreateRpcClient(const char * path)138 RpcClient *CreateRpcClient(const char *path)
139 {
140 int fd = ConnectUnixServer(path);
141 if (fd < 0) {
142 return NULL;
143 }
144 SetNonBlock(fd, 1);
145 RpcClient *client = (RpcClient *)calloc(1, sizeof(RpcClient));
146 if (client == NULL) {
147 close(fd);
148 return NULL;
149 }
150 client->context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
151 if (client->context == NULL) {
152 close(fd);
153 free(client);
154 client = NULL;
155 return NULL;
156 }
157 client->context->fd = fd;
158 client->threadRunFlag = 1;
159 client->threadId = 0;
160 client->waitReply = CLIENT_STATE_IDLE;
161 client->callLockFlag = 0;
162 pthread_mutex_init(&client->mutex, NULL);
163 pthread_cond_init(&client->condW, NULL);
164 pthread_mutex_init(&client->lockMutex, NULL);
165 pthread_cond_init(&client->lockCond, NULL);
166 int ret = pthread_create(&client->threadId, NULL, RpcClientThreadDeal, client);
167 if (ret) {
168 pthread_cond_destroy(&client->condW);
169 pthread_mutex_destroy(&client->mutex);
170 pthread_cond_destroy(&client->lockCond);
171 pthread_mutex_destroy(&client->lockMutex);
172 ReleaseContext(client->context);
173 close(fd);
174 free(client);
175 client = NULL;
176 return NULL;
177 }
178 pthread_setname_np(client->threadId, "RpcClientThread");
179 signal(SIGPIPE, SIG_IGN);
180 return client;
181 }
182
ReleaseRpcClient(RpcClient * client)183 void ReleaseRpcClient(RpcClient *client)
184 {
185 if (client != NULL) {
186 if (client->threadId != 0) {
187 client->threadRunFlag = 0;
188 pthread_join(client->threadId, NULL);
189 }
190 pthread_cond_destroy(&client->condW);
191 pthread_mutex_destroy(&client->mutex);
192 pthread_cond_destroy(&client->lockCond);
193 pthread_mutex_destroy(&client->lockMutex);
194 close(client->context->fd);
195 ReleaseContext(client->context);
196 free(client);
197 client = NULL;
198 }
199 return;
200 }
201
RemoteCall(RpcClient * client)202 int RemoteCall(RpcClient *client)
203 {
204 if (client == NULL) {
205 return -1;
206 }
207 if (client->waitReply == CLIENT_STATE_EXIT) {
208 return -1;
209 }
210 int ret = 0;
211 Context *context = client->context;
212 while (context->wBegin != context->wEnd && ret >= 0) {
213 ret = ContextWriteNet(context);
214 }
215 if (ret < 0) {
216 return ret;
217 }
218 ret = 0; /* reset ret value */
219 pthread_mutex_lock(&client->mutex);
220 while (client->waitReply != CLIENT_STATE_DEAL_REPLY && client->waitReply != CLIENT_STATE_EXIT) {
221 pthread_cond_wait(&client->condW, &client->mutex);
222 }
223 if (client->waitReply == CLIENT_STATE_EXIT) {
224 ret = -1;
225 }
226 pthread_mutex_unlock(&client->mutex);
227 return ret;
228 }
229
ReadClientEnd(RpcClient * client)230 void ReadClientEnd(RpcClient *client)
231 {
232 if (client == NULL) {
233 return;
234 }
235
236 pthread_mutex_lock(&client->mutex);
237 free(client->context->oneProcess);
238 client->context->oneProcess = NULL;
239 if (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
240 client->waitReply = CLIENT_STATE_IDLE;
241 }
242 pthread_cond_signal(&client->condW);
243 pthread_mutex_unlock(&client->mutex);
244 return;
245 }
246
LockRpcClient(RpcClient * client)247 void LockRpcClient(RpcClient *client)
248 {
249 if (client == NULL) {
250 return;
251 }
252
253 pthread_mutex_lock(&client->lockMutex);
254 while (client->callLockFlag != 0) {
255 pthread_cond_wait(&client->lockCond, &client->lockMutex);
256 }
257 client->callLockFlag = 1;
258 pthread_mutex_unlock(&client->lockMutex);
259 return;
260 }
261
UnlockRpcClient(RpcClient * client)262 void UnlockRpcClient(RpcClient *client)
263 {
264 if (client == NULL) {
265 return;
266 }
267
268 pthread_mutex_lock(&client->lockMutex);
269 client->callLockFlag = 0;
270 pthread_cond_signal(&client->lockCond);
271 pthread_mutex_unlock(&client->lockMutex);
272 return;
273 }
274