1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "client.h"
16 #include <signal.h>
17 #include <sys/time.h>
18 #include "log.h"
19
20 #undef LOG_TAG
21 #define LOG_TAG "WifiRpcClient"
22
23 const int FD_CHECK_TIMEOUT = 1000; /* poll wait time, units: ms */
24 const int CLIENT_STATE_IDLE = 0;
25 const int CLIENT_STATE_DEAL_REPLY = 1;
26 const int CLIENT_STATE_EXIT = 2;
27 const int TMP_BUFF_SIZE = 16;
28
29 static void *RpcClientThreadDeal(void *arg);
30
RpcClientReadMsg(RpcClient * client)31 static char *RpcClientReadMsg(RpcClient *client)
32 {
33 if (client == NULL) {
34 return NULL;
35 }
36
37 char *buff = ContextGetReadRecord(client->context);
38 while (buff == NULL && client->threadRunFlag) {
39 int ret = WaitFdEvent(client->context->fd, READ_EVENT, FD_CHECK_TIMEOUT);
40 if (ret < 0) {
41 LOGE("wait server reply message failed!");
42 client->threadRunFlag = 0;
43 return NULL;
44 } else if (ret == 0) {
45 continue;
46 }
47 ret = ContextReadNet(client->context);
48 if (ret < 0) {
49 LOGE("read server reply message failed!");
50 client->threadRunFlag = 0;
51 return NULL;
52 }
53 buff = ContextGetReadRecord(client->context);
54 }
55 if (!client->threadRunFlag) {
56 if (buff != NULL) {
57 free(buff);
58 }
59 return NULL;
60 }
61 return buff;
62 }
63
RpcClientDealReadMsg(RpcClient * client,char * buff)64 static void RpcClientDealReadMsg(RpcClient *client, char *buff)
65 {
66 if (client == NULL) {
67 return;
68 }
69
70 char szTmp[TMP_BUFF_SIZE] = {0};
71 if (snprintf_s(szTmp, sizeof(szTmp), sizeof(szTmp) - 1, "N%c", client->context->cSplit) < 0) {
72 return;
73 }
74 if (strncmp(buff, szTmp, strlen(szTmp)) == 0) { /* deal reply message */
75 pthread_mutex_lock(&client->mutex);
76 client->waitReply = CLIENT_STATE_DEAL_REPLY;
77 client->context->oneProcess = buff;
78 client->context->nPos = strlen(szTmp);
79 client->context->nSize = strlen(buff);
80 pthread_cond_signal(&client->condW);
81 pthread_mutex_unlock(&client->mutex);
82 } else { /* deal callback message */
83 pthread_mutex_lock(&client->mutex);
84 while (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
85 pthread_cond_wait(&client->condW, &client->mutex);
86 }
87 pthread_mutex_unlock(&client->mutex);
88 client->context->oneProcess = buff;
89 client->context->nPos = strlen(szTmp);
90 client->context->nSize = strlen(buff);
91 OnTransact(client->context);
92 free(buff);
93 }
94 return;
95 }
96
RpcClientThreadDeal(void * arg)97 static void *RpcClientThreadDeal(void *arg)
98 {
99 RpcClient *client = (RpcClient *)arg;
100 if (client == NULL) {
101 return NULL;
102 }
103
104 while (client->threadRunFlag) {
105 char *buff = RpcClientReadMsg(client);
106 if (buff == NULL) {
107 continue;
108 }
109 RpcClientDealReadMsg(client, buff);
110 }
111 pthread_mutex_lock(&client->mutex);
112 client->waitReply = CLIENT_STATE_EXIT;
113 pthread_cond_signal(&client->condW);
114 pthread_mutex_unlock(&client->mutex);
115 LOGI("Client read message thread exiting!");
116 return NULL;
117 }
118
CreateRpcClient(const char * path)119 RpcClient *CreateRpcClient(const char *path)
120 {
121 int fd = ConnectUnixServer(path);
122 if (fd < 0) {
123 return NULL;
124 }
125 SetNonBlock(fd, 1);
126 RpcClient *client = (RpcClient *)calloc(1, sizeof(RpcClient));
127 if (client == NULL) {
128 close(fd);
129 return NULL;
130 }
131 client->context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
132 if (client->context == NULL) {
133 close(fd);
134 free(client);
135 return NULL;
136 }
137 client->context->fd = fd;
138 client->threadRunFlag = 1;
139 client->threadId = 0;
140 client->waitReply = CLIENT_STATE_IDLE;
141 client->callLockFlag = 0;
142 pthread_mutex_init(&client->mutex, NULL);
143 pthread_cond_init(&client->condW, NULL);
144 pthread_mutex_init(&client->lockMutex, NULL);
145 pthread_cond_init(&client->lockCond, NULL);
146 int ret = pthread_create(&client->threadId, NULL, RpcClientThreadDeal, client);
147 if (ret) {
148 pthread_cond_destroy(&client->condW);
149 pthread_mutex_destroy(&client->mutex);
150 pthread_cond_destroy(&client->lockCond);
151 pthread_mutex_destroy(&client->lockMutex);
152 ReleaseContext(client->context);
153 close(fd);
154 free(client);
155 return NULL;
156 }
157 signal(SIGPIPE, SIG_IGN);
158 return client;
159 }
160
ReleaseRpcClient(RpcClient * client)161 void ReleaseRpcClient(RpcClient *client)
162 {
163 if (client != NULL) {
164 if (client->threadId != 0) {
165 client->threadRunFlag = 0;
166 pthread_join(client->threadId, NULL);
167 }
168 pthread_cond_destroy(&client->condW);
169 pthread_mutex_destroy(&client->mutex);
170 pthread_cond_destroy(&client->lockCond);
171 pthread_mutex_destroy(&client->lockMutex);
172 close(client->context->fd);
173 ReleaseContext(client->context);
174 free(client);
175 }
176 return;
177 }
178
RemoteCall(RpcClient * client)179 int RemoteCall(RpcClient *client)
180 {
181 if (client == NULL) {
182 return -1;
183 }
184 if (client->waitReply == CLIENT_STATE_EXIT) {
185 return -1;
186 }
187 int ret = 0;
188 Context *context = client->context;
189 while (context->wBegin != context->wEnd && ret >= 0) {
190 ret = ContextWriteNet(context);
191 }
192 if (ret < 0) {
193 return ret;
194 }
195 ret = 0; /* reset ret value */
196 pthread_mutex_lock(&client->mutex);
197 while (client->waitReply != CLIENT_STATE_DEAL_REPLY && client->waitReply != CLIENT_STATE_EXIT) {
198 pthread_cond_wait(&client->condW, &client->mutex);
199 }
200 if (client->waitReply == CLIENT_STATE_EXIT) {
201 ret = -1;
202 }
203 pthread_mutex_unlock(&client->mutex);
204 return ret;
205 }
206
ReadClientEnd(RpcClient * client)207 void ReadClientEnd(RpcClient *client)
208 {
209 if (client == NULL) {
210 return;
211 }
212
213 pthread_mutex_lock(&client->mutex);
214 free(client->context->oneProcess);
215 client->context->oneProcess = NULL;
216 if (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
217 client->waitReply = CLIENT_STATE_IDLE;
218 }
219 pthread_cond_signal(&client->condW);
220 pthread_mutex_unlock(&client->mutex);
221 return;
222 }
223
LockRpcClient(RpcClient * client)224 void LockRpcClient(RpcClient *client)
225 {
226 if (client == NULL) {
227 return;
228 }
229
230 pthread_mutex_lock(&client->lockMutex);
231 while (client->callLockFlag != 0) {
232 pthread_cond_wait(&client->lockCond, &client->lockMutex);
233 }
234 client->callLockFlag = 1;
235 pthread_mutex_unlock(&client->lockMutex);
236 return;
237 }
238
UnlockRpcClient(RpcClient * client)239 void UnlockRpcClient(RpcClient *client)
240 {
241 if (client == NULL) {
242 return;
243 }
244
245 pthread_mutex_lock(&client->lockMutex);
246 client->callLockFlag = 0;
247 pthread_cond_signal(&client->lockCond);
248 pthread_mutex_unlock(&client->lockMutex);
249 return;
250 }
251