• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /***
2   This file is part of PulseAudio.
3 
4   Copyright 2016 Arun Raghavan <mail@arunraghavan.net>
5 
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2.1 of the License,
9   or (at your option) any later version.
10 
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15 
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19 
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 
24 #include <pulse/timeval.h>
25 #include <pulsecore/fdsem.h>
26 #include <pulsecore/core-rtclock.h>
27 
28 #include "rtp.h"
29 
30 #include <gio/gio.h>
31 
32 #include <gst/gst.h>
33 #include <gst/app/gstappsrc.h>
34 #include <gst/app/gstappsink.h>
35 #include <gst/base/gstadapter.h>
36 #include <gst/rtp/gstrtpbuffer.h>
37 
38 #define MAKE_ELEMENT_NAMED(v, e, n)                     \
39     v = gst_element_factory_make(e, n);                 \
40     if (!v) {                                           \
41         pa_log("Could not create %s element", e);       \
42         goto fail;                                      \
43     }
44 
45 #define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL)
46 #define RTP_HEADER_SIZE    12
47 
48 struct pa_rtp_context {
49     pa_fdsem *fdsem;
50     pa_sample_spec ss;
51 
52     GstElement *pipeline;
53     GstElement *appsrc;
54     GstElement *appsink;
55     GstCaps *meta_reference;
56 
57     bool first_buffer;
58     uint32_t last_timestamp;
59 
60     uint8_t *send_buf;
61     size_t mtu;
62 };
63 
caps_from_sample_spec(const pa_sample_spec * ss)64 static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) {
65     if (ss->format != PA_SAMPLE_S16BE)
66         return NULL;
67 
68     return gst_caps_new_simple("audio/x-raw",
69             "format", G_TYPE_STRING, "S16BE",
70             "rate", G_TYPE_INT, (int) ss->rate,
71             "channels", G_TYPE_INT, (int) ss->channels,
72             "layout", G_TYPE_STRING, "interleaved",
73             NULL);
74 }
75 
init_send_pipeline(pa_rtp_context * c,int fd,uint8_t payload,size_t mtu,const pa_sample_spec * ss)76 static bool init_send_pipeline(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
77     GstElement *appsrc = NULL, *pay = NULL, *capsf = NULL, *rtpbin = NULL, *sink = NULL;
78     GstCaps *caps;
79     GSocket *socket;
80     GInetSocketAddress *addr;
81     GInetAddress *iaddr;
82     guint16 port;
83     gchar *addr_str;
84 
85     MAKE_ELEMENT(appsrc, "appsrc");
86     MAKE_ELEMENT(pay, "rtpL16pay");
87     MAKE_ELEMENT(capsf, "capsfilter");
88     MAKE_ELEMENT(rtpbin, "rtpbin");
89     MAKE_ELEMENT(sink, "udpsink");
90 
91     c->pipeline = gst_pipeline_new(NULL);
92 
93     gst_bin_add_many(GST_BIN(c->pipeline), appsrc, pay, capsf, rtpbin, sink, NULL);
94 
95     caps = caps_from_sample_spec(ss);
96     if (!caps) {
97         pa_log("Unsupported format to payload");
98         goto fail;
99     }
100 
101     socket = g_socket_new_from_fd(fd, NULL);
102     if (!socket) {
103         pa_log("Failed to create socket");
104         goto fail;
105     }
106 
107     addr = G_INET_SOCKET_ADDRESS(g_socket_get_remote_address(socket, NULL));
108     iaddr = g_inet_socket_address_get_address(addr);
109     addr_str = g_inet_address_to_string(iaddr);
110     port = g_inet_socket_address_get_port(addr);
111 
112     g_object_set(appsrc, "caps", caps, "is-live", TRUE, "blocksize", mtu, "format", 3 /* time */, NULL);
113     g_object_set(pay, "mtu", mtu, NULL);
114     g_object_set(sink, "socket", socket, "host", addr_str, "port", port,
115                  "enable-last-sample", FALSE, "sync", FALSE, "loop",
116                  g_socket_get_multicast_loopback(socket), "ttl",
117                  g_socket_get_ttl(socket), "ttl-mc",
118                  g_socket_get_multicast_ttl(socket), "auto-multicast", FALSE,
119                  NULL);
120 
121     g_free(addr_str);
122     g_object_unref(addr);
123     g_object_unref(socket);
124 
125     gst_caps_unref(caps);
126 
127     /* Force the payload type that we want */
128     caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) payload, NULL);
129     g_object_set(capsf, "caps", caps, NULL);
130     gst_caps_unref(caps);
131 
132     if (!gst_element_link(appsrc, pay) ||
133         !gst_element_link(pay, capsf) ||
134         !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") ||
135         !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) {
136 
137         pa_log("Could not set up send pipeline");
138         goto fail;
139     }
140 
141     if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
142         pa_log("Could not start pipeline");
143         goto fail;
144     }
145 
146     c->appsrc = gst_object_ref(appsrc);
147 
148     return true;
149 
150 fail:
151     if (c->pipeline) {
152         gst_object_unref(c->pipeline);
153     } else {
154         /* These weren't yet added to pipeline, so we still have a ref */
155         if (appsrc)
156             gst_object_unref(appsrc);
157         if (pay)
158             gst_object_unref(pay);
159         if (capsf)
160             gst_object_unref(capsf);
161         if (rtpbin)
162             gst_object_unref(rtpbin);
163         if (sink)
164             gst_object_unref(sink);
165     }
166 
167     return false;
168 }
169 
pa_rtp_context_new_send(int fd,uint8_t payload,size_t mtu,const pa_sample_spec * ss)170 pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
171     pa_rtp_context *c = NULL;
172     GError *error = NULL;
173 
174     pa_assert(fd >= 0);
175 
176     pa_log_info("Initialising GStreamer RTP backend for send");
177 
178     c = pa_xnew0(pa_rtp_context, 1);
179 
180     c->ss = *ss;
181     c->mtu = mtu - RTP_HEADER_SIZE;
182     c->send_buf = pa_xmalloc(c->mtu);
183 
184     if (!gst_init_check(NULL, NULL, &error)) {
185         pa_log_error("Could not initialise GStreamer: %s", error->message);
186         g_error_free(error);
187         goto fail;
188     }
189 
190     if (!init_send_pipeline(c, fd, payload, mtu, ss))
191         goto fail;
192 
193     return c;
194 
195 fail:
196     pa_rtp_context_free(c);
197     return NULL;
198 }
199 
200 /* Called from I/O thread context */
process_bus_messages(pa_rtp_context * c)201 static bool process_bus_messages(pa_rtp_context *c) {
202     GstBus *bus;
203     GstMessage *message;
204     bool ret = true;
205 
206     bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
207 
208     while (ret && (message = gst_bus_pop(bus))) {
209         if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
210             GError *error = NULL;
211 
212             ret = false;
213 
214             gst_message_parse_error(message, &error, NULL);
215             pa_log("Got an error: %s", error->message);
216 
217             g_error_free(error);
218         }
219 
220         gst_message_unref(message);
221     }
222 
223     gst_object_unref(bus);
224 
225     return ret;
226 }
227 
228 /* Called from I/O thread context */
pa_rtp_send(pa_rtp_context * c,pa_memblockq * q)229 int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
230     GstBuffer *buf;
231     size_t n = 0;
232 
233     pa_assert(c);
234     pa_assert(q);
235 
236     if (!process_bus_messages(c))
237         return -1;
238 
239     /*
240      * While we check here for atleast MTU worth of data being available in
241      * memblockq, we might not have exact equivalent to MTU. Hence, we walk
242      * over the memchunks in memblockq and accumulate MTU bytes next.
243      */
244     if (pa_memblockq_get_length(q) < c->mtu)
245         return 0;
246 
247     for (;;) {
248         pa_memchunk chunk;
249         int r;
250 
251         pa_memchunk_reset(&chunk);
252 
253         if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
254             /*
255              * Accumulate MTU bytes of data before sending. If the current
256              * chunk length + accumulated bytes exceeds MTU, we drop bytes
257              * considered for transfer in this iteration from memblockq.
258              *
259              * The remaining bytes will be available in the next iteration,
260              * as these will be tracked and maintained by memblockq.
261              */
262             size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
263 
264             pa_assert(chunk.memblock);
265 
266             memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k);
267             pa_memblock_release(chunk.memblock);
268             pa_memblock_unref(chunk.memblock);
269 
270             n += k;
271             pa_memblockq_drop(q, k);
272         }
273 
274         if (r < 0 || n >= c->mtu) {
275             GstClock *clock;
276             GstClockTime timestamp, clock_time;
277             GstMapInfo info;
278 
279             if (n > 0) {
280                 clock = gst_element_get_clock(c->pipeline);
281                 clock_time = gst_clock_get_time(clock);
282                 gst_object_unref(clock);
283 
284                 timestamp = gst_element_get_base_time(c->pipeline);
285                 if (timestamp > clock_time)
286                   timestamp -= clock_time;
287                 else
288                   timestamp = 0;
289 
290                 buf = gst_buffer_new_allocate(NULL, n, NULL);
291                 pa_assert(buf);
292 
293                 GST_BUFFER_PTS(buf) = timestamp;
294 
295                 pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE));
296 
297                 memcpy(info.data, c->send_buf, n);
298                 gst_buffer_unmap(buf, &info);
299 
300                 if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
301                     pa_log_error("Could not push buffer");
302                     return -1;
303                 }
304             }
305 
306             if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
307                 break;
308 
309             n = 0;
310         }
311     }
312 
313     return 0;
314 }
315 
rtp_caps_from_sample_spec(const pa_sample_spec * ss)316 static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) {
317     if (ss->format != PA_SAMPLE_S16BE)
318         return NULL;
319 
320     return gst_caps_new_simple("application/x-rtp",
321             "media", G_TYPE_STRING, "audio",
322             "encoding-name", G_TYPE_STRING, "L16",
323             "clock-rate", G_TYPE_INT, (int) ss->rate,
324             "payload", G_TYPE_INT, (int) pa_rtp_payload_from_sample_spec(ss),
325             "layout", G_TYPE_STRING, "interleaved",
326             NULL);
327 }
328 
on_pad_added(GstElement * element,GstPad * pad,gpointer userdata)329 static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
330     pa_rtp_context *c = (pa_rtp_context *) userdata;
331     GstElement *depay;
332     GstPad *sinkpad;
333     GstPadLinkReturn ret;
334 
335     depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
336     pa_assert(depay);
337 
338     sinkpad = gst_element_get_static_pad(depay, "sink");
339 
340     ret = gst_pad_link(pad, sinkpad);
341     if (ret != GST_PAD_LINK_OK) {
342         GstBus *bus;
343         GError *error;
344 
345         bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
346         error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
347         gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));
348 
349         /* Actually cause the I/O thread to wake up and process the error */
350         pa_fdsem_post(c->fdsem);
351 
352         g_error_free(error);
353         gst_object_unref(bus);
354     }
355 
356     gst_object_unref(sinkpad);
357     gst_object_unref(depay);
358 }
359 
udpsrc_buffer_probe(GstPad * pad,GstPadProbeInfo * info,gpointer userdata)360 static GstPadProbeReturn udpsrc_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer userdata) {
361     struct timeval tv;
362     pa_usec_t timestamp;
363     pa_rtp_context *c = (pa_rtp_context *) userdata;
364 
365     pa_assert(info->type & GST_PAD_PROBE_TYPE_BUFFER);
366 
367     pa_gettimeofday(&tv);
368     timestamp = pa_timeval_load(&tv);
369 
370     gst_buffer_add_reference_timestamp_meta(GST_BUFFER(info->data), c->meta_reference, timestamp * GST_USECOND,
371             GST_CLOCK_TIME_NONE);
372 
373     return GST_PAD_PROBE_OK;
374 }
375 
init_receive_pipeline(pa_rtp_context * c,int fd,const pa_sample_spec * ss)376 static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
377     GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
378     GstCaps *caps;
379     GstPad *pad;
380     GSocket *socket;
381     GError *error = NULL;
382 
383     MAKE_ELEMENT(udpsrc, "udpsrc");
384     MAKE_ELEMENT(rtpbin, "rtpbin");
385     MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
386     MAKE_ELEMENT(appsink, "appsink");
387 
388     c->pipeline = gst_pipeline_new(NULL);
389 
390     gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
391 
392     socket = g_socket_new_from_fd(fd, &error);
393     if (error) {
394         pa_log("Could not create socket: %s", error->message);
395         g_error_free(error);
396         goto fail;
397     }
398 
399     caps = rtp_caps_from_sample_spec(ss);
400     if (!caps) {
401         pa_log("Unsupported format to payload");
402         goto fail;
403     }
404 
405     g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
406     g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
407     g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
408 
409     gst_caps_unref(caps);
410     g_object_unref(socket);
411 
412     if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
413         !gst_element_link(depay, appsink)) {
414 
415         pa_log("Could not set up receive pipeline");
416         goto fail;
417     }
418 
419     g_signal_connect(G_OBJECT(rtpbin), "pad-added", G_CALLBACK(on_pad_added), c);
420 
421     /* This logic should go into udpsrc, and we should be populating the
422      * receive timestamp using SCM_TIMESTAMP, but until we have that ... */
423     c->meta_reference = gst_caps_new_empty_simple("timestamp/x-pulseaudio-wallclock");
424 
425     pad = gst_element_get_static_pad(udpsrc, "src");
426     gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, udpsrc_buffer_probe, c, NULL);
427     gst_object_unref(pad);
428 
429     if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
430         pa_log("Could not start pipeline");
431         goto fail;
432     }
433 
434     c->appsink = gst_object_ref(appsink);
435 
436     return true;
437 
438 fail:
439     if (c->pipeline) {
440         gst_object_unref(c->pipeline);
441     } else {
442         /* These weren't yet added to pipeline, so we still have a ref */
443         if (udpsrc)
444             gst_object_unref(udpsrc);
445         if (depay)
446             gst_object_unref(depay);
447         if (rtpbin)
448             gst_object_unref(rtpbin);
449         if (appsink)
450             gst_object_unref(appsink);
451     }
452 
453     return false;
454 }
455 
456 /* Called from the GStreamer streaming thread */
appsink_eos(GstAppSink * appsink,gpointer userdata)457 static void appsink_eos(GstAppSink *appsink, gpointer userdata) {
458     pa_rtp_context *c = (pa_rtp_context *) userdata;
459 
460     pa_fdsem_post(c->fdsem);
461 }
462 
463 /* Called from the GStreamer streaming thread */
appsink_new_sample(GstAppSink * appsink,gpointer userdata)464 static GstFlowReturn appsink_new_sample(GstAppSink *appsink, gpointer userdata) {
465     pa_rtp_context *c = (pa_rtp_context *) userdata;
466 
467     pa_fdsem_post(c->fdsem);
468 
469     return GST_FLOW_OK;
470 }
471 
pa_rtp_context_new_recv(int fd,uint8_t payload,const pa_sample_spec * ss)472 pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) {
473     pa_rtp_context *c = NULL;
474     GstAppSinkCallbacks callbacks = { 0, };
475     GError *error = NULL;
476 
477     pa_assert(fd >= 0);
478 
479     pa_log_info("Initialising GStreamer RTP backend for receive");
480 
481     c = pa_xnew0(pa_rtp_context, 1);
482 
483     c->fdsem = pa_fdsem_new();
484     c->ss = *ss;
485     c->send_buf = NULL;
486     c->first_buffer = true;
487 
488     if (!gst_init_check(NULL, NULL, &error)) {
489         pa_log_error("Could not initialise GStreamer: %s", error->message);
490         g_error_free(error);
491         goto fail;
492     }
493 
494     if (!init_receive_pipeline(c, fd, ss))
495         goto fail;
496 
497     callbacks.eos = appsink_eos;
498     callbacks.new_sample = appsink_new_sample;
499     gst_app_sink_set_callbacks(GST_APP_SINK(c->appsink), &callbacks, c, NULL);
500 
501     return c;
502 
503 fail:
504     pa_rtp_context_free(c);
505     return NULL;
506 }
507 
508 /* Called from I/O thread context */
pa_rtp_recv(pa_rtp_context * c,pa_memchunk * chunk,pa_mempool * pool,uint32_t * rtp_tstamp,struct timeval * tstamp)509 int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
510     GstSample *sample = NULL;
511     GstBufferList *buf_list;
512     GstAdapter *adapter;
513     GstBuffer *buf;
514     GstMapInfo info;
515     GstClockTime timestamp = GST_CLOCK_TIME_NONE;
516     uint8_t *data;
517     uint64_t data_len = 0;
518 
519     if (!process_bus_messages(c))
520         goto fail;
521 
522     adapter = gst_adapter_new();
523     pa_assert(adapter);
524 
525     while (true) {
526         sample = gst_app_sink_try_pull_sample(GST_APP_SINK(c->appsink), 0);
527         if (!sample)
528             break;
529 
530         buf = gst_sample_get_buffer(sample);
531 
532         /* Get the timestamp from the first buffer */
533         if (timestamp == GST_CLOCK_TIME_NONE) {
534             GstReferenceTimestampMeta *meta = gst_buffer_get_reference_timestamp_meta(buf, c->meta_reference);
535 
536             /* Use the meta if we were able to insert it and it came through,
537              * else try to fallback to the DTS, which is only available in
538              * GStreamer 1.16 and earlier. */
539             if (meta)
540                 timestamp = meta->timestamp;
541             else if (GST_BUFFER_DTS(buf) != GST_CLOCK_TIME_NONE)
542                 timestamp = GST_BUFFER_DTS(buf);
543             else
544                 timestamp = 0;
545         }
546 
547         if (GST_BUFFER_IS_DISCONT(buf))
548             pa_log_info("Discontinuity detected, possibly lost some packets");
549 
550         if (!gst_buffer_map(buf, &info, GST_MAP_READ)) {
551             pa_log_info("Failed to map buffer");
552             gst_sample_unref(sample);
553             goto fail;
554         }
555 
556         data_len += info.size;
557         /* We need the buffer to be valid longer than the sample, which will
558          * be valid only for the duration of this loop.
559          *
560          * To do this, increase the ref count. Ownership is transferred to the
561          * adapter in gst_adapter_push.
562          */
563         gst_buffer_ref(buf);
564         gst_adapter_push(adapter, buf);
565         gst_buffer_unmap(buf, &info);
566 
567         gst_sample_unref(sample);
568     }
569 
570     buf_list = gst_adapter_take_buffer_list(adapter, data_len);
571     pa_assert(buf_list);
572 
573     pa_assert(pa_mempool_block_size_max(pool) >= data_len);
574 
575     chunk->memblock = pa_memblock_new(pool, data_len);
576     chunk->index = 0;
577     chunk->length = data_len;
578 
579     data = (uint8_t *) pa_memblock_acquire_chunk(chunk);
580 
581     for (int i = 0; i < gst_buffer_list_length(buf_list); i++) {
582         buf = gst_buffer_list_get(buf_list, i);
583 
584         if (!gst_buffer_map(buf, &info, GST_MAP_READ)) {
585             gst_buffer_list_unref(buf_list);
586             goto fail;
587         }
588 
589         memcpy(data, info.data, info.size);
590         data += info.size;
591         gst_buffer_unmap(buf, &info);
592     }
593 
594     pa_memblock_release(chunk->memblock);
595 
596     /* When buffer-mode = none, the buffer PTS is the RTP timestamp, converted
597      * to time units (instead of clock-rate units as is in the header) and
598      * wraparound-corrected. */
599     *rtp_tstamp = gst_util_uint64_scale_int(GST_BUFFER_PTS(gst_buffer_list_get(buf_list, 0)), c->ss.rate, GST_SECOND) & 0xFFFFFFFFU;
600     if (timestamp != GST_CLOCK_TIME_NONE)
601         pa_timeval_rtstore(tstamp, timestamp / PA_NSEC_PER_USEC, false);
602 
603     if (c->first_buffer) {
604         c->first_buffer = false;
605         c->last_timestamp = *rtp_tstamp;
606     } else {
607         /* The RTP clock -> time domain -> RTP clock transformation above might
608          * add a ±1 rounding error, so let's get rid of that */
609         uint32_t expected = c->last_timestamp + (uint32_t) (data_len / pa_rtp_context_get_frame_size(c));
610         int delta = *rtp_tstamp - expected;
611 
612         if (delta == 1 || delta == -1)
613             *rtp_tstamp -= delta;
614 
615         c->last_timestamp = *rtp_tstamp;
616     }
617 
618     gst_buffer_list_unref(buf_list);
619     gst_object_unref(adapter);
620 
621     return 0;
622 
623 fail:
624     if (adapter)
625         gst_object_unref(adapter);
626 
627     if (chunk->memblock)
628         pa_memblock_unref(chunk->memblock);
629 
630     return -1;
631 }
632 
pa_rtp_context_free(pa_rtp_context * c)633 void pa_rtp_context_free(pa_rtp_context *c) {
634     pa_assert(c);
635 
636     if (c->meta_reference)
637         gst_caps_unref(c->meta_reference);
638 
639     if (c->appsrc) {
640         gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc));
641         gst_object_unref(c->appsrc);
642         pa_xfree(c->send_buf);
643     }
644 
645     if (c->appsink)
646         gst_object_unref(c->appsink);
647 
648     if (c->pipeline) {
649         gst_element_set_state(c->pipeline, GST_STATE_NULL);
650         gst_object_unref(c->pipeline);
651     }
652 
653     if (c->fdsem)
654         pa_fdsem_free(c->fdsem);
655 
656     pa_xfree(c);
657 }
658 
pa_rtp_context_get_rtpoll_item(pa_rtp_context * c,pa_rtpoll * rtpoll)659 pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
660     return pa_rtpoll_item_new_fdsem(rtpoll, PA_RTPOLL_LATE, c->fdsem);
661 }
662 
pa_rtp_context_get_frame_size(pa_rtp_context * c)663 size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
664     return pa_frame_size(&c->ss);
665 }
666