• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2013 Stefano Sabatini
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 /**
22  * @file
23  * receive commands through libzeromq and broker them to filters
24  */
25 
26 #include "config_components.h"
27 
28 #include <zmq.h>
29 #include "libavutil/avstring.h"
30 #include "libavutil/bprint.h"
31 #include "libavutil/opt.h"
32 #include "avfilter.h"
33 #include "internal.h"
34 #include "audio.h"
35 #include "video.h"
36 
37 typedef struct ZMQContext {
38     const AVClass *class;
39     void *zmq;
40     void *responder;
41     char *bind_address;
42     int command_count;
43 } ZMQContext;
44 
45 #define OFFSET(x) offsetof(ZMQContext, x)
46 #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
47 static const AVOption options[] = {
48     { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
49     { "b",            "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
50     { NULL }
51 };
52 
init(AVFilterContext * ctx)53 static av_cold int init(AVFilterContext *ctx)
54 {
55     ZMQContext *zmq = ctx->priv;
56 
57     zmq->zmq = zmq_ctx_new();
58     if (!zmq->zmq) {
59         av_log(ctx, AV_LOG_ERROR,
60                "Could not create ZMQ context: %s\n", zmq_strerror(errno));
61         return AVERROR_EXTERNAL;
62     }
63 
64     zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
65     if (!zmq->responder) {
66         av_log(ctx, AV_LOG_ERROR,
67                "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
68         return AVERROR_EXTERNAL;
69     }
70 
71     if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
72         av_log(ctx, AV_LOG_ERROR,
73                "Could not bind ZMQ socket to address '%s': %s\n",
74                zmq->bind_address, zmq_strerror(errno));
75         return AVERROR_EXTERNAL;
76     }
77 
78     zmq->command_count = -1;
79     return 0;
80 }
81 
uninit(AVFilterContext * ctx)82 static void av_cold uninit(AVFilterContext *ctx)
83 {
84     ZMQContext *zmq = ctx->priv;
85 
86     zmq_close(zmq->responder);
87     zmq_ctx_destroy(zmq->zmq);
88 }
89 
90 typedef struct Command {
91     char *target, *command, *arg;
92 } Command;
93 
94 #define SPACES " \f\t\n\r"
95 
parse_command(Command * cmd,const char * command_str,void * log_ctx)96 static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
97 {
98     const char **buf = &command_str;
99 
100     cmd->target = av_get_token(buf, SPACES);
101     if (!cmd->target || !cmd->target[0]) {
102         av_log(log_ctx, AV_LOG_ERROR,
103                "No target specified in command '%s'\n", command_str);
104         return AVERROR(EINVAL);
105     }
106 
107     cmd->command = av_get_token(buf, SPACES);
108     if (!cmd->command || !cmd->command[0]) {
109         av_log(log_ctx, AV_LOG_ERROR,
110                "No command specified in command '%s'\n", command_str);
111         return AVERROR(EINVAL);
112     }
113 
114     cmd->arg = av_get_token(buf, SPACES);
115     return 0;
116 }
117 
recv_msg(AVFilterContext * ctx,char ** buf,int * buf_size)118 static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
119 {
120     ZMQContext *zmq = ctx->priv;
121     zmq_msg_t msg;
122     int ret = 0;
123 
124     if (zmq_msg_init(&msg) == -1) {
125         av_log(ctx, AV_LOG_WARNING,
126                "Could not initialize receive message: %s\n", zmq_strerror(errno));
127         return AVERROR_EXTERNAL;
128     }
129 
130     if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
131         if (errno != EAGAIN)
132             av_log(ctx, AV_LOG_WARNING,
133                    "Could not receive message: %s\n", zmq_strerror(errno));
134         ret = AVERROR_EXTERNAL;
135         goto end;
136     }
137 
138     *buf_size = zmq_msg_size(&msg) + 1;
139     *buf = av_malloc(*buf_size);
140     if (!*buf) {
141         ret = AVERROR(ENOMEM);
142         goto end;
143     }
144     memcpy(*buf, zmq_msg_data(&msg), *buf_size - 1);
145     (*buf)[*buf_size-1] = 0;
146 
147 end:
148     zmq_msg_close(&msg);
149     return ret;
150 }
151 
filter_frame(AVFilterLink * inlink,AVFrame * ref)152 static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
153 {
154     AVFilterContext *ctx = inlink->dst;
155     ZMQContext *zmq = ctx->priv;
156 
157     while (1) {
158         char cmd_buf[1024];
159         char *recv_buf, *send_buf;
160         int recv_buf_size;
161         Command cmd = {0};
162         int ret;
163 
164         /* receive command */
165         if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
166             break;
167         zmq->command_count++;
168 
169         /* parse command */
170         if (parse_command(&cmd, recv_buf, ctx) < 0) {
171             av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
172             goto end;
173         }
174 
175         /* process command */
176         av_log(ctx, AV_LOG_VERBOSE,
177                "Processing command #%d target:%s command:%s arg:%s\n",
178                zmq->command_count, cmd.target, cmd.command, cmd.arg);
179         ret = avfilter_graph_send_command(inlink->graph,
180                                           cmd.target, cmd.command, cmd.arg,
181                                           cmd_buf, sizeof(cmd_buf),
182                                           AVFILTER_CMD_FLAG_ONE);
183         send_buf = av_asprintf("%d %s%s%s",
184                                -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
185         if (!send_buf) {
186             ret = AVERROR(ENOMEM);
187             goto end;
188         }
189         av_log(ctx, AV_LOG_VERBOSE,
190                "Sending command reply for command #%d:\n%s\n",
191                zmq->command_count, send_buf);
192         if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
193             av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
194                    zmq->command_count, zmq_strerror(ret));
195 
196     end:
197         av_freep(&send_buf);
198         av_freep(&recv_buf);
199         recv_buf_size = 0;
200         av_freep(&cmd.target);
201         av_freep(&cmd.command);
202         av_freep(&cmd.arg);
203     }
204 
205     return ff_filter_frame(ctx->outputs[0], ref);
206 }
207 
208 AVFILTER_DEFINE_CLASS_EXT(zmq, "(a)zmq", options);
209 
210 #if CONFIG_ZMQ_FILTER
211 
212 static const AVFilterPad zmq_inputs[] = {
213     {
214         .name         = "default",
215         .type         = AVMEDIA_TYPE_VIDEO,
216         .filter_frame = filter_frame,
217     },
218 };
219 
220 static const AVFilterPad zmq_outputs[] = {
221     {
222         .name = "default",
223         .type = AVMEDIA_TYPE_VIDEO,
224     },
225 };
226 
227 const AVFilter ff_vf_zmq = {
228     .name        = "zmq",
229     .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
230     .init        = init,
231     .uninit      = uninit,
232     .priv_size   = sizeof(ZMQContext),
233     FILTER_INPUTS(zmq_inputs),
234     FILTER_OUTPUTS(zmq_outputs),
235     .priv_class  = &zmq_class,
236 };
237 
238 #endif
239 
240 #if CONFIG_AZMQ_FILTER
241 
242 static const AVFilterPad azmq_inputs[] = {
243     {
244         .name         = "default",
245         .type         = AVMEDIA_TYPE_AUDIO,
246         .filter_frame = filter_frame,
247     },
248 };
249 
250 static const AVFilterPad azmq_outputs[] = {
251     {
252         .name = "default",
253         .type = AVMEDIA_TYPE_AUDIO,
254     },
255 };
256 
257 const AVFilter ff_af_azmq = {
258     .name        = "azmq",
259     .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
260     .priv_class  = &zmq_class,
261     .init        = init,
262     .uninit      = uninit,
263     .priv_size   = sizeof(ZMQContext),
264     FILTER_INPUTS(azmq_inputs),
265     FILTER_OUTPUTS(azmq_outputs),
266 };
267 
268 #endif
269