• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "client.h"
16 #include <signal.h>
17 #include <sys/time.h>
18 #include "log.h"
19 
20 #undef LOG_TAG
21 #define LOG_TAG "WifiRpcClient"
22 
23 const int FD_CHECK_TIMEOUT = 1000; /* poll wait time, units: ms */
24 const int CLIENT_STATE_IDLE = 0;
25 const int CLIENT_STATE_DEAL_REPLY = 1;
26 const int CLIENT_STATE_EXIT = 2;
27 const int TMP_BUFF_SIZE = 16;
28 
29 static void *RpcClientThreadDeal(void *arg);
30 
RpcClientReadMsg(RpcClient * client)31 static char *RpcClientReadMsg(RpcClient *client)
32 {
33     if (client == NULL) {
34         return NULL;
35     }
36 
37     char *buff = ContextGetReadRecord(client->context);
38     while (buff == NULL && client->threadRunFlag) {
39         int ret = WaitFdEvent(client->context->fd, READ_EVENT, FD_CHECK_TIMEOUT);
40         if (ret < 0) {
41             LOGE("wait server reply message failed!");
42             client->threadRunFlag = 0;
43             return NULL;
44         } else if (ret == 0) {
45             continue;
46         }
47         ret = ContextReadNet(client->context);
48         if (ret < 0) {
49             LOGE("read server reply message failed!");
50             client->threadRunFlag = 0;
51             return NULL;
52         }
53         buff = ContextGetReadRecord(client->context);
54     }
55     if (!client->threadRunFlag) {
56         if (buff != NULL) {
57             free(buff);
58         }
59         return NULL;
60     }
61     return buff;
62 }
63 
RpcClientDealReadMsg(RpcClient * client,char * buff)64 static void RpcClientDealReadMsg(RpcClient *client, char *buff)
65 {
66     if (client == NULL) {
67         return;
68     }
69 
70     char szTmp[TMP_BUFF_SIZE] = {0};
71     if (snprintf_s(szTmp, sizeof(szTmp), sizeof(szTmp) - 1, "N%c", client->context->cSplit) < 0) {
72         return;
73     }
74     if (strncmp(buff, szTmp, strlen(szTmp)) == 0) { /* deal reply message */
75         pthread_mutex_lock(&client->mutex);
76         client->waitReply = CLIENT_STATE_DEAL_REPLY;
77         client->context->oneProcess = buff;
78         client->context->nPos = strlen(szTmp);
79         client->context->nSize = strlen(buff);
80         pthread_cond_signal(&client->condW);
81         pthread_mutex_unlock(&client->mutex);
82     } else { /* deal callback message */
83         pthread_mutex_lock(&client->mutex);
84         while (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
85             pthread_cond_wait(&client->condW, &client->mutex);
86         }
87         pthread_mutex_unlock(&client->mutex);
88         client->context->oneProcess = buff;
89         client->context->nPos = strlen(szTmp);
90         client->context->nSize = strlen(buff);
91         OnTransact(client->context);
92         free(buff);
93     }
94     return;
95 }
96 
RpcClientThreadDeal(void * arg)97 static void *RpcClientThreadDeal(void *arg)
98 {
99     RpcClient *client = (RpcClient *)arg;
100     if (client == NULL) {
101         return NULL;
102     }
103 
104     while (client->threadRunFlag) {
105         char *buff = RpcClientReadMsg(client);
106         if (buff == NULL) {
107             continue;
108         }
109         RpcClientDealReadMsg(client, buff);
110     }
111     pthread_mutex_lock(&client->mutex);
112     client->waitReply = CLIENT_STATE_EXIT;
113     pthread_cond_signal(&client->condW);
114     pthread_mutex_unlock(&client->mutex);
115     LOGI("Client read message thread exiting!");
116     return NULL;
117 }
118 
CreateRpcClient(const char * path)119 RpcClient *CreateRpcClient(const char *path)
120 {
121     int fd = ConnectUnixServer(path);
122     if (fd < 0) {
123         return NULL;
124     }
125     SetNonBlock(fd, 1);
126     RpcClient *client = (RpcClient *)calloc(1, sizeof(RpcClient));
127     if (client == NULL) {
128         close(fd);
129         return NULL;
130     }
131     client->context = CreateContext(CONTEXT_BUFFER_MIN_SIZE);
132     if (client->context == NULL) {
133         close(fd);
134         free(client);
135         return NULL;
136     }
137     client->context->fd = fd;
138     client->threadRunFlag = 1;
139     client->threadId = 0;
140     client->waitReply = CLIENT_STATE_IDLE;
141     client->callLockFlag = 0;
142     pthread_mutex_init(&client->mutex, NULL);
143     pthread_cond_init(&client->condW, NULL);
144     pthread_mutex_init(&client->lockMutex, NULL);
145     pthread_cond_init(&client->lockCond, NULL);
146     int ret = pthread_create(&client->threadId, NULL, RpcClientThreadDeal, client);
147     if (ret) {
148         pthread_cond_destroy(&client->condW);
149         pthread_mutex_destroy(&client->mutex);
150         pthread_cond_destroy(&client->lockCond);
151         pthread_mutex_destroy(&client->lockMutex);
152         ReleaseContext(client->context);
153         close(fd);
154         free(client);
155         return NULL;
156     }
157     signal(SIGPIPE, SIG_IGN);
158     return client;
159 }
160 
ReleaseRpcClient(RpcClient * client)161 void ReleaseRpcClient(RpcClient *client)
162 {
163     if (client != NULL) {
164         if (client->threadId != 0) {
165             client->threadRunFlag = 0;
166             pthread_join(client->threadId, NULL);
167         }
168         pthread_cond_destroy(&client->condW);
169         pthread_mutex_destroy(&client->mutex);
170         pthread_cond_destroy(&client->lockCond);
171         pthread_mutex_destroy(&client->lockMutex);
172         close(client->context->fd);
173         ReleaseContext(client->context);
174         free(client);
175     }
176     return;
177 }
178 
RemoteCall(RpcClient * client)179 int RemoteCall(RpcClient *client)
180 {
181     if (client == NULL) {
182         return -1;
183     }
184     if (client->waitReply == CLIENT_STATE_EXIT) {
185         return -1;
186     }
187     int ret = 0;
188     Context *context = client->context;
189     while (context->wBegin != context->wEnd && ret >= 0) {
190         ret = ContextWriteNet(context);
191     }
192     if (ret < 0) {
193         return ret;
194     }
195     ret = 0; /* reset ret value */
196     pthread_mutex_lock(&client->mutex);
197     while (client->waitReply != CLIENT_STATE_DEAL_REPLY && client->waitReply != CLIENT_STATE_EXIT) {
198         pthread_cond_wait(&client->condW, &client->mutex);
199     }
200     if (client->waitReply == CLIENT_STATE_EXIT) {
201         ret = -1;
202     }
203     pthread_mutex_unlock(&client->mutex);
204     return ret;
205 }
206 
ReadClientEnd(RpcClient * client)207 void ReadClientEnd(RpcClient *client)
208 {
209     if (client == NULL) {
210         return;
211     }
212 
213     pthread_mutex_lock(&client->mutex);
214     free(client->context->oneProcess);
215     client->context->oneProcess = NULL;
216     if (client->waitReply == CLIENT_STATE_DEAL_REPLY) {
217         client->waitReply = CLIENT_STATE_IDLE;
218     }
219     pthread_cond_signal(&client->condW);
220     pthread_mutex_unlock(&client->mutex);
221     return;
222 }
223 
LockRpcClient(RpcClient * client)224 void LockRpcClient(RpcClient *client)
225 {
226     if (client == NULL) {
227         return;
228     }
229 
230     pthread_mutex_lock(&client->lockMutex);
231     while (client->callLockFlag != 0) {
232         pthread_cond_wait(&client->lockCond, &client->lockMutex);
233     }
234     client->callLockFlag = 1;
235     pthread_mutex_unlock(&client->lockMutex);
236     return;
237 }
238 
UnlockRpcClient(RpcClient * client)239 void UnlockRpcClient(RpcClient *client)
240 {
241     if (client == NULL) {
242         return;
243     }
244 
245     pthread_mutex_lock(&client->lockMutex);
246     client->callLockFlag = 0;
247     pthread_cond_signal(&client->lockCond);
248     pthread_mutex_unlock(&client->lockMutex);
249     return;
250 }
251