• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "utils_work_queue.h"
17 
18 #include <stddef.h>
19 #include <pthread.h>
20 #include <sys/prctl.h>
21 
22 #include "securec.h"
23 
24 #include "utils_list.h"
25 #include "utils_mem.h"
26 
27 #define RUN 0
28 #define DIE 1
29 
30 typedef struct WorkQueue {
31     ListHead head;
32     pthread_mutex_t mutex;
33     pthread_cond_t cond;
34     volatile int32_t state;
35     uint32_t capacity;
36     uint32_t size;
37     pthread_t pthreadId;
38     const char *name;
39 } WorkQueue;
40 
41 typedef struct {
42     ListNode linkNode;
43     WorkProcess process; // callback func
44     uint32_t dataLen;
45     uint8_t *dataBuff; // user data ptr
46 } Worker;
47 
WorkQueueThread(void * data)48 static void *WorkQueueThread(void *data)
49 {
50     WorkQueue *queue = (WorkQueue *)data;
51     Worker *worker = NULL;
52 
53     prctl(PR_SET_NAME, queue->name, 0, 0, 0);
54 
55     (void)pthread_mutex_lock(&queue->mutex);
56     while (queue->state == RUN) {
57         while ((IsEmptyList(&queue->head)) && (queue->state == RUN)) {
58             pthread_cond_wait(&queue->cond, &queue->mutex);
59         }
60         // need to check again
61         if (queue->state != RUN) {
62             break;
63         }
64 
65         worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
66         RemoveListNode(&worker->linkNode);
67         queue->size--;
68 
69         pthread_mutex_unlock(&queue->mutex);
70         worker->process(worker->dataBuff, worker->dataLen);
71         FREE(worker);
72         (void)pthread_mutex_lock(&queue->mutex);
73     }
74 
75     // now the queue is stopped, just remove the nodes.
76     while (!IsEmptyList(&queue->head)) {
77         worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
78         RemoveListNode(&worker->linkNode);
79         queue->size--;
80         FREE(worker);
81     }
82 
83     pthread_mutex_unlock(&queue->mutex);
84     return NULL;
85 }
86 
CreateWorkQueue(uint32_t capacity,const char * name)87 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
88 {
89     WorkQueue *queue = MALLOC(sizeof(WorkQueue));
90     if (queue == NULL) {
91         return NULL;
92     }
93     (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
94 
95     InitListHead(&(queue->head));
96     queue->state = RUN;
97     queue->capacity = capacity;
98     queue->size = 0;
99     queue->name = name;
100 
101     int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
102     if (iRet != 0) {
103         FREE(queue);
104         return NULL;
105     }
106 
107     iRet = pthread_cond_init(&queue->cond, NULL);
108     if (iRet != 0) {
109         (void)pthread_mutex_destroy(&(queue->mutex));
110         FREE(queue);
111         return NULL;
112     }
113 
114     iRet = pthread_create(&queue->pthreadId, NULL, WorkQueueThread, queue);
115     if (iRet != 0) {
116         (void)pthread_cond_destroy(&(queue->cond));
117         (void)pthread_mutex_destroy(&(queue->mutex));
118         FREE(queue);
119         return NULL;
120     }
121 
122     return queue;
123 }
124 
DestroyWorkQueue(WorkQueue * queue)125 uint32_t DestroyWorkQueue(WorkQueue *queue)
126 {
127     if (queue == NULL) {
128         return WORK_QUEUE_NULL_PTR;
129     }
130 
131     (void)pthread_mutex_lock(&queue->mutex);
132     queue->state = DIE;
133     int32_t iRet = pthread_cond_broadcast(&queue->cond);
134     if (iRet != 0) {
135         (void)pthread_mutex_unlock(&queue->mutex);
136         return WORK_QUEUE_THREAD_COND_ERR;
137     }
138     (void)pthread_mutex_unlock(&queue->mutex);
139 
140     iRet = pthread_join(queue->pthreadId, NULL);
141     if (iRet != 0) {
142         return WORK_QUEUE_THREAD_JOIN_ERR;
143     }
144 
145     FREE(queue);
146     return WORK_QUEUE_OK;
147 }
148 
QueueWork(WorkQueue * queue,WorkProcess process,uint8_t * data,uint32_t length)149 uint32_t QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)
150 {
151     if ((queue == NULL) || (process == NULL)) {
152         return WORK_QUEUE_NULL_PTR;
153     }
154     if (queue->state != RUN) {
155         return WORK_QUEUE_STATE_ERR;
156     }
157     if (queue->size >= queue->capacity) {
158         return WORK_QUEUE_FULL;
159     }
160 
161     Worker *worker = MALLOC(sizeof(Worker));
162     if (worker == NULL) {
163         return WORK_QUEUE_MALLOC_ERR;
164     }
165     (void)memset_s(worker, sizeof(Worker), 0, sizeof(Worker));
166 
167     InitListHead(&worker->linkNode);
168     worker->dataLen = length;
169     worker->dataBuff = data;
170     worker->process = process;
171 
172     (void)pthread_mutex_lock(&queue->mutex);
173     AddListNodeBefore(&worker->linkNode, &queue->head);
174     queue->size++;
175 
176     (void)pthread_mutex_unlock(&queue->mutex);
177     (void)pthread_cond_broadcast(&queue->cond);
178     return WORK_QUEUE_OK;
179 }
180