• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "le_loop.h"
17 
18 #include <errno.h>
19 #include <sys/socket.h>
20 #include "securec.h"
21 
22 #include "le_socket.h"
23 #include "le_task.h"
24 
HandleSendMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_SendMessageComplete complete)25 static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26     const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27 {
28     EventLoop *loop = (EventLoop *)loopHandle;
29     StreamTask *stream = (StreamTask *)taskHandle;
30     LE_Buffer *buffer = GetFirstBuffer(stream);
31     while (buffer) {
32         int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33         if (strstr(((const char *)buffer->data + 12), "bootevent.boot.completed") != NULL) {
34             LE_LOGI("begin to send boot.completed to param_watcher, fd:%d, size:%u",
35                 GetSocketFd(taskHandle), buffer->dataSize);
36         }
37         if (ret < 0 || (size_t)ret < buffer->dataSize) {
38             LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
39                     buffer->dataSize, ret, errno);
40         }
41         LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
42         buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
43         if (complete != NULL) {
44             complete(taskHandle, buffer);
45         }
46         FreeBuffer(loopHandle, stream, buffer);
47         buffer = GetFirstBuffer(stream);
48     }
49     if (IsBufferEmpty(stream)) {
50         LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
51         loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
52         return LE_SUCCESS;
53     }
54     return LE_SUCCESS;
55 }
56 
HandleRecvMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_RecvMessage recvMessage,const LE_HandleRecvMsg handleRecvMsg)57 static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
58     const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
59 {
60     LE_STATUS status = LE_SUCCESS;
61     LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
62     LE_CHECK(buffer != NULL, return LE_NO_MEMORY, "Failed to create buffer");
63     int readLen = 0;
64     while (1) {
65         if (handleRecvMsg != NULL) {
66             readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
67         } else {
68             readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
69         }
70         LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
71         if (readLen < 0) {
72             if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
73                 continue;
74             }
75             status = LE_DIS_CONNECTED;
76             break;
77         } else if (readLen == 0) {
78             // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
79             status = LE_DIS_CONNECTED;
80             break;
81         } else {
82             break;
83         }
84     }
85     if (status != LE_SUCCESS) {
86         FreeBuffer(loopHandle, NULL, buffer);
87         return status;
88     }
89     if (recvMessage) {
90         recvMessage(taskHandle, buffer->data, readLen);
91     }
92     FreeBuffer(loopHandle, NULL, buffer);
93     return status;
94 }
95 
HandleStreamEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)96 static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
97 {
98     StreamConnectTask *stream = (StreamConnectTask *)handle;
99     LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
100 
101     LE_STATUS status = LE_SUCCESS;
102     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
103         status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
104     }
105     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
106         status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
107     }
108     if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
109         if (stream->disConnectComplete) {
110             stream->disConnectComplete(handle);
111         }
112         LE_CloseStreamTask(loopHandle, handle);
113     }
114     return status;
115 }
116 
HandleClientEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)117 static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
118 {
119     StreamClientTask *client = (StreamClientTask *)handle;
120     LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
121 
122     LE_STATUS status = LE_SUCCESS;
123     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
124         LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
125         client->connected = 1;
126         status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
127     }
128     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
129         status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
130     }
131     if (status == LE_DIS_CONNECTED) {
132         if (client->disConnectComplete) {
133             client->disConnectComplete(handle);
134         }
135         client->connected = 0;
136         LE_CloseStreamTask(loopHandle, handle);
137     }
138     return status;
139 }
140 
HandleStreamTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)141 static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
142 {
143     BaseTask *task = (BaseTask *)taskHandle;
144     DelTask((EventLoop *)loopHandle, task);
145     CloseTask(loopHandle, task);
146     if (task->taskId.fd > 0) {
147         close(task->taskId.fd);
148     }
149 }
150 
DumpStreamServerTaskInfo_(const TaskHandle task)151 static void DumpStreamServerTaskInfo_(const TaskHandle task)
152 {
153     INIT_CHECK(task != NULL, return);
154     BaseTask *baseTask = (BaseTask *)task;
155     StreamServerTask *serverTask = (StreamServerTask *)baseTask;
156     printf("\tfd: %d \n", serverTask->base.taskId.fd);
157     printf("\t  TaskType: %s \n", "ServerTask");
158     if (strlen(serverTask->server) > 0) {
159         printf("\t  Server socket:%s \n", serverTask->server);
160     } else {
161         printf("\t  Server socket:%s \n", "NULL");
162     }
163 }
164 
DumpStreamConnectTaskInfo_(const TaskHandle task)165 static void DumpStreamConnectTaskInfo_(const TaskHandle task)
166 {
167     INIT_CHECK(task != NULL, return);
168     BaseTask *baseTask = (BaseTask *)task;
169     StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
170     TaskHandle taskHandle = (TaskHandle)connectTask;
171     printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
172     printf("\t  TaskType: %s \n", "ConnectTask");
173     printf("\t  ServiceInfo: \n");
174     struct ucred cred = {-1, -1, -1};
175     socklen_t credSize  = sizeof(struct ucred);
176     if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
177         printf("\t    Service Pid: %d \n", cred.pid);
178         printf("\t    Service Uid: %u \n", cred.uid);
179         printf("\t    Service Gid: %u \n", cred.gid);
180     } else {
181         printf("\t    Service Pid: %s \n", "NULL");
182         printf("\t    Service Uid: %s \n", "NULL");
183         printf("\t    Service Gid: %s \n", "NULL");
184     }
185 }
186 
HandleServerEvent_(const LoopHandle loopHandle,const TaskHandle serverTask,uint32_t oper)187 static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
188 {
189     LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
190     if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
191         return LE_FAILURE;
192     }
193     StreamServerTask *server = (StreamServerTask *)serverTask;
194     LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
195 
196     int ret = server->incommingConnect(loopHandle, serverTask);
197     if (ret != LE_SUCCESS) {
198         LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
199     }
200     EventLoop *loop = (EventLoop *)loopHandle;
201     loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
202     return LE_SUCCESS;
203 }
204 
LE_CreateStreamServer(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamServerInfo * info)205 LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
206     TaskHandle *taskHandle, const LE_StreamServerInfo *info)
207 {
208     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
209     LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
210     LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
211         "Invalid parameters incommingConnect %s", info->server);
212 
213     int fd = info->socketId;
214     int ret = 0;
215     if (info->socketId <= 0) {
216         fd = CreateSocket(info->baseInfo.flags, info->server);
217         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
218     } else {
219         ret = listenSocket(fd, info->baseInfo.flags, info->server);
220         LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
221     }
222 
223     EventLoop *loop = (EventLoop *)loopHandle;
224     StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
225         sizeof(StreamServerTask) + strlen(info->server) + 1);
226     LE_CHECK(task != NULL, close(fd);
227         return LE_NO_MEMORY, "Failed to create task");
228     task->base.handleEvent = HandleServerEvent_;
229     task->base.innerClose = HandleStreamTaskClose_;
230     task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
231     task->incommingConnect = info->incommingConnect;
232     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
233     ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
234     LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
235     *taskHandle = (TaskHandle)task;
236     return LE_SUCCESS;
237 }
238 
LE_CreateStreamClient(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamInfo * info)239 LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
240     TaskHandle *taskHandle, const LE_StreamInfo *info)
241 {
242     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
243     LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
244 
245     int fd = CreateSocket(info->baseInfo.flags, info->server);
246     LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
247 
248     StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
249     LE_CHECK(task != NULL, close(fd);
250         return LE_NO_MEMORY, "Failed to create task");
251     task->stream.base.handleEvent = HandleClientEvent_;
252     task->stream.base.innerClose = HandleStreamTaskClose_;
253     OH_ListInit(&task->stream.buffHead);
254     LoopMutexInit(&task->stream.mutex);
255 
256     task->connectComplete = info->connectComplete;
257     task->sendMessageComplete = info->sendMessageComplete;
258     task->recvMessage = info->recvMessage;
259     task->disConnectComplete = info->disConnectComplete;
260     task->handleRecvMsg = info->handleRecvMsg;
261     EventLoop *loop = (EventLoop *)loopHandle;
262     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
263     *taskHandle = (TaskHandle)task;
264     return LE_SUCCESS;
265 }
266 
LE_AcceptStreamClient(const LoopHandle loopHandle,const TaskHandle server,TaskHandle * taskHandle,const LE_StreamInfo * info)267 LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
268     TaskHandle *taskHandle, const LE_StreamInfo *info)
269 {
270     LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
271     LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
272     LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
273     int fd = -1;
274     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
275         fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
276         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
277     }
278     StreamConnectTask *task = (StreamConnectTask *)CreateTask(
279         loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
280     LE_CHECK(task != NULL, close(fd);
281         return LE_NO_MEMORY, "Failed to create task");
282     task->stream.base.handleEvent = HandleStreamEvent_;
283     task->stream.base.innerClose = HandleStreamTaskClose_;
284     task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
285     task->disConnectComplete = info->disConnectComplete;
286     task->sendMessageComplete = info->sendMessageComplete;
287     task->recvMessage = info->recvMessage;
288     task->serverTask = (StreamServerTask *)server;
289     task->handleRecvMsg = info->handleRecvMsg;
290     OH_ListInit(&task->stream.buffHead);
291     LoopMutexInit(&task->stream.mutex);
292     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
293         EventLoop *loop = (EventLoop *)loopHandle;
294         loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
295     }
296     *taskHandle = (TaskHandle)task;
297     return 0;
298 }
299 
LE_CloseStreamTask(const LoopHandle loopHandle,const TaskHandle taskHandle)300 void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
301 {
302     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
303     LE_CloseTask(loopHandle, taskHandle);
304 }
305 
LE_GetSocketFd(const TaskHandle taskHandle)306 int LE_GetSocketFd(const TaskHandle taskHandle)
307 {
308     LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
309     return GetSocketFd(taskHandle);
310 }
311