1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
4 * 2005 Andy Wingo <wingo@pobox.com>
5 * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6 * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
7 *
8 * gstnetclientclock.h: clock that synchronizes itself to a time provider over
9 * the network
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Library General Public
13 * License as published by the Free Software Foundation; either
14 * version 2 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Library General Public License for more details.
20 *
21 * You should have received a copy of the GNU Library General Public
22 * License along with this library; if not, write to the
23 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24 * Boston, MA 02110-1301, USA.
25 */
26 /**
27 * SECTION:gstnetclientclock
28 * @title: GstNetClientClock
29 * @short_description: Special clock that synchronizes to a remote time
30 * provider.
31 * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
32 *
33 * #GstNetClientClock implements a custom #GstClock that synchronizes its time
34 * to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
35 * implements a #GstClock that synchronizes its time to a remote NTPv4 server.
36 *
37 * A new clock is created with gst_net_client_clock_new() or
38 * gst_ntp_clock_new(), which takes the address and port of the remote time
39 * provider along with a name and an initial time.
40 *
41 * This clock will poll the time provider and will update its calibration
42 * parameters based on the local and remote observations.
43 *
44 * The "round-trip" property limits the maximum round trip packets can take.
45 *
46 * Various parameters of the clock can be configured with the parent #GstClock
47 * "timeout", "window-size" and "window-threshold" object properties.
48 *
49 * A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
50 * gst_pipeline_use_clock().
51 *
52 * If you set a #GstBus on the clock via the "bus" object property, it will
53 * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
54 * statistics about clock accuracy and network traffic.
55 */
56
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
60
61 #include "gstnettimepacket.h"
62 #include "gstntppacket.h"
63 #include "gstnetclientclock.h"
64 #include "gstnetutils.h"
65
66 #include <gio/gio.h>
67
68 #include <string.h>
69
70 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
71 #define GST_CAT_DEFAULT (ncc_debug)
72
73 typedef struct
74 {
75 GstClock *clock; /* GstNetClientInternalClock */
76
77 GList *clocks; /* GstNetClientClocks */
78
79 GstClockID remove_id;
80 } ClockCache;
81
82 G_LOCK_DEFINE_STATIC (clocks_lock);
83 static GList *clocks = NULL;
84
85 #define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \
86 (gst_net_client_internal_clock_get_type())
87 #define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \
88 (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock))
89 #define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
90 (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass))
91 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \
92 (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
93 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
94 (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
95
96 typedef struct _GstNetClientInternalClock GstNetClientInternalClock;
97 typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass;
98
99 G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void);
100
101 #define DEFAULT_ADDRESS "127.0.0.1"
102 #define DEFAULT_PORT 5637
103 #define DEFAULT_TIMEOUT GST_SECOND
104 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
105 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
106 * more often than 1/20th second (arbitrarily, to spread observations a little) */
107 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
108 #define DEFAULT_BASE_TIME 0
109 #define DEFAULT_QOS_DSCP -1
110
111 /* Maximum number of clock updates we can skip before updating */
112 #define MAX_SKIPPED_UPDATES 5
113
114 #define MEDIAN_PRE_FILTERING_WINDOW 9
115
116 enum
117 {
118 PROP_0,
119 PROP_ADDRESS,
120 PROP_PORT,
121 PROP_ROUNDTRIP_LIMIT,
122 PROP_MINIMUM_UPDATE_INTERVAL,
123 PROP_BUS,
124 PROP_BASE_TIME,
125 PROP_INTERNAL_CLOCK,
126 PROP_IS_NTP,
127 PROP_QOS_DSCP
128 };
129
130 struct _GstNetClientInternalClock
131 {
132 GstSystemClock clock;
133
134 GThread *thread;
135
136 GSocket *socket;
137 GSocketAddress *servaddr;
138 GCancellable *cancel;
139 gboolean made_cancel_fd;
140
141 GstClockTime timeout_expiration;
142 GstClockTime roundtrip_limit;
143 GstClockTime rtt_avg;
144 GstClockTime minimum_update_interval;
145 GstClockTime last_remote_poll_interval;
146 GstClockTime remote_avg_old;
147 guint skipped_updates;
148 GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
149 gint last_rtts_missing;
150
151 gchar *address;
152 gint port;
153 gboolean is_ntp;
154 gint qos_dscp;
155
156 /* Protected by OBJECT_LOCK */
157 GList *busses;
158 };
159
160 struct _GstNetClientInternalClockClass
161 {
162 GstSystemClockClass parent_class;
163 };
164
165 #define _do_init \
166 GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
167
168 G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock,
169 gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init);
170
171 static void gst_net_client_internal_clock_finalize (GObject * object);
172 static void gst_net_client_internal_clock_set_property (GObject * object,
173 guint prop_id, const GValue * value, GParamSpec * pspec);
174 static void gst_net_client_internal_clock_get_property (GObject * object,
175 guint prop_id, GValue * value, GParamSpec * pspec);
176 static void gst_net_client_internal_clock_constructed (GObject * object);
177
178 static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock *
179 self);
180 static void gst_net_client_internal_clock_stop (GstNetClientInternalClock *
181 self);
182
183 static void
gst_net_client_internal_clock_class_init(GstNetClientInternalClockClass * klass)184 gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass *
185 klass)
186 {
187 GObjectClass *gobject_class;
188
189 gobject_class = G_OBJECT_CLASS (klass);
190
191 gobject_class->finalize = gst_net_client_internal_clock_finalize;
192 gobject_class->get_property = gst_net_client_internal_clock_get_property;
193 gobject_class->set_property = gst_net_client_internal_clock_set_property;
194 gobject_class->constructed = gst_net_client_internal_clock_constructed;
195
196 g_object_class_install_property (gobject_class, PROP_ADDRESS,
197 g_param_spec_string ("address", "address",
198 "The IP address of the machine providing a time server",
199 DEFAULT_ADDRESS,
200 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
201 g_object_class_install_property (gobject_class, PROP_PORT,
202 g_param_spec_int ("port", "port",
203 "The port on which the remote server is listening", 0, G_MAXUINT16,
204 DEFAULT_PORT,
205 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
206 g_object_class_install_property (gobject_class, PROP_IS_NTP,
207 g_param_spec_boolean ("is-ntp", "Is NTP",
208 "The clock is using the NTPv4 protocol", FALSE,
209 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
210 }
211
212 static void
gst_net_client_internal_clock_init(GstNetClientInternalClock * self)213 gst_net_client_internal_clock_init (GstNetClientInternalClock * self)
214 {
215 GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
216
217 self->port = DEFAULT_PORT;
218 self->address = g_strdup (DEFAULT_ADDRESS);
219 self->is_ntp = FALSE;
220 self->qos_dscp = DEFAULT_QOS_DSCP;
221
222 gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT);
223
224 self->thread = NULL;
225
226 self->servaddr = NULL;
227 self->rtt_avg = GST_CLOCK_TIME_NONE;
228 self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
229 self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
230 self->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
231 self->skipped_updates = 0;
232 self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
233 self->remote_avg_old = 0;
234 }
235
236 static void
gst_net_client_internal_clock_finalize(GObject * object)237 gst_net_client_internal_clock_finalize (GObject * object)
238 {
239 GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
240
241 if (self->thread) {
242 gst_net_client_internal_clock_stop (self);
243 }
244
245 g_free (self->address);
246 self->address = NULL;
247
248 if (self->servaddr != NULL) {
249 g_object_unref (self->servaddr);
250 self->servaddr = NULL;
251 }
252
253 if (self->socket != NULL) {
254 if (!g_socket_close (self->socket, NULL))
255 GST_ERROR_OBJECT (self, "Failed to close socket");
256 g_object_unref (self->socket);
257 self->socket = NULL;
258 }
259
260 g_warn_if_fail (self->busses == NULL);
261
262 G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize
263 (object);
264 }
265
266 static void
gst_net_client_internal_clock_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)267 gst_net_client_internal_clock_set_property (GObject * object, guint prop_id,
268 const GValue * value, GParamSpec * pspec)
269 {
270 GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
271
272 switch (prop_id) {
273 case PROP_ADDRESS:
274 GST_OBJECT_LOCK (self);
275 g_free (self->address);
276 self->address = g_value_dup_string (value);
277 if (self->address == NULL)
278 self->address = g_strdup (DEFAULT_ADDRESS);
279 GST_OBJECT_UNLOCK (self);
280 break;
281 case PROP_PORT:
282 GST_OBJECT_LOCK (self);
283 self->port = g_value_get_int (value);
284 GST_OBJECT_UNLOCK (self);
285 break;
286 case PROP_IS_NTP:
287 GST_OBJECT_LOCK (self);
288 self->is_ntp = g_value_get_boolean (value);
289 GST_OBJECT_UNLOCK (self);
290 break;
291 default:
292 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
293 break;
294 }
295 }
296
297 static void
gst_net_client_internal_clock_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)298 gst_net_client_internal_clock_get_property (GObject * object, guint prop_id,
299 GValue * value, GParamSpec * pspec)
300 {
301 GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
302
303 switch (prop_id) {
304 case PROP_ADDRESS:
305 GST_OBJECT_LOCK (self);
306 g_value_set_string (value, self->address);
307 GST_OBJECT_UNLOCK (self);
308 break;
309 case PROP_PORT:
310 g_value_set_int (value, self->port);
311 break;
312 case PROP_IS_NTP:
313 g_value_set_boolean (value, self->is_ntp);
314 break;
315 default:
316 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
317 break;
318 }
319 }
320
321 static void
gst_net_client_internal_clock_constructed(GObject * object)322 gst_net_client_internal_clock_constructed (GObject * object)
323 {
324 GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
325
326 G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed
327 (object);
328
329 if (!gst_net_client_internal_clock_start (self)) {
330 g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self));
331 }
332
333 /* all systems go, cap'n */
334 }
335
336 static gint
compare_clock_time(const GstClockTime * a,const GstClockTime * b)337 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
338 {
339 if (*a < *b)
340 return -1;
341 else if (*a > *b)
342 return 1;
343 return 0;
344 }
345
346 static void
gst_net_client_internal_clock_observe_times(GstNetClientInternalClock * self,GstClockTime local_1,GstClockTime remote_1,GstClockTime remote_2,GstClockTime local_2)347 gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self,
348 GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2,
349 GstClockTime local_2)
350 {
351 GstClockTime current_timeout = 0;
352 GstClockTime local_avg, remote_avg;
353 gdouble r_squared;
354 GstClock *clock;
355 GstClockTime rtt, rtt_limit, min_update_interval;
356 /* Use for discont tracking */
357 GstClockTime time_before = 0;
358 GstClockTime min_guess = 0;
359 GstClockTimeDiff time_discont = 0;
360 gboolean synched, now_synched;
361 GstClockTime internal_time, external_time, rate_num, rate_den;
362 GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
363 orig_rate_den;
364 GstClockTime max_discont;
365 GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
366 GstClockTime median;
367 gint i;
368
369 GST_OBJECT_LOCK (self);
370 rtt_limit = self->roundtrip_limit;
371
372 GST_LOG_OBJECT (self,
373 "local1 %" G_GUINT64_FORMAT " remote1 %" G_GUINT64_FORMAT " remote2 %"
374 G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT, local_1, remote_1,
375 remote_2, local_2);
376
377 /* If the server told us a poll interval and it's bigger than the
378 * one configured via the property, use the server's */
379 if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
380 self->last_remote_poll_interval > self->minimum_update_interval)
381 min_update_interval = self->last_remote_poll_interval;
382 else
383 min_update_interval = self->minimum_update_interval;
384 GST_OBJECT_UNLOCK (self);
385
386 if (local_2 < local_1) {
387 GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
388 " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
389 GST_TIME_ARGS (local_2));
390 goto bogus_observation;
391 }
392
393 if (remote_2 < remote_1) {
394 GST_LOG_OBJECT (self,
395 "Dropping observation: remote receive time %" GST_TIME_FORMAT
396 " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (remote_1),
397 GST_TIME_ARGS (remote_2));
398 goto bogus_observation;
399 }
400
401 /* The round trip time is (assuming symmetric path delays)
402 * delta = (local_2 - local_1) - (remote_2 - remote_1)
403 */
404
405 rtt = GST_CLOCK_DIFF (local_1, local_2) - GST_CLOCK_DIFF (remote_1, remote_2);
406
407 if ((rtt_limit > 0) && (rtt > rtt_limit)) {
408 GST_LOG_OBJECT (self,
409 "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
410 GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
411 goto bogus_observation;
412 }
413
414 for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
415 self->last_rtts[i - 1] = self->last_rtts[i];
416 self->last_rtts[i - 1] = rtt;
417
418 if (self->last_rtts_missing) {
419 self->last_rtts_missing--;
420 } else {
421 memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts));
422 g_qsort_with_data (&last_rtts,
423 MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
424 (GCompareDataFunc) compare_clock_time, NULL);
425
426 median = last_rtts[MEDIAN_PRE_FILTERING_WINDOW / 2];
427
428 /* FIXME: We might want to use something else here, like only allowing
429 * things in the interquartile range, or also filtering away delays that
430 * are too small compared to the median. This here worked well enough
431 * in tests so far.
432 */
433 if (rtt > 2 * median) {
434 GST_LOG_OBJECT (self,
435 "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * median %"
436 GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (median));
437 goto bogus_observation;
438 }
439 }
440
441 /* Track an average round trip time, for a bit of smoothing */
442 /* Always update before discarding a sample, so genuine changes in
443 * the network get picked up, eventually */
444 if (self->rtt_avg == GST_CLOCK_TIME_NONE)
445 self->rtt_avg = rtt;
446 else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */
447 self->rtt_avg = (3 * self->rtt_avg + rtt) / 4;
448 else
449 self->rtt_avg = (15 * self->rtt_avg + rtt) / 16;
450
451 if (rtt > 2 * self->rtt_avg) {
452 GST_LOG_OBJECT (self,
453 "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
454 GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg));
455 goto bogus_observation;
456 }
457
458 /* The difference between the local and remote clock (again assuming
459 * symmetric path delays):
460 *
461 * local_1 + delta / 2 - remote_1 = theta
462 * or
463 * local_2 - delta / 2 - remote_2 = theta
464 *
465 * which gives after some simple algebraic transformations:
466 *
467 * (remote_1 - local_1) + (remote_2 - local_2)
468 * theta = -------------------------------------------
469 * 2
470 *
471 *
472 * Thus remote time at local_avg is equal to:
473 *
474 * local_avg + theta =
475 *
476 * local_1 + local_2 (remote_1 - local_1) + (remote_2 - local_2)
477 * ----------------- + -------------------------------------------
478 * 2 2
479 *
480 * =
481 *
482 * remote_1 + remote_2
483 * ------------------- = remote_avg
484 * 2
485 *
486 * We use this for our clock estimation, i.e. local_avg at remote clock
487 * being the same as remote_avg.
488 */
489
490 local_avg = (local_2 + local_1) / 2;
491 remote_avg = (remote_2 + remote_1) / 2;
492
493 GST_LOG_OBJECT (self,
494 "remoteavg %" G_GUINT64_FORMAT " localavg %" G_GUINT64_FORMAT,
495 remote_avg, local_avg);
496
497 clock = GST_CLOCK_CAST (self);
498
499 /* Store what the clock produced as 'now' before this update */
500 gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time,
501 &orig_external_time, &orig_rate_num, &orig_rate_den);
502 internal_time = orig_internal_time;
503 external_time = orig_external_time;
504 rate_num = orig_rate_num;
505 rate_den = orig_rate_den;
506
507 min_guess =
508 gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1,
509 internal_time, external_time, rate_num, rate_den);
510 time_before =
511 gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
512 internal_time, external_time, rate_num, rate_den);
513
514 /* Maximum discontinuity, when we're synched with the master. Could make this a property,
515 * but this value seems to work fine */
516 max_discont = self->rtt_avg / 4;
517
518 /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */
519 synched =
520 (GST_CLOCK_DIFF (remote_avg, min_guess) < (GstClockTimeDiff) (max_discont)
521 && GST_CLOCK_DIFF (time_before,
522 remote_avg) < (GstClockTimeDiff) (max_discont));
523
524 /* Check if new remote_avg is less than before to detect if signal lost
525 * sync due to the remote clock has restarted. Then the new remote time will
526 * be less than the previous time which should not happen if increased in a
527 * monotonic way. Also, only perform this check on a synchronized clock to
528 * avoid startup issues.
529 */
530 if (synched) {
531 if (remote_avg < self->remote_avg_old) {
532 gst_clock_set_synced (GST_CLOCK (self), FALSE);
533 } else {
534 self->remote_avg_old = remote_avg;
535 }
536 }
537
538 if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self),
539 local_avg, remote_avg, &r_squared, &internal_time, &external_time,
540 &rate_num, &rate_den)) {
541
542 /* Now compare the difference (discont) in the clock
543 * after this observation */
544 time_discont = GST_CLOCK_DIFF (time_before,
545 gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
546 internal_time, external_time, rate_num, rate_den));
547
548 /* If we were in sync with the remote clock, clamp the allowed
549 * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
550 * of remote time correctly windowed the actual remote time observation */
551 if (synched && ABS (time_discont) > max_discont) {
552 GstClockTimeDiff offset;
553 GST_DEBUG_OBJECT (clock,
554 "Too large a discont, clamping to 1/4 average RTT = %"
555 GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
556 if (time_discont > 0) { /* Too large a forward step - add a -ve offset */
557 offset = max_discont - time_discont;
558 if (-offset > external_time)
559 external_time = 0;
560 else
561 external_time += offset;
562 } else { /* Too large a backward step - add a +ve offset */
563 offset = -(max_discont + time_discont);
564 external_time += offset;
565 }
566
567 time_discont += offset;
568 }
569
570 /* Check if the new clock params would have made our observation within range */
571 now_synched =
572 (GST_CLOCK_DIFF (remote_avg,
573 gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self),
574 local_1, internal_time, external_time, rate_num,
575 rate_den)) < (GstClockTimeDiff) (max_discont))
576 &&
577 (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration
578 (GST_CLOCK_CAST (self), local_2, internal_time, external_time,
579 rate_num, rate_den),
580 remote_avg) < (GstClockTimeDiff) (max_discont));
581
582 /* Only update the clock if we had synch or just gained it */
583 if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) {
584 gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time,
585 external_time, rate_num, rate_den);
586 /* ghetto formula - shorter timeout for bad correlations */
587 current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
588 current_timeout =
589 MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self)));
590 self->skipped_updates = 0;
591
592 /* FIXME: When do we consider the clock absolutely not synced anymore? */
593 gst_clock_set_synced (GST_CLOCK (self), TRUE);
594 } else {
595 /* Restore original calibration vars for the report, we're not changing the clock */
596 internal_time = orig_internal_time;
597 external_time = orig_external_time;
598 rate_num = orig_rate_num;
599 rate_den = orig_rate_den;
600 time_discont = 0;
601 self->skipped_updates++;
602 }
603 }
604
605 /* Limit the polling to at most one per minimum_update_interval */
606 if (rtt < min_update_interval)
607 current_timeout = MAX (min_update_interval - rtt, current_timeout);
608
609 GST_OBJECT_LOCK (self);
610 if (self->busses) {
611 GstStructure *s;
612 GstMessage *msg;
613 GList *l;
614
615 /* Output a stats message, whether we updated the clock or not */
616 s = gst_structure_new ("gst-netclock-statistics",
617 "synchronised", G_TYPE_BOOLEAN, synched,
618 "rtt", G_TYPE_UINT64, rtt,
619 "rtt-average", G_TYPE_UINT64, self->rtt_avg,
620 "local", G_TYPE_UINT64, local_avg,
621 "remote", G_TYPE_UINT64, remote_avg,
622 "discontinuity", G_TYPE_INT64, time_discont,
623 "remote-min-estimate", G_TYPE_UINT64, min_guess,
624 "remote-max-estimate", G_TYPE_UINT64, time_before,
625 "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote_avg,
626 min_guess), "remote-max-error", G_TYPE_INT64,
627 GST_CLOCK_DIFF (remote_avg, time_before), "request-send", G_TYPE_UINT64,
628 local_1, "request-receive", G_TYPE_UINT64, local_2, "r-squared",
629 G_TYPE_DOUBLE, r_squared, "timeout", G_TYPE_UINT64, current_timeout,
630 "internal-time", G_TYPE_UINT64, internal_time, "external-time",
631 G_TYPE_UINT64, external_time, "rate-num", G_TYPE_UINT64, rate_num,
632 "rate-den", G_TYPE_UINT64, rate_den, "rate", G_TYPE_DOUBLE,
633 (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64,
634 GST_CLOCK_DIFF (internal_time, external_time), NULL);
635 msg = gst_message_new_element (GST_OBJECT (self), s);
636
637 for (l = self->busses; l; l = l->next)
638 gst_bus_post (l->data, gst_message_ref (msg));
639 gst_message_unref (msg);
640 }
641 GST_OBJECT_UNLOCK (self);
642
643 GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
644 self->timeout_expiration = gst_util_get_timestamp () + current_timeout;
645
646 return;
647
648 bogus_observation:
649 /* Schedule a new packet again soon */
650 self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
651 return;
652 }
653
654 static gpointer
gst_net_client_internal_clock_thread(gpointer data)655 gst_net_client_internal_clock_thread (gpointer data)
656 {
657 GstNetClientInternalClock *self = data;
658 GSocket *socket = self->socket;
659 GError *err = NULL;
660 gint cur_qos_dscp = DEFAULT_QOS_DSCP;
661
662 GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
663
664 g_socket_set_blocking (socket, TRUE);
665 g_socket_set_timeout (socket, 0);
666
667 while (!g_cancellable_is_cancelled (self->cancel)) {
668 GstClockTime expiration_time = self->timeout_expiration;
669 GstClockTime now = gst_util_get_timestamp ();
670 gint64 socket_timeout;
671
672 if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
673 socket_timeout = 0;
674 } else {
675 socket_timeout = (expiration_time - now) / GST_USECOND;
676 }
677
678 GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
679
680 if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
681 self->cancel, &err)) {
682 /* cancelled, timeout or error */
683 if (err->code == G_IO_ERROR_CANCELLED) {
684 GST_INFO_OBJECT (self, "cancelled");
685 g_clear_error (&err);
686 break;
687 } else if (err->code == G_IO_ERROR_TIMED_OUT) {
688 gint new_qos_dscp;
689
690 /* timed out, let's send another packet */
691 GST_DEBUG_OBJECT (self, "timed out");
692
693 /* before next sending check if need to change QoS */
694 new_qos_dscp = self->qos_dscp;
695 if (cur_qos_dscp != new_qos_dscp &&
696 gst_net_utils_set_socket_tos (socket, new_qos_dscp)) {
697 GST_DEBUG_OBJECT (self, "changed QoS DSCP to: %d", new_qos_dscp);
698 cur_qos_dscp = new_qos_dscp;
699 }
700
701 if (self->is_ntp) {
702 GstNtpPacket *packet;
703
704 packet = gst_ntp_packet_new (NULL, NULL);
705
706 packet->transmit_time =
707 gst_clock_get_internal_time (GST_CLOCK_CAST (self));
708
709 GST_DEBUG_OBJECT (self,
710 "sending packet, local time = %" GST_TIME_FORMAT,
711 GST_TIME_ARGS (packet->transmit_time));
712
713 gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL);
714
715 g_free (packet);
716 } else {
717 GstNetTimePacket *packet;
718
719 packet = gst_net_time_packet_new (NULL);
720
721 packet->local_time =
722 gst_clock_get_internal_time (GST_CLOCK_CAST (self));
723
724 GST_DEBUG_OBJECT (self,
725 "sending packet, local time = %" GST_TIME_FORMAT,
726 GST_TIME_ARGS (packet->local_time));
727
728 gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL);
729
730 g_free (packet);
731 }
732
733 /* reset timeout (but are expecting a response sooner anyway) */
734 self->timeout_expiration =
735 gst_util_get_timestamp () +
736 gst_clock_get_timeout (GST_CLOCK_CAST (self));
737 } else {
738 GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
739 g_usleep (G_USEC_PER_SEC / 10); /* throttle */
740 }
741 g_clear_error (&err);
742 } else {
743 GstClockTime new_local;
744
745 /* got packet */
746
747 new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self));
748
749 if (self->is_ntp) {
750 GstNtpPacket *packet;
751
752 packet = gst_ntp_packet_receive (socket, NULL, &err);
753
754 if (packet != NULL) {
755 GST_LOG_OBJECT (self, "got packet back");
756 GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
757 GST_TIME_ARGS (packet->origin_time));
758 GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
759 GST_TIME_ARGS (packet->receive_time));
760 GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
761 GST_TIME_ARGS (packet->transmit_time));
762 GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
763 GST_TIME_ARGS (new_local));
764 GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
765 GST_TIME_ARGS (packet->poll_interval));
766
767 /* Remember the last poll interval we ever got from the server */
768 if (packet->poll_interval != GST_CLOCK_TIME_NONE)
769 self->last_remote_poll_interval = packet->poll_interval;
770
771 /* observe_times will reset the timeout */
772 gst_net_client_internal_clock_observe_times (self,
773 packet->origin_time, packet->receive_time, packet->transmit_time,
774 new_local);
775
776 g_free (packet);
777 } else if (err != NULL) {
778 if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
779 || g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
780 GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
781 g_clear_error (&err);
782 break;
783 } else if (g_error_matches (err, GST_NTP_ERROR,
784 GST_NTP_ERROR_KOD_RATE)) {
785 GST_WARNING_OBJECT (self, "need to limit rate");
786
787 /* If the server did not tell us a poll interval before, double
788 * our minimum poll interval. Otherwise we assume that the server
789 * already told us something sensible and that this error here
790 * was just a spurious error */
791 if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
792 self->minimum_update_interval *= 2;
793
794 /* And wait a bit before we send the next packet instead of
795 * sending it immediately */
796 self->timeout_expiration =
797 gst_util_get_timestamp () +
798 gst_clock_get_timeout (GST_CLOCK_CAST (self));
799 } else {
800 GST_WARNING_OBJECT (self, "receive error: %s", err->message);
801 }
802 g_clear_error (&err);
803 }
804 } else {
805 GstNetTimePacket *packet;
806
807 packet = gst_net_time_packet_receive (socket, NULL, &err);
808
809 if (packet != NULL) {
810 GST_LOG_OBJECT (self, "got packet back");
811 GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
812 GST_TIME_ARGS (packet->local_time));
813 GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
814 GST_TIME_ARGS (packet->remote_time));
815 GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
816 GST_TIME_ARGS (new_local));
817
818 /* observe_times will reset the timeout */
819 gst_net_client_internal_clock_observe_times (self, packet->local_time,
820 packet->remote_time, packet->remote_time, new_local);
821
822 g_free (packet);
823 } else if (err != NULL) {
824 GST_WARNING_OBJECT (self, "receive error: %s", err->message);
825 g_clear_error (&err);
826 }
827 }
828 }
829 }
830 GST_INFO_OBJECT (self, "shutting down net client clock thread");
831 return NULL;
832 }
833
834 static gboolean
gst_net_client_internal_clock_start(GstNetClientInternalClock * self)835 gst_net_client_internal_clock_start (GstNetClientInternalClock * self)
836 {
837 GSocketAddress *servaddr;
838 GSocketAddress *myaddr;
839 GSocketAddress *anyaddr;
840 GInetAddress *inetaddr;
841 GSocket *socket;
842 GError *error = NULL;
843 GSocketFamily family;
844 GPollFD dummy_pollfd;
845 GResolver *resolver = NULL;
846 GError *err = NULL;
847
848 g_return_val_if_fail (self->address != NULL, FALSE);
849 g_return_val_if_fail (self->servaddr == NULL, FALSE);
850
851 /* create target address */
852 inetaddr = g_inet_address_new_from_string (self->address);
853 if (inetaddr == NULL) {
854 GList *results;
855
856 resolver = g_resolver_get_default ();
857
858 results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err);
859 if (!results)
860 goto failed_to_resolve;
861
862 inetaddr = G_INET_ADDRESS (g_object_ref (results->data));
863 g_resolver_free_addresses (results);
864 g_object_unref (resolver);
865 }
866
867 family = g_inet_address_get_family (inetaddr);
868
869 servaddr = g_inet_socket_address_new (inetaddr, self->port);
870 g_object_unref (inetaddr);
871
872 g_assert (servaddr != NULL);
873
874 GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
875 self->port);
876
877 socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
878 G_SOCKET_PROTOCOL_UDP, &error);
879
880 if (socket == NULL)
881 goto no_socket;
882
883 GST_DEBUG_OBJECT (self, "binding socket");
884 inetaddr = g_inet_address_new_any (family);
885 anyaddr = g_inet_socket_address_new (inetaddr, 0);
886 g_socket_bind (socket, anyaddr, TRUE, &error);
887 g_object_unref (anyaddr);
888 g_object_unref (inetaddr);
889
890 if (error != NULL)
891 goto bind_error;
892
893 /* check address we're bound to, mostly for debugging purposes */
894 myaddr = g_socket_get_local_address (socket, &error);
895
896 if (myaddr == NULL)
897 goto getsockname_error;
898
899 GST_DEBUG_OBJECT (self, "socket opened on UDP port %d",
900 g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
901
902 g_object_unref (myaddr);
903
904 self->cancel = g_cancellable_new ();
905 self->made_cancel_fd =
906 g_cancellable_make_pollfd (self->cancel, &dummy_pollfd);
907
908 self->socket = socket;
909 self->servaddr = G_SOCKET_ADDRESS (servaddr);
910
911 self->thread = g_thread_try_new ("GstNetClientInternalClock",
912 gst_net_client_internal_clock_thread, self, &error);
913
914 if (error != NULL)
915 goto no_thread;
916
917 return TRUE;
918
919 /* ERRORS */
920 no_socket:
921 {
922 GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
923 g_error_free (error);
924 return FALSE;
925 }
926 bind_error:
927 {
928 GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
929 g_error_free (error);
930 g_object_unref (socket);
931 return FALSE;
932 }
933 getsockname_error:
934 {
935 GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
936 g_error_free (error);
937 g_object_unref (socket);
938 return FALSE;
939 }
940 failed_to_resolve:
941 {
942 GST_ERROR_OBJECT (self, "resolving '%s' failed: %s",
943 self->address, err->message);
944 g_clear_error (&err);
945 g_object_unref (resolver);
946 return FALSE;
947 }
948 no_thread:
949 {
950 GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
951 g_object_unref (self->servaddr);
952 self->servaddr = NULL;
953 g_object_unref (self->socket);
954 self->socket = NULL;
955 g_error_free (error);
956 return FALSE;
957 }
958 }
959
960 static void
gst_net_client_internal_clock_stop(GstNetClientInternalClock * self)961 gst_net_client_internal_clock_stop (GstNetClientInternalClock * self)
962 {
963 if (self->thread == NULL)
964 return;
965
966 GST_INFO_OBJECT (self, "stopping...");
967 g_cancellable_cancel (self->cancel);
968
969 g_thread_join (self->thread);
970 self->thread = NULL;
971
972 if (self->made_cancel_fd)
973 g_cancellable_release_fd (self->cancel);
974
975 g_object_unref (self->cancel);
976 self->cancel = NULL;
977
978 g_object_unref (self->servaddr);
979 self->servaddr = NULL;
980
981 g_object_unref (self->socket);
982 self->socket = NULL;
983
984 GST_INFO_OBJECT (self, "stopped");
985 }
986
987 struct _GstNetClientClockPrivate
988 {
989 GstClock *internal_clock;
990
991 GstClockTime roundtrip_limit;
992 GstClockTime minimum_update_interval;
993
994 GstClockTime base_time, internal_base_time;
995
996 gchar *address;
997 gint port;
998 gint qos_dscp;
999
1000 GstBus *bus;
1001
1002 gboolean is_ntp;
1003
1004 gulong synced_id;
1005 };
1006
1007 G_DEFINE_TYPE_WITH_PRIVATE (GstNetClientClock, gst_net_client_clock,
1008 GST_TYPE_SYSTEM_CLOCK);
1009
1010 static void gst_net_client_clock_finalize (GObject * object);
1011 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
1012 const GValue * value, GParamSpec * pspec);
1013 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
1014 GValue * value, GParamSpec * pspec);
1015 static void gst_net_client_clock_constructed (GObject * object);
1016
1017 static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock);
1018
1019 static void
gst_net_client_clock_class_init(GstNetClientClockClass * klass)1020 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
1021 {
1022 GObjectClass *gobject_class;
1023 GstClockClass *clock_class;
1024
1025 gobject_class = G_OBJECT_CLASS (klass);
1026 clock_class = GST_CLOCK_CLASS (klass);
1027
1028 gobject_class->finalize = gst_net_client_clock_finalize;
1029 gobject_class->get_property = gst_net_client_clock_get_property;
1030 gobject_class->set_property = gst_net_client_clock_set_property;
1031 gobject_class->constructed = gst_net_client_clock_constructed;
1032
1033 g_object_class_install_property (gobject_class, PROP_ADDRESS,
1034 g_param_spec_string ("address", "address",
1035 "The IP address of the machine providing a time server",
1036 DEFAULT_ADDRESS,
1037 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1038 g_object_class_install_property (gobject_class, PROP_PORT,
1039 g_param_spec_int ("port", "port",
1040 "The port on which the remote server is listening", 0, G_MAXUINT16,
1041 DEFAULT_PORT,
1042 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1043 g_object_class_install_property (gobject_class, PROP_BUS,
1044 g_param_spec_object ("bus", "bus",
1045 "A GstBus on which to send clock status information", GST_TYPE_BUS,
1046 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1047
1048 /**
1049 * GstNetClientInternalClock::round-trip-limit:
1050 *
1051 * Maximum allowed round-trip for packets. If this property is set to a nonzero
1052 * value, all packets with a round-trip interval larger than this limit will be
1053 * ignored. This is useful for networks with severe and fluctuating transport
1054 * delays. Filtering out these packets increases stability of the synchronization.
1055 * On the other hand, the lower the limit, the higher the amount of filtered
1056 * packets. Empirical tests are typically necessary to estimate a good value
1057 * for the limit.
1058 * If the property is set to zero, the limit is disabled.
1059 *
1060 * Since: 1.4
1061 */
1062 g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
1063 g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
1064 "Maximum tolerable round-trip interval for packets, in nanoseconds "
1065 "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
1066 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1067
1068 g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
1069 g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
1070 "Minimum polling interval for packets, in nanoseconds"
1071 "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
1072 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1073
1074 g_object_class_install_property (gobject_class, PROP_BASE_TIME,
1075 g_param_spec_uint64 ("base-time", "Base Time",
1076 "Initial time that is reported before synchronization", 0,
1077 G_MAXUINT64, DEFAULT_BASE_TIME,
1078 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1079
1080 g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
1081 g_param_spec_object ("internal-clock", "Internal Clock",
1082 "Internal clock that directly slaved to the remote clock",
1083 GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1084
1085 g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
1086 g_param_spec_int ("qos-dscp", "QoS diff srv code point",
1087 "Quality of Service, differentiated services code point (-1 default)",
1088 -1, 63, DEFAULT_QOS_DSCP,
1089 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1090
1091 clock_class->get_internal_time = gst_net_client_clock_get_internal_time;
1092 }
1093
1094 static void
gst_net_client_clock_init(GstNetClientClock * self)1095 gst_net_client_clock_init (GstNetClientClock * self)
1096 {
1097 GstNetClientClockPrivate *priv;
1098 GstClock *clock;
1099
1100 self->priv = priv = gst_net_client_clock_get_instance_private (self);
1101
1102 GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
1103 GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
1104
1105 priv->port = DEFAULT_PORT;
1106 priv->address = g_strdup (DEFAULT_ADDRESS);
1107 priv->qos_dscp = DEFAULT_QOS_DSCP;
1108
1109 priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
1110 priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
1111
1112 clock = gst_system_clock_obtain ();
1113 priv->base_time = DEFAULT_BASE_TIME;
1114 priv->internal_base_time = gst_clock_get_time (clock);
1115 gst_object_unref (clock);
1116 }
1117
1118 /* Must be called with clocks_lock */
1119 static void
update_clock_cache(ClockCache * cache)1120 update_clock_cache (ClockCache * cache)
1121 {
1122 GstClockTime roundtrip_limit = 0, minimum_update_interval = 0;
1123 GList *l, *busses = NULL;
1124 gint qos_dscp = DEFAULT_QOS_DSCP;
1125
1126 GST_OBJECT_LOCK (cache->clock);
1127 g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses,
1128 (GDestroyNotify) gst_object_unref);
1129
1130 for (l = cache->clocks; l; l = l->next) {
1131 GstNetClientClock *clock = l->data;
1132
1133 if (clock->priv->bus)
1134 busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus));
1135
1136 if (roundtrip_limit == 0)
1137 roundtrip_limit = clock->priv->roundtrip_limit;
1138 else
1139 roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit);
1140
1141 if (minimum_update_interval == 0)
1142 minimum_update_interval = clock->priv->minimum_update_interval;
1143 else
1144 minimum_update_interval =
1145 MIN (minimum_update_interval, clock->priv->minimum_update_interval);
1146
1147 qos_dscp = MAX (qos_dscp, clock->priv->qos_dscp);
1148 }
1149 GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses;
1150 GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit =
1151 roundtrip_limit;
1152 GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval =
1153 minimum_update_interval;
1154 GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->qos_dscp = qos_dscp;
1155
1156 GST_OBJECT_UNLOCK (cache->clock);
1157 }
1158
1159 static gboolean
remove_clock_cache(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)1160 remove_clock_cache (GstClock * clock, GstClockTime time, GstClockID id,
1161 gpointer user_data)
1162 {
1163 ClockCache *cache = user_data;
1164
1165 G_LOCK (clocks_lock);
1166 if (!cache->clocks) {
1167 gst_clock_id_unref (cache->remove_id);
1168 gst_object_unref (cache->clock);
1169 clocks = g_list_remove (clocks, cache);
1170 g_free (cache);
1171 }
1172 G_UNLOCK (clocks_lock);
1173
1174 return TRUE;
1175 }
1176
1177 static void
gst_net_client_clock_finalize(GObject * object)1178 gst_net_client_clock_finalize (GObject * object)
1179 {
1180 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1181 GList *l;
1182
1183 if (self->priv->synced_id)
1184 g_signal_handler_disconnect (self->priv->internal_clock,
1185 self->priv->synced_id);
1186 self->priv->synced_id = 0;
1187
1188 G_LOCK (clocks_lock);
1189 for (l = clocks; l; l = l->next) {
1190 ClockCache *cache = l->data;
1191
1192 if (cache->clock == self->priv->internal_clock) {
1193 cache->clocks = g_list_remove (cache->clocks, self);
1194
1195 if (cache->clocks) {
1196 update_clock_cache (cache);
1197 } else {
1198 GstClock *sysclock = gst_system_clock_obtain ();
1199 GstClockTime time = gst_clock_get_time (sysclock) + 60 * GST_SECOND;
1200
1201 cache->remove_id = gst_clock_new_single_shot_id (sysclock, time);
1202 gst_clock_id_wait_async (cache->remove_id, remove_clock_cache, cache,
1203 NULL);
1204 gst_object_unref (sysclock);
1205 }
1206 break;
1207 }
1208 }
1209 G_UNLOCK (clocks_lock);
1210
1211 g_free (self->priv->address);
1212 self->priv->address = NULL;
1213
1214 if (self->priv->bus != NULL) {
1215 gst_object_unref (self->priv->bus);
1216 self->priv->bus = NULL;
1217 }
1218
1219 G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object);
1220 }
1221
1222 static void
gst_net_client_clock_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1223 gst_net_client_clock_set_property (GObject * object, guint prop_id,
1224 const GValue * value, GParamSpec * pspec)
1225 {
1226 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1227 gboolean update = FALSE;
1228
1229 switch (prop_id) {
1230 case PROP_ADDRESS:
1231 GST_OBJECT_LOCK (self);
1232 g_free (self->priv->address);
1233 self->priv->address = g_value_dup_string (value);
1234 if (self->priv->address == NULL)
1235 self->priv->address = g_strdup (DEFAULT_ADDRESS);
1236 GST_OBJECT_UNLOCK (self);
1237 break;
1238 case PROP_PORT:
1239 GST_OBJECT_LOCK (self);
1240 self->priv->port = g_value_get_int (value);
1241 GST_OBJECT_UNLOCK (self);
1242 break;
1243 case PROP_ROUNDTRIP_LIMIT:
1244 GST_OBJECT_LOCK (self);
1245 self->priv->roundtrip_limit = g_value_get_uint64 (value);
1246 GST_OBJECT_UNLOCK (self);
1247 update = TRUE;
1248 break;
1249 case PROP_MINIMUM_UPDATE_INTERVAL:
1250 GST_OBJECT_LOCK (self);
1251 self->priv->minimum_update_interval = g_value_get_uint64 (value);
1252 GST_OBJECT_UNLOCK (self);
1253 update = TRUE;
1254 break;
1255 case PROP_BUS:
1256 GST_OBJECT_LOCK (self);
1257 if (self->priv->bus)
1258 gst_object_unref (self->priv->bus);
1259 self->priv->bus = g_value_dup_object (value);
1260 GST_OBJECT_UNLOCK (self);
1261 update = TRUE;
1262 break;
1263 case PROP_BASE_TIME:{
1264 GstClock *clock;
1265
1266 self->priv->base_time = g_value_get_uint64 (value);
1267 clock = gst_system_clock_obtain ();
1268 self->priv->internal_base_time = gst_clock_get_time (clock);
1269 gst_object_unref (clock);
1270 break;
1271 }
1272 case PROP_QOS_DSCP:
1273 GST_OBJECT_LOCK (self);
1274 self->priv->qos_dscp = g_value_get_int (value);
1275 GST_OBJECT_UNLOCK (self);
1276 update = TRUE;
1277 break;
1278 default:
1279 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1280 break;
1281 }
1282
1283 if (update && self->priv->internal_clock) {
1284 GList *l;
1285
1286 G_LOCK (clocks_lock);
1287 for (l = clocks; l; l = l->next) {
1288 ClockCache *cache = l->data;
1289
1290 if (cache->clock == self->priv->internal_clock) {
1291 update_clock_cache (cache);
1292 }
1293 }
1294 G_UNLOCK (clocks_lock);
1295 }
1296 }
1297
1298 static void
gst_net_client_clock_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1299 gst_net_client_clock_get_property (GObject * object, guint prop_id,
1300 GValue * value, GParamSpec * pspec)
1301 {
1302 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1303
1304 switch (prop_id) {
1305 case PROP_ADDRESS:
1306 GST_OBJECT_LOCK (self);
1307 g_value_set_string (value, self->priv->address);
1308 GST_OBJECT_UNLOCK (self);
1309 break;
1310 case PROP_PORT:
1311 g_value_set_int (value, self->priv->port);
1312 break;
1313 case PROP_ROUNDTRIP_LIMIT:
1314 GST_OBJECT_LOCK (self);
1315 g_value_set_uint64 (value, self->priv->roundtrip_limit);
1316 GST_OBJECT_UNLOCK (self);
1317 break;
1318 case PROP_MINIMUM_UPDATE_INTERVAL:
1319 GST_OBJECT_LOCK (self);
1320 g_value_set_uint64 (value, self->priv->minimum_update_interval);
1321 GST_OBJECT_UNLOCK (self);
1322 break;
1323 case PROP_BUS:
1324 GST_OBJECT_LOCK (self);
1325 g_value_set_object (value, self->priv->bus);
1326 GST_OBJECT_UNLOCK (self);
1327 break;
1328 case PROP_BASE_TIME:
1329 g_value_set_uint64 (value, self->priv->base_time);
1330 break;
1331 case PROP_INTERNAL_CLOCK:
1332 g_value_set_object (value, self->priv->internal_clock);
1333 break;
1334 case PROP_QOS_DSCP:
1335 GST_OBJECT_LOCK (self);
1336 g_value_set_int (value, self->priv->qos_dscp);
1337 GST_OBJECT_UNLOCK (self);
1338 break;
1339 default:
1340 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1341 break;
1342 }
1343 }
1344
1345 static void
gst_net_client_clock_synced_cb(GstClock * internal_clock,gboolean synced,GstClock * self)1346 gst_net_client_clock_synced_cb (GstClock * internal_clock, gboolean synced,
1347 GstClock * self)
1348 {
1349 gst_clock_set_synced (self, synced);
1350 }
1351
1352 static void
gst_net_client_clock_constructed(GObject * object)1353 gst_net_client_clock_constructed (GObject * object)
1354 {
1355 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1356 GstClock *internal_clock;
1357 GList *l;
1358 ClockCache *cache = NULL;
1359
1360 G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object);
1361
1362 G_LOCK (clocks_lock);
1363 for (l = clocks; l; l = l->next) {
1364 ClockCache *tmp = l->data;
1365 GstNetClientInternalClock *internal_clock =
1366 GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock);
1367
1368 if (strcmp (internal_clock->address, self->priv->address) == 0 &&
1369 internal_clock->port == self->priv->port) {
1370 cache = tmp;
1371
1372 if (cache->remove_id) {
1373 gst_clock_id_unschedule (cache->remove_id);
1374 cache->remove_id = NULL;
1375 }
1376 break;
1377 }
1378 }
1379
1380 if (!cache) {
1381 cache = g_new0 (ClockCache, 1);
1382
1383 cache->clock =
1384 g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address",
1385 self->priv->address, "port", self->priv->port, "is-ntp",
1386 self->priv->is_ntp, NULL);
1387 gst_object_ref_sink (cache->clock);
1388 clocks = g_list_prepend (clocks, cache);
1389
1390 /* Not actually leaked but is cached for a while before being disposed,
1391 * see gst_net_client_clock_finalize, so pretend it is to not confuse
1392 * tests. */
1393 GST_OBJECT_FLAG_SET (cache->clock, GST_OBJECT_FLAG_MAY_BE_LEAKED);
1394 }
1395
1396 cache->clocks = g_list_prepend (cache->clocks, self);
1397
1398 GST_OBJECT_LOCK (cache->clock);
1399 if (gst_clock_is_synced (cache->clock))
1400 gst_clock_set_synced (GST_CLOCK (self), TRUE);
1401 self->priv->synced_id =
1402 g_signal_connect (cache->clock, "synced",
1403 G_CALLBACK (gst_net_client_clock_synced_cb), self);
1404 GST_OBJECT_UNLOCK (cache->clock);
1405
1406 G_UNLOCK (clocks_lock);
1407
1408 self->priv->internal_clock = internal_clock = cache->clock;
1409
1410 /* all systems go, cap'n */
1411 }
1412
1413 static GstClockTime
gst_net_client_clock_get_internal_time(GstClock * clock)1414 gst_net_client_clock_get_internal_time (GstClock * clock)
1415 {
1416 GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock);
1417
1418 if (!gst_clock_is_synced (self->priv->internal_clock)) {
1419 GstClockTime now = gst_clock_get_internal_time (self->priv->internal_clock);
1420 return gst_clock_adjust_with_calibration (self->priv->internal_clock, now,
1421 self->priv->internal_base_time, self->priv->base_time, 1, 1);
1422 }
1423
1424 return gst_clock_get_time (self->priv->internal_clock);
1425 }
1426
1427 /**
1428 * gst_net_client_clock_new:
1429 * @name: a name for the clock
1430 * @remote_address: the address or hostname of the remote clock provider
1431 * @remote_port: the port of the remote clock provider
1432 * @base_time: initial time of the clock
1433 *
1434 * Create a new #GstNetClientClock that will report the time
1435 * provided by the #GstNetTimeProvider on @remote_address and
1436 * @remote_port.
1437 *
1438 * Returns: (transfer full): a new #GstClock that receives a time from the remote
1439 * clock.
1440 */
1441 GstClock *
gst_net_client_clock_new(const gchar * name,const gchar * remote_address,gint remote_port,GstClockTime base_time)1442 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
1443 gint remote_port, GstClockTime base_time)
1444 {
1445 GstClock *ret;
1446
1447 g_return_val_if_fail (remote_address != NULL, NULL);
1448 g_return_val_if_fail (remote_port > 0, NULL);
1449 g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1450 g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1451
1452 ret =
1453 g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "name", name, "address",
1454 remote_address, "port", remote_port, "base-time", base_time, NULL);
1455
1456 /* Clear floating flag */
1457 gst_object_ref_sink (ret);
1458
1459 return ret;
1460 }
1461
1462 G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK);
1463
1464 static void
gst_ntp_clock_class_init(GstNtpClockClass * klass)1465 gst_ntp_clock_class_init (GstNtpClockClass * klass)
1466 {
1467 }
1468
1469 static void
gst_ntp_clock_init(GstNtpClock * self)1470 gst_ntp_clock_init (GstNtpClock * self)
1471 {
1472 GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
1473 }
1474
1475 /**
1476 * gst_ntp_clock_new:
1477 * @name: a name for the clock
1478 * @remote_address: the address or hostname of the remote clock provider
1479 * @remote_port: the port of the remote clock provider
1480 * @base_time: initial time of the clock
1481 *
1482 * Create a new #GstNtpClock that will report the time provided by
1483 * the NTPv4 server on @remote_address and @remote_port.
1484 *
1485 * Returns: (transfer full): a new #GstClock that receives a time from the remote
1486 * clock.
1487 *
1488 * Since: 1.6
1489 */
1490 GstClock *
gst_ntp_clock_new(const gchar * name,const gchar * remote_address,gint remote_port,GstClockTime base_time)1491 gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
1492 gint remote_port, GstClockTime base_time)
1493 {
1494 GstClock *ret;
1495
1496 g_return_val_if_fail (remote_address != NULL, NULL);
1497 g_return_val_if_fail (remote_port > 0, NULL);
1498 g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1499 g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1500
1501 ret =
1502 g_object_new (GST_TYPE_NTP_CLOCK, "name", name, "address", remote_address,
1503 "port", remote_port, "base-time", base_time, NULL);
1504
1505 gst_object_ref_sink (ret);
1506
1507 return ret;
1508 }
1509