1 /*
2 * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "client.h"
17 #include <pthread.h>
18 #include <securec.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/un.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include "common.h"
25 #include "log.h"
26 #include "net.h"
27
28 #undef LOG_TAG
29 #define LOG_TAG "WifiRpcClient"
30
31 const int FD_CHECK_TIMEOUT = 1000; /* poll wait time, units: ms */
32 const int CLIENT_STATE_IDLE = 0;
33 const int CLIENT_STATE_DEAL_REPLY = 1;
34 const int CLIENT_STATE_EXIT = 2;
35
36 #define TMP_BUFF_SIZE 16
37
38 static void *RpcClientThreadDeal(void *arg);
39
RpcClientReadMsg(RpcClient * client)40 static char *RpcClientReadMsg(RpcClient *client)
41 {
42 if (client == NULL) {
43 return NULL;
44 }
45
46 char *buff = ContextGetReadRecord(client->context);
47 while (buff == NULL && client->threadRunFlag) {
48 int ret = WaitFdEvent(client->context->fd, READ_EVENT, FD_CHECK_TIMEOUT);
49 if (ret < 0) {
50 LOGE("wait server reply message failed!");
51 client->threadRunFlag = 0;
52 return NULL;
53 } else if (ret == 0) {
54 continue;
55 }
56 ret = ContextReadNet(client->context);
57 if (ret < 0) {
58 LOGE("read server reply message failed!");
59 client->threadRunFlag = 0;
60 return NULL;
61 }
62 buff = ContextGetReadRecord(client->context);
63 }
64 if (!client->threadRunFlag) {
65 if (buff != NULL) {
66 free(buff);
67 buff = NULL;
68 }
69 return NULL;
70 }
71 return buff;
72 }
73
RpcClientDealReadMsg(RpcClient * client,char * buff)74 static void RpcClientDealReadMsg(RpcClient *client, char *buff)
75 {
76 if (client == NULL) {
77 if (buff != NULL) {
78 free(buff);
79 buff = NULL;
80 }
81 return;
82 }
83
84 char szTmp[TMP_BUFF_SIZE] = {0};
85 if (snprintf_s(szTmp, sizeof(szTmp), sizeof(szTmp) - 1, "N%c", client->context->cSplit) < 0) {
86 if (buff != NULL) {
87 free(buff);
88 buff = NULL;
89 }
90 return;
91 }
92 if (strncmp(buff, szTmp, strlen(szTmp)) == 0) { /* deal reply message */
93 pthread_mutex_lock(&client->mutex);
94 client->waitReply = CLIENT_STATE_DEAL_REPLY;
95 client->context->oneProcess = buff;
96 client->context->nPos = strlen(szTmp);
97 client->context->nSize = strlen(buff);
98 pthread_cond_signal(&client->condW);
99 pthread_mutex_unlock(&client->mutex);
100 } else { /* deal callback message */
101 pthread_mutex_lock(&client->mutex);
102 while (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
103 pthread_cond_wait(&client->condW, &client->mutex);
104 }
105 pthread_mutex_unlock(&client->mutex);
106 client->context->oneProcess = buff;
107 client->context->nPos = strlen(szTmp);
108 client->context->nSize = strlen(buff);
109 OnTransact(client->context);
110 free(buff);
111 buff = NULL;
112 }
113 return;
114 }
115
RpcClientThreadDeal(void * arg)116 static void *RpcClientThreadDeal(void *arg)
117 {
118 RpcClient *client = (RpcClient *)arg;
119 if (client == NULL) {
120 return NULL;
121 }
122
123 while (client->threadRunFlag) {
124 char *buff = RpcClientReadMsg(client);
125 if (buff == NULL) {
126 continue;
127 }
128 RpcClientDealReadMsg(client, buff);
129 }
130 pthread_mutex_lock(&client->mutex);
131 client->waitReply = CLIENT_STATE_EXIT;
132 pthread_cond_signal(&client->condW);
133 pthread_mutex_unlock(&client->mutex);
134 LOGI("Client read message thread exiting!");
135 return NULL;
136 }
137
CreateRpcClient(const char * path)138 RpcClient *CreateRpcClient(const char *path)
139 {
140 int fd = ConnectUnixServer(path);
141 if (fd < 0) {
142 LOGE("connect server failed.");
143 return NULL;
144 }
145 SetNonBlock(fd, 1);
146 RpcClient *client = (RpcClient *)calloc(1, sizeof(RpcClient));
147 if (client == NULL) {
148 close(fd);
149 return NULL;
150 }
151 client->context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
152 if (client->context == NULL) {
153 LOGE("create context failed.");
154 close(fd);
155 free(client);
156 client = NULL;
157 return NULL;
158 }
159 client->context->fd = fd;
160 client->threadRunFlag = 1;
161 client->threadId = 0;
162 client->waitReply = CLIENT_STATE_IDLE;
163 client->callLockFlag = 0;
164 pthread_mutex_init(&client->mutex, NULL);
165 pthread_cond_init(&client->condW, NULL);
166 pthread_mutex_init(&client->lockMutex, NULL);
167 pthread_cond_init(&client->lockCond, NULL);
168 int ret = pthread_create(&client->threadId, NULL, RpcClientThreadDeal, client);
169 if (ret) {
170 pthread_cond_destroy(&client->condW);
171 pthread_mutex_destroy(&client->mutex);
172 pthread_cond_destroy(&client->lockCond);
173 pthread_mutex_destroy(&client->lockMutex);
174 ReleaseContext(client->context);
175 close(fd);
176 free(client);
177 client = NULL;
178 return NULL;
179 }
180 pthread_setname_np(client->threadId, "RpcClientThread");
181 signal(SIGPIPE, SIG_IGN);
182 return client;
183 }
184
ReleaseRpcClient(RpcClient * client)185 void ReleaseRpcClient(RpcClient *client)
186 {
187 if (client != NULL) {
188 if (client->threadId != 0) {
189 client->threadRunFlag = 0;
190 pthread_join(client->threadId, NULL);
191 }
192 pthread_cond_destroy(&client->condW);
193 pthread_mutex_destroy(&client->mutex);
194 pthread_cond_destroy(&client->lockCond);
195 pthread_mutex_destroy(&client->lockMutex);
196 close(client->context->fd);
197 ReleaseContext(client->context);
198 free(client);
199 client = NULL;
200 }
201 return;
202 }
203
RemoteCall(RpcClient * client)204 int RemoteCall(RpcClient *client)
205 {
206 if (client == NULL) {
207 return -1;
208 }
209 if (client->waitReply == CLIENT_STATE_EXIT) {
210 LOGE("remote call, but client exit.");
211 return -1;
212 }
213 int ret = 0;
214 Context *context = client->context;
215 while (context->wBegin != context->wEnd && ret >= 0) {
216 ret = ContextWriteNet(context);
217 }
218 if (ret < 0) {
219 LOGE("context write failed.");
220 return ret;
221 }
222 ret = 0; /* reset ret value */
223 pthread_mutex_lock(&client->mutex);
224 while (client->waitReply != CLIENT_STATE_DEAL_REPLY && client->waitReply != CLIENT_STATE_EXIT) {
225 pthread_cond_wait(&client->condW, &client->mutex);
226 }
227 if (client->waitReply == CLIENT_STATE_EXIT) {
228 LOGE("client exit.");
229 ret = -1;
230 }
231 pthread_mutex_unlock(&client->mutex);
232 return ret;
233 }
234
ReadClientEnd(RpcClient * client)235 void ReadClientEnd(RpcClient *client)
236 {
237 if (client == NULL) {
238 return;
239 }
240
241 pthread_mutex_lock(&client->mutex);
242 free(client->context->oneProcess);
243 client->context->oneProcess = NULL;
244 if (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
245 client->waitReply = CLIENT_STATE_IDLE;
246 }
247 pthread_cond_signal(&client->condW);
248 pthread_mutex_unlock(&client->mutex);
249 return;
250 }
251
LockRpcClient(RpcClient * client)252 void LockRpcClient(RpcClient *client)
253 {
254 if (client == NULL) {
255 return;
256 }
257
258 pthread_mutex_lock(&client->lockMutex);
259 while (client->callLockFlag != 0) {
260 pthread_cond_wait(&client->lockCond, &client->lockMutex);
261 }
262 client->callLockFlag = 1;
263 pthread_mutex_unlock(&client->lockMutex);
264 return;
265 }
266
UnlockRpcClient(RpcClient * client)267 void UnlockRpcClient(RpcClient *client)
268 {
269 if (client == NULL) {
270 return;
271 }
272
273 pthread_mutex_lock(&client->lockMutex);
274 client->callLockFlag = 0;
275 pthread_cond_signal(&client->lockCond);
276 pthread_mutex_unlock(&client->lockMutex);
277 return;
278 }
279