1 /* ASF RTP Payloader plugin for GStreamer
2 * Copyright (C) 2009 Thiago Santos <thiagoss@embedded.ufcg.edu.br>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 /* FIXME
21 * - this element doesn't follow (max/min) time properties,
22 * is it possible to do it with a container format?
23 */
24
25 #ifdef HAVE_CONFIG_H
26 # include "config.h"
27 #endif
28
29 #include <gst/rtp/gstrtpbuffer.h>
30 #include <string.h>
31
32 #include "gstrtpasfpay.h"
33
34 GST_DEBUG_CATEGORY_STATIC (rtpasfpay_debug);
35 #define GST_CAT_DEFAULT (rtpasfpay_debug)
36
37 static GstStaticPadTemplate gst_rtp_asf_pay_sink_template =
38 GST_STATIC_PAD_TEMPLATE ("sink",
39 GST_PAD_SINK,
40 GST_PAD_ALWAYS,
41 GST_STATIC_CAPS ("video/x-ms-asf, " "parsed = (boolean) true")
42 );
43
44 static GstStaticPadTemplate gst_rtp_asf_pay_src_template =
45 GST_STATIC_PAD_TEMPLATE ("src",
46 GST_PAD_SRC,
47 GST_PAD_ALWAYS,
48 GST_STATIC_CAPS ("application/x-rtp, "
49 "media = (string) {\"audio\", \"video\", \"application\"}, "
50 "clock-rate = (int) 1000, " "encoding-name = (string) \"X-ASF-PF\"")
51 );
52
53 static GstFlowReturn
54 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer);
55 static gboolean
56 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps);
57
58 #define gst_rtp_asf_pay_parent_class parent_class
59 G_DEFINE_TYPE (GstRtpAsfPay, gst_rtp_asf_pay, GST_TYPE_RTP_BASE_PAYLOAD);
60 GST_ELEMENT_REGISTER_DEFINE (rtpasfpay, "rtpasfpay",
61 GST_RANK_NONE, GST_TYPE_RTP_ASF_PAY);
62
63 static void
gst_rtp_asf_pay_init(GstRtpAsfPay * rtpasfpay)64 gst_rtp_asf_pay_init (GstRtpAsfPay * rtpasfpay)
65 {
66 rtpasfpay->first_ts = 0;
67 rtpasfpay->config = NULL;
68 rtpasfpay->packets_count = 0;
69 rtpasfpay->state = ASF_NOT_STARTED;
70 rtpasfpay->headers = NULL;
71 rtpasfpay->current = NULL;
72 }
73
74 static void
gst_rtp_asf_pay_finalize(GObject * object)75 gst_rtp_asf_pay_finalize (GObject * object)
76 {
77 GstRtpAsfPay *rtpasfpay;
78 rtpasfpay = GST_RTP_ASF_PAY (object);
79 g_free (rtpasfpay->config);
80 if (rtpasfpay->headers)
81 gst_buffer_unref (rtpasfpay->headers);
82 G_OBJECT_CLASS (parent_class)->finalize (object);
83 }
84
85 static void
gst_rtp_asf_pay_class_init(GstRtpAsfPayClass * klass)86 gst_rtp_asf_pay_class_init (GstRtpAsfPayClass * klass)
87 {
88 GObjectClass *gobject_class;
89 GstElementClass *gstelement_class;
90 GstRTPBasePayloadClass *gstbasertppayload_class;
91
92 gobject_class = (GObjectClass *) klass;
93 gstelement_class = (GstElementClass *) klass;
94 gstbasertppayload_class = (GstRTPBasePayloadClass *) klass;
95
96 gobject_class->finalize = gst_rtp_asf_pay_finalize;
97
98 gstbasertppayload_class->handle_buffer = gst_rtp_asf_pay_handle_buffer;
99 gstbasertppayload_class->set_caps = gst_rtp_asf_pay_set_caps;
100
101 gst_element_class_add_static_pad_template (gstelement_class,
102 &gst_rtp_asf_pay_sink_template);
103 gst_element_class_add_static_pad_template (gstelement_class,
104 &gst_rtp_asf_pay_src_template);
105 gst_element_class_set_static_metadata (gstelement_class, "RTP ASF payloader",
106 "Codec/Payloader/Network",
107 "Payload-encodes ASF into RTP packets (MS_RTSP)",
108 "Thiago Santos <thiagoss@embedded.ufcg.edu.br>");
109
110 GST_DEBUG_CATEGORY_INIT (rtpasfpay_debug, "rtpasfpay", 0,
111 "ASF RTP Payloader");
112 }
113
114 static gboolean
gst_rtp_asf_pay_set_caps(GstRTPBasePayload * rtppay,GstCaps * caps)115 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps)
116 {
117 /* FIXME change application for the actual content */
118 gst_rtp_base_payload_set_options (rtppay, "application", TRUE, "X-ASF-PF",
119 1000);
120 return TRUE;
121 }
122
123 static GstFlowReturn
gst_rtp_asf_pay_handle_packet(GstRtpAsfPay * rtpasfpay,GstBuffer * buffer)124 gst_rtp_asf_pay_handle_packet (GstRtpAsfPay * rtpasfpay, GstBuffer * buffer)
125 {
126 GstRTPBasePayload *rtppay;
127 GstAsfPacketInfo *packetinfo;
128 guint8 flags;
129 guint8 *data;
130 guint32 packet_util_size;
131 guint32 packet_offset;
132 guint32 size_left;
133 GstFlowReturn ret = GST_FLOW_OK;
134
135 rtppay = GST_RTP_BASE_PAYLOAD (rtpasfpay);
136 packetinfo = &rtpasfpay->packetinfo;
137
138 if (!gst_asf_parse_packet (buffer, packetinfo, TRUE,
139 rtpasfpay->asfinfo.packet_size)) {
140 GST_ERROR_OBJECT (rtpasfpay, "Error while parsing asf packet");
141 gst_buffer_unref (buffer);
142 return GST_FLOW_ERROR;
143 }
144
145 if (packetinfo->packet_size == 0)
146 packetinfo->packet_size = rtpasfpay->asfinfo.packet_size;
147
148 GST_LOG_OBJECT (rtpasfpay, "Packet size: %" G_GUINT32_FORMAT
149 ", padding: %" G_GUINT32_FORMAT, packetinfo->packet_size,
150 packetinfo->padding);
151
152 /* update padding field to 0 */
153 if (packetinfo->padding > 0) {
154 GstAsfPacketInfo info;
155 /* find padding field offset */
156 guint offset = packetinfo->err_cor_len + 2 +
157 gst_asf_get_var_size_field_len (packetinfo->packet_field_type) +
158 gst_asf_get_var_size_field_len (packetinfo->seq_field_type);
159 buffer = gst_buffer_make_writable (buffer);
160 switch (packetinfo->padd_field_type) {
161 case ASF_FIELD_TYPE_DWORD:
162 gst_buffer_memset (buffer, offset, 0, 4);
163 break;
164 case ASF_FIELD_TYPE_WORD:
165 gst_buffer_memset (buffer, offset, 0, 2);
166 break;
167 case ASF_FIELD_TYPE_BYTE:
168 gst_buffer_memset (buffer, offset, 0, 1);
169 break;
170 case ASF_FIELD_TYPE_NONE:
171 default:
172 break;
173 }
174 gst_asf_parse_packet (buffer, &info, FALSE, 0);
175 }
176
177 if (packetinfo->padding != 0)
178 packet_util_size = rtpasfpay->asfinfo.packet_size - packetinfo->padding;
179 else
180 packet_util_size = packetinfo->packet_size;
181 packet_offset = 0;
182 while (packet_util_size > 0) {
183 /* Even if we don't fill completely an output buffer we
184 * push it when we add an fragment. Because it seems that
185 * it is not possible to determine where a asf packet
186 * fragment ends inside a rtp packet payload.
187 * This flag tells us to push the packet.
188 */
189 gboolean force_push = FALSE;
190 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
191
192 /* we have no output buffer pending, create one */
193 if (rtpasfpay->current == NULL) {
194 GST_LOG_OBJECT (rtpasfpay, "Creating new output buffer");
195 rtpasfpay->current =
196 gst_rtp_buffer_new_allocate_len (GST_RTP_BASE_PAYLOAD_MTU (rtpasfpay),
197 0, 0);
198 rtpasfpay->cur_off = 0;
199 rtpasfpay->has_ts = FALSE;
200 rtpasfpay->marker = FALSE;
201 }
202 gst_rtp_buffer_map (rtpasfpay->current, GST_MAP_READWRITE, &rtp);
203 data = gst_rtp_buffer_get_payload (&rtp);
204 data += rtpasfpay->cur_off;
205 size_left = gst_rtp_buffer_get_payload_len (&rtp) - rtpasfpay->cur_off;
206
207 GST_DEBUG_OBJECT (rtpasfpay, "Input buffer bytes consumed: %"
208 G_GUINT32_FORMAT "/%" G_GSIZE_FORMAT, packet_offset,
209 gst_buffer_get_size (buffer));
210
211 GST_DEBUG_OBJECT (rtpasfpay, "Output rtpbuffer status");
212 GST_DEBUG_OBJECT (rtpasfpay, "Current offset: %" G_GUINT32_FORMAT,
213 rtpasfpay->cur_off);
214 GST_DEBUG_OBJECT (rtpasfpay, "Size left: %" G_GUINT32_FORMAT, size_left);
215 GST_DEBUG_OBJECT (rtpasfpay, "Has ts: %s",
216 rtpasfpay->has_ts ? "yes" : "no");
217 if (rtpasfpay->has_ts) {
218 GST_DEBUG_OBJECT (rtpasfpay, "Ts: %" G_GUINT32_FORMAT, rtpasfpay->ts);
219 }
220
221 flags = 0;
222 if (packetinfo->has_keyframe) {
223 flags = flags | 0x80;
224 }
225 flags = flags | 0x20; /* Relative timestamp is present */
226
227 if (!rtpasfpay->has_ts) {
228 /* this is the first asf packet, its send time is the
229 * rtp packet timestamp */
230 rtpasfpay->has_ts = TRUE;
231 rtpasfpay->ts = packetinfo->send_time;
232 }
233
234 if (size_left >= packet_util_size + 8) {
235 /* enough space for the rest of the packet */
236 if (packet_offset == 0) {
237 flags = flags | 0x40;
238 GST_WRITE_UINT24_BE (data + 1, packet_util_size);
239 } else {
240 GST_WRITE_UINT24_BE (data + 1, packet_offset);
241 force_push = TRUE;
242 }
243 data[0] = flags;
244 GST_WRITE_UINT32_BE (data + 4,
245 (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
246 gst_buffer_extract (buffer, packet_offset, data + 8, packet_util_size);
247
248 /* updating status variables */
249 rtpasfpay->cur_off += 8 + packet_util_size;
250 size_left -= packet_util_size + 8;
251 packet_offset += packet_util_size;
252 packet_util_size = 0;
253 rtpasfpay->marker = TRUE;
254 } else {
255 /* fragment packet */
256 data[0] = flags;
257 GST_WRITE_UINT24_BE (data + 1, packet_offset);
258 GST_WRITE_UINT32_BE (data + 4,
259 (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
260 gst_buffer_extract (buffer, packet_offset, data + 8, size_left - 8);
261
262 /* updating status variables */
263 rtpasfpay->cur_off += size_left;
264 packet_offset += size_left - 8;
265 packet_util_size -= size_left - 8;
266 size_left = 0;
267 force_push = TRUE;
268 }
269
270 /* there is not enough room for any more buffers */
271 if (force_push || size_left <= 8) {
272
273 gst_rtp_buffer_set_ssrc (&rtp, rtppay->current_ssrc);
274 gst_rtp_buffer_set_marker (&rtp, rtpasfpay->marker);
275 gst_rtp_buffer_set_payload_type (&rtp, GST_RTP_BASE_PAYLOAD_PT (rtppay));
276 gst_rtp_buffer_set_seq (&rtp, rtppay->seqnum + 1);
277 gst_rtp_buffer_set_timestamp (&rtp, packetinfo->send_time);
278 gst_rtp_buffer_unmap (&rtp);
279
280 /* trim remaining bytes not used */
281 if (size_left != 0) {
282 gst_buffer_set_size (rtpasfpay->current,
283 gst_buffer_get_size (rtpasfpay->current) - size_left);
284 }
285
286 GST_BUFFER_TIMESTAMP (rtpasfpay->current) = GST_BUFFER_TIMESTAMP (buffer);
287
288 rtppay->seqnum++;
289 rtppay->timestamp = packetinfo->send_time;
290
291 GST_DEBUG_OBJECT (rtpasfpay, "Pushing rtp buffer");
292 ret = gst_rtp_base_payload_push (rtppay, rtpasfpay->current);
293 rtpasfpay->current = NULL;
294 if (ret != GST_FLOW_OK) {
295 gst_buffer_unref (buffer);
296 return ret;
297 }
298 }
299 }
300 gst_buffer_unref (buffer);
301 return ret;
302 }
303
304 static GstFlowReturn
gst_rtp_asf_pay_parse_headers(GstRtpAsfPay * rtpasfpay)305 gst_rtp_asf_pay_parse_headers (GstRtpAsfPay * rtpasfpay)
306 {
307 gchar *maxps;
308 GstMapInfo map;
309
310 g_return_val_if_fail (rtpasfpay->headers, GST_FLOW_ERROR);
311
312 if (!gst_asf_parse_headers (rtpasfpay->headers, &rtpasfpay->asfinfo))
313 goto error;
314
315 GST_DEBUG_OBJECT (rtpasfpay, "Packets number: %" G_GUINT64_FORMAT,
316 rtpasfpay->asfinfo.packets_count);
317 GST_DEBUG_OBJECT (rtpasfpay, "Packets size: %" G_GUINT32_FORMAT,
318 rtpasfpay->asfinfo.packet_size);
319 GST_DEBUG_OBJECT (rtpasfpay, "Broadcast mode: %s",
320 rtpasfpay->asfinfo.broadcast ? "true" : "false");
321
322 /* get the config for caps */
323 g_free (rtpasfpay->config);
324 gst_buffer_map (rtpasfpay->headers, &map, GST_MAP_READ);
325 rtpasfpay->config = g_base64_encode (map.data, map.size);
326 gst_buffer_unmap (rtpasfpay->headers, &map);
327 GST_DEBUG_OBJECT (rtpasfpay, "Serialized headers to base64 string %s",
328 rtpasfpay->config);
329
330 g_assert (rtpasfpay->config != NULL);
331 GST_DEBUG_OBJECT (rtpasfpay, "Setting optional caps values: maxps=%"
332 G_GUINT32_FORMAT " and config=%s", rtpasfpay->asfinfo.packet_size,
333 rtpasfpay->config);
334 maxps =
335 g_strdup_printf ("%" G_GUINT32_FORMAT, rtpasfpay->asfinfo.packet_size);
336 gst_rtp_base_payload_set_outcaps (GST_RTP_BASE_PAYLOAD (rtpasfpay), "maxps",
337 G_TYPE_STRING, maxps, "config", G_TYPE_STRING, rtpasfpay->config, NULL);
338 g_free (maxps);
339
340 return GST_FLOW_OK;
341
342 error:
343 {
344 GST_ELEMENT_ERROR (rtpasfpay, STREAM, DECODE, (NULL),
345 ("Error parsing headers"));
346 return GST_FLOW_ERROR;
347 }
348 }
349
350 static GstFlowReturn
gst_rtp_asf_pay_handle_buffer(GstRTPBasePayload * rtppay,GstBuffer * buffer)351 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer)
352 {
353 GstRtpAsfPay *rtpasfpay = GST_RTP_ASF_PAY_CAST (rtppay);
354
355 if (G_UNLIKELY (rtpasfpay->state == ASF_END)) {
356 GST_LOG_OBJECT (rtpasfpay,
357 "Dropping buffer as we already pushed all packets");
358 gst_buffer_unref (buffer);
359 return GST_FLOW_EOS; /* we already finished our job */
360 }
361
362 /* receive headers
363 * we only accept if they are in a single buffer */
364 if (G_UNLIKELY (rtpasfpay->state == ASF_NOT_STARTED)) {
365 guint64 header_size;
366
367 if (gst_buffer_get_size (buffer) < 24) { /* guid+object size size */
368 GST_ERROR_OBJECT (rtpasfpay,
369 "Buffer too small, smaller than a Guid and object size");
370 gst_buffer_unref (buffer);
371 return GST_FLOW_ERROR;
372 }
373
374 header_size = gst_asf_match_and_peek_obj_size_buf (buffer,
375 &(guids[ASF_HEADER_OBJECT_INDEX]));
376 if (header_size > 0) {
377 GST_DEBUG_OBJECT (rtpasfpay, "ASF header guid received, size %"
378 G_GUINT64_FORMAT, header_size);
379
380 if (gst_buffer_get_size (buffer) < header_size) {
381 GST_ERROR_OBJECT (rtpasfpay, "Headers should be contained in a single"
382 " buffer");
383 gst_buffer_unref (buffer);
384 return GST_FLOW_ERROR;
385 } else {
386 rtpasfpay->state = ASF_DATA_OBJECT;
387
388 /* clear previous headers, if any */
389 if (rtpasfpay->headers) {
390 gst_buffer_unref (rtpasfpay->headers);
391 }
392
393 GST_DEBUG_OBJECT (rtpasfpay, "Storing headers");
394 if (gst_buffer_get_size (buffer) == header_size) {
395 rtpasfpay->headers = buffer;
396 return GST_FLOW_OK;
397 } else {
398 /* headers are a subbuffer of thie buffer */
399 GstBuffer *aux = gst_buffer_copy_region (buffer,
400 GST_BUFFER_COPY_ALL, header_size,
401 gst_buffer_get_size (buffer) - header_size);
402 rtpasfpay->headers = gst_buffer_copy_region (buffer,
403 GST_BUFFER_COPY_ALL, 0, header_size);
404 gst_buffer_replace (&buffer, aux);
405 }
406 }
407 } else {
408 GST_ERROR_OBJECT (rtpasfpay, "Missing ASF header start");
409 gst_buffer_unref (buffer);
410 return GST_FLOW_ERROR;
411 }
412 }
413
414 if (G_UNLIKELY (rtpasfpay->state == ASF_DATA_OBJECT)) {
415 GstMapInfo map;
416
417 if (gst_buffer_get_size (buffer) != ASF_DATA_OBJECT_SIZE) {
418 GST_ERROR_OBJECT (rtpasfpay, "Received buffer of different size of "
419 "the data object header");
420 gst_buffer_unref (buffer);
421 return GST_FLOW_ERROR;
422 }
423
424 gst_buffer_map (buffer, &map, GST_MAP_READ);
425 if (gst_asf_match_guid (map.data, &(guids[ASF_DATA_OBJECT_INDEX]))) {
426 gst_buffer_unmap (buffer, &map);
427 GST_DEBUG_OBJECT (rtpasfpay, "Received data object header");
428 rtpasfpay->headers = gst_buffer_append (rtpasfpay->headers, buffer);
429 rtpasfpay->state = ASF_PACKETS;
430
431 return gst_rtp_asf_pay_parse_headers (rtpasfpay);
432 } else {
433 gst_buffer_unmap (buffer, &map);
434 GST_ERROR_OBJECT (rtpasfpay, "Unexpected object received (was expecting "
435 "data object)");
436 gst_buffer_unref (buffer);
437 return GST_FLOW_ERROR;
438 }
439 }
440
441 if (G_LIKELY (rtpasfpay->state == ASF_PACKETS)) {
442 /* in broadcast mode we can't trust the packets count information
443 * from the headers
444 * We assume that if this is on broadcast mode it is a live stream
445 * and we are going to keep receiving packets indefinitely
446 */
447 if (rtpasfpay->asfinfo.broadcast ||
448 rtpasfpay->packets_count < rtpasfpay->asfinfo.packets_count) {
449 GST_DEBUG_OBJECT (rtpasfpay, "Received packet %"
450 G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
451 rtpasfpay->packets_count, rtpasfpay->asfinfo.packets_count);
452 rtpasfpay->packets_count++;
453 return gst_rtp_asf_pay_handle_packet (rtpasfpay, buffer);
454 } else {
455 GST_INFO_OBJECT (rtpasfpay, "Packets ended");
456 rtpasfpay->state = ASF_END;
457 gst_buffer_unref (buffer);
458 return GST_FLOW_EOS;
459 }
460 }
461
462 gst_buffer_unref (buffer);
463 return GST_FLOW_OK;
464 }
465