1 /* GStreamer
2 *
3 * Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
4 * Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include "gstgiobasesink.h"
27 #include "gstgioelements.h"
28
29 GST_DEBUG_CATEGORY_STATIC (gst_gio_base_sink_debug);
30 #define GST_CAT_DEFAULT gst_gio_base_sink_debug
31
32 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
33 GST_PAD_SINK,
34 GST_PAD_ALWAYS,
35 GST_STATIC_CAPS_ANY);
36
37 #define gst_gio_base_sink_parent_class parent_class
38 G_DEFINE_TYPE (GstGioBaseSink, gst_gio_base_sink, GST_TYPE_BASE_SINK);
39
40 static void gst_gio_base_sink_finalize (GObject * object);
41 static gboolean gst_gio_base_sink_start (GstBaseSink * base_sink);
42 static gboolean gst_gio_base_sink_stop (GstBaseSink * base_sink);
43 static gboolean gst_gio_base_sink_unlock (GstBaseSink * base_sink);
44 static gboolean gst_gio_base_sink_unlock_stop (GstBaseSink * base_sink);
45 static gboolean gst_gio_base_sink_event (GstBaseSink * base_sink,
46 GstEvent * event);
47 static GstFlowReturn gst_gio_base_sink_render (GstBaseSink * base_sink,
48 GstBuffer * buffer);
49 static gboolean gst_gio_base_sink_query (GstBaseSink * bsink, GstQuery * query);
50
51 static void
gst_gio_base_sink_class_init(GstGioBaseSinkClass * klass)52 gst_gio_base_sink_class_init (GstGioBaseSinkClass * klass)
53 {
54 GObjectClass *gobject_class = (GObjectClass *) klass;
55 GstElementClass *gstelement_class = (GstElementClass *) klass;
56 GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
57
58 GST_DEBUG_CATEGORY_INIT (gst_gio_base_sink_debug, "gio_base_sink", 0,
59 "GIO base sink");
60
61 gobject_class->finalize = gst_gio_base_sink_finalize;
62
63 gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
64
65 gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_gio_base_sink_start);
66 gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_gio_base_sink_stop);
67 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_base_sink_unlock);
68 gstbasesink_class->unlock_stop =
69 GST_DEBUG_FUNCPTR (gst_gio_base_sink_unlock_stop);
70 gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_gio_base_sink_query);
71 gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_gio_base_sink_event);
72 gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_gio_base_sink_render);
73
74 gst_type_mark_as_plugin_api (GST_TYPE_GIO_BASE_SINK, 0);
75 }
76
77 static void
gst_gio_base_sink_init(GstGioBaseSink * sink)78 gst_gio_base_sink_init (GstGioBaseSink * sink)
79 {
80 gst_base_sink_set_sync (GST_BASE_SINK (sink), FALSE);
81
82 sink->cancel = g_cancellable_new ();
83 }
84
85 static void
gst_gio_base_sink_finalize(GObject * object)86 gst_gio_base_sink_finalize (GObject * object)
87 {
88 GstGioBaseSink *sink = GST_GIO_BASE_SINK (object);
89
90 if (sink->cancel) {
91 g_object_unref (sink->cancel);
92 sink->cancel = NULL;
93 }
94
95 if (sink->stream) {
96 g_object_unref (sink->stream);
97 sink->stream = NULL;
98 }
99
100 GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
101 }
102
103 static gboolean
gst_gio_base_sink_start(GstBaseSink * base_sink)104 gst_gio_base_sink_start (GstBaseSink * base_sink)
105 {
106 GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink);
107 GstGioBaseSinkClass *gbsink_class = GST_GIO_BASE_SINK_GET_CLASS (sink);
108
109 sink->position = 0;
110
111 /* FIXME: This will likely block */
112 sink->stream = gbsink_class->get_stream (sink);
113 if (G_UNLIKELY (!G_IS_OUTPUT_STREAM (sink->stream))) {
114 GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
115 ("No output stream provided by subclass"));
116 return FALSE;
117 } else if (G_UNLIKELY (g_output_stream_is_closed (sink->stream))) {
118 GST_ELEMENT_ERROR (sink, LIBRARY, FAILED, (NULL),
119 ("Output stream is already closed"));
120 return FALSE;
121 }
122
123 GST_DEBUG_OBJECT (sink, "started sink");
124
125 return TRUE;
126 }
127
128 static gboolean
gst_gio_base_sink_stop(GstBaseSink * base_sink)129 gst_gio_base_sink_stop (GstBaseSink * base_sink)
130 {
131 GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink);
132 GstGioBaseSinkClass *klass = GST_GIO_BASE_SINK_GET_CLASS (sink);
133 gboolean success;
134 GError *err = NULL;
135
136 if (klass->close_on_stop && G_IS_OUTPUT_STREAM (sink->stream)) {
137 GST_DEBUG_OBJECT (sink, "closing stream");
138
139 /* FIXME: can block but unfortunately we can't use async operations
140 * here because they require a running main loop */
141 success = g_output_stream_close (sink->stream, sink->cancel, &err);
142
143 if (!success && !gst_gio_error (sink, "g_output_stream_close", &err, NULL)) {
144 GST_ELEMENT_WARNING (sink, RESOURCE, CLOSE, (NULL),
145 ("gio_output_stream_close failed: %s", err->message));
146 g_clear_error (&err);
147 } else if (!success) {
148 GST_ELEMENT_WARNING (sink, RESOURCE, CLOSE, (NULL),
149 ("g_output_stream_close failed"));
150 } else {
151 GST_DEBUG_OBJECT (sink, "g_outut_stream_close succeeded");
152 }
153
154 g_object_unref (sink->stream);
155 sink->stream = NULL;
156 } else {
157 success = g_output_stream_flush (sink->stream, sink->cancel, &err);
158
159 if (!success && !gst_gio_error (sink, "g_output_stream_flush", &err, NULL)) {
160 GST_ELEMENT_WARNING (sink, RESOURCE, CLOSE, (NULL),
161 ("gio_output_stream_flush failed: %s", err->message));
162 g_clear_error (&err);
163 } else if (!success) {
164 GST_ELEMENT_WARNING (sink, RESOURCE, CLOSE, (NULL),
165 ("g_output_stream_flush failed"));
166 } else {
167 GST_DEBUG_OBJECT (sink, "g_outut_stream_flush succeeded");
168 }
169
170 g_object_unref (sink->stream);
171 sink->stream = NULL;
172 }
173
174 return TRUE;
175 }
176
177 static gboolean
gst_gio_base_sink_unlock(GstBaseSink * base_sink)178 gst_gio_base_sink_unlock (GstBaseSink * base_sink)
179 {
180 GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink);
181
182 GST_LOG_OBJECT (sink, "triggering cancellation");
183
184 g_cancellable_cancel (sink->cancel);
185
186 return TRUE;
187 }
188
189 static gboolean
gst_gio_base_sink_unlock_stop(GstBaseSink * base_sink)190 gst_gio_base_sink_unlock_stop (GstBaseSink * base_sink)
191 {
192 GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink);
193
194 GST_LOG_OBJECT (sink, "resetting cancellable");
195
196 g_object_unref (sink->cancel);
197 sink->cancel = g_cancellable_new ();
198
199 return TRUE;
200 }
201
202 static gboolean
gst_gio_base_sink_event(GstBaseSink * base_sink,GstEvent * event)203 gst_gio_base_sink_event (GstBaseSink * base_sink, GstEvent * event)
204 {
205 GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink);
206 GstFlowReturn ret = GST_FLOW_OK;
207
208 if (sink->stream == NULL)
209 return TRUE;
210
211 switch (GST_EVENT_TYPE (event)) {
212 case GST_EVENT_SEGMENT:
213 if (G_IS_OUTPUT_STREAM (sink->stream)) {
214 const GstSegment *segment;
215
216 gst_event_parse_segment (event, &segment);
217
218 if (segment->format != GST_FORMAT_BYTES) {
219 GST_WARNING_OBJECT (sink, "ignored SEGMENT event in %s format",
220 gst_format_get_name (segment->format));
221 break;
222 }
223
224 if (GST_GIO_STREAM_IS_SEEKABLE (sink->stream)) {
225 ret = gst_gio_seek (sink, G_SEEKABLE (sink->stream), segment->start,
226 sink->cancel);
227 if (ret == GST_FLOW_OK)
228 sink->position = segment->start;
229 } else {
230 ret = GST_FLOW_NOT_SUPPORTED;
231 }
232 }
233 break;
234
235 case GST_EVENT_EOS:
236 case GST_EVENT_FLUSH_START:
237 if (G_IS_OUTPUT_STREAM (sink->stream)) {
238 gboolean success;
239 GError *err = NULL;
240
241 success = g_output_stream_flush (sink->stream, sink->cancel, &err);
242
243 if (!success && !gst_gio_error (sink, "g_output_stream_flush", &err,
244 &ret)) {
245 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
246 ("flush failed: %s", err->message));
247 g_clear_error (&err);
248 }
249 }
250 break;
251
252 default:
253 break;
254 }
255 if (ret == GST_FLOW_OK)
256 return GST_BASE_SINK_CLASS (parent_class)->event (base_sink, event);
257 else {
258 gst_event_unref (event);
259 return FALSE;
260 }
261 }
262
263 static GstFlowReturn
gst_gio_base_sink_render(GstBaseSink * base_sink,GstBuffer * buffer)264 gst_gio_base_sink_render (GstBaseSink * base_sink, GstBuffer * buffer)
265 {
266 GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink);
267 gsize written;
268 GstMapInfo map;
269 gboolean success;
270 GError *err = NULL;
271
272 g_return_val_if_fail (G_IS_OUTPUT_STREAM (sink->stream), GST_FLOW_ERROR);
273
274 gst_buffer_map (buffer, &map, GST_MAP_READ);
275
276 GST_LOG_OBJECT (sink,
277 "writing %" G_GSIZE_FORMAT " bytes to offset %" G_GUINT64_FORMAT,
278 map.size, sink->position);
279
280 success =
281 g_output_stream_write_all (sink->stream, map.data, map.size, &written,
282 sink->cancel, &err);
283 gst_buffer_unmap (buffer, &map);
284
285 if (success) {
286 sink->position += written;
287 return GST_FLOW_OK;
288
289 } else {
290 GstFlowReturn ret;
291
292 if (!gst_gio_error (sink, "g_output_stream_write_all", &err, &ret)) {
293 if (GST_GIO_ERROR_MATCHES (err, NO_SPACE)) {
294 GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL),
295 ("Could not write to stream: %s", err->message));
296 } else {
297 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
298 ("Could not write to stream: %s", err->message));
299 }
300 g_clear_error (&err);
301 }
302
303 return ret;
304 }
305 }
306
307 static gboolean
gst_gio_base_sink_query(GstBaseSink * bsink,GstQuery * query)308 gst_gio_base_sink_query (GstBaseSink * bsink, GstQuery * query)
309 {
310 GstGioBaseSink *sink = GST_GIO_BASE_SINK (bsink);
311 GstFormat format;
312
313 switch (GST_QUERY_TYPE (query)) {
314 case GST_QUERY_POSITION:
315 gst_query_parse_position (query, &format, NULL);
316 switch (format) {
317 case GST_FORMAT_BYTES:
318 case GST_FORMAT_DEFAULT:
319 gst_query_set_position (query, format, sink->position);
320 return TRUE;
321 default:
322 return FALSE;
323 }
324 case GST_QUERY_FORMATS:
325 gst_query_set_formats (query, 2, GST_FORMAT_DEFAULT, GST_FORMAT_BYTES);
326 return TRUE;
327 case GST_QUERY_URI:
328 if (GST_IS_URI_HANDLER (sink)) {
329 gchar *uri;
330
331 uri = gst_uri_handler_get_uri (GST_URI_HANDLER (sink));
332 gst_query_set_uri (query, uri);
333 g_free (uri);
334 return TRUE;
335 }
336 return FALSE;
337 case GST_QUERY_SEEKING:
338 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
339 if (format == GST_FORMAT_BYTES || format == GST_FORMAT_DEFAULT) {
340 gst_query_set_seeking (query, format,
341 GST_GIO_STREAM_IS_SEEKABLE (sink->stream), 0, -1);
342 } else {
343 gst_query_set_seeking (query, format, FALSE, 0, -1);
344 }
345 return TRUE;
346 default:
347 return GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
348 }
349 }
350