• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Permission is hereby granted, free of charge, to any person obtaining a copy
3  * of this software and associated documentation files (the "Software"), to deal
4  * in the Software without restriction, including without limitation the rights
5  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6  * copies of the Software, and to permit persons to whom the Software is
7  * furnished to do so, subject to the following conditions:
8  *
9  * The above copyright notice and this permission notice shall be included in
10  * all copies or substantial portions of the Software.
11  *
12  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18  * THE SOFTWARE.
19  */
20 
21 /**
22  * Thread message API test
23  */
24 
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
28 #include "libavutil/threadmessage.h"
29 #include "libavutil/thread.h" // not public
30 
31 struct sender_data {
32     int id;
33     pthread_t tid;
34     int workload;
35     AVThreadMessageQueue *queue;
36 };
37 
38 /* same as sender_data but shuffled for testing purpose */
39 struct receiver_data {
40     pthread_t tid;
41     int workload;
42     int id;
43     AVThreadMessageQueue *queue;
44 };
45 
46 struct message {
47     AVFrame *frame;
48     // we add some junk in the message to make sure the message size is >
49     // sizeof(void*)
50     int magic;
51 };
52 
53 #define MAGIC 0xdeadc0de
54 
free_frame(void * arg)55 static void free_frame(void *arg)
56 {
57     struct message *msg = arg;
58     av_assert0(msg->magic == MAGIC);
59     av_frame_free(&msg->frame);
60 }
61 
sender_thread(void * arg)62 static void *sender_thread(void *arg)
63 {
64     int i, ret = 0;
65     struct sender_data *wd = arg;
66 
67     av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68     for (i = 0; i < wd->workload; i++) {
69         if (rand() % wd->workload < wd->workload / 10) {
70             av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
71             av_thread_message_flush(wd->queue);
72         } else {
73             char *val;
74             AVDictionary *meta = NULL;
75             struct message msg = {
76                 .magic = MAGIC,
77                 .frame = av_frame_alloc(),
78             };
79 
80             if (!msg.frame) {
81                 ret = AVERROR(ENOMEM);
82                 break;
83             }
84 
85             /* we add some metadata to identify the frames */
86             val = av_asprintf("frame %d/%d from sender %d",
87                               i + 1, wd->workload, wd->id);
88             if (!val) {
89                 av_frame_free(&msg.frame);
90                 ret = AVERROR(ENOMEM);
91                 break;
92             }
93             ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
94             if (ret < 0) {
95                 av_frame_free(&msg.frame);
96                 break;
97             }
98             msg.frame->metadata = meta;
99 
100             /* allocate a real frame in order to simulate "real" work */
101             msg.frame->format = AV_PIX_FMT_RGBA;
102             msg.frame->width  = 320;
103             msg.frame->height = 240;
104             ret = av_frame_get_buffer(msg.frame, 0);
105             if (ret < 0) {
106                 av_frame_free(&msg.frame);
107                 break;
108             }
109 
110             /* push the frame in the common queue */
111             av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112                    wd->id, i + 1, wd->workload, msg.frame);
113             ret = av_thread_message_queue_send(wd->queue, &msg, 0);
114             if (ret < 0) {
115                 av_frame_free(&msg.frame);
116                 break;
117             }
118         }
119     }
120     av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121            wd->id, av_err2str(ret));
122     av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
123     return NULL;
124 }
125 
receiver_thread(void * arg)126 static void *receiver_thread(void *arg)
127 {
128     int i, ret = 0;
129     struct receiver_data *rd = arg;
130 
131     for (i = 0; i < rd->workload; i++) {
132         if (rand() % rd->workload < rd->workload / 10) {
133             av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
134                    "discarding %d message(s)\n", rd->id,
135                    av_thread_message_queue_nb_elems(rd->queue));
136             av_thread_message_flush(rd->queue);
137         } else {
138             struct message msg;
139             AVDictionary *meta;
140             AVDictionaryEntry *e;
141 
142             ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
143             if (ret < 0)
144                 break;
145             av_assert0(msg.magic == MAGIC);
146             meta = msg.frame->metadata;
147             e = av_dict_get(meta, "sig", NULL, 0);
148             av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
149             av_frame_free(&msg.frame);
150         }
151     }
152 
153     av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
154     av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
155 
156     return NULL;
157 }
158 
get_workload(int minv,int maxv)159 static int get_workload(int minv, int maxv)
160 {
161     return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
162 }
163 
main(int ac,char ** av)164 int main(int ac, char **av)
165 {
166     int i, ret = 0;
167     int max_queue_size;
168     int nb_senders, sender_min_load, sender_max_load;
169     int nb_receivers, receiver_min_load, receiver_max_load;
170     struct sender_data *senders;
171     struct receiver_data *receivers;
172     AVThreadMessageQueue *queue = NULL;
173 
174     if (ac != 8) {
175         av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
176                "<nb_senders> <sender_min_send> <sender_max_send> "
177                "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
178         return 1;
179     }
180 
181     max_queue_size    = atoi(av[1]);
182     nb_senders        = atoi(av[2]);
183     sender_min_load   = atoi(av[3]);
184     sender_max_load   = atoi(av[4]);
185     nb_receivers      = atoi(av[5]);
186     receiver_min_load = atoi(av[6]);
187     receiver_max_load = atoi(av[7]);
188 
189     if (max_queue_size <= 0 ||
190         nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
191         nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
192         av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
193         return 1;
194     }
195 
196     av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
197            "%d receivers receiving [%d-%d]\n", max_queue_size,
198            nb_senders, sender_min_load, sender_max_load,
199            nb_receivers, receiver_min_load, receiver_max_load);
200 
201     senders = av_mallocz_array(nb_senders, sizeof(*senders));
202     receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
203     if (!senders || !receivers) {
204         ret = AVERROR(ENOMEM);
205         goto end;
206     }
207 
208     ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
209     if (ret < 0)
210         goto end;
211 
212     av_thread_message_queue_set_free_func(queue, free_frame);
213 
214 #define SPAWN_THREADS(type) do {                                                \
215     for (i = 0; i < nb_##type##s; i++) {                                        \
216         struct type##_data *td = &type##s[i];                                   \
217                                                                                 \
218         td->id = i;                                                             \
219         td->queue = queue;                                                      \
220         td->workload = get_workload(type##_min_load, type##_max_load);          \
221                                                                                 \
222         ret = pthread_create(&td->tid, NULL, type##_thread, td);                \
223         if (ret) {                                                              \
224             const int err = AVERROR(ret);                                       \
225             av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type)    \
226                    " thread: %s\n", av_err2str(err));                           \
227             goto end;                                                           \
228         }                                                                       \
229     }                                                                           \
230 } while (0)
231 
232 #define WAIT_THREADS(type) do {                                                 \
233     for (i = 0; i < nb_##type##s; i++) {                                        \
234         struct type##_data *td = &type##s[i];                                   \
235                                                                                 \
236         ret = pthread_join(td->tid, NULL);                                      \
237         if (ret) {                                                              \
238             const int err = AVERROR(ret);                                       \
239             av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type)     \
240                    " thread: %s\n", av_err2str(err));                           \
241             goto end;                                                           \
242         }                                                                       \
243     }                                                                           \
244 } while (0)
245 
246     SPAWN_THREADS(receiver);
247     SPAWN_THREADS(sender);
248 
249     WAIT_THREADS(sender);
250     WAIT_THREADS(receiver);
251 
252 end:
253     av_thread_message_queue_free(&queue);
254     av_freep(&senders);
255     av_freep(&receivers);
256 
257     if (ret < 0 && ret != AVERROR_EOF) {
258         av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
259         return 1;
260     }
261     return 0;
262 }
263