• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer RTMP Library
2  * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
3  *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
18  * Boston, MA 02110-1335, USA.
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include "rtmpchunkstream.h"
26 #include "rtmputils.h"
27 
28 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_stream_debug_category);
29 #define GST_CAT_DEFAULT gst_rtmp_chunk_stream_debug_category
30 
31 static void
init_debug(void)32 init_debug (void)
33 {
34   static gsize done = 0;
35   if (g_once_init_enter (&done)) {
36     GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_stream_debug_category,
37         "rtmpchunkstream", 0, "debug category for rtmp chunk streams");
38     g_once_init_leave (&done, 1);
39   }
40 }
41 
42 enum
43 {
44   CHUNK_BYTE_TWOBYTE = 0,
45   CHUNK_BYTE_THREEBYTE = 1,
46   CHUNK_BYTE_MASK = 0x3f,
47   CHUNK_STREAM_MIN_TWOBYTE = 0x40,
48   CHUNK_STREAM_MIN_THREEBYTE = 0x140,
49   CHUNK_STREAM_MAX_THREEBYTE = 0x1003f,
50 };
51 
52 typedef enum
53 {
54   CHUNK_TYPE_0 = 0,
55   CHUNK_TYPE_1 = 1,
56   CHUNK_TYPE_2 = 2,
57   CHUNK_TYPE_3 = 3,
58 } ChunkType;
59 
60 static const gsize chunk_header_sizes[4] = { 11, 7, 3, 0 };
61 
62 struct _GstRtmpChunkStream
63 {
64   GstBuffer *buffer;
65   GstRtmpMeta *meta;
66   GstMapInfo map;               /* Only used for parsing */
67   guint32 id;
68   guint32 offset;
69   guint64 bytes;
70 };
71 
72 struct _GstRtmpChunkStreams
73 {
74   GArray *array;
75 };
76 
77 static inline gboolean
chunk_stream_is_open(GstRtmpChunkStream * cstream)78 chunk_stream_is_open (GstRtmpChunkStream * cstream)
79 {
80   return cstream->map.data != NULL;
81 }
82 
83 static void
chunk_stream_take_buffer(GstRtmpChunkStream * cstream,GstBuffer * buffer)84 chunk_stream_take_buffer (GstRtmpChunkStream * cstream, GstBuffer * buffer)
85 {
86   GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
87   g_assert (meta);
88   g_assert (cstream->buffer == NULL);
89   cstream->buffer = buffer;
90   cstream->meta = meta;
91 }
92 
93 static void
chunk_stream_clear(GstRtmpChunkStream * cstream)94 chunk_stream_clear (GstRtmpChunkStream * cstream)
95 {
96   if (chunk_stream_is_open (cstream)) {
97     gst_buffer_unmap (cstream->buffer, &cstream->map);
98     cstream->map.data = NULL;
99   }
100 
101   gst_buffer_replace (&cstream->buffer, NULL);
102   cstream->meta = NULL;
103   cstream->offset = 0;
104 }
105 
106 static guint32
chunk_stream_next_size(GstRtmpChunkStream * cstream,guint32 chunk_size)107 chunk_stream_next_size (GstRtmpChunkStream * cstream, guint32 chunk_size)
108 {
109   guint32 size, offset;
110 
111   size = cstream->meta->size;
112   offset = cstream->offset;
113 
114   g_return_val_if_fail (chunk_size, 0);
115   g_return_val_if_fail (offset <= size, 0);
116   return MIN (size - offset, chunk_size);
117 }
118 
119 static inline gboolean
needs_ext_ts(GstRtmpMeta * meta)120 needs_ext_ts (GstRtmpMeta * meta)
121 {
122   return meta->ts_delta >= 0xffffff;
123 }
124 
125 
126 static guint32
dts_to_abs_ts(GstBuffer * buffer)127 dts_to_abs_ts (GstBuffer * buffer)
128 {
129   GstClockTime dts = GST_BUFFER_DTS (buffer);
130   guint32 ret = 0;
131 
132   if (GST_CLOCK_TIME_IS_VALID (dts)) {
133     ret = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
134   }
135 
136   GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " into abs TS %"
137       G_GUINT32_FORMAT " ms", GST_TIME_ARGS (dts), ret);
138   return ret;
139 }
140 
141 static gboolean
dts_diff_to_delta_ts(GstBuffer * old_buffer,GstBuffer * buffer,guint32 * out_ts)142 dts_diff_to_delta_ts (GstBuffer * old_buffer, GstBuffer * buffer,
143     guint32 * out_ts)
144 {
145   GstClockTime dts = GST_BUFFER_DTS (buffer),
146       old_dts = GST_BUFFER_DTS (old_buffer);
147   guint32 abs_ts, old_abs_ts, delta_32 = 0;
148 
149   if (!GST_CLOCK_TIME_IS_VALID (dts) || !GST_CLOCK_TIME_IS_VALID (old_dts)) {
150     GST_LOG ("Timestamps not valid; using delta TS 0");
151     goto out;
152   }
153 
154   if (ABS (GST_CLOCK_DIFF (old_dts, dts)) > GST_MSECOND * G_MAXINT32) {
155     GST_WARNING ("Timestamp delta too large: %" GST_TIME_FORMAT " -> %"
156         GST_TIME_FORMAT, GST_TIME_ARGS (old_dts), GST_TIME_ARGS (dts));
157     return FALSE;
158   }
159 
160   abs_ts = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
161   old_abs_ts = gst_util_uint64_scale_round (old_dts, 1, GST_MSECOND);
162 
163   /* underflow wraps around */
164   delta_32 = abs_ts - old_abs_ts;
165 
166   GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT
167       " ms) -> %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT " ms) into delta TS %"
168       G_GUINT32_FORMAT " ms", GST_TIME_ARGS (old_dts), old_abs_ts,
169       GST_TIME_ARGS (dts), abs_ts, delta_32);
170 
171 out:
172   *out_ts = delta_32;
173   return TRUE;
174 }
175 
176 static ChunkType
select_chunk_type(GstRtmpChunkStream * cstream,GstBuffer * buffer)177 select_chunk_type (GstRtmpChunkStream * cstream, GstBuffer * buffer)
178 {
179   GstBuffer *old_buffer = cstream->buffer;
180   GstRtmpMeta *meta, *old_meta;
181 
182   g_return_val_if_fail (buffer, -1);
183 
184   meta = gst_buffer_get_rtmp_meta (buffer);
185 
186   g_return_val_if_fail (meta, -1);
187   g_return_val_if_fail (gst_rtmp_message_type_is_valid (meta->type), -1);
188 
189   meta->size = gst_buffer_get_size (buffer);
190   meta->cstream = cstream->id;
191 
192   g_return_val_if_fail (meta->size <= GST_RTMP_MAXIMUM_MESSAGE_SIZE, -1);
193 
194   if (!old_buffer) {
195     GST_TRACE ("Picking header 0: no previous header");
196     meta->ts_delta = dts_to_abs_ts (buffer);
197     return CHUNK_TYPE_0;
198   }
199 
200   old_meta = gst_buffer_get_rtmp_meta (old_buffer);
201   g_return_val_if_fail (old_meta, -1);
202 
203   if (old_meta->mstream != meta->mstream) {
204     GST_TRACE ("Picking header 0: stream mismatch; "
205         "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
206         old_meta->mstream, meta->mstream);
207     meta->ts_delta = dts_to_abs_ts (buffer);
208     return CHUNK_TYPE_0;
209   }
210 
211   if (!dts_diff_to_delta_ts (old_buffer, buffer, &meta->ts_delta)) {
212     GST_TRACE ("Picking header 0: timestamp delta overflow");
213     meta->ts_delta = dts_to_abs_ts (buffer);
214     return CHUNK_TYPE_0;
215   }
216 
217   /* now at least type 1 */
218 
219   if (old_meta->type != meta->type) {
220     GST_TRACE ("Picking header 1: type mismatch; want %d got %d",
221         old_meta->type, meta->type);
222     return CHUNK_TYPE_1;
223   }
224 
225   if (old_meta->size != meta->size) {
226     GST_TRACE ("Picking header 1: size mismatch; "
227         "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
228         old_meta->size, meta->size);
229     return CHUNK_TYPE_1;
230   }
231 
232   /* now at least type 2 */
233 
234   if (old_meta->ts_delta != meta->ts_delta) {
235     GST_TRACE ("Picking header 2: timestamp delta mismatch; "
236         "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
237         old_meta->ts_delta, meta->ts_delta);
238     return CHUNK_TYPE_2;
239   }
240 
241   /* now at least type 3 */
242 
243   GST_TRACE ("Picking header 3");
244   return CHUNK_TYPE_3;
245 }
246 
247 static GstBuffer *
serialize_next(GstRtmpChunkStream * cstream,guint32 chunk_size,ChunkType type)248 serialize_next (GstRtmpChunkStream * cstream, guint32 chunk_size,
249     ChunkType type)
250 {
251   GstRtmpMeta *meta = cstream->meta;
252   guint8 small_stream_id;
253   gsize header_size = chunk_header_sizes[type], offset;
254   gboolean ext_ts;
255   GstBuffer *ret;
256   GstMapInfo map;
257 
258   GST_TRACE ("Serializing a chunk of type %d, offset %" G_GUINT32_FORMAT,
259       type, cstream->offset);
260 
261   if (cstream->id < CHUNK_STREAM_MIN_TWOBYTE) {
262     small_stream_id = cstream->id;
263     header_size += 1;
264   } else if (cstream->id < CHUNK_STREAM_MIN_THREEBYTE) {
265     small_stream_id = CHUNK_BYTE_TWOBYTE;
266     header_size += 2;
267   } else {
268     small_stream_id = CHUNK_BYTE_THREEBYTE;
269     header_size += 3;
270   }
271 
272   ext_ts = needs_ext_ts (meta);
273   if (ext_ts) {
274     header_size += 4;
275   }
276 
277   GST_TRACE ("Allocating buffer, header size %" G_GSIZE_FORMAT, header_size);
278 
279   ret = gst_buffer_new_allocate (NULL, header_size, NULL);
280   if (!ret) {
281     GST_ERROR ("Failed to allocate chunk buffer");
282     return NULL;
283   }
284 
285   if (!gst_buffer_map (ret, &map, GST_MAP_WRITE)) {
286     GST_ERROR ("Failed to map %" GST_PTR_FORMAT, ret);
287     gst_buffer_unref (ret);
288     return NULL;
289   }
290 
291   /* Chunk Basic Header */
292   GST_WRITE_UINT8 (map.data, (type << 6) | small_stream_id);
293   offset = 1;
294 
295   switch (small_stream_id) {
296     case CHUNK_BYTE_TWOBYTE:
297       GST_WRITE_UINT8 (map.data + 1, cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
298       offset += 1;
299       break;
300 
301     case CHUNK_BYTE_THREEBYTE:
302       GST_WRITE_UINT16_LE (map.data + 1,
303           cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
304       offset += 2;
305       break;
306   }
307 
308   switch (type) {
309     case CHUNK_TYPE_0:
310       /* SRSLY:  "Message stream ID is stored in little-endian format." */
311       GST_WRITE_UINT32_LE (map.data + offset + 7, meta->mstream);
312       /* no break */
313     case CHUNK_TYPE_1:
314       GST_WRITE_UINT24_BE (map.data + offset + 3, meta->size);
315       GST_WRITE_UINT8 (map.data + offset + 6, meta->type);
316       /* no break */
317     case CHUNK_TYPE_2:
318       GST_WRITE_UINT24_BE (map.data + offset,
319           ext_ts ? 0xffffff : meta->ts_delta);
320       /* no break */
321     case CHUNK_TYPE_3:
322       offset += chunk_header_sizes[type];
323 
324       if (ext_ts) {
325         GST_WRITE_UINT32_BE (map.data + offset, meta->ts_delta);
326         offset += 4;
327       }
328   }
329 
330   g_assert (offset == header_size);
331   GST_MEMDUMP (">>> chunk header", map.data, offset);
332 
333   gst_buffer_unmap (ret, &map);
334 
335   GST_BUFFER_OFFSET (ret) = GST_BUFFER_OFFSET_IS_VALID (cstream->buffer) ?
336       GST_BUFFER_OFFSET (cstream->buffer) + cstream->offset : cstream->bytes;
337   GST_BUFFER_OFFSET_END (ret) = GST_BUFFER_OFFSET (ret);
338 
339   if (meta->size > 0) {
340     guint32 payload_size = chunk_stream_next_size (cstream, chunk_size);
341 
342     GST_TRACE ("Appending %" G_GUINT32_FORMAT " bytes of payload",
343         payload_size);
344 
345     gst_buffer_copy_into (ret, cstream->buffer, GST_BUFFER_COPY_MEMORY,
346         cstream->offset, payload_size);
347 
348     GST_BUFFER_OFFSET_END (ret) += payload_size;
349     cstream->offset += payload_size;
350     cstream->bytes += payload_size;
351   } else {
352     GST_TRACE ("Chunk has no payload");
353   }
354 
355   gst_rtmp_buffer_dump (ret, ">>> chunk");
356 
357   return ret;
358 }
359 
360 void
gst_rtmp_chunk_stream_clear(GstRtmpChunkStream * cstream)361 gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream)
362 {
363   g_return_if_fail (cstream);
364   GST_LOG ("Clearing chunk stream %" G_GUINT32_FORMAT, cstream->id);
365   chunk_stream_clear (cstream);
366 }
367 
368 guint32
gst_rtmp_chunk_stream_parse_id(const guint8 * data,gsize size)369 gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size)
370 {
371   guint32 ret;
372 
373   if (size < 1) {
374     GST_TRACE ("Not enough bytes to read ID");
375     return 0;
376   }
377 
378   ret = GST_READ_UINT8 (data) & CHUNK_BYTE_MASK;
379 
380   switch (ret) {
381     case CHUNK_BYTE_TWOBYTE:
382       if (size < 2) {
383         GST_TRACE ("Not enough bytes to read two-byte ID");
384         return 0;
385       }
386 
387       ret = GST_READ_UINT8 (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
388       break;
389 
390     case CHUNK_BYTE_THREEBYTE:
391       if (size < 3) {
392         GST_TRACE ("Not enough bytes to read three-byte ID");
393         return 0;
394       }
395 
396       ret = GST_READ_UINT16_LE (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
397       break;
398   }
399 
400   GST_TRACE ("Parsed chunk stream ID %" G_GUINT32_FORMAT, ret);
401   return ret;
402 }
403 
404 guint32
gst_rtmp_chunk_stream_parse_header(GstRtmpChunkStream * cstream,const guint8 * data,gsize size)405 gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream,
406     const guint8 * data, gsize size)
407 {
408   GstBuffer *buffer;
409   GstRtmpMeta *meta;
410   const guint8 *message_header;
411   guint32 header_size;
412   ChunkType type;
413   gboolean has_abs_timestamp = FALSE;
414 
415   g_return_val_if_fail (cstream, 0);
416   g_return_val_if_fail (cstream->id == gst_rtmp_chunk_stream_parse_id (data,
417           size), 0);
418 
419   type = GST_READ_UINT8 (data) >> 6;
420   GST_TRACE ("Parsing chunk stream %" G_GUINT32_FORMAT " header type %d",
421       cstream->id, type);
422 
423   switch (GST_READ_UINT8 (data) & CHUNK_BYTE_MASK) {
424     case CHUNK_BYTE_TWOBYTE:
425       header_size = 2;
426       break;
427     case CHUNK_BYTE_THREEBYTE:
428       header_size = 3;
429       break;
430     default:
431       header_size = 1;
432       break;
433   }
434 
435   message_header = data + header_size;
436   header_size += chunk_header_sizes[type];
437 
438   if (cstream->buffer) {
439     buffer = cstream->buffer;
440     meta = cstream->meta;
441     g_assert (meta->cstream == cstream->id);
442   } else {
443     buffer = gst_buffer_new ();
444     GST_BUFFER_DTS (buffer) = 0;
445     GST_BUFFER_OFFSET (buffer) = cstream->bytes;
446     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
447 
448     meta = gst_buffer_add_rtmp_meta (buffer);
449     meta->cstream = cstream->id;
450 
451     chunk_stream_take_buffer (cstream, buffer);
452     GST_DEBUG ("Starting parse with new %" GST_PTR_FORMAT, buffer);
453   }
454 
455   if (size < header_size) {
456     GST_TRACE ("not enough bytes to read header");
457     return header_size;
458   }
459 
460   switch (type) {
461     case CHUNK_TYPE_0:
462       has_abs_timestamp = TRUE;
463       /* SRSLY:  "Message stream ID is stored in little-endian format." */
464       meta->mstream = GST_READ_UINT32_LE (message_header + 7);
465       /* no break */
466     case CHUNK_TYPE_1:
467       meta->type = GST_READ_UINT8 (message_header + 6);
468       meta->size = GST_READ_UINT24_BE (message_header + 3);
469       /* no break */
470     case CHUNK_TYPE_2:
471       meta->ts_delta = GST_READ_UINT24_BE (message_header);
472       /* no break */
473     case CHUNK_TYPE_3:
474       if (needs_ext_ts (meta)) {
475         guint32 timestamp;
476 
477         if (size < header_size + 4) {
478           GST_TRACE ("not enough bytes to read extended timestamp");
479           return header_size + 4;
480         }
481 
482         GST_TRACE ("Reading extended timestamp");
483         timestamp = GST_READ_UINT32_BE (data + header_size);
484 
485         if (type == 3 && meta->ts_delta != timestamp) {
486           GST_WARNING ("Type 3 extended timestamp does not match expected"
487               " timestamp (want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT
488               "); assuming it's not present", meta->ts_delta, timestamp);
489         } else {
490           meta->ts_delta = timestamp;
491           header_size += 4;
492         }
493       }
494   }
495 
496   GST_MEMDUMP ("<<< chunk header", data, header_size);
497 
498   if (!chunk_stream_is_open (cstream)) {
499     GstClockTime dts = GST_BUFFER_DTS (buffer);
500     guint32 delta_32, abs_32;
501     gint64 delta_64;
502 
503     if (has_abs_timestamp) {
504       abs_32 = meta->ts_delta;
505       delta_32 = abs_32 - dts / GST_MSECOND;
506     } else {
507       delta_32 = meta->ts_delta;
508       abs_32 = delta_32 + dts / GST_MSECOND;
509     }
510 
511     GST_TRACE ("Timestamp delta is %" G_GUINT32_FORMAT " (absolute %"
512         G_GUINT32_FORMAT ")", delta_32, abs_32);
513 
514     /* emulate signed overflow */
515     delta_64 = delta_32;
516     if (delta_64 > G_MAXINT32) {
517       delta_64 -= G_MAXUINT32;
518       delta_64 -= 1;
519     }
520 
521     delta_64 *= GST_MSECOND;
522 
523     if (G_LIKELY (delta_64 >= 0)) {
524       /* Normal advancement */
525     } else if (G_LIKELY ((guint64) (-delta_64) <= dts)) {
526       /* In-bounds regression */
527       GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT,
528           GST_STIME_ARGS (delta_64));
529     } else {
530       /* Out-of-bounds regression */
531       GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT ", offsetting",
532           GST_STIME_ARGS (delta_64));
533       delta_64 = delta_32 * GST_MSECOND;
534     }
535 
536     GST_BUFFER_DTS (buffer) += delta_64;
537 
538     GST_TRACE ("Adjusted buffer DTS (%" GST_TIME_FORMAT ") by %"
539         GST_STIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (dts),
540         GST_STIME_ARGS (delta_64), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
541   } else {
542     GST_TRACE ("Message payload already started; not touching timestamp");
543   }
544 
545   return header_size;
546 }
547 
548 guint32
gst_rtmp_chunk_stream_parse_payload(GstRtmpChunkStream * cstream,guint32 chunk_size,guint8 ** data)549 gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream,
550     guint32 chunk_size, guint8 ** data)
551 {
552   GstMemory *mem;
553 
554   g_return_val_if_fail (cstream, 0);
555   g_return_val_if_fail (cstream->buffer, 0);
556 
557   if (!chunk_stream_is_open (cstream)) {
558     guint32 size = cstream->meta->size;
559 
560     GST_TRACE ("Allocating buffer, payload size %" G_GUINT32_FORMAT, size);
561 
562     mem = gst_allocator_alloc (NULL, size, 0);
563     if (!mem) {
564       GST_ERROR ("Failed to allocate buffer for payload size %"
565           G_GUINT32_FORMAT, size);
566       return 0;
567     }
568 
569     gst_buffer_append_memory (cstream->buffer, mem);
570     gst_buffer_map (cstream->buffer, &cstream->map, GST_MAP_WRITE);
571   }
572 
573   g_return_val_if_fail (cstream->map.size == cstream->meta->size, 0);
574 
575   if (data) {
576     *data = cstream->map.data + cstream->offset;
577   }
578 
579   return chunk_stream_next_size (cstream, chunk_size);
580 }
581 
582 guint32
gst_rtmp_chunk_stream_wrote_payload(GstRtmpChunkStream * cstream,guint32 chunk_size)583 gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream,
584     guint32 chunk_size)
585 {
586   guint32 size;
587 
588   g_return_val_if_fail (cstream, FALSE);
589   g_return_val_if_fail (chunk_stream_is_open (cstream), FALSE);
590 
591   size = chunk_stream_next_size (cstream, chunk_size);
592   cstream->offset += size;
593   cstream->bytes += size;
594 
595   return chunk_stream_next_size (cstream, chunk_size);
596 }
597 
598 GstBuffer *
gst_rtmp_chunk_stream_parse_finish(GstRtmpChunkStream * cstream)599 gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream)
600 {
601   GstBuffer *buffer, *empty;
602 
603   g_return_val_if_fail (cstream, NULL);
604   g_return_val_if_fail (cstream->buffer, NULL);
605 
606   buffer = gst_buffer_ref (cstream->buffer);
607   GST_BUFFER_OFFSET_END (buffer) = cstream->bytes;
608 
609   gst_rtmp_buffer_dump (buffer, "<<< message");
610 
611   chunk_stream_clear (cstream);
612 
613   empty = gst_buffer_new ();
614 
615   if (!gst_buffer_copy_into (empty, buffer, GST_BUFFER_COPY_META, 0, 0)) {
616     GST_ERROR ("copy_into failed");
617     return NULL;
618   }
619 
620   GST_BUFFER_DTS (empty) = GST_BUFFER_DTS (buffer);
621   GST_BUFFER_OFFSET (empty) = GST_BUFFER_OFFSET_END (buffer);
622 
623   chunk_stream_take_buffer (cstream, empty);
624 
625   return buffer;
626 }
627 
628 GstBuffer *
gst_rtmp_chunk_stream_serialize_start(GstRtmpChunkStream * cstream,GstBuffer * buffer,guint32 chunk_size)629 gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
630     GstBuffer * buffer, guint32 chunk_size)
631 {
632   ChunkType type;
633 
634   g_return_val_if_fail (cstream, NULL);
635   g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL);
636 
637   type = select_chunk_type (cstream, buffer);
638   g_return_val_if_fail (type >= 0, NULL);
639 
640   GST_TRACE ("Starting serialization of message %" GST_PTR_FORMAT
641       " into stream %" G_GUINT32_FORMAT, buffer, cstream->id);
642 
643   gst_rtmp_buffer_dump (buffer, ">>> message");
644 
645   chunk_stream_clear (cstream);
646   chunk_stream_take_buffer (cstream, gst_buffer_ref (buffer));
647 
648   return serialize_next (cstream, chunk_size, type);
649 }
650 
651 GstBuffer *
gst_rtmp_chunk_stream_serialize_next(GstRtmpChunkStream * cstream,guint32 chunk_size)652 gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream,
653     guint32 chunk_size)
654 {
655   g_return_val_if_fail (cstream, NULL);
656   g_return_val_if_fail (cstream->buffer, NULL);
657 
658   if (chunk_stream_next_size (cstream, chunk_size) == 0) {
659     GST_TRACE ("Message serialization finished");
660     return NULL;
661   }
662 
663   GST_TRACE ("Continuing serialization of message %" GST_PTR_FORMAT
664       " into stream %" G_GUINT32_FORMAT, cstream->buffer, cstream->id);
665 
666   return serialize_next (cstream, chunk_size, CHUNK_TYPE_3);
667 }
668 
669 GstBuffer *
gst_rtmp_chunk_stream_serialize_all(GstRtmpChunkStream * cstream,GstBuffer * buffer,guint32 chunk_size)670 gst_rtmp_chunk_stream_serialize_all (GstRtmpChunkStream * cstream,
671     GstBuffer * buffer, guint32 chunk_size)
672 {
673   GstBuffer *outbuf, *nextbuf;
674 
675   outbuf = gst_rtmp_chunk_stream_serialize_start (cstream, buffer, chunk_size);
676   nextbuf = gst_rtmp_chunk_stream_serialize_next (cstream, chunk_size);
677 
678   while (nextbuf) {
679     outbuf = gst_buffer_append (outbuf, nextbuf);
680     nextbuf = gst_rtmp_chunk_stream_serialize_next (cstream, chunk_size);
681   }
682 
683   return outbuf;
684 }
685 
686 GstRtmpChunkStreams *
gst_rtmp_chunk_streams_new(void)687 gst_rtmp_chunk_streams_new (void)
688 {
689   GstRtmpChunkStreams *cstreams;
690 
691   init_debug ();
692 
693   cstreams = g_slice_new (GstRtmpChunkStreams);
694   cstreams->array = g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkStream));
695   g_array_set_clear_func (cstreams->array,
696       (GDestroyNotify) gst_rtmp_chunk_stream_clear);
697   return cstreams;
698 }
699 
700 void
gst_rtmp_chunk_streams_free(gpointer ptr)701 gst_rtmp_chunk_streams_free (gpointer ptr)
702 {
703   GstRtmpChunkStreams *cstreams = ptr;
704   g_clear_pointer (&cstreams->array, g_array_unref);
705   g_slice_free (GstRtmpChunkStreams, cstreams);
706 }
707 
708 GstRtmpChunkStream *
gst_rtmp_chunk_streams_get(GstRtmpChunkStreams * cstreams,guint32 id)709 gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id)
710 {
711   GArray *array;
712   GstRtmpChunkStream *entry;
713   guint i;
714 
715   g_return_val_if_fail (cstreams, NULL);
716   g_return_val_if_fail (id > CHUNK_BYTE_THREEBYTE, NULL);
717   g_return_val_if_fail (id <= CHUNK_STREAM_MAX_THREEBYTE, NULL);
718 
719   array = cstreams->array;
720 
721   for (i = 0; i < array->len; i++) {
722     entry = &g_array_index (array, GstRtmpChunkStream, i);
723     if (entry->id == id) {
724       GST_TRACE ("Obtaining chunk stream %" G_GUINT32_FORMAT, id);
725       return entry;
726     }
727   }
728 
729   GST_DEBUG ("Allocating chunk stream %" G_GUINT32_FORMAT, id);
730 
731   g_array_set_size (array, i + 1);
732   entry = &g_array_index (array, GstRtmpChunkStream, i);
733   entry->id = id;
734   return entry;
735 }
736