1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "le_task.h"
16 #include <time.h>
17 #include <sys/eventfd.h>
18
19 #include "le_loop.h"
20
21 #define MILLION_MICROSECOND 1000000
22 #define THOUSAND_MILLISECOND 1000
23
DoAsyncEvent_(const LoopHandle loopHandle,AsyncEventTask * asyncTask)24 static void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask)
25 {
26 LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters");
27 #ifdef LOOP_DEBUG
28 struct timespec startTime = {0};
29 struct timespec endTime = {0};
30 long long diff;
31 clock_gettime(CLOCK_MONOTONIC, &(startTime));
32 #endif
33 StreamTask *task = &asyncTask->stream;
34 ListNode *node = task->buffHead.next;
35 if (node != &task->buffHead) {
36 LE_Buffer *buffer = ListEntry(node, LE_Buffer, node);
37 uint64_t eventId = *(uint64_t*)(buffer->data);
38 if (asyncTask->processAsyncEvent) {
39 asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId,
40 (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize);
41 }
42 OH_ListRemove(&buffer->node);
43 free(buffer);
44 #ifdef LOOP_DEBUG
45 clock_gettime(CLOCK_MONOTONIC, &(endTime));
46 diff = (long long)((endTime.tv_sec - startTime.tv_sec) * MILLION_MICROSECOND);
47 if (endTime.tv_nsec > startTime.tv_nsec) {
48 diff += (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
49 } else {
50 diff -= (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
51 }
52 LE_LOGI("DoAsyncEvent_ diff %ld", diff);
53 #endif
54 }
55 }
56
57 #ifdef STARTUP_INIT_TEST
LE_DoAsyncEvent(const LoopHandle loopHandle,const TaskHandle taskHandle)58 void LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle)
59 {
60 AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
61 while (!IsBufferEmpty(&asyncTask->stream)) {
62 DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle);
63 }
64 }
65 #endif
HandleAsyncEvent_(const LoopHandle loopHandle,const TaskHandle taskHandle,uint32_t oper)66 static LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper)
67 {
68 LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper);
69 EventLoop *loop = (EventLoop *)loopHandle;
70 AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
71 if (LE_TEST_FLAGS(oper, Event_Read)) {
72 uint64_t eventId = 0;
73 int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
74 LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %llu", GetSocketFd(taskHandle), ret, eventId);
75 DoAsyncEvent_(loopHandle, asyncTask);
76 if (!IsBufferEmpty(&asyncTask->stream)) {
77 loop->modEvent(loop, (const BaseTask *)taskHandle, Event_Write);
78 return LE_SUCCESS;
79 }
80 } else {
81 static uint64_t eventId = 0;
82 (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
83 loop->modEvent(loop, (const BaseTask *)taskHandle, Event_Read);
84 eventId++;
85 }
86 return LE_SUCCESS;
87 }
88
HandleAsyncTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)89 static void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
90 {
91 BaseTask *task = (BaseTask *)taskHandle;
92 CloseTask(loopHandle, task);
93 DelTask((EventLoop *)loopHandle, task);
94 close(task->taskId.fd);
95 }
96
LE_CreateAsyncTask(const LoopHandle loopHandle,TaskHandle * taskHandle,LE_ProcessAsyncEvent processAsyncEvent)97 LE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle,
98 TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent)
99 {
100 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
101 LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent ");
102
103 int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC);
104 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd ");
105 LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL};
106 AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask));
107 LE_CHECK(task != NULL, close(fd);
108 return LE_NO_MEMORY, "Failed to create task");
109 task->stream.base.handleEvent = HandleAsyncEvent_;
110 task->stream.base.innerClose = HandleAsyncTaskClose_;
111
112 OH_ListInit(&task->stream.buffHead);
113 LoopMutexInit(&task->stream.mutex);
114 task->processAsyncEvent = processAsyncEvent;
115 EventLoop *loop = (EventLoop *)loopHandle;
116 loop->addEvent(loop, (const BaseTask *)task, Event_Read);
117 *taskHandle = (TaskHandle)task;
118 return LE_SUCCESS;
119 }
120
LE_StartAsyncEvent(const LoopHandle loopHandle,const TaskHandle taskHandle,uint64_t eventId,const uint8_t * data,uint32_t buffLen)121 LE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle,
122 const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen)
123 {
124 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
125 BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId));
126 char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL);
127 int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId));
128 LE_CHECK(ret == 0, return -1, "Failed to copy data");
129 if (data != NULL || buffLen == 0) {
130 ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen);
131 LE_CHECK(ret == 0, return -1, "Failed to copy data");
132 buff[sizeof(eventId) + buffLen] = '\0';
133 }
134 return LE_Send(loopHandle, taskHandle, handle, buffLen);
135 }
136
LE_StopAsyncTask(LoopHandle loopHandle,TaskHandle taskHandle)137 void LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle)
138 {
139 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
140 LE_CloseTask(loopHandle, taskHandle);
141 }