1 /* GStreamer
2 * Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
3 *
4 * gstgssink.cpp:
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21 /**
22 * SECTION:element-gssink
23 * @title: gssink
24 * @see_also: #GstGsSrc
25 *
26 * Write incoming data to a series of sequentially-named remote files on a
27 * Google Cloud Storage bucket.
28 *
29 * The object-name property should contain a string with a \%d placeholder
30 * that will be substituted with the index for each filename.
31 *
32 * If the #GstGsSink:post-messages property is %TRUE, it sends an application
33 * message named `GstGsSink` after writing each buffer.
34 *
35 * The message's structure contains these fields:
36 *
37 * * #gchararray `filename`: the filename where the buffer was written.
38 * * #gchararray `date`: the date of the current buffer, NULL if no start date
39 * is provided.
40 * * #gint `index`: index of the buffer.
41 * * #GstClockTime `timestamp`: the timestamp of the buffer.
42 * * #GstClockTime `stream-time`: the stream time of the buffer.
43 * * #GstClockTime `running-time`: the running_time of the buffer.
44 * * #GstClockTime `duration`: the duration of the buffer.
45 * * #guint64 `offset`: the offset of the buffer that triggered the message.
46 * * #guint64 `offset-end`: the offset-end of the buffer that triggered the
47 * message.
48 *
49 * ## Example launch line
50 * ```
51 * gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
52 * object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
53 * next-file=buffer post-messages=true
54 * ```
55 * ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
56 * names are frame00000.png, frame00001.png, ..., frame00014.png
57 * ```
58 * gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
59 * pngenc ! gssink start-date="2020-04-16T08:55:03Z"
60 * object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
61 * next-file=buffer post-messages=true
62 * ```
63 * ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
64 * names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
65 * im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
66 * im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
67 * ```
68 * gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
69 * object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
70 * ```
71 * ### Upload any stream as a single file into Google Cloud Storage. Similar as
72 * filesink in this case. The file is then accessible from:
73 * gs://mybucket/mypath/myvideos/video.mp4
74 *
75 * See also: #GstGsSrc
76 * Since: 1.20
77 */
78
79 #ifdef HAVE_CONFIG_H
80 #include "config.h"
81 #endif
82
83 #include "gstgscommon.h"
84 #include "gstgssink.h"
85
86 #include <algorithm>
87
88 static GstStaticPadTemplate sinktemplate =
89 GST_STATIC_PAD_TEMPLATE("sink",
90 GST_PAD_SINK,
91 GST_PAD_ALWAYS,
92 GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
95 #define GST_CAT_DEFAULT gst_gs_sink_debug
96
97 #define DEFAULT_INDEX 0
98 #define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
99 #define DEFAULT_OBJECT_NAME "%s_%05d"
100 #define DEFAULT_POST_MESSAGES FALSE
101
102 namespace gcs = google::cloud::storage;
103
104 enum {
105 PROP_0,
106 PROP_BUCKET_NAME,
107 PROP_OBJECT_NAME,
108 PROP_INDEX,
109 PROP_POST_MESSAGES,
110 PROP_NEXT_FILE,
111 PROP_SERVICE_ACCOUNT_EMAIL,
112 PROP_START_DATE,
113 PROP_SERVICE_ACCOUNT_CREDENTIALS,
114 PROP_METADATA,
115 };
116
117 class GSWriteStream;
118
119 struct _GstGsSink {
120 GstBaseSink parent;
121
122 std::unique_ptr<google::cloud::storage::Client> gcs_client;
123 std::unique_ptr<GSWriteStream> gcs_stream;
124 gchar* service_account_email;
125 gchar* service_account_credentials;
126 gchar* bucket_name;
127 gchar* object_name;
128 gchar* start_date_str;
129 GDateTime* start_date;
130 gint index;
131 gboolean post_messages;
132 GstGsSinkNext next_file;
133 const gchar* content_type;
134 size_t nb_percent_format;
135 gboolean percent_s_is_first;
136 GstStructure* metadata;
137 };
138
139 static void gst_gs_sink_finalize(GObject* object);
140
141 static void gst_gs_sink_set_property(GObject* object,
142 guint prop_id,
143 const GValue* value,
144 GParamSpec* pspec);
145 static void gst_gs_sink_get_property(GObject* object,
146 guint prop_id,
147 GValue* value,
148 GParamSpec* pspec);
149
150 static gboolean gst_gs_sink_start(GstBaseSink* bsink);
151 static gboolean gst_gs_sink_stop(GstBaseSink* sink);
152 static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
153 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
154 GstBufferList* buffer_list);
155 static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
156 static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
157
158 #define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
gst_gs_sink_next_get_type(void)159 static GType gst_gs_sink_next_get_type(void) {
160 static GType gs_sink_next_type = 0;
161 static const GEnumValue next_types[] = {
162 {GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
163 {GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
164 {0, NULL, NULL}};
165
166 if (!gs_sink_next_type) {
167 gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
168 }
169
170 return gs_sink_next_type;
171 }
172
173 #define gst_gs_sink_parent_class parent_class
174 G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
175 GST_ELEMENT_REGISTER_DEFINE(gssink, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK)
176
177 class GSWriteStream {
178 public:
GSWriteStream(google::cloud::storage::Client & client,const char * bucket_name,const char * object_name,gcs::ObjectMetadata metadata)179 GSWriteStream(google::cloud::storage::Client& client,
180 const char* bucket_name,
181 const char* object_name,
182 gcs::ObjectMetadata metadata)
183 : gcs_stream_(client.WriteObject(bucket_name,
184 object_name,
185 gcs::WithObjectMetadata(metadata))) {}
~GSWriteStream()186 ~GSWriteStream() { gcs_stream_.Close(); }
187
stream()188 gcs::ObjectWriteStream& stream() { return gcs_stream_; }
189
190 private:
191 gcs::ObjectWriteStream gcs_stream_;
192 };
193
gst_gs_sink_class_init(GstGsSinkClass * klass)194 static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
195 GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
196 GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
197 GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
198
199 gobject_class->set_property = gst_gs_sink_set_property;
200 gobject_class->get_property = gst_gs_sink_get_property;
201
202 /**
203 * GstGsSink:bucket-name:
204 *
205 * Name of the Google Cloud Storage bucket.
206 *
207 * Since: 1.20
208 */
209 g_object_class_install_property(
210 gobject_class, PROP_BUCKET_NAME,
211 g_param_spec_string(
212 "bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
213 NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
214
215 /**
216 * GstGsSink:object-name:
217 *
218 * Full path name of the remote file.
219 *
220 * Since: 1.20
221 */
222 g_object_class_install_property(
223 gobject_class, PROP_OBJECT_NAME,
224 g_param_spec_string(
225 "object-name", "Object Name", "Full path name of the remote file",
226 DEFAULT_OBJECT_NAME,
227 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
228
229 /**
230 * GstGsSink:index:
231 *
232 * Index to use with location property to create file names.
233 *
234 * Since: 1.20
235 */
236 g_object_class_install_property(
237 gobject_class, PROP_INDEX,
238 g_param_spec_int(
239 "index", "Index",
240 "Index to use with location property to create file names. The "
241 "index is incremented by one for each buffer written.",
242 0, G_MAXINT, DEFAULT_INDEX,
243 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
244
245 /**
246 * GstGsSink:post-messages:
247 *
248 * Post a message on the GstBus for each file.
249 *
250 * Since: 1.20
251 */
252 g_object_class_install_property(
253 gobject_class, PROP_POST_MESSAGES,
254 g_param_spec_boolean(
255 "post-messages", "Post Messages",
256 "Post a message for each file with information of the buffer",
257 DEFAULT_POST_MESSAGES,
258 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
259 /**
260 * GstGsSink:next-file:
261 *
262 * A #GstGsSinkNext that specifies when to start a new file.
263 *
264 * Since: 1.20
265 */
266 g_object_class_install_property(
267 gobject_class, PROP_NEXT_FILE,
268 g_param_spec_enum(
269 "next-file", "Next File", "When to start a new file",
270 GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
271 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
272
273 /**
274 * GstGsSink:service-account-email:
275 *
276 * Service Account Email to use for credentials.
277 *
278 * Since: 1.20
279 */
280 g_object_class_install_property(
281 gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
282 g_param_spec_string(
283 "service-account-email", "Service Account Email",
284 "Service Account Email to use for credentials", NULL,
285 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
286 GST_PARAM_MUTABLE_READY)));
287
288 /**
289 * GstGsSink:service-account-credentials:
290 *
291 * Service Account Credentials as a JSON string to use for credentials.
292 *
293 * Since: 1.20
294 */
295 g_object_class_install_property(
296 gobject_class, PROP_SERVICE_ACCOUNT_CREDENTIALS,
297 g_param_spec_string(
298 "service-account-credentials", "Service Account Credentials",
299 "Service Account Credentials as a JSON string to use for credentials",
300 NULL,
301 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
302 GST_PARAM_MUTABLE_READY)));
303
304 /**
305 * GstGsSink:start-date:
306 *
307 * Start date in iso8601 format.
308 *
309 * Since: 1.20
310 */
311 g_object_class_install_property(
312 gobject_class, PROP_START_DATE,
313 g_param_spec_string(
314 "start-date", "Start Date", "Start date in iso8601 format", NULL,
315 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
316 GST_PARAM_MUTABLE_READY)));
317
318 /**
319 * GstGsSink:metadata:
320 *
321 * A map of metadata to store with the object; field values need to be
322 * convertible to strings.
323 *
324 * Since: 1.20
325 */
326 g_object_class_install_property(
327 gobject_class, PROP_METADATA,
328 g_param_spec_boxed(
329 "metadata", "Metadata",
330 "A map of metadata to store with the object; field values need to be "
331 "convertible to strings.",
332 GST_TYPE_STRUCTURE,
333 (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
334 GST_PARAM_MUTABLE_READY)));
335
336 gobject_class->finalize = gst_gs_sink_finalize;
337
338 gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
339 gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
340 gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
341 gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
342 gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
343 gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
344
345 GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
346
347 gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
348 gst_element_class_set_static_metadata(
349 gstelement_class, "Google Cloud Storage Sink", "Sink/File",
350 "Write buffers to a sequentially named set of files on Google Cloud "
351 "Storage",
352 "Julien Isorce <jisorce@oblong.com>");
353 }
354
gst_gs_sink_init(GstGsSink * sink)355 static void gst_gs_sink_init(GstGsSink* sink) {
356 sink->gcs_client = nullptr;
357 sink->gcs_stream = nullptr;
358 sink->index = DEFAULT_INDEX;
359 sink->post_messages = DEFAULT_POST_MESSAGES;
360 sink->service_account_email = NULL;
361 sink->service_account_credentials = NULL;
362 sink->bucket_name = NULL;
363 sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
364 sink->start_date_str = NULL;
365 sink->start_date = NULL;
366 sink->next_file = DEFAULT_NEXT_FILE;
367 sink->content_type = NULL;
368 sink->nb_percent_format = 0;
369 sink->percent_s_is_first = FALSE;
370
371 gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
372 }
373
gst_gs_sink_finalize(GObject * object)374 static void gst_gs_sink_finalize(GObject* object) {
375 GstGsSink* sink = GST_GS_SINK(object);
376
377 sink->gcs_client = nullptr;
378 sink->gcs_stream = nullptr;
379 g_free(sink->service_account_email);
380 sink->service_account_email = NULL;
381 g_free(sink->service_account_credentials);
382 sink->service_account_credentials = NULL;
383 g_free(sink->bucket_name);
384 sink->bucket_name = NULL;
385 g_free(sink->object_name);
386 sink->object_name = NULL;
387 g_free(sink->start_date_str);
388 sink->start_date_str = NULL;
389 if (sink->start_date) {
390 g_date_time_unref(sink->start_date);
391 sink->start_date = NULL;
392 }
393 sink->content_type = NULL;
394 g_clear_pointer(&sink->metadata, gst_structure_free);
395
396 G_OBJECT_CLASS(parent_class)->finalize(object);
397 }
398
gst_gs_sink_set_object_name(GstGsSink * sink,const gchar * object_name)399 static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
400 const gchar* object_name) {
401 g_free(sink->object_name);
402 sink->object_name = NULL;
403 sink->nb_percent_format = 0;
404 sink->percent_s_is_first = FALSE;
405
406 if (!object_name) {
407 GST_ERROR_OBJECT(sink, "Object name is null");
408 return FALSE;
409 }
410
411 const std::string name(object_name);
412 sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
413 if (sink->nb_percent_format > 2) {
414 GST_ERROR_OBJECT(sink, "Object name has too many formats");
415 return FALSE;
416 }
417
418 const size_t delimiter_percent_s = name.find("%s");
419 if (delimiter_percent_s == std::string::npos) {
420 if (sink->nb_percent_format == 2) {
421 GST_ERROR_OBJECT(sink, "Object name must have just one number format");
422 return FALSE;
423 }
424 sink->object_name = g_strdup(object_name);
425 return TRUE;
426 }
427
428 const size_t delimiter_percent = name.find_first_of('%');
429 if (delimiter_percent_s == delimiter_percent) {
430 sink->percent_s_is_first = TRUE;
431
432 if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
433 GST_ERROR_OBJECT(sink, "Object name expect max one string format");
434 return FALSE;
435 }
436 }
437
438 sink->object_name = g_strdup(object_name);
439
440 return TRUE;
441 }
442
gst_gs_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)443 static void gst_gs_sink_set_property(GObject* object,
444 guint prop_id,
445 const GValue* value,
446 GParamSpec* pspec) {
447 GstGsSink* sink = GST_GS_SINK(object);
448
449 switch (prop_id) {
450 case PROP_BUCKET_NAME:
451 g_free(sink->bucket_name);
452 sink->bucket_name = g_strdup(g_value_get_string(value));
453 break;
454 case PROP_OBJECT_NAME:
455 gst_gs_sink_set_object_name(sink, g_value_get_string(value));
456 break;
457 case PROP_INDEX:
458 sink->index = g_value_get_int(value);
459 break;
460 case PROP_POST_MESSAGES:
461 sink->post_messages = g_value_get_boolean(value);
462 break;
463 case PROP_NEXT_FILE:
464 sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
465 break;
466 case PROP_SERVICE_ACCOUNT_EMAIL:
467 g_free(sink->service_account_email);
468 sink->service_account_email = g_strdup(g_value_get_string(value));
469 break;
470 case PROP_SERVICE_ACCOUNT_CREDENTIALS:
471 g_free(sink->service_account_credentials);
472 sink->service_account_credentials = g_strdup(g_value_get_string(value));
473 break;
474 case PROP_START_DATE:
475 g_free(sink->start_date_str);
476 if (sink->start_date)
477 g_date_time_unref(sink->start_date);
478 sink->start_date_str = g_strdup(g_value_get_string(value));
479 sink->start_date =
480 g_date_time_new_from_iso8601(sink->start_date_str, NULL);
481 if (!sink->start_date) {
482 GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
483 sink->start_date_str);
484 g_free(sink->start_date_str);
485 sink->start_date_str = NULL;
486 }
487 break;
488 case PROP_METADATA:
489 g_clear_pointer(&sink->metadata, gst_structure_free);
490 sink->metadata = (GstStructure*)g_value_dup_boxed(value);
491 break;
492 default:
493 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
494 break;
495 }
496 }
497
gst_gs_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)498 static void gst_gs_sink_get_property(GObject* object,
499 guint prop_id,
500 GValue* value,
501 GParamSpec* pspec) {
502 GstGsSink* sink = GST_GS_SINK(object);
503
504 switch (prop_id) {
505 case PROP_BUCKET_NAME:
506 g_value_set_string(value, sink->bucket_name);
507 break;
508 case PROP_OBJECT_NAME:
509 g_value_set_string(value, sink->object_name);
510 break;
511 case PROP_INDEX:
512 g_value_set_int(value, sink->index);
513 break;
514 case PROP_POST_MESSAGES:
515 g_value_set_boolean(value, sink->post_messages);
516 break;
517 case PROP_NEXT_FILE:
518 g_value_set_enum(value, sink->next_file);
519 break;
520 case PROP_SERVICE_ACCOUNT_EMAIL:
521 g_value_set_string(value, sink->service_account_email);
522 break;
523 case PROP_SERVICE_ACCOUNT_CREDENTIALS:
524 g_value_set_string(value, sink->service_account_credentials);
525 break;
526 case PROP_START_DATE:
527 g_value_set_string(value, sink->start_date_str);
528 break;
529 case PROP_METADATA:
530 g_value_set_boxed(value, sink->metadata);
531 break;
532 default:
533 G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
534 break;
535 }
536 }
537
gst_gs_sink_start(GstBaseSink * bsink)538 static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
539 GstGsSink* sink = GST_GS_SINK(bsink);
540 GError* err = NULL;
541
542 if (!sink->bucket_name) {
543 GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
544 GST_ERROR_SYSTEM);
545 return FALSE;
546 }
547
548 if (!sink->object_name) {
549 GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
550 GST_ERROR_SYSTEM);
551 return FALSE;
552 }
553
554 sink->content_type = "";
555
556 sink->gcs_client = gst_gs_create_client(
557 sink->service_account_email, sink->service_account_credentials, &err);
558 if (err) {
559 GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
560 ("Could not create client (%s)", err->message),
561 GST_ERROR_SYSTEM);
562 g_clear_error(&err);
563 return FALSE;
564 }
565
566 GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
567 sink->bucket_name, sink->object_name);
568
569 return TRUE;
570 }
571
gst_gs_sink_stop(GstBaseSink * bsink)572 static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
573 GstGsSink* sink = GST_GS_SINK(bsink);
574
575 sink->gcs_client = nullptr;
576 sink->gcs_stream = nullptr;
577 sink->content_type = NULL;
578
579 return TRUE;
580 }
581
gst_gs_sink_post_message_full(GstGsSink * sink,GstClockTime timestamp,GstClockTime duration,GstClockTime offset,GstClockTime offset_end,GstClockTime running_time,GstClockTime stream_time,const char * filename,const gchar * date)582 static void gst_gs_sink_post_message_full(GstGsSink* sink,
583 GstClockTime timestamp,
584 GstClockTime duration,
585 GstClockTime offset,
586 GstClockTime offset_end,
587 GstClockTime running_time,
588 GstClockTime stream_time,
589 const char* filename,
590 const gchar* date) {
591 GstStructure* s;
592
593 if (!sink->post_messages)
594 return;
595
596 s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
597 "date", G_TYPE_STRING, date, "index", G_TYPE_INT,
598 sink->index, "timestamp", G_TYPE_UINT64, timestamp,
599 "stream-time", G_TYPE_UINT64, stream_time,
600 "running-time", G_TYPE_UINT64, running_time, "duration",
601 G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
602 offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
603
604 gst_element_post_message(GST_ELEMENT_CAST(sink),
605 gst_message_new_element(GST_OBJECT_CAST(sink), s));
606 }
607
gst_gs_sink_post_message_from_time(GstGsSink * sink,GstClockTime timestamp,GstClockTime duration,const char * filename)608 static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
609 GstClockTime timestamp,
610 GstClockTime duration,
611 const char* filename) {
612 GstClockTime running_time, stream_time;
613 guint64 offset, offset_end;
614 GstSegment* segment;
615 GstFormat format;
616
617 if (!sink->post_messages)
618 return;
619
620 segment = &GST_BASE_SINK(sink)->segment;
621 format = segment->format;
622
623 offset = -1;
624 offset_end = -1;
625
626 running_time = gst_segment_to_running_time(segment, format, timestamp);
627 stream_time = gst_segment_to_stream_time(segment, format, timestamp);
628
629 gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
630 running_time, stream_time, filename, NULL);
631 }
632
gst_gs_sink_post_message(GstGsSink * sink,GstBuffer * buffer,const char * filename,const char * date)633 static void gst_gs_sink_post_message(GstGsSink* sink,
634 GstBuffer* buffer,
635 const char* filename,
636 const char* date) {
637 GstClockTime duration, timestamp;
638 GstClockTime running_time, stream_time;
639 guint64 offset, offset_end;
640 GstSegment* segment;
641 GstFormat format;
642
643 if (!sink->post_messages)
644 return;
645
646 segment = &GST_BASE_SINK(sink)->segment;
647 format = segment->format;
648
649 timestamp = GST_BUFFER_PTS(buffer);
650 duration = GST_BUFFER_DURATION(buffer);
651 offset = GST_BUFFER_OFFSET(buffer);
652 offset_end = GST_BUFFER_OFFSET_END(buffer);
653
654 running_time = gst_segment_to_running_time(segment, format, timestamp);
655 stream_time = gst_segment_to_stream_time(segment, format, timestamp);
656
657 gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
658 running_time, stream_time, filename, date);
659 }
660
661 struct AddMetadataIter {
662 GstGsSink* sink;
663 gcs::ObjectMetadata* metadata;
664 };
665
add_metadata_foreach(GQuark field_id,const GValue * value,gpointer user_data)666 static gboolean add_metadata_foreach(GQuark field_id,
667 const GValue* value,
668 gpointer user_data) {
669 struct AddMetadataIter* it = (struct AddMetadataIter*)user_data;
670 GValue svalue = G_VALUE_INIT;
671
672 g_value_init(&svalue, G_TYPE_STRING);
673
674 if (g_value_transform(value, &svalue)) {
675 const gchar* key = g_quark_to_string(field_id);
676 const gchar* value = g_value_get_string(&svalue);
677
678 GST_LOG_OBJECT(it->sink, "metadata '%s' -> '%s'", key, value);
679 it->metadata->upsert_metadata(key, value);
680 } else {
681 GST_WARNING_OBJECT(it->sink, "Failed to convert metadata '%s' to string",
682 g_quark_to_string(field_id));
683 }
684
685 g_value_unset(&svalue);
686 return TRUE;
687 }
688
gst_gs_sink_write_buffer(GstGsSink * sink,GstBuffer * buffer)689 static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
690 GstBuffer* buffer) {
691 GstMapInfo map = {0};
692 gchar* object_name = NULL;
693 gchar* buffer_date = NULL;
694
695 if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
696 return GST_FLOW_ERROR;
697
698 gcs::ObjectMetadata metadata =
699 gcs::ObjectMetadata().set_content_type(sink->content_type);
700
701 if (sink->metadata) {
702 struct AddMetadataIter it = {sink, &metadata};
703
704 gst_structure_foreach(sink->metadata, add_metadata_foreach, &it);
705 }
706
707 switch (sink->next_file) {
708 case GST_GS_SINK_NEXT_BUFFER: {
709 // Get buffer date if needed.
710 if (sink->start_date) {
711 if (sink->nb_percent_format != 2) {
712 GST_ERROR_OBJECT(sink, "Object name expects date and index");
713 gst_buffer_unmap(buffer, &map);
714 return GST_FLOW_ERROR;
715 }
716
717 if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
718 GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
719 gst_buffer_unmap(buffer, &map);
720 return GST_FLOW_ERROR;
721 }
722
723 if (sink->percent_s_is_first) {
724 object_name =
725 g_strdup_printf(sink->object_name, buffer_date, sink->index);
726 } else {
727 object_name =
728 g_strdup_printf(sink->object_name, sink->index, buffer_date);
729 }
730 } else {
731 if (sink->nb_percent_format != 1) {
732 GST_ERROR_OBJECT(sink, "Object name expects only an index");
733 gst_buffer_unmap(buffer, &map);
734 return GST_FLOW_ERROR;
735 }
736
737 object_name = g_strdup_printf(sink->object_name, sink->index);
738 }
739
740 GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
741
742 gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
743 sink->bucket_name, object_name, gcs::WithObjectMetadata(metadata));
744
745 gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
746 if (gcs_stream.fail()) {
747 GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
748 }
749 gcs_stream.Close();
750
751 google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
752 sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
753 if (!object_metadata) {
754 GST_ERROR_OBJECT(
755 sink, "Could not get object metadata for object %s (%s)",
756 object_name, object_metadata.status().message().c_str());
757 gst_buffer_unmap(buffer, &map);
758 g_free(object_name);
759 g_free(buffer_date);
760 return GST_FLOW_ERROR;
761 }
762
763 GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
764 object_name, object_metadata->size());
765
766 gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
767 g_free(object_name);
768 g_free(buffer_date);
769 ++sink->index;
770 break;
771 }
772 case GST_GS_SINK_NEXT_NONE: {
773 if (!sink->gcs_stream) {
774 GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
775 sink->gcs_stream = std::make_unique<GSWriteStream>(
776 *sink->gcs_client.get(), sink->bucket_name, sink->object_name,
777 metadata);
778
779 if (!sink->gcs_stream->stream().IsOpen()) {
780 GST_ELEMENT_ERROR(
781 sink, RESOURCE, OPEN_READ,
782 ("Could not create write stream (%s)",
783 sink->gcs_stream->stream().last_status().message().c_str()),
784 GST_ERROR_SYSTEM);
785 gst_buffer_unmap(buffer, &map);
786 return GST_FLOW_OK;
787 }
788 }
789
790 GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
791
792 gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
793 stream.write(reinterpret_cast<const char*>(map.data), map.size);
794 if (stream.fail()) {
795 GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
796 }
797 break;
798 }
799 default:
800 g_assert_not_reached();
801 }
802
803 gst_buffer_unmap(buffer, &map);
804 return GST_FLOW_OK;
805 }
806
gst_gs_sink_render(GstBaseSink * bsink,GstBuffer * buffer)807 static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
808 GstGsSink* sink = GST_GS_SINK(bsink);
809 GstFlowReturn flow = GST_FLOW_OK;
810
811 flow = gst_gs_sink_write_buffer(sink, buffer);
812 return flow;
813 }
814
buffer_list_copy_data(GstBuffer ** buf,guint idx,gpointer data)815 static gboolean buffer_list_copy_data(GstBuffer** buf,
816 guint idx,
817 gpointer data) {
818 GstBuffer* dest = GST_BUFFER_CAST(data);
819 guint num, i;
820
821 if (idx == 0)
822 gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
823
824 num = gst_buffer_n_memory(*buf);
825 for (i = 0; i < num; ++i) {
826 GstMemory* mem;
827
828 mem = gst_buffer_get_memory(*buf, i);
829 gst_buffer_append_memory(dest, mem);
830 }
831
832 return TRUE;
833 }
834
835 /* Our assumption for now is that the buffers in a buffer list should always
836 * end up in the same file. If someone wants different behaviour, they'll just
837 * have to add a property for that. */
gst_gs_sink_render_list(GstBaseSink * sink,GstBufferList * list)838 static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
839 GstBufferList* list) {
840 GstBuffer* buf;
841 guint size;
842
843 size = gst_buffer_list_calculate_size(list);
844 GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
845
846 /* copy all buffers in the list into one single buffer, so we can use
847 * the normal render function (FIXME: optimise to avoid the memcpy) */
848 buf = gst_buffer_new();
849 gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
850 g_assert(gst_buffer_get_size(buf) == size);
851
852 gst_gs_sink_render(sink, buf);
853 gst_buffer_unref(buf);
854
855 return GST_FLOW_OK;
856 }
857
gst_gs_sink_set_caps(GstBaseSink * bsink,GstCaps * caps)858 static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
859 GstGsSink* sink = GST_GS_SINK(bsink);
860 GstStructure* s = gst_caps_get_structure(caps, 0);
861
862 sink->content_type = gst_structure_get_name(s);
863
864 GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
865
866 return TRUE;
867 }
868
gst_gs_sink_event(GstBaseSink * bsink,GstEvent * event)869 static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
870 GstGsSink* sink = GST_GS_SINK(bsink);
871
872 switch (GST_EVENT_TYPE(event)) {
873 case GST_EVENT_EOS:
874 if (sink->gcs_stream) {
875 sink->gcs_stream = nullptr;
876 gst_gs_sink_post_message_from_time(
877 sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
878 }
879 break;
880 default:
881 break;
882 }
883
884 return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
885 }
886