• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2005 Andy Wingo <wingo@pobox.com>
5  * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
6  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
7  *
8  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
9  * the network
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION:gstnetclientclock
28  * @title: GstNetClientClock
29  * @short_description: Special clock that synchronizes to a remote time
30  *                     provider.
31  * @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
32  *
33  * #GstNetClientClock implements a custom #GstClock that synchronizes its time
34  * to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
35  * implements a #GstClock that synchronizes its time to a remote NTPv4 server.
36  *
37  * A new clock is created with gst_net_client_clock_new() or
38  * gst_ntp_clock_new(), which takes the address and port of the remote time
39  * provider along with a name and an initial time.
40  *
41  * This clock will poll the time provider and will update its calibration
42  * parameters based on the local and remote observations.
43  *
44  * The "round-trip" property limits the maximum round trip packets can take.
45  *
46  * Various parameters of the clock can be configured with the parent #GstClock
47  * "timeout", "window-size" and "window-threshold" object properties.
48  *
49  * A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
50  * gst_pipeline_use_clock().
51  *
52  * If you set a #GstBus on the clock via the "bus" object property, it will
53  * send @GST_MESSAGE_ELEMENT messages with an attached #GstStructure containing
54  * statistics about clock accuracy and network traffic.
55  */
56 
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
60 
61 #include "gstnettimepacket.h"
62 #include "gstntppacket.h"
63 #include "gstnetclientclock.h"
64 #include "gstnetutils.h"
65 
66 #include <gio/gio.h>
67 
68 #include <string.h>
69 
70 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
71 #define GST_CAT_DEFAULT (ncc_debug)
72 
73 typedef struct
74 {
75   GstClock *clock;              /* GstNetClientInternalClock */
76 
77   GList *clocks;                /* GstNetClientClocks */
78 
79   GstClockID remove_id;
80 } ClockCache;
81 
82 G_LOCK_DEFINE_STATIC (clocks_lock);
83 static GList *clocks = NULL;
84 
85 #define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \
86   (gst_net_client_internal_clock_get_type())
87 #define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \
88   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock))
89 #define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
90   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass))
91 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \
92   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
93 #define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \
94   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK))
95 
96 typedef struct _GstNetClientInternalClock GstNetClientInternalClock;
97 typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass;
98 
99 G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void);
100 
101 #define DEFAULT_ADDRESS         "127.0.0.1"
102 #define DEFAULT_PORT            5637
103 #define DEFAULT_TIMEOUT         GST_SECOND
104 #define DEFAULT_ROUNDTRIP_LIMIT GST_SECOND
105 /* Minimum timeout will be immediately (ie, as fast as one RTT), but no
106  * more often than 1/20th second (arbitrarily, to spread observations a little) */
107 #define DEFAULT_MINIMUM_UPDATE_INTERVAL (GST_SECOND / 20)
108 #define DEFAULT_BASE_TIME       0
109 #define DEFAULT_QOS_DSCP        -1
110 
111 /* Maximum number of clock updates we can skip before updating */
112 #define MAX_SKIPPED_UPDATES 5
113 
114 #define MEDIAN_PRE_FILTERING_WINDOW 9
115 
116 enum
117 {
118   PROP_0,
119   PROP_ADDRESS,
120   PROP_PORT,
121   PROP_ROUNDTRIP_LIMIT,
122   PROP_MINIMUM_UPDATE_INTERVAL,
123   PROP_BUS,
124   PROP_BASE_TIME,
125   PROP_INTERNAL_CLOCK,
126   PROP_IS_NTP,
127   PROP_QOS_DSCP
128 };
129 
130 struct _GstNetClientInternalClock
131 {
132   GstSystemClock clock;
133 
134   GThread *thread;
135 
136   GSocket *socket;
137   GSocketAddress *servaddr;
138   GCancellable *cancel;
139   gboolean made_cancel_fd;
140 
141   GstClockTime timeout_expiration;
142   GstClockTime roundtrip_limit;
143   GstClockTime rtt_avg;
144   GstClockTime minimum_update_interval;
145   GstClockTime last_remote_poll_interval;
146   GstClockTime remote_avg_old;
147   guint skipped_updates;
148   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
149   gint last_rtts_missing;
150 
151   gchar *address;
152   gint port;
153   gboolean is_ntp;
154   gint qos_dscp;
155 
156   /* Protected by OBJECT_LOCK */
157   GList *busses;
158 };
159 
160 struct _GstNetClientInternalClockClass
161 {
162   GstSystemClockClass parent_class;
163 };
164 
165 #define _do_init \
166   GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock");
167 
168 G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock,
169     gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init);
170 
171 static void gst_net_client_internal_clock_finalize (GObject * object);
172 static void gst_net_client_internal_clock_set_property (GObject * object,
173     guint prop_id, const GValue * value, GParamSpec * pspec);
174 static void gst_net_client_internal_clock_get_property (GObject * object,
175     guint prop_id, GValue * value, GParamSpec * pspec);
176 static void gst_net_client_internal_clock_constructed (GObject * object);
177 
178 static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock *
179     self);
180 static void gst_net_client_internal_clock_stop (GstNetClientInternalClock *
181     self);
182 
183 static void
gst_net_client_internal_clock_class_init(GstNetClientInternalClockClass * klass)184 gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass *
185     klass)
186 {
187   GObjectClass *gobject_class;
188 
189   gobject_class = G_OBJECT_CLASS (klass);
190 
191   gobject_class->finalize = gst_net_client_internal_clock_finalize;
192   gobject_class->get_property = gst_net_client_internal_clock_get_property;
193   gobject_class->set_property = gst_net_client_internal_clock_set_property;
194   gobject_class->constructed = gst_net_client_internal_clock_constructed;
195 
196   g_object_class_install_property (gobject_class, PROP_ADDRESS,
197       g_param_spec_string ("address", "address",
198           "The IP address of the machine providing a time server",
199           DEFAULT_ADDRESS,
200           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
201   g_object_class_install_property (gobject_class, PROP_PORT,
202       g_param_spec_int ("port", "port",
203           "The port on which the remote server is listening", 0, G_MAXUINT16,
204           DEFAULT_PORT,
205           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_IS_NTP,
207       g_param_spec_boolean ("is-ntp", "Is NTP",
208           "The clock is using the NTPv4 protocol", FALSE,
209           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
210 }
211 
212 static void
gst_net_client_internal_clock_init(GstNetClientInternalClock * self)213 gst_net_client_internal_clock_init (GstNetClientInternalClock * self)
214 {
215   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
216 
217   self->port = DEFAULT_PORT;
218   self->address = g_strdup (DEFAULT_ADDRESS);
219   self->is_ntp = FALSE;
220   self->qos_dscp = DEFAULT_QOS_DSCP;
221 
222   gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT);
223 
224   self->thread = NULL;
225 
226   self->servaddr = NULL;
227   self->rtt_avg = GST_CLOCK_TIME_NONE;
228   self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
229   self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
230   self->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
231   self->skipped_updates = 0;
232   self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
233   self->remote_avg_old = 0;
234 }
235 
236 static void
gst_net_client_internal_clock_finalize(GObject * object)237 gst_net_client_internal_clock_finalize (GObject * object)
238 {
239   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
240 
241   if (self->thread) {
242     gst_net_client_internal_clock_stop (self);
243   }
244 
245   g_free (self->address);
246   self->address = NULL;
247 
248   if (self->servaddr != NULL) {
249     g_object_unref (self->servaddr);
250     self->servaddr = NULL;
251   }
252 
253   if (self->socket != NULL) {
254     if (!g_socket_close (self->socket, NULL))
255       GST_ERROR_OBJECT (self, "Failed to close socket");
256     g_object_unref (self->socket);
257     self->socket = NULL;
258   }
259 
260   g_warn_if_fail (self->busses == NULL);
261 
262   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize
263       (object);
264 }
265 
266 static void
gst_net_client_internal_clock_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)267 gst_net_client_internal_clock_set_property (GObject * object, guint prop_id,
268     const GValue * value, GParamSpec * pspec)
269 {
270   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
271 
272   switch (prop_id) {
273     case PROP_ADDRESS:
274       GST_OBJECT_LOCK (self);
275       g_free (self->address);
276       self->address = g_value_dup_string (value);
277       if (self->address == NULL)
278         self->address = g_strdup (DEFAULT_ADDRESS);
279       GST_OBJECT_UNLOCK (self);
280       break;
281     case PROP_PORT:
282       GST_OBJECT_LOCK (self);
283       self->port = g_value_get_int (value);
284       GST_OBJECT_UNLOCK (self);
285       break;
286     case PROP_IS_NTP:
287       GST_OBJECT_LOCK (self);
288       self->is_ntp = g_value_get_boolean (value);
289       GST_OBJECT_UNLOCK (self);
290       break;
291     default:
292       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
293       break;
294   }
295 }
296 
297 static void
gst_net_client_internal_clock_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)298 gst_net_client_internal_clock_get_property (GObject * object, guint prop_id,
299     GValue * value, GParamSpec * pspec)
300 {
301   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
302 
303   switch (prop_id) {
304     case PROP_ADDRESS:
305       GST_OBJECT_LOCK (self);
306       g_value_set_string (value, self->address);
307       GST_OBJECT_UNLOCK (self);
308       break;
309     case PROP_PORT:
310       g_value_set_int (value, self->port);
311       break;
312     case PROP_IS_NTP:
313       g_value_set_boolean (value, self->is_ntp);
314       break;
315     default:
316       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
317       break;
318   }
319 }
320 
321 static void
gst_net_client_internal_clock_constructed(GObject * object)322 gst_net_client_internal_clock_constructed (GObject * object)
323 {
324   GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object);
325 
326   G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed
327       (object);
328 
329   if (!gst_net_client_internal_clock_start (self)) {
330     g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self));
331   }
332 
333   /* all systems go, cap'n */
334 }
335 
336 static gint
compare_clock_time(const GstClockTime * a,const GstClockTime * b)337 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
338 {
339   if (*a < *b)
340     return -1;
341   else if (*a > *b)
342     return 1;
343   return 0;
344 }
345 
346 static void
gst_net_client_internal_clock_observe_times(GstNetClientInternalClock * self,GstClockTime local_1,GstClockTime remote_1,GstClockTime remote_2,GstClockTime local_2)347 gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self,
348     GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2,
349     GstClockTime local_2)
350 {
351   GstClockTime current_timeout = 0;
352   GstClockTime local_avg, remote_avg;
353   gdouble r_squared;
354   GstClock *clock;
355   GstClockTime rtt, rtt_limit, min_update_interval;
356   /* Use for discont tracking */
357   GstClockTime time_before = 0;
358   GstClockTime min_guess = 0;
359   GstClockTimeDiff time_discont = 0;
360   gboolean synched, now_synched;
361   GstClockTime internal_time, external_time, rate_num, rate_den;
362   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
363       orig_rate_den;
364   GstClockTime max_discont;
365   GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
366   GstClockTime median;
367   gint i;
368 
369   GST_OBJECT_LOCK (self);
370   rtt_limit = self->roundtrip_limit;
371 
372   GST_LOG_OBJECT (self,
373       "local1 %" G_GUINT64_FORMAT " remote1 %" G_GUINT64_FORMAT " remote2 %"
374       G_GUINT64_FORMAT " local2 %" G_GUINT64_FORMAT, local_1, remote_1,
375       remote_2, local_2);
376 
377   /* If the server told us a poll interval and it's bigger than the
378    * one configured via the property, use the server's */
379   if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
380       self->last_remote_poll_interval > self->minimum_update_interval)
381     min_update_interval = self->last_remote_poll_interval;
382   else
383     min_update_interval = self->minimum_update_interval;
384   GST_OBJECT_UNLOCK (self);
385 
386   if (local_2 < local_1) {
387     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
388         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (local_1),
389         GST_TIME_ARGS (local_2));
390     goto bogus_observation;
391   }
392 
393   if (remote_2 < remote_1) {
394     GST_LOG_OBJECT (self,
395         "Dropping observation: remote receive time %" GST_TIME_FORMAT
396         " < send time %" GST_TIME_FORMAT, GST_TIME_ARGS (remote_1),
397         GST_TIME_ARGS (remote_2));
398     goto bogus_observation;
399   }
400 
401   /* The round trip time is (assuming symmetric path delays)
402    * delta = (local_2 - local_1) - (remote_2 - remote_1)
403    */
404 
405   rtt = GST_CLOCK_DIFF (local_1, local_2) - GST_CLOCK_DIFF (remote_1, remote_2);
406 
407   if ((rtt_limit > 0) && (rtt > rtt_limit)) {
408     GST_LOG_OBJECT (self,
409         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
410         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
411     goto bogus_observation;
412   }
413 
414   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
415     self->last_rtts[i - 1] = self->last_rtts[i];
416   self->last_rtts[i - 1] = rtt;
417 
418   if (self->last_rtts_missing) {
419     self->last_rtts_missing--;
420   } else {
421     memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts));
422     g_qsort_with_data (&last_rtts,
423         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
424         (GCompareDataFunc) compare_clock_time, NULL);
425 
426     median = last_rtts[MEDIAN_PRE_FILTERING_WINDOW / 2];
427 
428     /* FIXME: We might want to use something else here, like only allowing
429      * things in the interquartile range, or also filtering away delays that
430      * are too small compared to the median. This here worked well enough
431      * in tests so far.
432      */
433     if (rtt > 2 * median) {
434       GST_LOG_OBJECT (self,
435           "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * median %"
436           GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (median));
437       goto bogus_observation;
438     }
439   }
440 
441   /* Track an average round trip time, for a bit of smoothing */
442   /* Always update before discarding a sample, so genuine changes in
443    * the network get picked up, eventually */
444   if (self->rtt_avg == GST_CLOCK_TIME_NONE)
445     self->rtt_avg = rtt;
446   else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */
447     self->rtt_avg = (3 * self->rtt_avg + rtt) / 4;
448   else
449     self->rtt_avg = (15 * self->rtt_avg + rtt) / 16;
450 
451   if (rtt > 2 * self->rtt_avg) {
452     GST_LOG_OBJECT (self,
453         "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %"
454         GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg));
455     goto bogus_observation;
456   }
457 
458   /* The difference between the local and remote clock (again assuming
459    * symmetric path delays):
460    *
461    * local_1 + delta / 2 - remote_1 = theta
462    * or
463    * local_2 - delta / 2 - remote_2 = theta
464    *
465    * which gives after some simple algebraic transformations:
466    *
467    *         (remote_1 - local_1) + (remote_2 - local_2)
468    * theta = -------------------------------------------
469    *                              2
470    *
471    *
472    * Thus remote time at local_avg is equal to:
473    *
474    * local_avg + theta =
475    *
476    * local_1 + local_2   (remote_1 - local_1) + (remote_2 - local_2)
477    * ----------------- + -------------------------------------------
478    *         2                                2
479    *
480    * =
481    *
482    * remote_1 + remote_2
483    * ------------------- = remote_avg
484    *          2
485    *
486    * We use this for our clock estimation, i.e. local_avg at remote clock
487    * being the same as remote_avg.
488    */
489 
490   local_avg = (local_2 + local_1) / 2;
491   remote_avg = (remote_2 + remote_1) / 2;
492 
493   GST_LOG_OBJECT (self,
494       "remoteavg %" G_GUINT64_FORMAT " localavg %" G_GUINT64_FORMAT,
495       remote_avg, local_avg);
496 
497   clock = GST_CLOCK_CAST (self);
498 
499   /* Store what the clock produced as 'now' before this update */
500   gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time,
501       &orig_external_time, &orig_rate_num, &orig_rate_den);
502   internal_time = orig_internal_time;
503   external_time = orig_external_time;
504   rate_num = orig_rate_num;
505   rate_den = orig_rate_den;
506 
507   min_guess =
508       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1,
509       internal_time, external_time, rate_num, rate_den);
510   time_before =
511       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
512       internal_time, external_time, rate_num, rate_den);
513 
514   /* Maximum discontinuity, when we're synched with the master. Could make this a property,
515    * but this value seems to work fine */
516   max_discont = self->rtt_avg / 4;
517 
518   /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */
519   synched =
520       (GST_CLOCK_DIFF (remote_avg, min_guess) < (GstClockTimeDiff) (max_discont)
521       && GST_CLOCK_DIFF (time_before,
522           remote_avg) < (GstClockTimeDiff) (max_discont));
523 
524   /* Check if new remote_avg is less than before to detect if signal lost
525    * sync due to the remote clock has restarted. Then the new remote time will
526    * be less than the previous time which should not happen if increased in a
527    * monotonic way. Also, only perform this check on a synchronized clock to
528    * avoid startup issues.
529    */
530   if (synched) {
531     if (remote_avg < self->remote_avg_old) {
532       gst_clock_set_synced (GST_CLOCK (self), FALSE);
533     } else {
534       self->remote_avg_old = remote_avg;
535     }
536   }
537 
538   if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self),
539           local_avg, remote_avg, &r_squared, &internal_time, &external_time,
540           &rate_num, &rate_den)) {
541 
542     /* Now compare the difference (discont) in the clock
543      * after this observation */
544     time_discont = GST_CLOCK_DIFF (time_before,
545         gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2,
546             internal_time, external_time, rate_num, rate_den));
547 
548     /* If we were in sync with the remote clock, clamp the allowed
549      * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
550      * of remote time correctly windowed the actual remote time observation */
551     if (synched && ABS (time_discont) > max_discont) {
552       GstClockTimeDiff offset;
553       GST_DEBUG_OBJECT (clock,
554           "Too large a discont, clamping to 1/4 average RTT = %"
555           GST_TIME_FORMAT, GST_TIME_ARGS (max_discont));
556       if (time_discont > 0) {   /* Too large a forward step - add a -ve offset */
557         offset = max_discont - time_discont;
558         if (-offset > external_time)
559           external_time = 0;
560         else
561           external_time += offset;
562       } else {                  /* Too large a backward step - add a +ve offset */
563         offset = -(max_discont + time_discont);
564         external_time += offset;
565       }
566 
567       time_discont += offset;
568     }
569 
570     /* Check if the new clock params would have made our observation within range */
571     now_synched =
572         (GST_CLOCK_DIFF (remote_avg,
573             gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self),
574                 local_1, internal_time, external_time, rate_num,
575                 rate_den)) < (GstClockTimeDiff) (max_discont))
576         &&
577         (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration
578             (GST_CLOCK_CAST (self), local_2, internal_time, external_time,
579                 rate_num, rate_den),
580             remote_avg) < (GstClockTimeDiff) (max_discont));
581 
582     /* Only update the clock if we had synch or just gained it */
583     if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) {
584       gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time,
585           external_time, rate_num, rate_den);
586       /* ghetto formula - shorter timeout for bad correlations */
587       current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
588       current_timeout =
589           MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self)));
590       self->skipped_updates = 0;
591 
592       /* FIXME: When do we consider the clock absolutely not synced anymore? */
593       gst_clock_set_synced (GST_CLOCK (self), TRUE);
594     } else {
595       /* Restore original calibration vars for the report, we're not changing the clock */
596       internal_time = orig_internal_time;
597       external_time = orig_external_time;
598       rate_num = orig_rate_num;
599       rate_den = orig_rate_den;
600       time_discont = 0;
601       self->skipped_updates++;
602     }
603   }
604 
605   /* Limit the polling to at most one per minimum_update_interval */
606   if (rtt < min_update_interval)
607     current_timeout = MAX (min_update_interval - rtt, current_timeout);
608 
609   GST_OBJECT_LOCK (self);
610   if (self->busses) {
611     GstStructure *s;
612     GstMessage *msg;
613     GList *l;
614 
615     /* Output a stats message, whether we updated the clock or not */
616     s = gst_structure_new ("gst-netclock-statistics",
617         "synchronised", G_TYPE_BOOLEAN, synched,
618         "rtt", G_TYPE_UINT64, rtt,
619         "rtt-average", G_TYPE_UINT64, self->rtt_avg,
620         "local", G_TYPE_UINT64, local_avg,
621         "remote", G_TYPE_UINT64, remote_avg,
622         "discontinuity", G_TYPE_INT64, time_discont,
623         "remote-min-estimate", G_TYPE_UINT64, min_guess,
624         "remote-max-estimate", G_TYPE_UINT64, time_before,
625         "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote_avg,
626             min_guess), "remote-max-error", G_TYPE_INT64,
627         GST_CLOCK_DIFF (remote_avg, time_before), "request-send", G_TYPE_UINT64,
628         local_1, "request-receive", G_TYPE_UINT64, local_2, "r-squared",
629         G_TYPE_DOUBLE, r_squared, "timeout", G_TYPE_UINT64, current_timeout,
630         "internal-time", G_TYPE_UINT64, internal_time, "external-time",
631         G_TYPE_UINT64, external_time, "rate-num", G_TYPE_UINT64, rate_num,
632         "rate-den", G_TYPE_UINT64, rate_den, "rate", G_TYPE_DOUBLE,
633         (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64,
634         GST_CLOCK_DIFF (internal_time, external_time), NULL);
635     msg = gst_message_new_element (GST_OBJECT (self), s);
636 
637     for (l = self->busses; l; l = l->next)
638       gst_bus_post (l->data, gst_message_ref (msg));
639     gst_message_unref (msg);
640   }
641   GST_OBJECT_UNLOCK (self);
642 
643   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
644   self->timeout_expiration = gst_util_get_timestamp () + current_timeout;
645 
646   return;
647 
648 bogus_observation:
649   /* Schedule a new packet again soon */
650   self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
651   return;
652 }
653 
654 static gpointer
gst_net_client_internal_clock_thread(gpointer data)655 gst_net_client_internal_clock_thread (gpointer data)
656 {
657   GstNetClientInternalClock *self = data;
658   GSocket *socket = self->socket;
659   GError *err = NULL;
660   gint cur_qos_dscp = DEFAULT_QOS_DSCP;
661 
662   GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
663 
664   g_socket_set_blocking (socket, TRUE);
665   g_socket_set_timeout (socket, 0);
666 
667   while (!g_cancellable_is_cancelled (self->cancel)) {
668     GstClockTime expiration_time = self->timeout_expiration;
669     GstClockTime now = gst_util_get_timestamp ();
670     gint64 socket_timeout;
671 
672     if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
673       socket_timeout = 0;
674     } else {
675       socket_timeout = (expiration_time - now) / GST_USECOND;
676     }
677 
678     GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout);
679 
680     if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout,
681             self->cancel, &err)) {
682       /* cancelled, timeout or error */
683       if (err->code == G_IO_ERROR_CANCELLED) {
684         GST_INFO_OBJECT (self, "cancelled");
685         g_clear_error (&err);
686         break;
687       } else if (err->code == G_IO_ERROR_TIMED_OUT) {
688         gint new_qos_dscp;
689 
690         /* timed out, let's send another packet */
691         GST_DEBUG_OBJECT (self, "timed out");
692 
693         /* before next sending check if need to change QoS */
694         new_qos_dscp = self->qos_dscp;
695         if (cur_qos_dscp != new_qos_dscp &&
696             gst_net_utils_set_socket_tos (socket, new_qos_dscp)) {
697           GST_DEBUG_OBJECT (self, "changed QoS DSCP to: %d", new_qos_dscp);
698           cur_qos_dscp = new_qos_dscp;
699         }
700 
701         if (self->is_ntp) {
702           GstNtpPacket *packet;
703 
704           packet = gst_ntp_packet_new (NULL, NULL);
705 
706           packet->transmit_time =
707               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
708 
709           GST_DEBUG_OBJECT (self,
710               "sending packet, local time = %" GST_TIME_FORMAT,
711               GST_TIME_ARGS (packet->transmit_time));
712 
713           gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL);
714 
715           g_free (packet);
716         } else {
717           GstNetTimePacket *packet;
718 
719           packet = gst_net_time_packet_new (NULL);
720 
721           packet->local_time =
722               gst_clock_get_internal_time (GST_CLOCK_CAST (self));
723 
724           GST_DEBUG_OBJECT (self,
725               "sending packet, local time = %" GST_TIME_FORMAT,
726               GST_TIME_ARGS (packet->local_time));
727 
728           gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL);
729 
730           g_free (packet);
731         }
732 
733         /* reset timeout (but are expecting a response sooner anyway) */
734         self->timeout_expiration =
735             gst_util_get_timestamp () +
736             gst_clock_get_timeout (GST_CLOCK_CAST (self));
737       } else {
738         GST_DEBUG_OBJECT (self, "socket error: %s", err->message);
739         g_usleep (G_USEC_PER_SEC / 10); /* throttle */
740       }
741       g_clear_error (&err);
742     } else {
743       GstClockTime new_local;
744 
745       /* got packet */
746 
747       new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self));
748 
749       if (self->is_ntp) {
750         GstNtpPacket *packet;
751 
752         packet = gst_ntp_packet_receive (socket, NULL, &err);
753 
754         if (packet != NULL) {
755           GST_LOG_OBJECT (self, "got packet back");
756           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
757               GST_TIME_ARGS (packet->origin_time));
758           GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
759               GST_TIME_ARGS (packet->receive_time));
760           GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
761               GST_TIME_ARGS (packet->transmit_time));
762           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
763               GST_TIME_ARGS (new_local));
764           GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
765               GST_TIME_ARGS (packet->poll_interval));
766 
767           /* Remember the last poll interval we ever got from the server */
768           if (packet->poll_interval != GST_CLOCK_TIME_NONE)
769             self->last_remote_poll_interval = packet->poll_interval;
770 
771           /* observe_times will reset the timeout */
772           gst_net_client_internal_clock_observe_times (self,
773               packet->origin_time, packet->receive_time, packet->transmit_time,
774               new_local);
775 
776           g_free (packet);
777         } else if (err != NULL) {
778           if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
779               || g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
780             GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
781             g_clear_error (&err);
782             break;
783           } else if (g_error_matches (err, GST_NTP_ERROR,
784                   GST_NTP_ERROR_KOD_RATE)) {
785             GST_WARNING_OBJECT (self, "need to limit rate");
786 
787             /* If the server did not tell us a poll interval before, double
788              * our minimum poll interval. Otherwise we assume that the server
789              * already told us something sensible and that this error here
790              * was just a spurious error */
791             if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
792               self->minimum_update_interval *= 2;
793 
794             /* And wait a bit before we send the next packet instead of
795              * sending it immediately */
796             self->timeout_expiration =
797                 gst_util_get_timestamp () +
798                 gst_clock_get_timeout (GST_CLOCK_CAST (self));
799           } else {
800             GST_WARNING_OBJECT (self, "receive error: %s", err->message);
801           }
802           g_clear_error (&err);
803         }
804       } else {
805         GstNetTimePacket *packet;
806 
807         packet = gst_net_time_packet_receive (socket, NULL, &err);
808 
809         if (packet != NULL) {
810           GST_LOG_OBJECT (self, "got packet back");
811           GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
812               GST_TIME_ARGS (packet->local_time));
813           GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
814               GST_TIME_ARGS (packet->remote_time));
815           GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
816               GST_TIME_ARGS (new_local));
817 
818           /* observe_times will reset the timeout */
819           gst_net_client_internal_clock_observe_times (self, packet->local_time,
820               packet->remote_time, packet->remote_time, new_local);
821 
822           g_free (packet);
823         } else if (err != NULL) {
824           GST_WARNING_OBJECT (self, "receive error: %s", err->message);
825           g_clear_error (&err);
826         }
827       }
828     }
829   }
830   GST_INFO_OBJECT (self, "shutting down net client clock thread");
831   return NULL;
832 }
833 
834 static gboolean
gst_net_client_internal_clock_start(GstNetClientInternalClock * self)835 gst_net_client_internal_clock_start (GstNetClientInternalClock * self)
836 {
837   GSocketAddress *servaddr;
838   GSocketAddress *myaddr;
839   GSocketAddress *anyaddr;
840   GInetAddress *inetaddr;
841   GSocket *socket;
842   GError *error = NULL;
843   GSocketFamily family;
844   GPollFD dummy_pollfd;
845   GResolver *resolver = NULL;
846   GError *err = NULL;
847 
848   g_return_val_if_fail (self->address != NULL, FALSE);
849   g_return_val_if_fail (self->servaddr == NULL, FALSE);
850 
851   /* create target address */
852   inetaddr = g_inet_address_new_from_string (self->address);
853   if (inetaddr == NULL) {
854     GList *results;
855 
856     resolver = g_resolver_get_default ();
857 
858     results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err);
859     if (!results)
860       goto failed_to_resolve;
861 
862     inetaddr = G_INET_ADDRESS (g_object_ref (results->data));
863     g_resolver_free_addresses (results);
864     g_object_unref (resolver);
865   }
866 
867   family = g_inet_address_get_family (inetaddr);
868 
869   servaddr = g_inet_socket_address_new (inetaddr, self->port);
870   g_object_unref (inetaddr);
871 
872   g_assert (servaddr != NULL);
873 
874   GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
875       self->port);
876 
877   socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
878       G_SOCKET_PROTOCOL_UDP, &error);
879 
880   if (socket == NULL)
881     goto no_socket;
882 
883   GST_DEBUG_OBJECT (self, "binding socket");
884   inetaddr = g_inet_address_new_any (family);
885   anyaddr = g_inet_socket_address_new (inetaddr, 0);
886   g_socket_bind (socket, anyaddr, TRUE, &error);
887   g_object_unref (anyaddr);
888   g_object_unref (inetaddr);
889 
890   if (error != NULL)
891     goto bind_error;
892 
893   /* check address we're bound to, mostly for debugging purposes */
894   myaddr = g_socket_get_local_address (socket, &error);
895 
896   if (myaddr == NULL)
897     goto getsockname_error;
898 
899   GST_DEBUG_OBJECT (self, "socket opened on UDP port %d",
900       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
901 
902   g_object_unref (myaddr);
903 
904   self->cancel = g_cancellable_new ();
905   self->made_cancel_fd =
906       g_cancellable_make_pollfd (self->cancel, &dummy_pollfd);
907 
908   self->socket = socket;
909   self->servaddr = G_SOCKET_ADDRESS (servaddr);
910 
911   self->thread = g_thread_try_new ("GstNetClientInternalClock",
912       gst_net_client_internal_clock_thread, self, &error);
913 
914   if (error != NULL)
915     goto no_thread;
916 
917   return TRUE;
918 
919   /* ERRORS */
920 no_socket:
921   {
922     GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
923     g_error_free (error);
924     return FALSE;
925   }
926 bind_error:
927   {
928     GST_ERROR_OBJECT (self, "bind failed: %s", error->message);
929     g_error_free (error);
930     g_object_unref (socket);
931     return FALSE;
932   }
933 getsockname_error:
934   {
935     GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
936     g_error_free (error);
937     g_object_unref (socket);
938     return FALSE;
939   }
940 failed_to_resolve:
941   {
942     GST_ERROR_OBJECT (self, "resolving '%s' failed: %s",
943         self->address, err->message);
944     g_clear_error (&err);
945     g_object_unref (resolver);
946     return FALSE;
947   }
948 no_thread:
949   {
950     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
951     g_object_unref (self->servaddr);
952     self->servaddr = NULL;
953     g_object_unref (self->socket);
954     self->socket = NULL;
955     g_error_free (error);
956     return FALSE;
957   }
958 }
959 
960 static void
gst_net_client_internal_clock_stop(GstNetClientInternalClock * self)961 gst_net_client_internal_clock_stop (GstNetClientInternalClock * self)
962 {
963   if (self->thread == NULL)
964     return;
965 
966   GST_INFO_OBJECT (self, "stopping...");
967   g_cancellable_cancel (self->cancel);
968 
969   g_thread_join (self->thread);
970   self->thread = NULL;
971 
972   if (self->made_cancel_fd)
973     g_cancellable_release_fd (self->cancel);
974 
975   g_object_unref (self->cancel);
976   self->cancel = NULL;
977 
978   g_object_unref (self->servaddr);
979   self->servaddr = NULL;
980 
981   g_object_unref (self->socket);
982   self->socket = NULL;
983 
984   GST_INFO_OBJECT (self, "stopped");
985 }
986 
987 struct _GstNetClientClockPrivate
988 {
989   GstClock *internal_clock;
990 
991   GstClockTime roundtrip_limit;
992   GstClockTime minimum_update_interval;
993 
994   GstClockTime base_time, internal_base_time;
995 
996   gchar *address;
997   gint port;
998   gint qos_dscp;
999 
1000   GstBus *bus;
1001 
1002   gboolean is_ntp;
1003 
1004   gulong synced_id;
1005 };
1006 
1007 G_DEFINE_TYPE_WITH_PRIVATE (GstNetClientClock, gst_net_client_clock,
1008     GST_TYPE_SYSTEM_CLOCK);
1009 
1010 static void gst_net_client_clock_finalize (GObject * object);
1011 static void gst_net_client_clock_set_property (GObject * object, guint prop_id,
1012     const GValue * value, GParamSpec * pspec);
1013 static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
1014     GValue * value, GParamSpec * pspec);
1015 static void gst_net_client_clock_constructed (GObject * object);
1016 
1017 static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock);
1018 
1019 static void
gst_net_client_clock_class_init(GstNetClientClockClass * klass)1020 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
1021 {
1022   GObjectClass *gobject_class;
1023   GstClockClass *clock_class;
1024 
1025   gobject_class = G_OBJECT_CLASS (klass);
1026   clock_class = GST_CLOCK_CLASS (klass);
1027 
1028   gobject_class->finalize = gst_net_client_clock_finalize;
1029   gobject_class->get_property = gst_net_client_clock_get_property;
1030   gobject_class->set_property = gst_net_client_clock_set_property;
1031   gobject_class->constructed = gst_net_client_clock_constructed;
1032 
1033   g_object_class_install_property (gobject_class, PROP_ADDRESS,
1034       g_param_spec_string ("address", "address",
1035           "The IP address of the machine providing a time server",
1036           DEFAULT_ADDRESS,
1037           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1038   g_object_class_install_property (gobject_class, PROP_PORT,
1039       g_param_spec_int ("port", "port",
1040           "The port on which the remote server is listening", 0, G_MAXUINT16,
1041           DEFAULT_PORT,
1042           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
1043   g_object_class_install_property (gobject_class, PROP_BUS,
1044       g_param_spec_object ("bus", "bus",
1045           "A GstBus on which to send clock status information", GST_TYPE_BUS,
1046           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1047 
1048   /**
1049    * GstNetClientInternalClock::round-trip-limit:
1050    *
1051    * Maximum allowed round-trip for packets. If this property is set to a nonzero
1052    * value, all packets with a round-trip interval larger than this limit will be
1053    * ignored. This is useful for networks with severe and fluctuating transport
1054    * delays. Filtering out these packets increases stability of the synchronization.
1055    * On the other hand, the lower the limit, the higher the amount of filtered
1056    * packets. Empirical tests are typically necessary to estimate a good value
1057    * for the limit.
1058    * If the property is set to zero, the limit is disabled.
1059    *
1060    * Since: 1.4
1061    */
1062   g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT,
1063       g_param_spec_uint64 ("round-trip-limit", "round-trip limit",
1064           "Maximum tolerable round-trip interval for packets, in nanoseconds "
1065           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT,
1066           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1067 
1068   g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL,
1069       g_param_spec_uint64 ("minimum-update-interval", "minimum update interval",
1070           "Minimum polling interval for packets, in nanoseconds"
1071           "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL,
1072           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1073 
1074   g_object_class_install_property (gobject_class, PROP_BASE_TIME,
1075       g_param_spec_uint64 ("base-time", "Base Time",
1076           "Initial time that is reported before synchronization", 0,
1077           G_MAXUINT64, DEFAULT_BASE_TIME,
1078           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1079 
1080   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
1081       g_param_spec_object ("internal-clock", "Internal Clock",
1082           "Internal clock that directly slaved to the remote clock",
1083           GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1084 
1085   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
1086       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
1087           "Quality of Service, differentiated services code point (-1 default)",
1088           -1, 63, DEFAULT_QOS_DSCP,
1089           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1090 
1091   clock_class->get_internal_time = gst_net_client_clock_get_internal_time;
1092 }
1093 
1094 static void
gst_net_client_clock_init(GstNetClientClock * self)1095 gst_net_client_clock_init (GstNetClientClock * self)
1096 {
1097   GstNetClientClockPrivate *priv;
1098   GstClock *clock;
1099 
1100   self->priv = priv = gst_net_client_clock_get_instance_private (self);
1101 
1102   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
1103   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
1104 
1105   priv->port = DEFAULT_PORT;
1106   priv->address = g_strdup (DEFAULT_ADDRESS);
1107   priv->qos_dscp = DEFAULT_QOS_DSCP;
1108 
1109   priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
1110   priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
1111 
1112   clock = gst_system_clock_obtain ();
1113   priv->base_time = DEFAULT_BASE_TIME;
1114   priv->internal_base_time = gst_clock_get_time (clock);
1115   gst_object_unref (clock);
1116 }
1117 
1118 /* Must be called with clocks_lock */
1119 static void
update_clock_cache(ClockCache * cache)1120 update_clock_cache (ClockCache * cache)
1121 {
1122   GstClockTime roundtrip_limit = 0, minimum_update_interval = 0;
1123   GList *l, *busses = NULL;
1124   gint qos_dscp = DEFAULT_QOS_DSCP;
1125 
1126   GST_OBJECT_LOCK (cache->clock);
1127   g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses,
1128       (GDestroyNotify) gst_object_unref);
1129 
1130   for (l = cache->clocks; l; l = l->next) {
1131     GstNetClientClock *clock = l->data;
1132 
1133     if (clock->priv->bus)
1134       busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus));
1135 
1136     if (roundtrip_limit == 0)
1137       roundtrip_limit = clock->priv->roundtrip_limit;
1138     else
1139       roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit);
1140 
1141     if (minimum_update_interval == 0)
1142       minimum_update_interval = clock->priv->minimum_update_interval;
1143     else
1144       minimum_update_interval =
1145           MIN (minimum_update_interval, clock->priv->minimum_update_interval);
1146 
1147     qos_dscp = MAX (qos_dscp, clock->priv->qos_dscp);
1148   }
1149   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses;
1150   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit =
1151       roundtrip_limit;
1152   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval =
1153       minimum_update_interval;
1154   GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->qos_dscp = qos_dscp;
1155 
1156   GST_OBJECT_UNLOCK (cache->clock);
1157 }
1158 
1159 static gboolean
remove_clock_cache(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)1160 remove_clock_cache (GstClock * clock, GstClockTime time, GstClockID id,
1161     gpointer user_data)
1162 {
1163   ClockCache *cache = user_data;
1164 
1165   G_LOCK (clocks_lock);
1166   if (!cache->clocks) {
1167     gst_clock_id_unref (cache->remove_id);
1168     gst_object_unref (cache->clock);
1169     clocks = g_list_remove (clocks, cache);
1170     g_free (cache);
1171   }
1172   G_UNLOCK (clocks_lock);
1173 
1174   return TRUE;
1175 }
1176 
1177 static void
gst_net_client_clock_finalize(GObject * object)1178 gst_net_client_clock_finalize (GObject * object)
1179 {
1180   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1181   GList *l;
1182 
1183   if (self->priv->synced_id)
1184     g_signal_handler_disconnect (self->priv->internal_clock,
1185         self->priv->synced_id);
1186   self->priv->synced_id = 0;
1187 
1188   G_LOCK (clocks_lock);
1189   for (l = clocks; l; l = l->next) {
1190     ClockCache *cache = l->data;
1191 
1192     if (cache->clock == self->priv->internal_clock) {
1193       cache->clocks = g_list_remove (cache->clocks, self);
1194 
1195       if (cache->clocks) {
1196         update_clock_cache (cache);
1197       } else {
1198         GstClock *sysclock = gst_system_clock_obtain ();
1199         GstClockTime time = gst_clock_get_time (sysclock) + 60 * GST_SECOND;
1200 
1201         cache->remove_id = gst_clock_new_single_shot_id (sysclock, time);
1202         gst_clock_id_wait_async (cache->remove_id, remove_clock_cache, cache,
1203             NULL);
1204         gst_object_unref (sysclock);
1205       }
1206       break;
1207     }
1208   }
1209   G_UNLOCK (clocks_lock);
1210 
1211   g_free (self->priv->address);
1212   self->priv->address = NULL;
1213 
1214   if (self->priv->bus != NULL) {
1215     gst_object_unref (self->priv->bus);
1216     self->priv->bus = NULL;
1217   }
1218 
1219   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object);
1220 }
1221 
1222 static void
gst_net_client_clock_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1223 gst_net_client_clock_set_property (GObject * object, guint prop_id,
1224     const GValue * value, GParamSpec * pspec)
1225 {
1226   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1227   gboolean update = FALSE;
1228 
1229   switch (prop_id) {
1230     case PROP_ADDRESS:
1231       GST_OBJECT_LOCK (self);
1232       g_free (self->priv->address);
1233       self->priv->address = g_value_dup_string (value);
1234       if (self->priv->address == NULL)
1235         self->priv->address = g_strdup (DEFAULT_ADDRESS);
1236       GST_OBJECT_UNLOCK (self);
1237       break;
1238     case PROP_PORT:
1239       GST_OBJECT_LOCK (self);
1240       self->priv->port = g_value_get_int (value);
1241       GST_OBJECT_UNLOCK (self);
1242       break;
1243     case PROP_ROUNDTRIP_LIMIT:
1244       GST_OBJECT_LOCK (self);
1245       self->priv->roundtrip_limit = g_value_get_uint64 (value);
1246       GST_OBJECT_UNLOCK (self);
1247       update = TRUE;
1248       break;
1249     case PROP_MINIMUM_UPDATE_INTERVAL:
1250       GST_OBJECT_LOCK (self);
1251       self->priv->minimum_update_interval = g_value_get_uint64 (value);
1252       GST_OBJECT_UNLOCK (self);
1253       update = TRUE;
1254       break;
1255     case PROP_BUS:
1256       GST_OBJECT_LOCK (self);
1257       if (self->priv->bus)
1258         gst_object_unref (self->priv->bus);
1259       self->priv->bus = g_value_dup_object (value);
1260       GST_OBJECT_UNLOCK (self);
1261       update = TRUE;
1262       break;
1263     case PROP_BASE_TIME:{
1264       GstClock *clock;
1265 
1266       self->priv->base_time = g_value_get_uint64 (value);
1267       clock = gst_system_clock_obtain ();
1268       self->priv->internal_base_time = gst_clock_get_time (clock);
1269       gst_object_unref (clock);
1270       break;
1271     }
1272     case PROP_QOS_DSCP:
1273       GST_OBJECT_LOCK (self);
1274       self->priv->qos_dscp = g_value_get_int (value);
1275       GST_OBJECT_UNLOCK (self);
1276       update = TRUE;
1277       break;
1278     default:
1279       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1280       break;
1281   }
1282 
1283   if (update && self->priv->internal_clock) {
1284     GList *l;
1285 
1286     G_LOCK (clocks_lock);
1287     for (l = clocks; l; l = l->next) {
1288       ClockCache *cache = l->data;
1289 
1290       if (cache->clock == self->priv->internal_clock) {
1291         update_clock_cache (cache);
1292       }
1293     }
1294     G_UNLOCK (clocks_lock);
1295   }
1296 }
1297 
1298 static void
gst_net_client_clock_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1299 gst_net_client_clock_get_property (GObject * object, guint prop_id,
1300     GValue * value, GParamSpec * pspec)
1301 {
1302   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1303 
1304   switch (prop_id) {
1305     case PROP_ADDRESS:
1306       GST_OBJECT_LOCK (self);
1307       g_value_set_string (value, self->priv->address);
1308       GST_OBJECT_UNLOCK (self);
1309       break;
1310     case PROP_PORT:
1311       g_value_set_int (value, self->priv->port);
1312       break;
1313     case PROP_ROUNDTRIP_LIMIT:
1314       GST_OBJECT_LOCK (self);
1315       g_value_set_uint64 (value, self->priv->roundtrip_limit);
1316       GST_OBJECT_UNLOCK (self);
1317       break;
1318     case PROP_MINIMUM_UPDATE_INTERVAL:
1319       GST_OBJECT_LOCK (self);
1320       g_value_set_uint64 (value, self->priv->minimum_update_interval);
1321       GST_OBJECT_UNLOCK (self);
1322       break;
1323     case PROP_BUS:
1324       GST_OBJECT_LOCK (self);
1325       g_value_set_object (value, self->priv->bus);
1326       GST_OBJECT_UNLOCK (self);
1327       break;
1328     case PROP_BASE_TIME:
1329       g_value_set_uint64 (value, self->priv->base_time);
1330       break;
1331     case PROP_INTERNAL_CLOCK:
1332       g_value_set_object (value, self->priv->internal_clock);
1333       break;
1334     case PROP_QOS_DSCP:
1335       GST_OBJECT_LOCK (self);
1336       g_value_set_int (value, self->priv->qos_dscp);
1337       GST_OBJECT_UNLOCK (self);
1338       break;
1339     default:
1340       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1341       break;
1342   }
1343 }
1344 
1345 static void
gst_net_client_clock_synced_cb(GstClock * internal_clock,gboolean synced,GstClock * self)1346 gst_net_client_clock_synced_cb (GstClock * internal_clock, gboolean synced,
1347     GstClock * self)
1348 {
1349   gst_clock_set_synced (self, synced);
1350 }
1351 
1352 static void
gst_net_client_clock_constructed(GObject * object)1353 gst_net_client_clock_constructed (GObject * object)
1354 {
1355   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
1356   GstClock *internal_clock;
1357   GList *l;
1358   ClockCache *cache = NULL;
1359 
1360   G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object);
1361 
1362   G_LOCK (clocks_lock);
1363   for (l = clocks; l; l = l->next) {
1364     ClockCache *tmp = l->data;
1365     GstNetClientInternalClock *internal_clock =
1366         GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock);
1367 
1368     if (strcmp (internal_clock->address, self->priv->address) == 0 &&
1369         internal_clock->port == self->priv->port) {
1370       cache = tmp;
1371 
1372       if (cache->remove_id) {
1373         gst_clock_id_unschedule (cache->remove_id);
1374         cache->remove_id = NULL;
1375       }
1376       break;
1377     }
1378   }
1379 
1380   if (!cache) {
1381     cache = g_new0 (ClockCache, 1);
1382 
1383     cache->clock =
1384         g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address",
1385         self->priv->address, "port", self->priv->port, "is-ntp",
1386         self->priv->is_ntp, NULL);
1387     gst_object_ref_sink (cache->clock);
1388     clocks = g_list_prepend (clocks, cache);
1389 
1390     /* Not actually leaked but is cached for a while before being disposed,
1391      * see gst_net_client_clock_finalize, so pretend it is to not confuse
1392      * tests. */
1393     GST_OBJECT_FLAG_SET (cache->clock, GST_OBJECT_FLAG_MAY_BE_LEAKED);
1394   }
1395 
1396   cache->clocks = g_list_prepend (cache->clocks, self);
1397 
1398   GST_OBJECT_LOCK (cache->clock);
1399   if (gst_clock_is_synced (cache->clock))
1400     gst_clock_set_synced (GST_CLOCK (self), TRUE);
1401   self->priv->synced_id =
1402       g_signal_connect (cache->clock, "synced",
1403       G_CALLBACK (gst_net_client_clock_synced_cb), self);
1404   GST_OBJECT_UNLOCK (cache->clock);
1405 
1406   G_UNLOCK (clocks_lock);
1407 
1408   self->priv->internal_clock = internal_clock = cache->clock;
1409 
1410   /* all systems go, cap'n */
1411 }
1412 
1413 static GstClockTime
gst_net_client_clock_get_internal_time(GstClock * clock)1414 gst_net_client_clock_get_internal_time (GstClock * clock)
1415 {
1416   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock);
1417 
1418   if (!gst_clock_is_synced (self->priv->internal_clock)) {
1419     GstClockTime now = gst_clock_get_internal_time (self->priv->internal_clock);
1420     return gst_clock_adjust_with_calibration (self->priv->internal_clock, now,
1421         self->priv->internal_base_time, self->priv->base_time, 1, 1);
1422   }
1423 
1424   return gst_clock_get_time (self->priv->internal_clock);
1425 }
1426 
1427 /**
1428  * gst_net_client_clock_new:
1429  * @name: a name for the clock
1430  * @remote_address: the address or hostname of the remote clock provider
1431  * @remote_port: the port of the remote clock provider
1432  * @base_time: initial time of the clock
1433  *
1434  * Create a new #GstNetClientClock that will report the time
1435  * provided by the #GstNetTimeProvider on @remote_address and
1436  * @remote_port.
1437  *
1438  * Returns: (transfer full): a new #GstClock that receives a time from the remote
1439  * clock.
1440  */
1441 GstClock *
gst_net_client_clock_new(const gchar * name,const gchar * remote_address,gint remote_port,GstClockTime base_time)1442 gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
1443     gint remote_port, GstClockTime base_time)
1444 {
1445   GstClock *ret;
1446 
1447   g_return_val_if_fail (remote_address != NULL, NULL);
1448   g_return_val_if_fail (remote_port > 0, NULL);
1449   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1450   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1451 
1452   ret =
1453       g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "name", name, "address",
1454       remote_address, "port", remote_port, "base-time", base_time, NULL);
1455 
1456   /* Clear floating flag */
1457   gst_object_ref_sink (ret);
1458 
1459   return ret;
1460 }
1461 
1462 G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK);
1463 
1464 static void
gst_ntp_clock_class_init(GstNtpClockClass * klass)1465 gst_ntp_clock_class_init (GstNtpClockClass * klass)
1466 {
1467 }
1468 
1469 static void
gst_ntp_clock_init(GstNtpClock * self)1470 gst_ntp_clock_init (GstNtpClock * self)
1471 {
1472   GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
1473 }
1474 
1475 /**
1476  * gst_ntp_clock_new:
1477  * @name: a name for the clock
1478  * @remote_address: the address or hostname of the remote clock provider
1479  * @remote_port: the port of the remote clock provider
1480  * @base_time: initial time of the clock
1481  *
1482  * Create a new #GstNtpClock that will report the time provided by
1483  * the NTPv4 server on @remote_address and @remote_port.
1484  *
1485  * Returns: (transfer full): a new #GstClock that receives a time from the remote
1486  * clock.
1487  *
1488  * Since: 1.6
1489  */
1490 GstClock *
gst_ntp_clock_new(const gchar * name,const gchar * remote_address,gint remote_port,GstClockTime base_time)1491 gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
1492     gint remote_port, GstClockTime base_time)
1493 {
1494   GstClock *ret;
1495 
1496   g_return_val_if_fail (remote_address != NULL, NULL);
1497   g_return_val_if_fail (remote_port > 0, NULL);
1498   g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
1499   g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
1500 
1501   ret =
1502       g_object_new (GST_TYPE_NTP_CLOCK, "name", name, "address", remote_address,
1503       "port", remote_port, "base-time", base_time, NULL);
1504 
1505   gst_object_ref_sink (ret);
1506 
1507   return ret;
1508 }
1509