• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* ASF RTP Payloader plugin for GStreamer
2  * Copyright (C) 2009 Thiago Santos <thiagoss@embedded.ufcg.edu.br>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /* FIXME
21  * - this element doesn't follow (max/min) time properties,
22  *   is it possible to do it with a container format?
23  */
24 
25 #ifdef HAVE_CONFIG_H
26 #  include "config.h"
27 #endif
28 
29 #include <gst/rtp/gstrtpbuffer.h>
30 #include <string.h>
31 
32 #include "gstrtpasfpay.h"
33 
34 GST_DEBUG_CATEGORY_STATIC (rtpasfpay_debug);
35 #define GST_CAT_DEFAULT (rtpasfpay_debug)
36 
37 static GstStaticPadTemplate gst_rtp_asf_pay_sink_template =
38 GST_STATIC_PAD_TEMPLATE ("sink",
39     GST_PAD_SINK,
40     GST_PAD_ALWAYS,
41     GST_STATIC_CAPS ("video/x-ms-asf, " "parsed = (boolean) true")
42     );
43 
44 static GstStaticPadTemplate gst_rtp_asf_pay_src_template =
45 GST_STATIC_PAD_TEMPLATE ("src",
46     GST_PAD_SRC,
47     GST_PAD_ALWAYS,
48     GST_STATIC_CAPS ("application/x-rtp, "
49         "media = (string) {\"audio\", \"video\", \"application\"}, "
50         "clock-rate = (int) 1000, " "encoding-name = (string) \"X-ASF-PF\"")
51     );
52 
53 static GstFlowReturn
54 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer);
55 static gboolean
56 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps);
57 
58 #define gst_rtp_asf_pay_parent_class parent_class
59 G_DEFINE_TYPE (GstRtpAsfPay, gst_rtp_asf_pay, GST_TYPE_RTP_BASE_PAYLOAD);
60 
61 static void
gst_rtp_asf_pay_init(GstRtpAsfPay * rtpasfpay)62 gst_rtp_asf_pay_init (GstRtpAsfPay * rtpasfpay)
63 {
64   rtpasfpay->first_ts = 0;
65   rtpasfpay->config = NULL;
66   rtpasfpay->packets_count = 0;
67   rtpasfpay->state = ASF_NOT_STARTED;
68   rtpasfpay->headers = NULL;
69   rtpasfpay->current = NULL;
70 }
71 
72 static void
gst_rtp_asf_pay_finalize(GObject * object)73 gst_rtp_asf_pay_finalize (GObject * object)
74 {
75   GstRtpAsfPay *rtpasfpay;
76   rtpasfpay = GST_RTP_ASF_PAY (object);
77   g_free (rtpasfpay->config);
78   if (rtpasfpay->headers)
79     gst_buffer_unref (rtpasfpay->headers);
80   G_OBJECT_CLASS (parent_class)->finalize (object);
81 }
82 
83 static void
gst_rtp_asf_pay_class_init(GstRtpAsfPayClass * klass)84 gst_rtp_asf_pay_class_init (GstRtpAsfPayClass * klass)
85 {
86   GObjectClass *gobject_class;
87   GstElementClass *gstelement_class;
88   GstRTPBasePayloadClass *gstbasertppayload_class;
89 
90   gobject_class = (GObjectClass *) klass;
91   gstelement_class = (GstElementClass *) klass;
92   gstbasertppayload_class = (GstRTPBasePayloadClass *) klass;
93 
94   gobject_class->finalize = gst_rtp_asf_pay_finalize;
95 
96   gstbasertppayload_class->handle_buffer = gst_rtp_asf_pay_handle_buffer;
97   gstbasertppayload_class->set_caps = gst_rtp_asf_pay_set_caps;
98 
99   gst_element_class_add_static_pad_template (gstelement_class,
100       &gst_rtp_asf_pay_sink_template);
101   gst_element_class_add_static_pad_template (gstelement_class,
102       &gst_rtp_asf_pay_src_template);
103   gst_element_class_set_static_metadata (gstelement_class, "RTP ASF payloader",
104       "Codec/Payloader/Network",
105       "Payload-encodes ASF into RTP packets (MS_RTSP)",
106       "Thiago Santos <thiagoss@embedded.ufcg.edu.br>");
107 
108   GST_DEBUG_CATEGORY_INIT (rtpasfpay_debug, "rtpasfpay", 0,
109       "ASF RTP Payloader");
110 }
111 
112 static gboolean
gst_rtp_asf_pay_set_caps(GstRTPBasePayload * rtppay,GstCaps * caps)113 gst_rtp_asf_pay_set_caps (GstRTPBasePayload * rtppay, GstCaps * caps)
114 {
115   /* FIXME change application for the actual content */
116   gst_rtp_base_payload_set_options (rtppay, "application", TRUE, "X-ASF-PF",
117       1000);
118   return TRUE;
119 }
120 
121 static GstFlowReturn
gst_rtp_asf_pay_handle_packet(GstRtpAsfPay * rtpasfpay,GstBuffer * buffer)122 gst_rtp_asf_pay_handle_packet (GstRtpAsfPay * rtpasfpay, GstBuffer * buffer)
123 {
124   GstRTPBasePayload *rtppay;
125   GstAsfPacketInfo *packetinfo;
126   guint8 flags;
127   guint8 *data;
128   guint32 packet_util_size;
129   guint32 packet_offset;
130   guint32 size_left;
131   GstFlowReturn ret = GST_FLOW_OK;
132 
133   rtppay = GST_RTP_BASE_PAYLOAD (rtpasfpay);
134   packetinfo = &rtpasfpay->packetinfo;
135 
136   if (!gst_asf_parse_packet (buffer, packetinfo, TRUE,
137           rtpasfpay->asfinfo.packet_size)) {
138     GST_ERROR_OBJECT (rtpasfpay, "Error while parsing asf packet");
139     gst_buffer_unref (buffer);
140     return GST_FLOW_ERROR;
141   }
142 
143   if (packetinfo->packet_size == 0)
144     packetinfo->packet_size = rtpasfpay->asfinfo.packet_size;
145 
146   GST_LOG_OBJECT (rtpasfpay, "Packet size: %" G_GUINT32_FORMAT
147       ", padding: %" G_GUINT32_FORMAT, packetinfo->packet_size,
148       packetinfo->padding);
149 
150   /* update padding field to 0 */
151   if (packetinfo->padding > 0) {
152     GstAsfPacketInfo info;
153     /* find padding field offset */
154     guint offset = packetinfo->err_cor_len + 2 +
155         gst_asf_get_var_size_field_len (packetinfo->packet_field_type) +
156         gst_asf_get_var_size_field_len (packetinfo->seq_field_type);
157     buffer = gst_buffer_make_writable (buffer);
158     switch (packetinfo->padd_field_type) {
159       case ASF_FIELD_TYPE_DWORD:
160         gst_buffer_memset (buffer, offset, 0, 4);
161         break;
162       case ASF_FIELD_TYPE_WORD:
163         gst_buffer_memset (buffer, offset, 0, 2);
164         break;
165       case ASF_FIELD_TYPE_BYTE:
166         gst_buffer_memset (buffer, offset, 0, 1);
167         break;
168       case ASF_FIELD_TYPE_NONE:
169       default:
170         break;
171     }
172     gst_asf_parse_packet (buffer, &info, FALSE, 0);
173   }
174 
175   if (packetinfo->padding != 0)
176     packet_util_size = rtpasfpay->asfinfo.packet_size - packetinfo->padding;
177   else
178     packet_util_size = packetinfo->packet_size;
179   packet_offset = 0;
180   while (packet_util_size > 0) {
181     /* Even if we don't fill completely an output buffer we
182      * push it when we add an fragment. Because it seems that
183      * it is not possible to determine where a asf packet
184      * fragment ends inside a rtp packet payload.
185      * This flag tells us to push the packet.
186      */
187     gboolean force_push = FALSE;
188     GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
189 
190     /* we have no output buffer pending, create one */
191     if (rtpasfpay->current == NULL) {
192       GST_LOG_OBJECT (rtpasfpay, "Creating new output buffer");
193       rtpasfpay->current =
194           gst_rtp_buffer_new_allocate_len (GST_RTP_BASE_PAYLOAD_MTU (rtpasfpay),
195           0, 0);
196       rtpasfpay->cur_off = 0;
197       rtpasfpay->has_ts = FALSE;
198       rtpasfpay->marker = FALSE;
199     }
200     gst_rtp_buffer_map (rtpasfpay->current, GST_MAP_READWRITE, &rtp);
201     data = gst_rtp_buffer_get_payload (&rtp);
202     data += rtpasfpay->cur_off;
203     size_left = gst_rtp_buffer_get_payload_len (&rtp) - rtpasfpay->cur_off;
204 
205     GST_DEBUG_OBJECT (rtpasfpay, "Input buffer bytes consumed: %"
206         G_GUINT32_FORMAT "/%" G_GSIZE_FORMAT, packet_offset,
207         gst_buffer_get_size (buffer));
208 
209     GST_DEBUG_OBJECT (rtpasfpay, "Output rtpbuffer status");
210     GST_DEBUG_OBJECT (rtpasfpay, "Current offset: %" G_GUINT32_FORMAT,
211         rtpasfpay->cur_off);
212     GST_DEBUG_OBJECT (rtpasfpay, "Size left: %" G_GUINT32_FORMAT, size_left);
213     GST_DEBUG_OBJECT (rtpasfpay, "Has ts: %s",
214         rtpasfpay->has_ts ? "yes" : "no");
215     if (rtpasfpay->has_ts) {
216       GST_DEBUG_OBJECT (rtpasfpay, "Ts: %" G_GUINT32_FORMAT, rtpasfpay->ts);
217     }
218 
219     flags = 0;
220     if (packetinfo->has_keyframe) {
221       flags = flags | 0x80;
222     }
223     flags = flags | 0x20;       /* Relative timestamp is present */
224 
225     if (!rtpasfpay->has_ts) {
226       /* this is the first asf packet, its send time is the
227        * rtp packet timestamp */
228       rtpasfpay->has_ts = TRUE;
229       rtpasfpay->ts = packetinfo->send_time;
230     }
231 
232     if (size_left >= packet_util_size + 8) {
233       /* enough space for the rest of the packet */
234       if (packet_offset == 0) {
235         flags = flags | 0x40;
236         GST_WRITE_UINT24_BE (data + 1, packet_util_size);
237       } else {
238         GST_WRITE_UINT24_BE (data + 1, packet_offset);
239         force_push = TRUE;
240       }
241       data[0] = flags;
242       GST_WRITE_UINT32_BE (data + 4,
243           (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
244       gst_buffer_extract (buffer, packet_offset, data + 8, packet_util_size);
245 
246       /* updating status variables */
247       rtpasfpay->cur_off += 8 + packet_util_size;
248       size_left -= packet_util_size + 8;
249       packet_offset += packet_util_size;
250       packet_util_size = 0;
251       rtpasfpay->marker = TRUE;
252     } else {
253       /* fragment packet */
254       data[0] = flags;
255       GST_WRITE_UINT24_BE (data + 1, packet_offset);
256       GST_WRITE_UINT32_BE (data + 4,
257           (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts);
258       gst_buffer_extract (buffer, packet_offset, data + 8, size_left - 8);
259 
260       /* updating status variables */
261       rtpasfpay->cur_off += size_left;
262       packet_offset += size_left - 8;
263       packet_util_size -= size_left - 8;
264       size_left = 0;
265       force_push = TRUE;
266     }
267 
268     /* there is not enough room for any more buffers */
269     if (force_push || size_left <= 8) {
270 
271       gst_rtp_buffer_set_ssrc (&rtp, rtppay->current_ssrc);
272       gst_rtp_buffer_set_marker (&rtp, rtpasfpay->marker);
273       gst_rtp_buffer_set_payload_type (&rtp, GST_RTP_BASE_PAYLOAD_PT (rtppay));
274       gst_rtp_buffer_set_seq (&rtp, rtppay->seqnum + 1);
275       gst_rtp_buffer_set_timestamp (&rtp, packetinfo->send_time);
276       gst_rtp_buffer_unmap (&rtp);
277 
278       /* trim remaining bytes not used */
279       if (size_left != 0) {
280         gst_buffer_set_size (rtpasfpay->current,
281             gst_buffer_get_size (rtpasfpay->current) - size_left);
282       }
283 
284       GST_BUFFER_TIMESTAMP (rtpasfpay->current) = GST_BUFFER_TIMESTAMP (buffer);
285 
286       rtppay->seqnum++;
287       rtppay->timestamp = packetinfo->send_time;
288 
289       GST_DEBUG_OBJECT (rtpasfpay, "Pushing rtp buffer");
290       ret = gst_rtp_base_payload_push (rtppay, rtpasfpay->current);
291       rtpasfpay->current = NULL;
292       if (ret != GST_FLOW_OK) {
293         gst_buffer_unref (buffer);
294         return ret;
295       }
296     }
297   }
298   gst_buffer_unref (buffer);
299   return ret;
300 }
301 
302 static GstFlowReturn
gst_rtp_asf_pay_parse_headers(GstRtpAsfPay * rtpasfpay)303 gst_rtp_asf_pay_parse_headers (GstRtpAsfPay * rtpasfpay)
304 {
305   gchar *maxps;
306   GstMapInfo map;
307 
308   g_return_val_if_fail (rtpasfpay->headers, GST_FLOW_ERROR);
309 
310   if (!gst_asf_parse_headers (rtpasfpay->headers, &rtpasfpay->asfinfo))
311     goto error;
312 
313   GST_DEBUG_OBJECT (rtpasfpay, "Packets number: %" G_GUINT64_FORMAT,
314       rtpasfpay->asfinfo.packets_count);
315   GST_DEBUG_OBJECT (rtpasfpay, "Packets size: %" G_GUINT32_FORMAT,
316       rtpasfpay->asfinfo.packet_size);
317   GST_DEBUG_OBJECT (rtpasfpay, "Broadcast mode: %s",
318       rtpasfpay->asfinfo.broadcast ? "true" : "false");
319 
320   /* get the config for caps */
321   g_free (rtpasfpay->config);
322   gst_buffer_map (rtpasfpay->headers, &map, GST_MAP_READ);
323   rtpasfpay->config = g_base64_encode (map.data, map.size);
324   gst_buffer_unmap (rtpasfpay->headers, &map);
325   GST_DEBUG_OBJECT (rtpasfpay, "Serialized headers to base64 string %s",
326       rtpasfpay->config);
327 
328   g_assert (rtpasfpay->config != NULL);
329   GST_DEBUG_OBJECT (rtpasfpay, "Setting optional caps values: maxps=%"
330       G_GUINT32_FORMAT " and config=%s", rtpasfpay->asfinfo.packet_size,
331       rtpasfpay->config);
332   maxps =
333       g_strdup_printf ("%" G_GUINT32_FORMAT, rtpasfpay->asfinfo.packet_size);
334   gst_rtp_base_payload_set_outcaps (GST_RTP_BASE_PAYLOAD (rtpasfpay), "maxps",
335       G_TYPE_STRING, maxps, "config", G_TYPE_STRING, rtpasfpay->config, NULL);
336   g_free (maxps);
337 
338   return GST_FLOW_OK;
339 
340 error:
341   {
342     GST_ELEMENT_ERROR (rtpasfpay, STREAM, DECODE, (NULL),
343         ("Error parsing headers"));
344     return GST_FLOW_ERROR;
345   }
346 }
347 
348 static GstFlowReturn
gst_rtp_asf_pay_handle_buffer(GstRTPBasePayload * rtppay,GstBuffer * buffer)349 gst_rtp_asf_pay_handle_buffer (GstRTPBasePayload * rtppay, GstBuffer * buffer)
350 {
351   GstRtpAsfPay *rtpasfpay = GST_RTP_ASF_PAY_CAST (rtppay);
352 
353   if (G_UNLIKELY (rtpasfpay->state == ASF_END)) {
354     GST_LOG_OBJECT (rtpasfpay,
355         "Dropping buffer as we already pushed all packets");
356     gst_buffer_unref (buffer);
357     return GST_FLOW_EOS;        /* we already finished our job */
358   }
359 
360   /* receive headers
361    * we only accept if they are in a single buffer */
362   if (G_UNLIKELY (rtpasfpay->state == ASF_NOT_STARTED)) {
363     guint64 header_size;
364 
365     if (gst_buffer_get_size (buffer) < 24) {    /* guid+object size size */
366       GST_ERROR_OBJECT (rtpasfpay,
367           "Buffer too small, smaller than a Guid and object size");
368       gst_buffer_unref (buffer);
369       return GST_FLOW_ERROR;
370     }
371 
372     header_size = gst_asf_match_and_peek_obj_size_buf (buffer,
373         &(guids[ASF_HEADER_OBJECT_INDEX]));
374     if (header_size > 0) {
375       GST_DEBUG_OBJECT (rtpasfpay, "ASF header guid received, size %"
376           G_GUINT64_FORMAT, header_size);
377 
378       if (gst_buffer_get_size (buffer) < header_size) {
379         GST_ERROR_OBJECT (rtpasfpay, "Headers should be contained in a single"
380             " buffer");
381         gst_buffer_unref (buffer);
382         return GST_FLOW_ERROR;
383       } else {
384         rtpasfpay->state = ASF_DATA_OBJECT;
385 
386         /* clear previous headers, if any */
387         if (rtpasfpay->headers) {
388           gst_buffer_unref (rtpasfpay->headers);
389         }
390 
391         GST_DEBUG_OBJECT (rtpasfpay, "Storing headers");
392         if (gst_buffer_get_size (buffer) == header_size) {
393           rtpasfpay->headers = buffer;
394           return GST_FLOW_OK;
395         } else {
396           /* headers are a subbuffer of thie buffer */
397           GstBuffer *aux = gst_buffer_copy_region (buffer,
398               GST_BUFFER_COPY_ALL, header_size,
399               gst_buffer_get_size (buffer) - header_size);
400           rtpasfpay->headers = gst_buffer_copy_region (buffer,
401               GST_BUFFER_COPY_ALL, 0, header_size);
402           gst_buffer_replace (&buffer, aux);
403         }
404       }
405     } else {
406       GST_ERROR_OBJECT (rtpasfpay, "Missing ASF header start");
407       gst_buffer_unref (buffer);
408       return GST_FLOW_ERROR;
409     }
410   }
411 
412   if (G_UNLIKELY (rtpasfpay->state == ASF_DATA_OBJECT)) {
413     GstMapInfo map;
414 
415     if (gst_buffer_get_size (buffer) != ASF_DATA_OBJECT_SIZE) {
416       GST_ERROR_OBJECT (rtpasfpay, "Received buffer of different size of "
417           "the data object header");
418       gst_buffer_unref (buffer);
419       return GST_FLOW_ERROR;
420     }
421 
422     gst_buffer_map (buffer, &map, GST_MAP_READ);
423     if (gst_asf_match_guid (map.data, &(guids[ASF_DATA_OBJECT_INDEX]))) {
424       gst_buffer_unmap (buffer, &map);
425       GST_DEBUG_OBJECT (rtpasfpay, "Received data object header");
426       rtpasfpay->headers = gst_buffer_append (rtpasfpay->headers, buffer);
427       rtpasfpay->state = ASF_PACKETS;
428 
429       return gst_rtp_asf_pay_parse_headers (rtpasfpay);
430     } else {
431       gst_buffer_unmap (buffer, &map);
432       GST_ERROR_OBJECT (rtpasfpay, "Unexpected object received (was expecting "
433           "data object)");
434       gst_buffer_unref (buffer);
435       return GST_FLOW_ERROR;
436     }
437   }
438 
439   if (G_LIKELY (rtpasfpay->state == ASF_PACKETS)) {
440     /* in broadcast mode we can't trust the packets count information
441      * from the headers
442      * We assume that if this is on broadcast mode it is a live stream
443      * and we are going to keep receiving packets indefinitely
444      */
445     if (rtpasfpay->asfinfo.broadcast ||
446         rtpasfpay->packets_count < rtpasfpay->asfinfo.packets_count) {
447       GST_DEBUG_OBJECT (rtpasfpay, "Received packet %"
448           G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
449           rtpasfpay->packets_count, rtpasfpay->asfinfo.packets_count);
450       rtpasfpay->packets_count++;
451       return gst_rtp_asf_pay_handle_packet (rtpasfpay, buffer);
452     } else {
453       GST_INFO_OBJECT (rtpasfpay, "Packets ended");
454       rtpasfpay->state = ASF_END;
455       gst_buffer_unref (buffer);
456       return GST_FLOW_EOS;
457     }
458   }
459 
460   gst_buffer_unref (buffer);
461   return GST_FLOW_OK;
462 }
463 
464 gboolean
gst_rtp_asf_pay_plugin_init(GstPlugin * plugin)465 gst_rtp_asf_pay_plugin_init (GstPlugin * plugin)
466 {
467   return gst_element_register (plugin, "rtpasfpay",
468       GST_RANK_NONE, GST_TYPE_RTP_ASF_PAY);
469 }
470