1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "utils_work_queue.h"
17
18 #include <stddef.h>
19 #include <pthread.h>
20 #include <sys/prctl.h>
21
22 #include "securec.h"
23
24 #include "utils_list.h"
25 #include "utils_mem.h"
26
27 #define RUN 0
28 #define DIE 1
29
30 typedef struct WorkQueue {
31 ListHead head;
32 pthread_mutex_t mutex;
33 pthread_cond_t cond;
34 volatile int32_t state;
35 uint32_t capacity;
36 uint32_t size;
37 pthread_t pthreadId;
38 const char *name;
39 } WorkQueue;
40
41 typedef struct {
42 ListNode linkNode;
43 WorkProcess process; // callback func
44 uint32_t dataLen;
45 uint8_t *dataBuff; // user data ptr
46 } Worker;
47
WorkQueueThread(void * data)48 static void *WorkQueueThread(void *data)
49 {
50 WorkQueue *queue = (WorkQueue *)data;
51 Worker *worker = NULL;
52
53 prctl(PR_SET_NAME, queue->name, 0, 0, 0);
54
55 (void)pthread_mutex_lock(&queue->mutex);
56 while (queue->state == RUN) {
57 while ((IsEmptyList(&queue->head)) && (queue->state == RUN)) {
58 pthread_cond_wait(&queue->cond, &queue->mutex);
59 }
60 // need to check again
61 if (queue->state != RUN) {
62 break;
63 }
64
65 worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
66 RemoveListNode(&worker->linkNode);
67 queue->size--;
68
69 pthread_mutex_unlock(&queue->mutex);
70 worker->process(worker->dataBuff, worker->dataLen);
71 FREE(worker);
72 (void)pthread_mutex_lock(&queue->mutex);
73 }
74
75 // now the queue is stopped, just remove the nodes.
76 while (!IsEmptyList(&queue->head)) {
77 worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
78 RemoveListNode(&worker->linkNode);
79 queue->size--;
80 FREE(worker);
81 }
82
83 pthread_mutex_unlock(&queue->mutex);
84 return NULL;
85 }
86
CreateWorkQueue(uint32_t capacity,const char * name)87 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
88 {
89 WorkQueue *queue = MALLOC(sizeof(WorkQueue));
90 if (queue == NULL) {
91 return NULL;
92 }
93 (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
94
95 InitListHead(&(queue->head));
96 queue->state = RUN;
97 queue->capacity = capacity;
98 queue->size = 0;
99 queue->name = name;
100
101 int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
102 if (iRet != 0) {
103 FREE(queue);
104 return NULL;
105 }
106
107 iRet = pthread_cond_init(&queue->cond, NULL);
108 if (iRet != 0) {
109 (void)pthread_mutex_destroy(&(queue->mutex));
110 FREE(queue);
111 return NULL;
112 }
113
114 iRet = pthread_create(&queue->pthreadId, NULL, WorkQueueThread, queue);
115 if (iRet != 0) {
116 (void)pthread_cond_destroy(&(queue->cond));
117 (void)pthread_mutex_destroy(&(queue->mutex));
118 FREE(queue);
119 return NULL;
120 }
121
122 return queue;
123 }
124
DestroyWorkQueue(WorkQueue * queue)125 uint32_t DestroyWorkQueue(WorkQueue *queue)
126 {
127 if (queue == NULL) {
128 return WORK_QUEUE_NULL_PTR;
129 }
130
131 (void)pthread_mutex_lock(&queue->mutex);
132 queue->state = DIE;
133 int32_t iRet = pthread_cond_broadcast(&queue->cond);
134 if (iRet != 0) {
135 (void)pthread_mutex_unlock(&queue->mutex);
136 return WORK_QUEUE_THREAD_COND_ERR;
137 }
138 (void)pthread_mutex_unlock(&queue->mutex);
139
140 iRet = pthread_join(queue->pthreadId, NULL);
141 if (iRet != 0) {
142 return WORK_QUEUE_THREAD_JOIN_ERR;
143 }
144
145 FREE(queue);
146 return WORK_QUEUE_OK;
147 }
148
QueueWork(WorkQueue * queue,WorkProcess process,uint8_t * data,uint32_t length)149 uint32_t QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)
150 {
151 if ((queue == NULL) || (process == NULL)) {
152 return WORK_QUEUE_NULL_PTR;
153 }
154 if (queue->state != RUN) {
155 return WORK_QUEUE_STATE_ERR;
156 }
157 if (queue->size >= queue->capacity) {
158 return WORK_QUEUE_FULL;
159 }
160
161 Worker *worker = MALLOC(sizeof(Worker));
162 if (worker == NULL) {
163 return WORK_QUEUE_MALLOC_ERR;
164 }
165 (void)memset_s(worker, sizeof(Worker), 0, sizeof(Worker));
166
167 InitListHead(&worker->linkNode);
168 worker->dataLen = length;
169 worker->dataBuff = data;
170 worker->process = process;
171
172 (void)pthread_mutex_lock(&queue->mutex);
173 AddListNodeBefore(&worker->linkNode, &queue->head);
174 queue->size++;
175
176 (void)pthread_mutex_unlock(&queue->mutex);
177 (void)pthread_cond_broadcast(&queue->cond);
178 return WORK_QUEUE_OK;
179 }
180