1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "le_task.h"
16
17 #include <errno.h>
18 #include <sys/eventfd.h>
19
20 #include "le_loop.h"
21
DoAsyncEvent_(const LoopHandle loopHandle,AsyncEventTask * asyncTask)22 static void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask)
23 {
24 LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters");
25 LE_Buffer *buffer = GetFirstBuffer(&asyncTask->stream);
26 while (buffer != NULL) {
27 uint64_t eventId = *(uint64_t*)(buffer->data);
28 if (asyncTask->processAsyncEvent) {
29 asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId,
30 (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize);
31 }
32 FreeBuffer(loopHandle, &asyncTask->stream, buffer);
33 buffer = GetFirstBuffer(&asyncTask->stream);
34 }
35 }
36 #ifdef STARTUP_INIT_TEST
LE_DoAsyncEvent(const LoopHandle loopHandle,const TaskHandle taskHandle)37 void LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle)
38 {
39 DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle);
40 }
41 #endif
HandleAsyncEvent_(const LoopHandle loopHandle,const TaskHandle taskHandle,uint32_t oper)42 static LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper)
43 {
44 LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper);
45 EventLoop *loop = (EventLoop *)loopHandle;
46 AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
47 if (LE_TEST_FLAGS(oper, Event_Read)) {
48 uint64_t eventId = 0;
49 int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
50 LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %lu", GetSocketFd(taskHandle), ret, eventId);
51 DoAsyncEvent_(loopHandle, asyncTask);
52 if (IsBufferEmpty(&asyncTask->stream)) {
53 loop->modEvent(loop, (const BaseTask *)taskHandle, Event_Read);
54 return LE_SUCCESS;
55 }
56 } else {
57 static uint64_t eventId = 0;
58 (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
59 loop->modEvent(loop, (const BaseTask *)taskHandle, Event_Read);
60 eventId++;
61 }
62 return LE_SUCCESS;
63 }
64
HandleAsyncTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)65 static void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
66 {
67 BaseTask *task = (BaseTask *)taskHandle;
68 CloseTask(loopHandle, task);
69 close(task->taskId.fd);
70 }
71
LE_CreateAsyncTask(const LoopHandle loopHandle,TaskHandle * taskHandle,LE_ProcessAsyncEvent processAsyncEvent)72 LE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle,
73 TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent)
74 {
75 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
76 LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent ");
77
78 int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC);
79 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd ");
80 LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL};
81 AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask));
82 LE_CHECK(task != NULL, close(fd);
83 return LE_NO_MEMORY, "Failed to create task");
84 task->stream.base.handleEvent = HandleAsyncEvent_;
85 task->stream.base.innerClose = HandleAsyncTaskClose_;
86
87 ListInit(&task->stream.buffHead);
88 LoopMutexInit(&task->stream.mutex);
89 task->processAsyncEvent = processAsyncEvent;
90 EventLoop *loop = (EventLoop *)loopHandle;
91 loop->addEvent(loop, (const BaseTask *)task, Event_Read);
92 *taskHandle = (TaskHandle)task;
93 return LE_SUCCESS;
94 }
95
LE_StartAsyncEvent(const LoopHandle loopHandle,const TaskHandle taskHandle,uint64_t eventId,const uint8_t * data,uint32_t buffLen)96 LE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle,
97 const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen)
98 {
99 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
100 BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId));
101 char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL);
102 int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId));
103 LE_CHECK(ret == 0, return -1, "Failed to copy data");
104 if (data != NULL || buffLen == 0) {
105 ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen);
106 LE_CHECK(ret == 0, return -1, "Failed to copy data");
107 buff[sizeof(eventId) + buffLen] = '\0';
108 }
109 return LE_Send(loopHandle, taskHandle, handle, buffLen);
110 }
111
LE_StopAsyncTask(LoopHandle loopHandle,TaskHandle taskHandle)112 void LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle)
113 {
114 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
115 LE_CloseTask(loopHandle, taskHandle);
116 }