• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2013 Stefano Sabatini
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 /**
22  * @file
23  * receive commands through libzeromq and broker them to filters
24  */
25 
26 #include <zmq.h>
27 #include "libavutil/avstring.h"
28 #include "libavutil/bprint.h"
29 #include "libavutil/opt.h"
30 #include "avfilter.h"
31 #include "internal.h"
32 #include "audio.h"
33 #include "video.h"
34 
35 typedef struct ZMQContext {
36     const AVClass *class;
37     void *zmq;
38     void *responder;
39     char *bind_address;
40     int command_count;
41 } ZMQContext;
42 
43 #define OFFSET(x) offsetof(ZMQContext, x)
44 #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
45 static const AVOption options[] = {
46     { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
47     { "b",            "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
48     { NULL }
49 };
50 
init(AVFilterContext * ctx)51 static av_cold int init(AVFilterContext *ctx)
52 {
53     ZMQContext *zmq = ctx->priv;
54 
55     zmq->zmq = zmq_ctx_new();
56     if (!zmq->zmq) {
57         av_log(ctx, AV_LOG_ERROR,
58                "Could not create ZMQ context: %s\n", zmq_strerror(errno));
59         return AVERROR_EXTERNAL;
60     }
61 
62     zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
63     if (!zmq->responder) {
64         av_log(ctx, AV_LOG_ERROR,
65                "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
66         return AVERROR_EXTERNAL;
67     }
68 
69     if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
70         av_log(ctx, AV_LOG_ERROR,
71                "Could not bind ZMQ socket to address '%s': %s\n",
72                zmq->bind_address, zmq_strerror(errno));
73         return AVERROR_EXTERNAL;
74     }
75 
76     zmq->command_count = -1;
77     return 0;
78 }
79 
uninit(AVFilterContext * ctx)80 static void av_cold uninit(AVFilterContext *ctx)
81 {
82     ZMQContext *zmq = ctx->priv;
83 
84     zmq_close(zmq->responder);
85     zmq_ctx_destroy(zmq->zmq);
86 }
87 
88 typedef struct Command {
89     char *target, *command, *arg;
90 } Command;
91 
92 #define SPACES " \f\t\n\r"
93 
parse_command(Command * cmd,const char * command_str,void * log_ctx)94 static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
95 {
96     const char **buf = &command_str;
97 
98     cmd->target = av_get_token(buf, SPACES);
99     if (!cmd->target || !cmd->target[0]) {
100         av_log(log_ctx, AV_LOG_ERROR,
101                "No target specified in command '%s'\n", command_str);
102         return AVERROR(EINVAL);
103     }
104 
105     cmd->command = av_get_token(buf, SPACES);
106     if (!cmd->command || !cmd->command[0]) {
107         av_log(log_ctx, AV_LOG_ERROR,
108                "No command specified in command '%s'\n", command_str);
109         return AVERROR(EINVAL);
110     }
111 
112     cmd->arg = av_get_token(buf, SPACES);
113     return 0;
114 }
115 
recv_msg(AVFilterContext * ctx,char ** buf,int * buf_size)116 static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
117 {
118     ZMQContext *zmq = ctx->priv;
119     zmq_msg_t msg;
120     int ret = 0;
121 
122     if (zmq_msg_init(&msg) == -1) {
123         av_log(ctx, AV_LOG_WARNING,
124                "Could not initialize receive message: %s\n", zmq_strerror(errno));
125         return AVERROR_EXTERNAL;
126     }
127 
128     if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
129         if (errno != EAGAIN)
130             av_log(ctx, AV_LOG_WARNING,
131                    "Could not receive message: %s\n", zmq_strerror(errno));
132         ret = AVERROR_EXTERNAL;
133         goto end;
134     }
135 
136     *buf_size = zmq_msg_size(&msg) + 1;
137     *buf = av_malloc(*buf_size);
138     if (!*buf) {
139         ret = AVERROR(ENOMEM);
140         goto end;
141     }
142     memcpy(*buf, zmq_msg_data(&msg), *buf_size - 1);
143     (*buf)[*buf_size-1] = 0;
144 
145 end:
146     zmq_msg_close(&msg);
147     return ret;
148 }
149 
filter_frame(AVFilterLink * inlink,AVFrame * ref)150 static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
151 {
152     AVFilterContext *ctx = inlink->dst;
153     ZMQContext *zmq = ctx->priv;
154 
155     while (1) {
156         char cmd_buf[1024];
157         char *recv_buf, *send_buf;
158         int recv_buf_size;
159         Command cmd = {0};
160         int ret;
161 
162         /* receive command */
163         if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
164             break;
165         zmq->command_count++;
166 
167         /* parse command */
168         if (parse_command(&cmd, recv_buf, ctx) < 0) {
169             av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
170             goto end;
171         }
172 
173         /* process command */
174         av_log(ctx, AV_LOG_VERBOSE,
175                "Processing command #%d target:%s command:%s arg:%s\n",
176                zmq->command_count, cmd.target, cmd.command, cmd.arg);
177         ret = avfilter_graph_send_command(inlink->graph,
178                                           cmd.target, cmd.command, cmd.arg,
179                                           cmd_buf, sizeof(cmd_buf),
180                                           AVFILTER_CMD_FLAG_ONE);
181         send_buf = av_asprintf("%d %s%s%s",
182                                -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
183         if (!send_buf) {
184             ret = AVERROR(ENOMEM);
185             goto end;
186         }
187         av_log(ctx, AV_LOG_VERBOSE,
188                "Sending command reply for command #%d:\n%s\n",
189                zmq->command_count, send_buf);
190         if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
191             av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
192                    zmq->command_count, zmq_strerror(ret));
193 
194     end:
195         av_freep(&send_buf);
196         av_freep(&recv_buf);
197         recv_buf_size = 0;
198         av_freep(&cmd.target);
199         av_freep(&cmd.command);
200         av_freep(&cmd.arg);
201     }
202 
203     return ff_filter_frame(ctx->outputs[0], ref);
204 }
205 
206 #if CONFIG_ZMQ_FILTER
207 
208 #define zmq_options options
209 AVFILTER_DEFINE_CLASS(zmq);
210 
211 static const AVFilterPad zmq_inputs[] = {
212     {
213         .name         = "default",
214         .type         = AVMEDIA_TYPE_VIDEO,
215         .filter_frame = filter_frame,
216     },
217     { NULL }
218 };
219 
220 static const AVFilterPad zmq_outputs[] = {
221     {
222         .name = "default",
223         .type = AVMEDIA_TYPE_VIDEO,
224     },
225     { NULL }
226 };
227 
228 AVFilter ff_vf_zmq = {
229     .name        = "zmq",
230     .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
231     .init        = init,
232     .uninit      = uninit,
233     .priv_size   = sizeof(ZMQContext),
234     .inputs      = zmq_inputs,
235     .outputs     = zmq_outputs,
236     .priv_class  = &zmq_class,
237 };
238 
239 #endif
240 
241 #if CONFIG_AZMQ_FILTER
242 
243 #define azmq_options options
244 AVFILTER_DEFINE_CLASS(azmq);
245 
246 static const AVFilterPad azmq_inputs[] = {
247     {
248         .name         = "default",
249         .type         = AVMEDIA_TYPE_AUDIO,
250         .filter_frame = filter_frame,
251     },
252     { NULL }
253 };
254 
255 static const AVFilterPad azmq_outputs[] = {
256     {
257         .name = "default",
258         .type = AVMEDIA_TYPE_AUDIO,
259     },
260     { NULL }
261 };
262 
263 AVFilter ff_af_azmq = {
264     .name        = "azmq",
265     .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
266     .init        = init,
267     .uninit      = uninit,
268     .priv_size   = sizeof(ZMQContext),
269     .inputs      = azmq_inputs,
270     .outputs     = azmq_outputs,
271     .priv_class  = &azmq_class,
272 };
273 
274 #endif
275