• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
3  *
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstptpclock
22  * @title: GstPtpClock
23  * @short_description: Special clock that synchronizes to a remote time
24  *                     provider via PTP (IEEE1588:2008).
25  * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
26  *
27  * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
28  * mode, that allows a GStreamer pipeline to synchronize to a PTP network
29  * clock in some specific domain.
30  *
31  * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
32  * a helper process to do the actual communication via the PTP ports. This is
33  * required as PTP listens on ports < 1024 and thus requires special
34  * privileges. Once this helper process is started, the main process will
35  * synchronize to all PTP domains that are detected on the selected
36  * interfaces.
37  *
38  * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
39  * time from a master clock inside a specific PTP domain. This clock will only
40  * return valid timestamps once the timestamps in the PTP domain are known. To
41  * check this, you can use gst_clock_wait_for_sync(), the GstClock::synced
42  * signal and gst_clock_is_synced().
43  *
44  * To gather statistics about the PTP clock synchronization,
45  * gst_ptp_statistics_callback_add() can be used. This gives the application
46  * the possibility to collect all kinds of statistics from the clock
47  * synchronization.
48  *
49  * Since: 1.6
50  *
51  */
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55 
56 #include "gstptpclock.h"
57 
58 #include "gstptp_private.h"
59 
60 #ifdef HAVE_SYS_WAIT_H
61 #include <sys/wait.h>
62 #endif
63 #ifdef G_OS_WIN32
64 #define WIN32_LEAN_AND_MEAN
65 #include <windows.h>
66 #endif
67 #include <sys/types.h>
68 
69 #ifdef HAVE_UNISTD_H
70 #include <unistd.h>
71 #elif defined(G_OS_WIN32)
72 #include <io.h>
73 #endif
74 
75 #include <gst/base/base.h>
76 
77 GST_DEBUG_CATEGORY_STATIC (ptp_debug);
78 #define GST_CAT_DEFAULT (ptp_debug)
79 
80 /* IEEE 1588 7.7.3.1 */
81 #define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
82 
83 /* Use a running average for calculating the mean path delay instead
84  * of just using the last measurement. Enabling this helps in unreliable
85  * networks, like wifi, with often changing delays
86  *
87  * Undef for following IEEE1588-2008 by the letter
88  */
89 #define USE_RUNNING_AVERAGE_DELAY 1
90 
91 /* Filter out any measurements that are above a certain threshold compared to
92  * previous measurements. Enabling this helps filtering out outliers that
93  * happen fairly often in unreliable networks, like wifi.
94  *
95  * Undef for following IEEE1588-2008 by the letter
96  */
97 #define USE_MEASUREMENT_FILTERING 1
98 
99 /* Select the first clock from which we capture a SYNC message as the master
100  * clock of the domain until we are ready to run the best master clock
101  * algorithm. This allows faster syncing but might mean a change of the master
102  * clock in the beginning. As all clocks in a domain are supposed to use the
103  * same time, this shouldn't be much of a problem.
104  *
105  * Undef for following IEEE1588-2008 by the letter
106  */
107 #define USE_OPPORTUNISTIC_CLOCK_SELECTION 1
108 
109 /* Only consider SYNC messages for which we are allowed to send a DELAY_REQ
110  * afterwards. This allows better synchronization in networks with varying
111  * delays, as for every other SYNC message we would have to assume that it's
112  * the average of what we saw before. But that might be completely off
113  */
114 #define USE_ONLY_SYNC_WITH_DELAY 1
115 
116 /* Filter out delay measurements that are too far away from the median of the
117  * last delay measurements, currently those that are more than 2 times as big.
118  * This increases accuracy a lot on wifi.
119  */
120 #define USE_MEDIAN_PRE_FILTERING 1
121 #define MEDIAN_PRE_FILTERING_WINDOW 9
122 
123 /* How many updates should be skipped at maximum when using USE_MEASUREMENT_FILTERING */
124 #define MAX_SKIPPED_UPDATES 5
125 
126 typedef enum
127 {
128   PTP_MESSAGE_TYPE_SYNC = 0x0,
129   PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
130   PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
131   PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
132   PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
133   PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
134   PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
135   PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
136   PTP_MESSAGE_TYPE_SIGNALING = 0xC,
137   PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
138 } PtpMessageType;
139 
140 typedef struct
141 {
142   guint64 seconds_field;        /* 48 bits valid */
143   guint32 nanoseconds_field;
144 } PtpTimestamp;
145 
146 #define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
147 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
148 #define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
149 
150 typedef struct
151 {
152   guint64 clock_identity;
153   guint16 port_number;
154 } PtpClockIdentity;
155 
156 static gint
compare_clock_identity(const PtpClockIdentity * a,const PtpClockIdentity * b)157 compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
158 {
159   if (a->clock_identity < b->clock_identity)
160     return -1;
161   else if (a->clock_identity > b->clock_identity)
162     return 1;
163 
164   if (a->port_number < b->port_number)
165     return -1;
166   else if (a->port_number > b->port_number)
167     return 1;
168 
169   return 0;
170 }
171 
172 typedef struct
173 {
174   guint8 clock_class;
175   guint8 clock_accuracy;
176   guint16 offset_scaled_log_variance;
177 } PtpClockQuality;
178 
179 typedef struct
180 {
181   guint8 transport_specific;
182   PtpMessageType message_type;
183   /* guint8 reserved; */
184   guint8 version_ptp;
185   guint16 message_length;
186   guint8 domain_number;
187   /* guint8 reserved; */
188   guint16 flag_field;
189   gint64 correction_field;      /* 48.16 fixed point nanoseconds */
190   /* guint32 reserved; */
191   PtpClockIdentity source_port_identity;
192   guint16 sequence_id;
193   guint8 control_field;
194   gint8 log_message_interval;
195 
196   union
197   {
198     struct
199     {
200       PtpTimestamp origin_timestamp;
201       gint16 current_utc_offset;
202       /* guint8 reserved; */
203       guint8 grandmaster_priority_1;
204       PtpClockQuality grandmaster_clock_quality;
205       guint8 grandmaster_priority_2;
206       guint64 grandmaster_identity;
207       guint16 steps_removed;
208       guint8 time_source;
209     } announce;
210 
211     struct
212     {
213       PtpTimestamp origin_timestamp;
214     } sync;
215 
216     struct
217     {
218       PtpTimestamp precise_origin_timestamp;
219     } follow_up;
220 
221     struct
222     {
223       PtpTimestamp origin_timestamp;
224     } delay_req;
225 
226     struct
227     {
228       PtpTimestamp receive_timestamp;
229       PtpClockIdentity requesting_port_identity;
230     } delay_resp;
231 
232   } message_specific;
233 } PtpMessage;
234 
235 static GMutex ptp_lock;
236 static GCond ptp_cond;
237 static gboolean initted = FALSE;
238 #ifdef HAVE_PTP
239 static gboolean supported = TRUE;
240 #else
241 static gboolean supported = FALSE;
242 #endif
243 static GPid ptp_helper_pid;
244 static GThread *ptp_helper_thread;
245 static GMainContext *main_context;
246 static GMainLoop *main_loop;
247 static GIOChannel *stdin_channel, *stdout_channel;
248 static GRand *delay_req_rand;
249 static GstClock *observation_system_clock;
250 static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
251 
252 typedef struct
253 {
254   GstClockTime receive_time;
255 
256   PtpClockIdentity master_clock_identity;
257 
258   guint8 grandmaster_priority_1;
259   PtpClockQuality grandmaster_clock_quality;
260   guint8 grandmaster_priority_2;
261   guint64 grandmaster_identity;
262   guint16 steps_removed;
263   guint8 time_source;
264 
265   guint16 sequence_id;
266 } PtpAnnounceMessage;
267 
268 typedef struct
269 {
270   PtpClockIdentity master_clock_identity;
271 
272   GstClockTime announce_interval;       /* last interval we received */
273   GQueue announce_messages;
274 } PtpAnnounceSender;
275 
276 typedef struct
277 {
278   guint domain;
279   PtpClockIdentity master_clock_identity;
280 
281   guint16 sync_seqnum;
282   GstClockTime sync_recv_time_local;    /* t2 */
283   GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
284   GstClockTime follow_up_recv_time_local;
285 
286   GSource *timeout_source;
287   guint16 delay_req_seqnum;
288   GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
289   GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
290   GstClockTime delay_resp_recv_time_local;
291 
292   gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
293   gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
294 } PtpPendingSync;
295 
296 static void
ptp_pending_sync_free(PtpPendingSync * sync)297 ptp_pending_sync_free (PtpPendingSync * sync)
298 {
299   if (sync->timeout_source) {
300     g_source_destroy (sync->timeout_source);
301     g_source_unref (sync->timeout_source);
302   }
303   g_free (sync);
304 }
305 
306 typedef struct
307 {
308   guint domain;
309 
310   GstClockTime last_ptp_time;
311   GstClockTime last_local_time;
312   gint skipped_updates;
313 
314   /* Used for selecting the master/grandmaster */
315   GList *announce_senders;
316 
317   /* Last selected master clock */
318   gboolean have_master_clock;
319   PtpClockIdentity master_clock_identity;
320   guint64 grandmaster_identity;
321 
322   /* Last SYNC or FOLLOW_UP timestamp we received */
323   GstClockTime last_ptp_sync_time;
324   GstClockTime sync_interval;
325 
326   GstClockTime mean_path_delay;
327   GstClockTime last_delay_req, min_delay_req_interval;
328   guint16 last_delay_req_seqnum;
329 
330   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
331   gint last_path_delays_missing;
332 
333   GQueue pending_syncs;
334 
335   GstClock *domain_clock;
336 } PtpDomainData;
337 
338 static GList *domain_data;
339 static GMutex domain_clocks_lock;
340 static GList *domain_clocks;
341 
342 /* Protected by PTP lock */
343 static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
344 static GHookList domain_stats_hooks;
345 static gint domain_stats_n_hooks;
346 static gboolean domain_stats_hooks_initted = FALSE;
347 
348 /* Converts log2 seconds to GstClockTime */
349 static GstClockTime
log2_to_clock_time(gint l)350 log2_to_clock_time (gint l)
351 {
352   if (l < 0)
353     return GST_SECOND >> (-l);
354   else
355     return GST_SECOND << l;
356 }
357 
358 static void
dump_ptp_message(PtpMessage * msg)359 dump_ptp_message (PtpMessage * msg)
360 {
361   GST_TRACE ("PTP message:");
362   GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
363   GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
364   GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
365   GST_TRACE ("\tmessage_length: %u", msg->message_length);
366   GST_TRACE ("\tdomain_number: %u", msg->domain_number);
367   GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
368   GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
369       (msg->correction_field / 65536),
370       (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
371   GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
372       msg->source_port_identity.clock_identity,
373       msg->source_port_identity.port_number);
374   GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
375   GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
376   GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
377       GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
378 
379   switch (msg->message_type) {
380     case PTP_MESSAGE_TYPE_ANNOUNCE:
381       GST_TRACE ("\tANNOUNCE:");
382       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
383           msg->message_specific.announce.origin_timestamp.seconds_field,
384           msg->message_specific.announce.origin_timestamp.nanoseconds_field);
385       GST_TRACE ("\t\tcurrent_utc_offset: %d",
386           msg->message_specific.announce.current_utc_offset);
387       GST_TRACE ("\t\tgrandmaster_priority_1: %u",
388           msg->message_specific.announce.grandmaster_priority_1);
389       GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
390           msg->message_specific.announce.grandmaster_clock_quality.clock_class,
391           msg->message_specific.announce.
392           grandmaster_clock_quality.clock_accuracy,
393           msg->message_specific.announce.
394           grandmaster_clock_quality.offset_scaled_log_variance);
395       GST_TRACE ("\t\tgrandmaster_priority_2: %u",
396           msg->message_specific.announce.grandmaster_priority_2);
397       GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
398           msg->message_specific.announce.grandmaster_identity);
399       GST_TRACE ("\t\tsteps_removed: %u",
400           msg->message_specific.announce.steps_removed);
401       GST_TRACE ("\t\ttime_source: 0x%02x",
402           msg->message_specific.announce.time_source);
403       break;
404     case PTP_MESSAGE_TYPE_SYNC:
405       GST_TRACE ("\tSYNC:");
406       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
407           msg->message_specific.sync.origin_timestamp.seconds_field,
408           msg->message_specific.sync.origin_timestamp.nanoseconds_field);
409       break;
410     case PTP_MESSAGE_TYPE_FOLLOW_UP:
411       GST_TRACE ("\tFOLLOW_UP:");
412       GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
413           msg->message_specific.follow_up.
414           precise_origin_timestamp.seconds_field,
415           msg->message_specific.follow_up.
416           precise_origin_timestamp.nanoseconds_field);
417       break;
418     case PTP_MESSAGE_TYPE_DELAY_REQ:
419       GST_TRACE ("\tDELAY_REQ:");
420       GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
421           msg->message_specific.delay_req.origin_timestamp.seconds_field,
422           msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
423       break;
424     case PTP_MESSAGE_TYPE_DELAY_RESP:
425       GST_TRACE ("\tDELAY_RESP:");
426       GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
427           msg->message_specific.delay_resp.receive_timestamp.seconds_field,
428           msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
429       GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
430           "x %u",
431           msg->message_specific.delay_resp.
432           requesting_port_identity.clock_identity,
433           msg->message_specific.delay_resp.
434           requesting_port_identity.port_number);
435       break;
436     default:
437       break;
438   }
439   GST_TRACE (" ");
440 }
441 
442 /* IEEE 1588-2008 5.3.3 */
443 static gboolean
parse_ptp_timestamp(PtpTimestamp * timestamp,GstByteReader * reader)444 parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
445 {
446   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
447 
448   timestamp->seconds_field =
449       (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
450       gst_byte_reader_get_uint16_be_unchecked (reader);
451   timestamp->nanoseconds_field =
452       gst_byte_reader_get_uint32_be_unchecked (reader);
453 
454   if (timestamp->nanoseconds_field >= 1000000000)
455     return FALSE;
456 
457   return TRUE;
458 }
459 
460 /* IEEE 1588-2008 13.3 */
461 static gboolean
parse_ptp_message_header(PtpMessage * msg,GstByteReader * reader)462 parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
463 {
464   guint8 b;
465 
466   g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
467 
468   b = gst_byte_reader_get_uint8_unchecked (reader);
469   msg->transport_specific = b >> 4;
470   msg->message_type = b & 0x0f;
471 
472   b = gst_byte_reader_get_uint8_unchecked (reader);
473   msg->version_ptp = b & 0x0f;
474   if (msg->version_ptp != 2) {
475     GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
476     return FALSE;
477   }
478 
479   msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
480   if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
481     GST_WARNING ("Not enough data (%u < %u)",
482         gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
483     return FALSE;
484   }
485 
486   msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
487   gst_byte_reader_skip_unchecked (reader, 1);
488 
489   msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
490   msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
491   gst_byte_reader_skip_unchecked (reader, 4);
492 
493   msg->source_port_identity.clock_identity =
494       gst_byte_reader_get_uint64_be_unchecked (reader);
495   msg->source_port_identity.port_number =
496       gst_byte_reader_get_uint16_be_unchecked (reader);
497 
498   msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
499   msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
500   msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
501 
502   return TRUE;
503 }
504 
505 /* IEEE 1588-2008 13.5 */
506 static gboolean
parse_ptp_message_announce(PtpMessage * msg,GstByteReader * reader)507 parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
508 {
509   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
510 
511   if (gst_byte_reader_get_remaining (reader) < 20)
512     return FALSE;
513 
514   if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
515           reader))
516     return FALSE;
517 
518   msg->message_specific.announce.current_utc_offset =
519       gst_byte_reader_get_uint16_be_unchecked (reader);
520   gst_byte_reader_skip_unchecked (reader, 1);
521 
522   msg->message_specific.announce.grandmaster_priority_1 =
523       gst_byte_reader_get_uint8_unchecked (reader);
524   msg->message_specific.announce.grandmaster_clock_quality.clock_class =
525       gst_byte_reader_get_uint8_unchecked (reader);
526   msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
527       gst_byte_reader_get_uint8_unchecked (reader);
528   msg->message_specific.announce.
529       grandmaster_clock_quality.offset_scaled_log_variance =
530       gst_byte_reader_get_uint16_be_unchecked (reader);
531   msg->message_specific.announce.grandmaster_priority_2 =
532       gst_byte_reader_get_uint8_unchecked (reader);
533   msg->message_specific.announce.grandmaster_identity =
534       gst_byte_reader_get_uint64_be_unchecked (reader);
535   msg->message_specific.announce.steps_removed =
536       gst_byte_reader_get_uint16_be_unchecked (reader);
537   msg->message_specific.announce.time_source =
538       gst_byte_reader_get_uint8_unchecked (reader);
539 
540   return TRUE;
541 }
542 
543 /* IEEE 1588-2008 13.6 */
544 static gboolean
parse_ptp_message_sync(PtpMessage * msg,GstByteReader * reader)545 parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
546 {
547   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
548 
549   if (gst_byte_reader_get_remaining (reader) < 10)
550     return FALSE;
551 
552   if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
553           reader))
554     return FALSE;
555 
556   return TRUE;
557 }
558 
559 /* IEEE 1588-2008 13.6 */
560 static gboolean
parse_ptp_message_delay_req(PtpMessage * msg,GstByteReader * reader)561 parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
562 {
563   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
564 
565   if (gst_byte_reader_get_remaining (reader) < 10)
566     return FALSE;
567 
568   if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
569           reader))
570     return FALSE;
571 
572   return TRUE;
573 }
574 
575 /* IEEE 1588-2008 13.7 */
576 static gboolean
parse_ptp_message_follow_up(PtpMessage * msg,GstByteReader * reader)577 parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
578 {
579   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
580 
581   if (gst_byte_reader_get_remaining (reader) < 10)
582     return FALSE;
583 
584   if (!parse_ptp_timestamp (&msg->message_specific.
585           follow_up.precise_origin_timestamp, reader))
586     return FALSE;
587 
588   return TRUE;
589 }
590 
591 /* IEEE 1588-2008 13.8 */
592 static gboolean
parse_ptp_message_delay_resp(PtpMessage * msg,GstByteReader * reader)593 parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
594 {
595   g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
596       FALSE);
597 
598   if (gst_byte_reader_get_remaining (reader) < 20)
599     return FALSE;
600 
601   if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
602           reader))
603     return FALSE;
604 
605   msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
606       gst_byte_reader_get_uint64_be_unchecked (reader);
607   msg->message_specific.delay_resp.requesting_port_identity.port_number =
608       gst_byte_reader_get_uint16_be_unchecked (reader);
609 
610   return TRUE;
611 }
612 
613 static gboolean
parse_ptp_message(PtpMessage * msg,const guint8 * data,gsize size)614 parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
615 {
616   GstByteReader reader;
617   gboolean ret = FALSE;
618 
619   gst_byte_reader_init (&reader, data, size);
620 
621   if (!parse_ptp_message_header (msg, &reader)) {
622     GST_WARNING ("Failed to parse PTP message header");
623     return FALSE;
624   }
625 
626   switch (msg->message_type) {
627     case PTP_MESSAGE_TYPE_SYNC:
628       ret = parse_ptp_message_sync (msg, &reader);
629       break;
630     case PTP_MESSAGE_TYPE_FOLLOW_UP:
631       ret = parse_ptp_message_follow_up (msg, &reader);
632       break;
633     case PTP_MESSAGE_TYPE_DELAY_REQ:
634       ret = parse_ptp_message_delay_req (msg, &reader);
635       break;
636     case PTP_MESSAGE_TYPE_DELAY_RESP:
637       ret = parse_ptp_message_delay_resp (msg, &reader);
638       break;
639     case PTP_MESSAGE_TYPE_ANNOUNCE:
640       ret = parse_ptp_message_announce (msg, &reader);
641       break;
642     default:
643       /* ignore for now */
644       break;
645   }
646 
647   return ret;
648 }
649 
650 static gint
compare_announce_message(const PtpAnnounceMessage * a,const PtpAnnounceMessage * b)651 compare_announce_message (const PtpAnnounceMessage * a,
652     const PtpAnnounceMessage * b)
653 {
654   /* IEEE 1588 Figure 27 */
655   if (a->grandmaster_identity == b->grandmaster_identity) {
656     if (a->steps_removed + 1 < b->steps_removed)
657       return -1;
658     else if (a->steps_removed > b->steps_removed + 1)
659       return 1;
660 
661     /* Error cases are filtered out earlier */
662     if (a->steps_removed < b->steps_removed)
663       return -1;
664     else if (a->steps_removed > b->steps_removed)
665       return 1;
666 
667     /* Error cases are filtered out earlier */
668     if (a->master_clock_identity.clock_identity <
669         b->master_clock_identity.clock_identity)
670       return -1;
671     else if (a->master_clock_identity.clock_identity >
672         b->master_clock_identity.clock_identity)
673       return 1;
674 
675     /* Error cases are filtered out earlier */
676     if (a->master_clock_identity.port_number <
677         b->master_clock_identity.port_number)
678       return -1;
679     else if (a->master_clock_identity.port_number >
680         b->master_clock_identity.port_number)
681       return 1;
682     else
683       g_assert_not_reached ();
684 
685     return 0;
686   }
687 
688   if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
689     return -1;
690   else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
691     return 1;
692 
693   if (a->grandmaster_clock_quality.clock_class <
694       b->grandmaster_clock_quality.clock_class)
695     return -1;
696   else if (a->grandmaster_clock_quality.clock_class >
697       b->grandmaster_clock_quality.clock_class)
698     return 1;
699 
700   if (a->grandmaster_clock_quality.clock_accuracy <
701       b->grandmaster_clock_quality.clock_accuracy)
702     return -1;
703   else if (a->grandmaster_clock_quality.clock_accuracy >
704       b->grandmaster_clock_quality.clock_accuracy)
705     return 1;
706 
707   if (a->grandmaster_clock_quality.offset_scaled_log_variance <
708       b->grandmaster_clock_quality.offset_scaled_log_variance)
709     return -1;
710   else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
711       b->grandmaster_clock_quality.offset_scaled_log_variance)
712     return 1;
713 
714   if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
715     return -1;
716   else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
717     return 1;
718 
719   if (a->grandmaster_identity < b->grandmaster_identity)
720     return -1;
721   else if (a->grandmaster_identity > b->grandmaster_identity)
722     return 1;
723   else
724     g_assert_not_reached ();
725 
726   return 0;
727 }
728 
729 static void
select_best_master_clock(PtpDomainData * domain,GstClockTime now)730 select_best_master_clock (PtpDomainData * domain, GstClockTime now)
731 {
732   GList *qualified_messages = NULL;
733   GList *l, *m;
734   PtpAnnounceMessage *best = NULL;
735 
736   /* IEEE 1588 9.3.2.5 */
737   for (l = domain->announce_senders; l; l = l->next) {
738     PtpAnnounceSender *sender = l->data;
739     GstClockTime window = 4 * sender->announce_interval;
740     gint count = 0;
741 
742     for (m = sender->announce_messages.head; m; m = m->next) {
743       PtpAnnounceMessage *msg = m->data;
744 
745       if (now - msg->receive_time <= window)
746         count++;
747     }
748 
749     /* Only include the newest message of announce senders that had at least 2
750      * announce messages in the last 4 announce intervals. Which also means
751      * that we wait at least 4 announce intervals before we select a master
752      * clock. Until then we just report based on the newest SYNC we received
753      */
754     if (count >= 2) {
755       qualified_messages =
756           g_list_prepend (qualified_messages,
757           g_queue_peek_tail (&sender->announce_messages));
758     }
759   }
760 
761   if (!qualified_messages) {
762     GST_DEBUG
763         ("No qualified announce messages for domain %u, can't select a master clock",
764         domain->domain);
765     domain->have_master_clock = FALSE;
766     return;
767   }
768 
769   for (l = qualified_messages; l; l = l->next) {
770     PtpAnnounceMessage *msg = l->data;
771 
772     if (!best || compare_announce_message (msg, best) < 0)
773       best = msg;
774   }
775   g_clear_pointer (&qualified_messages, g_list_free);
776 
777   if (domain->have_master_clock
778       && compare_clock_identity (&domain->master_clock_identity,
779           &best->master_clock_identity) == 0) {
780     GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
781   } else {
782     GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
783         "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
784         domain->domain, best->master_clock_identity.clock_identity,
785         best->master_clock_identity.port_number, best->grandmaster_identity);
786 
787     domain->have_master_clock = TRUE;
788     domain->grandmaster_identity = best->grandmaster_identity;
789 
790     /* Opportunistic master clock selection likely gave us the same master
791      * clock before, no need to reset all statistics */
792     if (compare_clock_identity (&domain->master_clock_identity,
793             &best->master_clock_identity) != 0) {
794       memcpy (&domain->master_clock_identity, &best->master_clock_identity,
795           sizeof (PtpClockIdentity));
796       domain->mean_path_delay = 0;
797       domain->last_delay_req = 0;
798       domain->last_path_delays_missing = 9;
799       domain->min_delay_req_interval = 0;
800       domain->sync_interval = 0;
801       domain->last_ptp_sync_time = 0;
802       domain->skipped_updates = 0;
803       g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
804           NULL);
805       g_queue_clear (&domain->pending_syncs);
806     }
807 
808     if (g_atomic_int_get (&domain_stats_n_hooks)) {
809       GstStructure *stats =
810           gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
811           "domain", G_TYPE_UINT, domain->domain,
812           "master-clock-id", G_TYPE_UINT64,
813           domain->master_clock_identity.clock_identity,
814           "master-clock-port", G_TYPE_UINT,
815           domain->master_clock_identity.port_number,
816           "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
817           NULL);
818       emit_ptp_statistics (domain->domain, stats);
819       gst_structure_free (stats);
820     }
821   }
822 }
823 
824 static void
handle_announce_message(PtpMessage * msg,GstClockTime receive_time)825 handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
826 {
827   GList *l;
828   PtpDomainData *domain = NULL;
829   PtpAnnounceSender *sender = NULL;
830   PtpAnnounceMessage *announce;
831 
832   /* IEEE1588 9.3.2.2 e)
833    * Don't consider messages with the alternate master flag set
834    */
835   if ((msg->flag_field & 0x0100))
836     return;
837 
838   /* IEEE 1588 9.3.2.5 d)
839    * Don't consider announce messages with steps_removed>=255
840    */
841   if (msg->message_specific.announce.steps_removed >= 255)
842     return;
843 
844   for (l = domain_data; l; l = l->next) {
845     PtpDomainData *tmp = l->data;
846 
847     if (tmp->domain == msg->domain_number) {
848       domain = tmp;
849       break;
850     }
851   }
852 
853   if (!domain) {
854     gchar *clock_name;
855 
856     domain = g_new0 (PtpDomainData, 1);
857     domain->domain = msg->domain_number;
858     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
859     domain->domain_clock =
860         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
861     gst_object_ref_sink (domain->domain_clock);
862     g_free (clock_name);
863     g_queue_init (&domain->pending_syncs);
864     domain->last_path_delays_missing = 9;
865     domain_data = g_list_prepend (domain_data, domain);
866 
867     g_mutex_lock (&domain_clocks_lock);
868     domain_clocks = g_list_prepend (domain_clocks, domain);
869     g_mutex_unlock (&domain_clocks_lock);
870 
871     if (g_atomic_int_get (&domain_stats_n_hooks)) {
872       GstStructure *stats =
873           gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
874           G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
875           domain->domain_clock, NULL);
876       emit_ptp_statistics (domain->domain, stats);
877       gst_structure_free (stats);
878     }
879   }
880 
881   for (l = domain->announce_senders; l; l = l->next) {
882     PtpAnnounceSender *tmp = l->data;
883 
884     if (compare_clock_identity (&tmp->master_clock_identity,
885             &msg->source_port_identity) == 0) {
886       sender = tmp;
887       break;
888     }
889   }
890 
891   if (!sender) {
892     sender = g_new0 (PtpAnnounceSender, 1);
893 
894     memcpy (&sender->master_clock_identity, &msg->source_port_identity,
895         sizeof (PtpClockIdentity));
896     g_queue_init (&sender->announce_messages);
897     domain->announce_senders =
898         g_list_prepend (domain->announce_senders, sender);
899   }
900 
901   for (l = sender->announce_messages.head; l; l = l->next) {
902     PtpAnnounceMessage *tmp = l->data;
903 
904     /* IEEE 1588 9.3.2.5 c)
905      * Don't consider identical messages, i.e. duplicates
906      */
907     if (tmp->sequence_id == msg->sequence_id)
908       return;
909   }
910 
911   sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
912 
913   announce = g_new0 (PtpAnnounceMessage, 1);
914   announce->receive_time = receive_time;
915   announce->sequence_id = msg->sequence_id;
916   memcpy (&announce->master_clock_identity, &msg->source_port_identity,
917       sizeof (PtpClockIdentity));
918   announce->grandmaster_identity =
919       msg->message_specific.announce.grandmaster_identity;
920   announce->grandmaster_priority_1 =
921       msg->message_specific.announce.grandmaster_priority_1;
922   announce->grandmaster_clock_quality.clock_class =
923       msg->message_specific.announce.grandmaster_clock_quality.clock_class;
924   announce->grandmaster_clock_quality.clock_accuracy =
925       msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
926   announce->grandmaster_clock_quality.offset_scaled_log_variance =
927       msg->message_specific.announce.
928       grandmaster_clock_quality.offset_scaled_log_variance;
929   announce->grandmaster_priority_2 =
930       msg->message_specific.announce.grandmaster_priority_2;
931   announce->steps_removed = msg->message_specific.announce.steps_removed;
932   announce->time_source = msg->message_specific.announce.time_source;
933   g_queue_push_tail (&sender->announce_messages, announce);
934 
935   select_best_master_clock (domain, receive_time);
936 }
937 
938 static gboolean
send_delay_req_timeout(PtpPendingSync * sync)939 send_delay_req_timeout (PtpPendingSync * sync)
940 {
941   StdIOHeader header = { 0, };
942   guint8 delay_req[44];
943   GstByteWriter writer;
944   GIOStatus status;
945   gsize written;
946   GError *err = NULL;
947 
948   header.type = TYPE_EVENT;
949   header.size = 44;
950 
951   GST_TRACE ("Sending delay_req to domain %u", sync->domain);
952 
953   gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
954   gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
955   gst_byte_writer_put_uint8_unchecked (&writer, 2);
956   gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
957   gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
958   gst_byte_writer_put_uint8_unchecked (&writer, 0);
959   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
960   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
961   gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
962   gst_byte_writer_put_uint64_be_unchecked (&writer,
963       ptp_clock_id.clock_identity);
964   gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
965   gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
966   gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
967   gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
968   gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
969   gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
970 
971   status =
972       g_io_channel_write_chars (stdout_channel, (gchar *) & header,
973       sizeof (header), &written, &err);
974   if (status == G_IO_STATUS_ERROR) {
975     g_warning ("Failed to write to stdout: %s", err->message);
976     g_clear_error (&err);
977     return G_SOURCE_REMOVE;
978   } else if (status == G_IO_STATUS_EOF) {
979     g_message ("EOF on stdout");
980     g_main_loop_quit (main_loop);
981     return G_SOURCE_REMOVE;
982   } else if (status != G_IO_STATUS_NORMAL) {
983     g_warning ("Unexpected stdout write status: %d", status);
984     g_main_loop_quit (main_loop);
985     return G_SOURCE_REMOVE;
986   } else if (written != sizeof (header)) {
987     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
988     g_main_loop_quit (main_loop);
989     return G_SOURCE_REMOVE;
990   }
991 
992   sync->delay_req_send_time_local =
993       gst_clock_get_time (observation_system_clock);
994 
995   status =
996       g_io_channel_write_chars (stdout_channel,
997       (const gchar *) delay_req, 44, &written, &err);
998   if (status == G_IO_STATUS_ERROR) {
999     g_warning ("Failed to write to stdout: %s", err->message);
1000     g_clear_error (&err);
1001     g_main_loop_quit (main_loop);
1002     return G_SOURCE_REMOVE;
1003   } else if (status == G_IO_STATUS_EOF) {
1004     g_message ("EOF on stdout");
1005     g_main_loop_quit (main_loop);
1006     return G_SOURCE_REMOVE;
1007   } else if (status != G_IO_STATUS_NORMAL) {
1008     g_warning ("Unexpected stdout write status: %d", status);
1009     g_main_loop_quit (main_loop);
1010     return G_SOURCE_REMOVE;
1011   } else if (written != 44) {
1012     g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
1013     g_main_loop_quit (main_loop);
1014     return G_SOURCE_REMOVE;
1015   }
1016 
1017   return G_SOURCE_REMOVE;
1018 }
1019 
1020 static gboolean
send_delay_req(PtpDomainData * domain,PtpPendingSync * sync)1021 send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
1022 {
1023   GstClockTime now = gst_clock_get_time (observation_system_clock);
1024   guint timeout;
1025   GSource *timeout_source;
1026 
1027   if (domain->last_delay_req != 0
1028       && domain->last_delay_req + domain->min_delay_req_interval > now) {
1029     GST_TRACE ("Too soon to send new DELAY_REQ");
1030     return FALSE;
1031   }
1032 
1033   domain->last_delay_req = now;
1034   sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
1035 
1036   /* IEEE 1588 9.5.11.2 */
1037   if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
1038     timeout = 0;
1039   else
1040     timeout =
1041         g_rand_int_range (delay_req_rand, 0,
1042         (domain->min_delay_req_interval * 2) / GST_MSECOND);
1043 
1044   sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
1045   g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
1046   g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
1047       sync, NULL);
1048   g_source_attach (timeout_source, main_context);
1049 
1050   return TRUE;
1051 }
1052 
1053 /* Filtering of outliers for RTT and time calculations inspired
1054  * by the code from gstnetclientclock.c
1055  */
1056 static void
update_ptp_time(PtpDomainData * domain,PtpPendingSync * sync)1057 update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
1058 {
1059   GstClockTime internal_time, external_time, rate_num, rate_den;
1060   GstClockTime corrected_ptp_time, corrected_local_time;
1061   gdouble r_squared = 0.0;
1062   gboolean synced;
1063   GstClockTimeDiff discont = 0;
1064   GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE;
1065 #ifdef USE_MEASUREMENT_FILTERING
1066   GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
1067       orig_rate_den;
1068   GstClockTime new_estimated_ptp_time;
1069   GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
1070   gboolean now_synced;
1071 #endif
1072 #ifdef USE_ONLY_SYNC_WITH_DELAY
1073   GstClockTime mean_path_delay;
1074 #endif
1075 
1076   GST_TRACE ("Updating PTP time");
1077 
1078 #ifdef USE_ONLY_SYNC_WITH_DELAY
1079   if (sync->delay_req_send_time_local == GST_CLOCK_TIME_NONE) {
1080     GST_TRACE ("Not updating - no delay_req sent");
1081     return;
1082   }
1083 
1084   /* IEEE 1588 11.3 */
1085   mean_path_delay =
1086       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1087       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1088       (sync->correction_field_sync + sync->correction_field_delay +
1089           32768) / 65536) / 2;
1090 #endif
1091 
1092   /* IEEE 1588 11.2 */
1093   corrected_ptp_time =
1094       sync->sync_send_time_remote +
1095       (sync->correction_field_sync + 32768) / 65536;
1096 
1097 #ifdef USE_ONLY_SYNC_WITH_DELAY
1098   corrected_local_time = sync->sync_recv_time_local - mean_path_delay;
1099 #else
1100   corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
1101 #endif
1102 
1103 #ifdef USE_MEASUREMENT_FILTERING
1104   /* We check this here and when updating the mean path delay, because
1105    * we can get here without a delay response too. The tolerance on
1106    * accepting follow-up after a sync is high, because a PTP server
1107    * doesn't have to prioritise sending FOLLOW_UP - its purpose is
1108    * just to give us the accurate timestamp of the preceding SYNC.
1109    *
1110    * For that reason also allow at least 100ms delay in case of delays smaller
1111    * than 5ms. */
1112   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
1113       && sync->follow_up_recv_time_local >
1114       sync->sync_recv_time_local + MAX (100 * GST_MSECOND,
1115           20 * domain->mean_path_delay)) {
1116     GstClockTimeDiff delay =
1117         sync->follow_up_recv_time_local - sync->sync_recv_time_local;
1118     GST_WARNING ("Sync-follow-up delay for domain %u too big: %"
1119         GST_STIME_FORMAT " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")",
1120         domain->domain, GST_STIME_ARGS (delay),
1121         GST_TIME_ARGS (domain->mean_path_delay));
1122     synced = FALSE;
1123     gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1124         &internal_time, &external_time, &rate_num, &rate_den);
1125     goto out;
1126   }
1127 #endif
1128 
1129   /* Set an initial local-remote relation */
1130   if (domain->last_ptp_time == 0)
1131     gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
1132         corrected_ptp_time, 1, 1);
1133 
1134 #ifdef USE_MEASUREMENT_FILTERING
1135   /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
1136    * estimate with our present knowledge about the clock
1137    */
1138   /* Store what the clock produced as 'now' before this update */
1139   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1140       &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
1141   internal_time = orig_internal_time;
1142   external_time = orig_external_time;
1143   rate_num = orig_rate_num;
1144   rate_den = orig_rate_den;
1145 
1146   /* 3/4 RTT window around the estimation */
1147   max_discont = domain->mean_path_delay * 3 / 2;
1148 
1149   /* Check if the estimated sync time is inside our window */
1150   estimated_ptp_time_min = corrected_local_time - max_discont;
1151   estimated_ptp_time_min =
1152       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1153       estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
1154   estimated_ptp_time_max = corrected_local_time + max_discont;
1155   estimated_ptp_time_max =
1156       gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
1157       estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
1158 
1159   synced = (estimated_ptp_time_min < corrected_ptp_time
1160       && corrected_ptp_time < estimated_ptp_time_max);
1161 
1162   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1163       GST_TIME_FORMAT, domain->domain,
1164       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1165 
1166   GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1167       GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
1168       GST_TIME_ARGS (corrected_ptp_time),
1169       GST_TIME_ARGS (estimated_ptp_time_max));
1170 
1171   if (gst_clock_add_observation_unapplied (domain->domain_clock,
1172           corrected_local_time, corrected_ptp_time, &r_squared,
1173           &internal_time, &external_time, &rate_num, &rate_den)) {
1174     GST_DEBUG ("Regression gave r_squared: %f", r_squared);
1175 
1176     /* Old estimated PTP time based on receive time and path delay */
1177     estimated_ptp_time = corrected_local_time;
1178     estimated_ptp_time =
1179         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1180         (domain->domain_clock), estimated_ptp_time, orig_internal_time,
1181         orig_external_time, orig_rate_num, orig_rate_den);
1182 
1183     /* New estimated PTP time based on receive time and path delay */
1184     new_estimated_ptp_time = corrected_local_time;
1185     new_estimated_ptp_time =
1186         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1187         (domain->domain_clock), new_estimated_ptp_time, internal_time,
1188         external_time, rate_num, rate_den);
1189 
1190     discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
1191     if (synced && ABS (discont) > max_discont) {
1192       GstClockTimeDiff offset;
1193       GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
1194           ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
1195           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1196           GST_TIME_ARGS (max_discont));
1197       if (discont > 0) {        /* Too large a forward step - add a -ve offset */
1198         offset = max_discont - discont;
1199         if (-offset > external_time)
1200           external_time = 0;
1201         else
1202           external_time += offset;
1203       } else {                  /* Too large a backward step - add a +ve offset */
1204         offset = -(max_discont + discont);
1205         external_time += offset;
1206       }
1207 
1208       discont += offset;
1209     } else {
1210       GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
1211           (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
1212           GST_TIME_ARGS (max_discont));
1213     }
1214 
1215     /* Check if the estimated sync time is now (still) inside our window */
1216     estimated_ptp_time_min = corrected_local_time - max_discont;
1217     estimated_ptp_time_min =
1218         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1219         (domain->domain_clock), estimated_ptp_time_min, internal_time,
1220         external_time, rate_num, rate_den);
1221     estimated_ptp_time_max = corrected_local_time + max_discont;
1222     estimated_ptp_time_max =
1223         gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1224         (domain->domain_clock), estimated_ptp_time_max, internal_time,
1225         external_time, rate_num, rate_den);
1226 
1227     now_synced = (estimated_ptp_time_min < corrected_ptp_time
1228         && corrected_ptp_time < estimated_ptp_time_max);
1229 
1230     GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
1231         GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
1232         GST_TIME_ARGS (corrected_ptp_time),
1233         GST_TIME_ARGS (estimated_ptp_time_max));
1234 
1235     if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
1236       gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
1237           internal_time, external_time, rate_num, rate_den);
1238       domain->skipped_updates = 0;
1239 
1240       domain->last_ptp_time = corrected_ptp_time;
1241       domain->last_local_time = corrected_local_time;
1242     } else {
1243       domain->skipped_updates++;
1244     }
1245   } else {
1246     domain->last_ptp_time = corrected_ptp_time;
1247     domain->last_local_time = corrected_local_time;
1248   }
1249 
1250 #else
1251   GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
1252       GST_TIME_FORMAT, domain->domain,
1253       GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
1254 
1255   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1256       &internal_time, &external_time, &rate_num, &rate_den);
1257 
1258   estimated_ptp_time = corrected_local_time;
1259   estimated_ptp_time =
1260       gst_clock_adjust_with_calibration (GST_CLOCK_CAST
1261       (domain->domain_clock), estimated_ptp_time, internal_time,
1262       external_time, rate_num, rate_den);
1263 
1264   gst_clock_add_observation (domain->domain_clock,
1265       corrected_local_time, corrected_ptp_time, &r_squared);
1266 
1267   gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
1268       &internal_time, &external_time, &rate_num, &rate_den);
1269 
1270   synced = TRUE;
1271   domain->last_ptp_time = corrected_ptp_time;
1272   domain->last_local_time = corrected_local_time;
1273 #endif
1274 
1275 #ifdef USE_MEASUREMENT_FILTERING
1276 out:
1277 #endif
1278   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1279     GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
1280         "domain", G_TYPE_UINT, domain->domain,
1281         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1282         "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
1283         "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
1284         "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
1285         "discontinuity", G_TYPE_INT64, discont,
1286         "synced", G_TYPE_BOOLEAN, synced,
1287         "r-squared", G_TYPE_DOUBLE, r_squared,
1288         "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
1289         "external-time", GST_TYPE_CLOCK_TIME, external_time,
1290         "rate-num", G_TYPE_UINT64, rate_num,
1291         "rate-den", G_TYPE_UINT64, rate_den,
1292         "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
1293         NULL);
1294     emit_ptp_statistics (domain->domain, stats);
1295     gst_structure_free (stats);
1296   }
1297 
1298 }
1299 
1300 #ifdef USE_MEDIAN_PRE_FILTERING
1301 static gint
compare_clock_time(const GstClockTime * a,const GstClockTime * b)1302 compare_clock_time (const GstClockTime * a, const GstClockTime * b)
1303 {
1304   if (*a < *b)
1305     return -1;
1306   else if (*a > *b)
1307     return 1;
1308   return 0;
1309 }
1310 #endif
1311 
1312 static gboolean
update_mean_path_delay(PtpDomainData * domain,PtpPendingSync * sync)1313 update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
1314 {
1315 #ifdef USE_MEDIAN_PRE_FILTERING
1316   GstClockTime last_path_delays[MEDIAN_PRE_FILTERING_WINDOW];
1317   GstClockTime median;
1318   gint i;
1319 #endif
1320 
1321   GstClockTime mean_path_delay, delay_req_delay = 0;
1322   gboolean ret;
1323 
1324   /* IEEE 1588 11.3 */
1325   mean_path_delay =
1326       (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
1327       sync->sync_recv_time_local - sync->delay_req_send_time_local -
1328       (sync->correction_field_sync + sync->correction_field_delay +
1329           32768) / 65536) / 2;
1330 
1331 #ifdef USE_MEDIAN_PRE_FILTERING
1332   for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++)
1333     domain->last_path_delays[i - 1] = domain->last_path_delays[i];
1334   domain->last_path_delays[i - 1] = mean_path_delay;
1335 
1336   if (domain->last_path_delays_missing) {
1337     domain->last_path_delays_missing--;
1338   } else {
1339     memcpy (&last_path_delays, &domain->last_path_delays,
1340         sizeof (last_path_delays));
1341     g_qsort_with_data (&last_path_delays,
1342         MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime),
1343         (GCompareDataFunc) compare_clock_time, NULL);
1344 
1345     median = last_path_delays[MEDIAN_PRE_FILTERING_WINDOW / 2];
1346 
1347     /* FIXME: We might want to use something else here, like only allowing
1348      * things in the interquartile range, or also filtering away delays that
1349      * are too small compared to the median. This here worked well enough
1350      * in tests so far.
1351      */
1352     if (mean_path_delay > 2 * median) {
1353       GST_WARNING ("Path delay for domain %u too big compared to median: %"
1354           GST_TIME_FORMAT " > 2 * %" GST_TIME_FORMAT, domain->domain,
1355           GST_TIME_ARGS (mean_path_delay), GST_TIME_ARGS (median));
1356       ret = FALSE;
1357       goto out;
1358     }
1359   }
1360 #endif
1361 
1362 #ifdef USE_RUNNING_AVERAGE_DELAY
1363   /* Track an average round trip time, for a bit of smoothing */
1364   /* Always update before discarding a sample, so genuine changes in
1365    * the network get picked up, eventually */
1366   if (domain->mean_path_delay == 0)
1367     domain->mean_path_delay = mean_path_delay;
1368   else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
1369     domain->mean_path_delay =
1370         (3 * domain->mean_path_delay + mean_path_delay) / 4;
1371   else
1372     domain->mean_path_delay =
1373         (15 * domain->mean_path_delay + mean_path_delay) / 16;
1374 #else
1375   domain->mean_path_delay = mean_path_delay;
1376 #endif
1377 
1378 #ifdef USE_MEASUREMENT_FILTERING
1379   /* The tolerance on accepting follow-up after a sync is high, because
1380    * a PTP server doesn't have to prioritise sending FOLLOW_UP - its purpose is
1381    * just to give us the accurate timestamp of the preceding SYNC.
1382    *
1383    * For that reason also allow at least 100ms delay in case of delays smaller
1384    * than 5ms. */
1385   if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
1386       domain->mean_path_delay != 0
1387       && sync->follow_up_recv_time_local >
1388       sync->sync_recv_time_local + MAX (100 * GST_MSECOND,
1389           20 * domain->mean_path_delay)) {
1390     GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
1391         " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")", domain->domain,
1392         GST_TIME_ARGS (sync->follow_up_recv_time_local -
1393             sync->sync_recv_time_local),
1394         GST_TIME_ARGS (domain->mean_path_delay));
1395     ret = FALSE;
1396     goto out;
1397   }
1398 
1399   if (mean_path_delay > 2 * domain->mean_path_delay) {
1400     GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
1401         " > 2 * %" GST_TIME_FORMAT, domain->domain,
1402         GST_TIME_ARGS (mean_path_delay),
1403         GST_TIME_ARGS (domain->mean_path_delay));
1404     ret = FALSE;
1405     goto out;
1406   }
1407 #endif
1408 
1409   delay_req_delay =
1410       sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
1411 
1412 #ifdef USE_MEASUREMENT_FILTERING
1413   /* delay_req_delay is a RTT, so 2 times the path delay is what we'd
1414    * hope for, but some PTP systems don't prioritise sending DELAY_RESP,
1415    * but they must still have placed an accurate reception timestamp.
1416    * That means we should be quite tolerant about late DELAY_RESP, and
1417    * mostly rely on filtering out jumps in the mean-path-delay elsewhere.
1418    *
1419    * For that reason also allow at least 100ms delay in case of delays smaller
1420    * than 5ms. */
1421   if (delay_req_delay > MAX (100 * GST_MSECOND, 20 * domain->mean_path_delay)) {
1422     GST_WARNING ("Delay-request-response delay for domain %u too big: %"
1423         GST_TIME_FORMAT " > MAX(100ms, 20 * %" GST_TIME_FORMAT ")",
1424         domain->domain, GST_TIME_ARGS (delay_req_delay),
1425         GST_TIME_ARGS (domain->mean_path_delay));
1426     ret = FALSE;
1427     goto out;
1428   }
1429 #endif
1430 
1431   ret = TRUE;
1432 
1433   GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
1434       GST_TIME_FORMAT ")", domain->domain,
1435       GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
1436   GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
1437       domain->domain, GST_TIME_ARGS (delay_req_delay));
1438 
1439 #if defined(USE_MEASUREMENT_FILTERING) || defined(USE_MEDIAN_PRE_FILTERING)
1440 out:
1441 #endif
1442   if (g_atomic_int_get (&domain_stats_n_hooks)) {
1443     GstStructure *stats =
1444         gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
1445         "domain", G_TYPE_UINT, domain->domain,
1446         "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
1447         "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
1448         "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
1449     emit_ptp_statistics (domain->domain, stats);
1450     gst_structure_free (stats);
1451   }
1452 
1453   return ret;
1454 }
1455 
1456 static void
handle_sync_message(PtpMessage * msg,GstClockTime receive_time)1457 handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
1458 {
1459   GList *l;
1460   PtpDomainData *domain = NULL;
1461   PtpPendingSync *sync = NULL;
1462 
1463   /* Don't consider messages with the alternate master flag set */
1464   if ((msg->flag_field & 0x0100)) {
1465     GST_TRACE ("Ignoring sync message with alternate-master flag");
1466     return;
1467   }
1468 
1469   for (l = domain_data; l; l = l->next) {
1470     PtpDomainData *tmp = l->data;
1471 
1472     if (msg->domain_number == tmp->domain) {
1473       domain = tmp;
1474       break;
1475     }
1476   }
1477 
1478   if (!domain) {
1479     gchar *clock_name;
1480 
1481     domain = g_new0 (PtpDomainData, 1);
1482     domain->domain = msg->domain_number;
1483     clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
1484     domain->domain_clock =
1485         g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
1486     gst_object_ref_sink (domain->domain_clock);
1487     g_free (clock_name);
1488     g_queue_init (&domain->pending_syncs);
1489     domain->last_path_delays_missing = 9;
1490     domain_data = g_list_prepend (domain_data, domain);
1491 
1492     g_mutex_lock (&domain_clocks_lock);
1493     domain_clocks = g_list_prepend (domain_clocks, domain);
1494     g_mutex_unlock (&domain_clocks_lock);
1495   }
1496 
1497   /* If we have a master clock, ignore this message if it's not coming from there */
1498   if (domain->have_master_clock
1499       && compare_clock_identity (&domain->master_clock_identity,
1500           &msg->source_port_identity) != 0)
1501     return;
1502 
1503 #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
1504   /* Opportunistic selection of master clock */
1505   if (!domain->have_master_clock)
1506     memcpy (&domain->master_clock_identity, &msg->source_port_identity,
1507         sizeof (PtpClockIdentity));
1508 #else
1509   if (!domain->have_master_clock)
1510     return;
1511 #endif
1512 
1513   domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
1514 
1515   /* Check if duplicated */
1516   for (l = domain->pending_syncs.head; l; l = l->next) {
1517     PtpPendingSync *tmp = l->data;
1518 
1519     if (tmp->sync_seqnum == msg->sequence_id)
1520       return;
1521   }
1522 
1523   if (msg->message_specific.sync.origin_timestamp.seconds_field >
1524       GST_CLOCK_TIME_NONE / GST_SECOND) {
1525     GST_FIXME ("Unsupported sync message seconds field value: %"
1526         G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
1527         msg->message_specific.sync.origin_timestamp.seconds_field,
1528         GST_CLOCK_TIME_NONE / GST_SECOND);
1529     return;
1530   }
1531 
1532   sync = g_new0 (PtpPendingSync, 1);
1533   sync->domain = domain->domain;
1534   sync->sync_seqnum = msg->sequence_id;
1535   sync->sync_recv_time_local = receive_time;
1536   sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
1537   sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
1538   sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
1539   sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
1540   sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
1541 
1542   /* 0.5 correction factor for division later */
1543   sync->correction_field_sync = msg->correction_field;
1544 
1545   if ((msg->flag_field & 0x0200)) {
1546     /* Wait for FOLLOW_UP */
1547     GST_TRACE ("Waiting for FOLLOW_UP msg");
1548   } else {
1549     sync->sync_send_time_remote =
1550         PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1551         sync.origin_timestamp);
1552 
1553     if (domain->last_ptp_sync_time != 0
1554         && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1555       GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1556           GST_TIME_FORMAT, domain->domain,
1557           GST_TIME_ARGS (domain->last_ptp_sync_time),
1558           GST_TIME_ARGS (sync->sync_send_time_remote));
1559       ptp_pending_sync_free (sync);
1560       sync = NULL;
1561       return;
1562     }
1563     domain->last_ptp_sync_time = sync->sync_send_time_remote;
1564 
1565     if (send_delay_req (domain, sync)) {
1566       /* Sent delay request */
1567     } else {
1568       update_ptp_time (domain, sync);
1569       ptp_pending_sync_free (sync);
1570       sync = NULL;
1571     }
1572   }
1573 
1574   if (sync)
1575     g_queue_push_tail (&domain->pending_syncs, sync);
1576 }
1577 
1578 static void
handle_follow_up_message(PtpMessage * msg,GstClockTime receive_time)1579 handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
1580 {
1581   GList *l;
1582   PtpDomainData *domain = NULL;
1583   PtpPendingSync *sync = NULL;
1584 
1585   GST_TRACE ("Processing FOLLOW_UP message");
1586 
1587   /* Don't consider messages with the alternate master flag set */
1588   if ((msg->flag_field & 0x0100)) {
1589     GST_TRACE ("Ignoring FOLLOW_UP with alternate-master flag");
1590     return;
1591   }
1592 
1593   for (l = domain_data; l; l = l->next) {
1594     PtpDomainData *tmp = l->data;
1595 
1596     if (msg->domain_number == tmp->domain) {
1597       domain = tmp;
1598       break;
1599     }
1600   }
1601 
1602   if (!domain) {
1603     GST_TRACE ("No domain match for FOLLOW_UP msg");
1604     return;
1605   }
1606 
1607   /* If we have a master clock, ignore this message if it's not coming from there */
1608   if (domain->have_master_clock
1609       && compare_clock_identity (&domain->master_clock_identity,
1610           &msg->source_port_identity) != 0) {
1611     GST_TRACE ("FOLLOW_UP msg not from current clock master. Ignoring");
1612     return;
1613   }
1614 
1615   /* Check if we know about this one */
1616   for (l = domain->pending_syncs.head; l; l = l->next) {
1617     PtpPendingSync *tmp = l->data;
1618 
1619     if (tmp->sync_seqnum == msg->sequence_id) {
1620       sync = tmp;
1621       break;
1622     }
1623   }
1624 
1625   if (!sync) {
1626     GST_TRACE ("Ignoring FOLLOW_UP with no pending SYNC");
1627     return;
1628   }
1629 
1630   /* Got a FOLLOW_UP for this already */
1631   if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE) {
1632     GST_TRACE ("Got repeat FOLLOW_UP. Ignoring");
1633     return;
1634   }
1635 
1636   if (sync->sync_recv_time_local >= receive_time) {
1637     GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
1638         GST_TIME_FORMAT, domain->domain,
1639         GST_TIME_ARGS (sync->sync_recv_time_local),
1640         GST_TIME_ARGS (receive_time));
1641     g_queue_remove (&domain->pending_syncs, sync);
1642     ptp_pending_sync_free (sync);
1643     return;
1644   }
1645 
1646   sync->correction_field_sync += msg->correction_field;
1647   sync->sync_send_time_remote =
1648       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1649       follow_up.precise_origin_timestamp);
1650   sync->follow_up_recv_time_local = receive_time;
1651 
1652   if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
1653     GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
1654         GST_TIME_FORMAT, domain->domain,
1655         GST_TIME_ARGS (domain->last_ptp_sync_time),
1656         GST_TIME_ARGS (sync->sync_send_time_remote));
1657     g_queue_remove (&domain->pending_syncs, sync);
1658     ptp_pending_sync_free (sync);
1659     sync = NULL;
1660     return;
1661   }
1662   domain->last_ptp_sync_time = sync->sync_send_time_remote;
1663 
1664   if (send_delay_req (domain, sync)) {
1665     /* Sent delay request */
1666   } else {
1667     update_ptp_time (domain, sync);
1668     g_queue_remove (&domain->pending_syncs, sync);
1669     ptp_pending_sync_free (sync);
1670     sync = NULL;
1671   }
1672 }
1673 
1674 static void
handle_delay_resp_message(PtpMessage * msg,GstClockTime receive_time)1675 handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
1676 {
1677   GList *l;
1678   PtpDomainData *domain = NULL;
1679   PtpPendingSync *sync = NULL;
1680 
1681   /* Don't consider messages with the alternate master flag set */
1682   if ((msg->flag_field & 0x0100))
1683     return;
1684 
1685   for (l = domain_data; l; l = l->next) {
1686     PtpDomainData *tmp = l->data;
1687 
1688     if (msg->domain_number == tmp->domain) {
1689       domain = tmp;
1690       break;
1691     }
1692   }
1693 
1694   if (!domain)
1695     return;
1696 
1697   /* If we have a master clock, ignore this message if it's not coming from there */
1698   if (domain->have_master_clock
1699       && compare_clock_identity (&domain->master_clock_identity,
1700           &msg->source_port_identity) != 0)
1701     return;
1702 
1703   /* Not for us */
1704   if (msg->message_specific.delay_resp.
1705       requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
1706       || msg->message_specific.delay_resp.
1707       requesting_port_identity.port_number != ptp_clock_id.port_number)
1708     return;
1709 
1710   domain->min_delay_req_interval =
1711       log2_to_clock_time (msg->log_message_interval);
1712 
1713   /* Check if we know about this one */
1714   for (l = domain->pending_syncs.head; l; l = l->next) {
1715     PtpPendingSync *tmp = l->data;
1716 
1717     if (tmp->delay_req_seqnum == msg->sequence_id) {
1718       sync = tmp;
1719       break;
1720     }
1721   }
1722 
1723   if (!sync)
1724     return;
1725 
1726   /* Got a DELAY_RESP for this already */
1727   if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
1728     return;
1729 
1730   if (sync->delay_req_send_time_local > receive_time) {
1731     GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
1732         GST_TIME_FORMAT, domain->domain,
1733         GST_TIME_ARGS (sync->delay_req_send_time_local),
1734         GST_TIME_ARGS (receive_time));
1735     g_queue_remove (&domain->pending_syncs, sync);
1736     ptp_pending_sync_free (sync);
1737     return;
1738   }
1739 
1740   sync->correction_field_delay = msg->correction_field;
1741 
1742   sync->delay_req_recv_time_remote =
1743       PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
1744       delay_resp.receive_timestamp);
1745   sync->delay_resp_recv_time_local = receive_time;
1746 
1747   if (domain->mean_path_delay != 0
1748       && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
1749     GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
1750         GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
1751         GST_TIME_ARGS (sync->sync_send_time_remote),
1752         GST_TIME_ARGS (sync->delay_req_recv_time_remote));
1753     g_queue_remove (&domain->pending_syncs, sync);
1754     ptp_pending_sync_free (sync);
1755     return;
1756   }
1757 
1758   if (update_mean_path_delay (domain, sync))
1759     update_ptp_time (domain, sync);
1760   g_queue_remove (&domain->pending_syncs, sync);
1761   ptp_pending_sync_free (sync);
1762 }
1763 
1764 static void
handle_ptp_message(PtpMessage * msg,GstClockTime receive_time)1765 handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
1766 {
1767   /* Ignore our own messages */
1768   if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
1769       msg->source_port_identity.port_number == ptp_clock_id.port_number) {
1770     GST_TRACE ("Ignoring our own message");
1771     return;
1772   }
1773 
1774   GST_TRACE ("Message type %d receive_time %" GST_TIME_FORMAT,
1775       msg->message_type, GST_TIME_ARGS (receive_time));
1776   switch (msg->message_type) {
1777     case PTP_MESSAGE_TYPE_ANNOUNCE:
1778       handle_announce_message (msg, receive_time);
1779       break;
1780     case PTP_MESSAGE_TYPE_SYNC:
1781       handle_sync_message (msg, receive_time);
1782       break;
1783     case PTP_MESSAGE_TYPE_FOLLOW_UP:
1784       handle_follow_up_message (msg, receive_time);
1785       break;
1786     case PTP_MESSAGE_TYPE_DELAY_RESP:
1787       handle_delay_resp_message (msg, receive_time);
1788       break;
1789     default:
1790       break;
1791   }
1792 }
1793 
1794 static gboolean
have_stdin_data_cb(GIOChannel * channel,GIOCondition condition,gpointer user_data)1795 have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
1796     gpointer user_data)
1797 {
1798   GIOStatus status;
1799   StdIOHeader header;
1800   gchar buffer[8192];
1801   GError *err = NULL;
1802   gsize read;
1803 
1804   if ((condition & G_IO_STATUS_EOF)) {
1805     GST_ERROR ("Got EOF on stdin");
1806     g_main_loop_quit (main_loop);
1807     return G_SOURCE_REMOVE;
1808   }
1809 
1810   status =
1811       g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
1812       &read, &err);
1813   if (status == G_IO_STATUS_ERROR) {
1814     GST_ERROR ("Failed to read from stdin: %s", err->message);
1815     g_clear_error (&err);
1816     g_main_loop_quit (main_loop);
1817     return G_SOURCE_REMOVE;
1818   } else if (status == G_IO_STATUS_EOF) {
1819     GST_ERROR ("Got EOF on stdin");
1820     g_main_loop_quit (main_loop);
1821     return G_SOURCE_REMOVE;
1822   } else if (status != G_IO_STATUS_NORMAL) {
1823     GST_ERROR ("Unexpected stdin read status: %d", status);
1824     g_main_loop_quit (main_loop);
1825     return G_SOURCE_REMOVE;
1826   } else if (read != sizeof (header)) {
1827     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1828     g_main_loop_quit (main_loop);
1829     return G_SOURCE_REMOVE;
1830   } else if (header.size > 8192) {
1831     GST_ERROR ("Unexpected size: %u", header.size);
1832     g_main_loop_quit (main_loop);
1833     return G_SOURCE_REMOVE;
1834   }
1835 
1836   status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
1837   if (status == G_IO_STATUS_ERROR) {
1838     GST_ERROR ("Failed to read from stdin: %s", err->message);
1839     g_clear_error (&err);
1840     g_main_loop_quit (main_loop);
1841     return G_SOURCE_REMOVE;
1842   } else if (status == G_IO_STATUS_EOF) {
1843     GST_ERROR ("EOF on stdin");
1844     g_main_loop_quit (main_loop);
1845     return G_SOURCE_REMOVE;
1846   } else if (status != G_IO_STATUS_NORMAL) {
1847     GST_ERROR ("Unexpected stdin read status: %d", status);
1848     g_main_loop_quit (main_loop);
1849     return G_SOURCE_REMOVE;
1850   } else if (read != header.size) {
1851     GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
1852     g_main_loop_quit (main_loop);
1853     return G_SOURCE_REMOVE;
1854   }
1855 
1856   switch (header.type) {
1857     case TYPE_EVENT:
1858     case TYPE_GENERAL:{
1859       GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
1860       PtpMessage msg;
1861 
1862       if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
1863         dump_ptp_message (&msg);
1864         handle_ptp_message (&msg, receive_time);
1865       }
1866       break;
1867     }
1868     default:
1869     case TYPE_CLOCK_ID:{
1870       if (header.size != 8) {
1871         GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
1872         g_main_loop_quit (main_loop);
1873         return G_SOURCE_REMOVE;
1874       }
1875       g_mutex_lock (&ptp_lock);
1876       ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
1877       ptp_clock_id.port_number = getpid ();
1878       GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
1879           ptp_clock_id.clock_identity, ptp_clock_id.port_number);
1880       g_cond_signal (&ptp_cond);
1881       g_mutex_unlock (&ptp_lock);
1882       break;
1883     }
1884   }
1885 
1886   return G_SOURCE_CONTINUE;
1887 }
1888 
1889 /* Cleanup all announce messages and announce message senders
1890  * that are timed out by now, and clean up all pending syncs
1891  * that are missing their FOLLOW_UP or DELAY_RESP */
1892 static gboolean
cleanup_cb(gpointer data)1893 cleanup_cb (gpointer data)
1894 {
1895   GstClockTime now = gst_clock_get_time (observation_system_clock);
1896   GList *l, *m, *n;
1897 
1898   for (l = domain_data; l; l = l->next) {
1899     PtpDomainData *domain = l->data;
1900 
1901     for (n = domain->announce_senders; n;) {
1902       PtpAnnounceSender *sender = n->data;
1903       gboolean timed_out = TRUE;
1904 
1905       /* Keep only 5 messages per sender around */
1906       while (g_queue_get_length (&sender->announce_messages) > 5) {
1907         PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
1908         g_free (msg);
1909       }
1910 
1911       for (m = sender->announce_messages.head; m; m = m->next) {
1912         PtpAnnounceMessage *msg = m->data;
1913 
1914         if (msg->receive_time +
1915             sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
1916           timed_out = FALSE;
1917           break;
1918         }
1919       }
1920 
1921       if (timed_out) {
1922         GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
1923             sender->master_clock_identity.clock_identity,
1924             sender->master_clock_identity.port_number);
1925         g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
1926         g_queue_clear (&sender->announce_messages);
1927       }
1928 
1929       if (g_queue_get_length (&sender->announce_messages) == 0) {
1930         GList *tmp = n->next;
1931 
1932         if (compare_clock_identity (&sender->master_clock_identity,
1933                 &domain->master_clock_identity) == 0)
1934           GST_WARNING ("currently selected master clock timed out");
1935         g_free (sender);
1936         domain->announce_senders =
1937             g_list_delete_link (domain->announce_senders, n);
1938         n = tmp;
1939       } else {
1940         n = n->next;
1941       }
1942     }
1943     select_best_master_clock (domain, now);
1944 
1945     /* Clean up any pending syncs */
1946     for (n = domain->pending_syncs.head; n;) {
1947       PtpPendingSync *sync = n->data;
1948       gboolean timed_out = FALSE;
1949 
1950       /* Time out pending syncs after 4 sync intervals or 10 seconds,
1951        * and pending delay reqs after 4 delay req intervals or 10 seconds
1952        */
1953       if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
1954           ((domain->min_delay_req_interval != 0
1955                   && sync->delay_req_send_time_local +
1956                   4 * domain->min_delay_req_interval < now)
1957               || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
1958         timed_out = TRUE;
1959       } else if ((domain->sync_interval != 0
1960               && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
1961           || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
1962         timed_out = TRUE;
1963       }
1964 
1965       if (timed_out) {
1966         GList *tmp = n->next;
1967         ptp_pending_sync_free (sync);
1968         g_queue_delete_link (&domain->pending_syncs, n);
1969         n = tmp;
1970       } else {
1971         n = n->next;
1972       }
1973     }
1974   }
1975 
1976   return G_SOURCE_CONTINUE;
1977 }
1978 
1979 static gpointer
ptp_helper_main(gpointer data)1980 ptp_helper_main (gpointer data)
1981 {
1982   GSource *cleanup_source;
1983 
1984   GST_DEBUG ("Starting PTP helper loop");
1985 
1986   /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
1987   cleanup_source = g_timeout_source_new_seconds (5);
1988   g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
1989   g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
1990   g_source_attach (cleanup_source, main_context);
1991   g_source_unref (cleanup_source);
1992 
1993   g_main_loop_run (main_loop);
1994   GST_DEBUG ("Stopped PTP helper loop");
1995 
1996   g_mutex_lock (&ptp_lock);
1997   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
1998   ptp_clock_id.port_number = 0;
1999   initted = FALSE;
2000   g_cond_signal (&ptp_cond);
2001   g_mutex_unlock (&ptp_lock);
2002 
2003   return NULL;
2004 }
2005 
2006 /**
2007  * gst_ptp_is_supported:
2008  *
2009  * Check if PTP clocks are generally supported on this system, and if previous
2010  * initializations did not fail.
2011  *
2012  * Returns: %TRUE if PTP clocks are generally supported on this system, and
2013  * previous initializations did not fail.
2014  *
2015  * Since: 1.6
2016  */
2017 gboolean
gst_ptp_is_supported(void)2018 gst_ptp_is_supported (void)
2019 {
2020   return supported;
2021 }
2022 
2023 /**
2024  * gst_ptp_is_initialized:
2025  *
2026  * Check if the GStreamer PTP clock subsystem is initialized.
2027  *
2028  * Returns: %TRUE if the GStreamer PTP clock subsystem is initialized.
2029  *
2030  * Since: 1.6
2031  */
2032 gboolean
gst_ptp_is_initialized(void)2033 gst_ptp_is_initialized (void)
2034 {
2035   return initted;
2036 }
2037 
2038 /**
2039  * gst_ptp_init:
2040  * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
2041  * @interfaces: (transfer none) (array zero-terminated=1) (allow-none): network interfaces to run the clock on
2042  *
2043  * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
2044  * slave-only mode for all domains on the given @interfaces with the
2045  * given @clock_id.
2046  *
2047  * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
2048  * generated from the MAC address of the first network interface.
2049  *
2050  * This function is automatically called by gst_ptp_clock_new() with default
2051  * parameters if it wasn't called before.
2052  *
2053  * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
2054  *
2055  * Since: 1.6
2056  */
2057 gboolean
gst_ptp_init(guint64 clock_id,gchar ** interfaces)2058 gst_ptp_init (guint64 clock_id, gchar ** interfaces)
2059 {
2060   gboolean ret;
2061   const gchar *env;
2062   gchar **argv = NULL;
2063   gint argc, argc_c;
2064   gint fd_r, fd_w;
2065   GError *err = NULL;
2066   GSource *stdin_source;
2067 
2068   GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
2069 
2070   g_mutex_lock (&ptp_lock);
2071   if (!supported) {
2072     GST_ERROR ("PTP not supported");
2073     ret = FALSE;
2074     goto done;
2075   }
2076 
2077   if (initted) {
2078     GST_DEBUG ("PTP already initialized");
2079     ret = TRUE;
2080     goto done;
2081   }
2082 
2083   if (ptp_helper_pid) {
2084     GST_DEBUG ("PTP currently initializing");
2085     goto wait;
2086   }
2087 
2088   if (!domain_stats_hooks_initted) {
2089     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2090     domain_stats_hooks_initted = TRUE;
2091   }
2092 
2093   argc = 1;
2094   if (clock_id != GST_PTP_CLOCK_ID_NONE)
2095     argc += 2;
2096   if (interfaces != NULL)
2097     argc += 2 * g_strv_length (interfaces);
2098 
2099   argv = g_new0 (gchar *, argc + 2);
2100   argc_c = 0;
2101 
2102   env = g_getenv ("GST_PTP_HELPER_1_0");
2103   if (env == NULL)
2104     env = g_getenv ("GST_PTP_HELPER");
2105   if (env != NULL && *env != '\0') {
2106     GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
2107     argv[argc_c++] = g_strdup (env);
2108   } else {
2109     argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
2110   }
2111 
2112   if (clock_id != GST_PTP_CLOCK_ID_NONE) {
2113     argv[argc_c++] = g_strdup ("-c");
2114     argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
2115   }
2116 
2117   if (interfaces != NULL) {
2118     gchar **ptr = interfaces;
2119 
2120     while (*ptr) {
2121       argv[argc_c++] = g_strdup ("-i");
2122       argv[argc_c++] = g_strdup (*ptr);
2123       ptr++;
2124     }
2125   }
2126 
2127   main_context = g_main_context_new ();
2128   main_loop = g_main_loop_new (main_context, FALSE);
2129 
2130   ptp_helper_thread =
2131       g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
2132   if (!ptp_helper_thread) {
2133     GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
2134     g_clear_error (&err);
2135     ret = FALSE;
2136     goto done;
2137   }
2138 
2139   if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
2140           &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
2141     GST_ERROR ("Failed to start ptp helper process: %s", err->message);
2142     g_clear_error (&err);
2143     ret = FALSE;
2144     supported = FALSE;
2145     goto done;
2146   }
2147 
2148   stdin_channel = g_io_channel_unix_new (fd_r);
2149   g_io_channel_set_encoding (stdin_channel, NULL, NULL);
2150   g_io_channel_set_buffered (stdin_channel, FALSE);
2151   g_io_channel_set_close_on_unref (stdin_channel, TRUE);
2152   stdin_source =
2153       g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
2154   g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
2155   g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
2156       NULL);
2157   g_source_attach (stdin_source, main_context);
2158   g_source_unref (stdin_source);
2159 
2160   /* Create stdout channel */
2161   stdout_channel = g_io_channel_unix_new (fd_w);
2162   g_io_channel_set_encoding (stdout_channel, NULL, NULL);
2163   g_io_channel_set_close_on_unref (stdout_channel, TRUE);
2164   g_io_channel_set_buffered (stdout_channel, FALSE);
2165 
2166   delay_req_rand = g_rand_new ();
2167   observation_system_clock =
2168       g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
2169       NULL);
2170   gst_object_ref_sink (observation_system_clock);
2171 
2172   initted = TRUE;
2173 
2174 wait:
2175   GST_DEBUG ("Waiting for PTP to be initialized");
2176 
2177   while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
2178     g_cond_wait (&ptp_cond, &ptp_lock);
2179 
2180   ret = initted;
2181   if (ret) {
2182     GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
2183         ptp_clock_id.clock_identity, ptp_clock_id.port_number);
2184   } else {
2185     GST_ERROR ("Failed to initialize");
2186     supported = FALSE;
2187   }
2188 
2189 done:
2190   g_strfreev (argv);
2191 
2192   if (!ret) {
2193     if (ptp_helper_pid) {
2194 #ifndef G_OS_WIN32
2195       kill (ptp_helper_pid, SIGKILL);
2196       waitpid (ptp_helper_pid, NULL, 0);
2197 #else
2198       TerminateProcess (ptp_helper_pid, 1);
2199       WaitForSingleObject (ptp_helper_pid, INFINITE);
2200 #endif
2201       g_spawn_close_pid (ptp_helper_pid);
2202     }
2203     ptp_helper_pid = 0;
2204 
2205     if (stdin_channel)
2206       g_io_channel_unref (stdin_channel);
2207     stdin_channel = NULL;
2208     if (stdout_channel)
2209       g_io_channel_unref (stdout_channel);
2210     stdout_channel = NULL;
2211 
2212     if (main_loop && ptp_helper_thread) {
2213       g_main_loop_quit (main_loop);
2214       g_thread_join (ptp_helper_thread);
2215     }
2216     ptp_helper_thread = NULL;
2217     if (main_loop)
2218       g_main_loop_unref (main_loop);
2219     main_loop = NULL;
2220     if (main_context)
2221       g_main_context_unref (main_context);
2222     main_context = NULL;
2223 
2224     if (delay_req_rand)
2225       g_rand_free (delay_req_rand);
2226     delay_req_rand = NULL;
2227 
2228     if (observation_system_clock)
2229       gst_object_unref (observation_system_clock);
2230     observation_system_clock = NULL;
2231   }
2232 
2233   g_mutex_unlock (&ptp_lock);
2234 
2235   return ret;
2236 }
2237 
2238 /**
2239  * gst_ptp_deinit:
2240  *
2241  * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
2242  * are any remaining GstPtpClock instances, they won't be further synchronized
2243  * to the PTP network clock.
2244  *
2245  * Since: 1.6
2246  */
2247 void
gst_ptp_deinit(void)2248 gst_ptp_deinit (void)
2249 {
2250   GList *l, *m;
2251 
2252   g_mutex_lock (&ptp_lock);
2253 
2254   if (ptp_helper_pid) {
2255 #ifndef G_OS_WIN32
2256     kill (ptp_helper_pid, SIGKILL);
2257     waitpid (ptp_helper_pid, NULL, 0);
2258 #else
2259     TerminateProcess (ptp_helper_pid, 1);
2260     WaitForSingleObject (ptp_helper_pid, INFINITE);
2261 #endif
2262     g_spawn_close_pid (ptp_helper_pid);
2263   }
2264   ptp_helper_pid = 0;
2265 
2266   if (stdin_channel)
2267     g_io_channel_unref (stdin_channel);
2268   stdin_channel = NULL;
2269   if (stdout_channel)
2270     g_io_channel_unref (stdout_channel);
2271   stdout_channel = NULL;
2272 
2273   if (main_loop && ptp_helper_thread) {
2274     GThread *tmp = ptp_helper_thread;
2275     ptp_helper_thread = NULL;
2276     g_mutex_unlock (&ptp_lock);
2277     g_main_loop_quit (main_loop);
2278     g_thread_join (tmp);
2279     g_mutex_lock (&ptp_lock);
2280   }
2281   if (main_loop)
2282     g_main_loop_unref (main_loop);
2283   main_loop = NULL;
2284   if (main_context)
2285     g_main_context_unref (main_context);
2286   main_context = NULL;
2287 
2288   if (delay_req_rand)
2289     g_rand_free (delay_req_rand);
2290   delay_req_rand = NULL;
2291   if (observation_system_clock)
2292     gst_object_unref (observation_system_clock);
2293   observation_system_clock = NULL;
2294 
2295   for (l = domain_data; l; l = l->next) {
2296     PtpDomainData *domain = l->data;
2297 
2298     for (m = domain->announce_senders; m; m = m->next) {
2299       PtpAnnounceSender *sender = m->data;
2300 
2301       g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
2302       g_queue_clear (&sender->announce_messages);
2303       g_free (sender);
2304     }
2305     g_list_free (domain->announce_senders);
2306 
2307     g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
2308         NULL);
2309     g_queue_clear (&domain->pending_syncs);
2310     gst_object_unref (domain->domain_clock);
2311     g_free (domain);
2312   }
2313   g_list_free (domain_data);
2314   domain_data = NULL;
2315   g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
2316   g_list_free (domain_clocks);
2317   domain_clocks = NULL;
2318 
2319   ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
2320   ptp_clock_id.port_number = 0;
2321 
2322   initted = FALSE;
2323 
2324   g_mutex_unlock (&ptp_lock);
2325 }
2326 
2327 #define DEFAULT_DOMAIN 0
2328 
2329 enum
2330 {
2331   PROP_0,
2332   PROP_DOMAIN,
2333   PROP_INTERNAL_CLOCK,
2334   PROP_MASTER_CLOCK_ID,
2335   PROP_GRANDMASTER_CLOCK_ID
2336 };
2337 
2338 struct _GstPtpClockPrivate
2339 {
2340   guint domain;
2341   GstClock *domain_clock;
2342   gulong domain_stats_id;
2343 };
2344 
2345 #define gst_ptp_clock_parent_class parent_class
2346 G_DEFINE_TYPE_WITH_PRIVATE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
2347 
2348 static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
2349     const GValue * value, GParamSpec * pspec);
2350 static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
2351     GValue * value, GParamSpec * pspec);
2352 static void gst_ptp_clock_finalize (GObject * object);
2353 
2354 static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
2355 
2356 static void
gst_ptp_clock_class_init(GstPtpClockClass * klass)2357 gst_ptp_clock_class_init (GstPtpClockClass * klass)
2358 {
2359   GObjectClass *gobject_class;
2360   GstClockClass *clock_class;
2361 
2362   gobject_class = G_OBJECT_CLASS (klass);
2363   clock_class = GST_CLOCK_CLASS (klass);
2364 
2365   gobject_class->finalize = gst_ptp_clock_finalize;
2366   gobject_class->get_property = gst_ptp_clock_get_property;
2367   gobject_class->set_property = gst_ptp_clock_set_property;
2368 
2369   g_object_class_install_property (gobject_class, PROP_DOMAIN,
2370       g_param_spec_uint ("domain", "Domain",
2371           "The PTP domain", 0, G_MAXUINT8,
2372           DEFAULT_DOMAIN,
2373           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
2374 
2375   g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
2376       g_param_spec_object ("internal-clock", "Internal Clock",
2377           "Internal clock", GST_TYPE_CLOCK,
2378           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2379 
2380   g_object_class_install_property (gobject_class, PROP_MASTER_CLOCK_ID,
2381       g_param_spec_uint64 ("master-clock-id", "Master Clock ID",
2382           "Master Clock ID", 0, G_MAXUINT64, 0,
2383           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2384 
2385   g_object_class_install_property (gobject_class, PROP_GRANDMASTER_CLOCK_ID,
2386       g_param_spec_uint64 ("grandmaster-clock-id", "Grand Master Clock ID",
2387           "Grand Master Clock ID", 0, G_MAXUINT64, 0,
2388           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
2389 
2390   clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
2391 }
2392 
2393 static void
gst_ptp_clock_init(GstPtpClock * self)2394 gst_ptp_clock_init (GstPtpClock * self)
2395 {
2396   GstPtpClockPrivate *priv;
2397 
2398   self->priv = priv = gst_ptp_clock_get_instance_private (self);
2399 
2400   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
2401   GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
2402 
2403   priv->domain = DEFAULT_DOMAIN;
2404 }
2405 
2406 static gboolean
gst_ptp_clock_ensure_domain_clock(GstPtpClock * self)2407 gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
2408 {
2409   gboolean got_clock = TRUE;
2410 
2411   if (G_UNLIKELY (!self->priv->domain_clock)) {
2412     g_mutex_lock (&domain_clocks_lock);
2413     if (!self->priv->domain_clock) {
2414       GList *l;
2415 
2416       got_clock = FALSE;
2417 
2418       for (l = domain_clocks; l; l = l->next) {
2419         PtpDomainData *clock_data = l->data;
2420 
2421         if (clock_data->domain == self->priv->domain &&
2422             clock_data->have_master_clock && clock_data->last_ptp_time != 0) {
2423           GST_DEBUG ("Switching domain clock on domain %d", clock_data->domain);
2424           self->priv->domain_clock = clock_data->domain_clock;
2425           got_clock = TRUE;
2426           break;
2427         }
2428       }
2429     }
2430     g_mutex_unlock (&domain_clocks_lock);
2431     if (got_clock) {
2432       g_object_notify (G_OBJECT (self), "internal-clock");
2433       gst_clock_set_synced (GST_CLOCK (self), TRUE);
2434     }
2435   }
2436 
2437   return got_clock;
2438 }
2439 
2440 static gboolean
gst_ptp_clock_stats_callback(guint8 domain,const GstStructure * stats,gpointer user_data)2441 gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
2442     gpointer user_data)
2443 {
2444   GstPtpClock *self = user_data;
2445 
2446   if (domain != self->priv->domain
2447       || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
2448     return TRUE;
2449 
2450   /* Let's set our internal clock */
2451   if (!gst_ptp_clock_ensure_domain_clock (self))
2452     return TRUE;
2453 
2454   self->priv->domain_stats_id = 0;
2455 
2456   return FALSE;
2457 }
2458 
2459 static void
gst_ptp_clock_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)2460 gst_ptp_clock_set_property (GObject * object, guint prop_id,
2461     const GValue * value, GParamSpec * pspec)
2462 {
2463   GstPtpClock *self = GST_PTP_CLOCK (object);
2464 
2465   switch (prop_id) {
2466     case PROP_DOMAIN:
2467       self->priv->domain = g_value_get_uint (value);
2468       gst_ptp_clock_ensure_domain_clock (self);
2469       if (!self->priv->domain_clock)
2470         self->priv->domain_stats_id =
2471             gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
2472             NULL);
2473       break;
2474     default:
2475       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2476       break;
2477   }
2478 }
2479 
2480 static void
gst_ptp_clock_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)2481 gst_ptp_clock_get_property (GObject * object, guint prop_id,
2482     GValue * value, GParamSpec * pspec)
2483 {
2484   GstPtpClock *self = GST_PTP_CLOCK (object);
2485 
2486   switch (prop_id) {
2487     case PROP_DOMAIN:
2488       g_value_set_uint (value, self->priv->domain);
2489       break;
2490     case PROP_INTERNAL_CLOCK:
2491       gst_ptp_clock_ensure_domain_clock (self);
2492       g_value_set_object (value, self->priv->domain_clock);
2493       break;
2494     case PROP_MASTER_CLOCK_ID:
2495     case PROP_GRANDMASTER_CLOCK_ID:{
2496       GList *l;
2497 
2498       g_mutex_lock (&domain_clocks_lock);
2499       g_value_set_uint64 (value, 0);
2500 
2501       for (l = domain_clocks; l; l = l->next) {
2502         PtpDomainData *clock_data = l->data;
2503 
2504         if (clock_data->domain == self->priv->domain) {
2505           if (prop_id == PROP_MASTER_CLOCK_ID)
2506             g_value_set_uint64 (value,
2507                 clock_data->master_clock_identity.clock_identity);
2508           else
2509             g_value_set_uint64 (value, clock_data->grandmaster_identity);
2510           break;
2511         }
2512       }
2513       g_mutex_unlock (&domain_clocks_lock);
2514       break;
2515     }
2516     default:
2517       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2518       break;
2519   }
2520 }
2521 
2522 static void
gst_ptp_clock_finalize(GObject * object)2523 gst_ptp_clock_finalize (GObject * object)
2524 {
2525   GstPtpClock *self = GST_PTP_CLOCK (object);
2526 
2527   if (self->priv->domain_stats_id)
2528     gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
2529 
2530   G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
2531 }
2532 
2533 static GstClockTime
gst_ptp_clock_get_internal_time(GstClock * clock)2534 gst_ptp_clock_get_internal_time (GstClock * clock)
2535 {
2536   GstPtpClock *self = GST_PTP_CLOCK (clock);
2537 
2538   gst_ptp_clock_ensure_domain_clock (self);
2539 
2540   if (!self->priv->domain_clock) {
2541     GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
2542         self->priv->domain);
2543     return GST_CLOCK_TIME_NONE;
2544   }
2545 
2546   return gst_clock_get_time (self->priv->domain_clock);
2547 }
2548 
2549 /**
2550  * gst_ptp_clock_new:
2551  * @name: Name of the clock
2552  * @domain: PTP domain
2553  *
2554  * Creates a new PTP clock instance that exports the PTP time of the master
2555  * clock in @domain. This clock can be slaved to other clocks as needed.
2556  *
2557  * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
2558  * default parameters.
2559  *
2560  * This clock only returns valid timestamps after it received the first
2561  * times from the PTP master clock on the network. Once this happens the
2562  * GstPtpClock::internal-clock property will become non-NULL. You can
2563  * check this with gst_clock_wait_for_sync(), the GstClock::synced signal and
2564  * gst_clock_is_synced().
2565  *
2566  * Returns: (transfer full): A new #GstClock
2567  *
2568  * Since: 1.6
2569  */
2570 GstClock *
gst_ptp_clock_new(const gchar * name,guint domain)2571 gst_ptp_clock_new (const gchar * name, guint domain)
2572 {
2573   GstClock *clock;
2574 
2575   g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
2576 
2577   if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
2578     GST_ERROR ("Failed to initialize PTP");
2579     return NULL;
2580   }
2581 
2582   clock = g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
2583       NULL);
2584 
2585   /* Clear floating flag */
2586   gst_object_ref_sink (clock);
2587 
2588   return clock;
2589 }
2590 
2591 typedef struct
2592 {
2593   guint8 domain;
2594   const GstStructure *stats;
2595 } DomainStatsMarshalData;
2596 
2597 static void
domain_stats_marshaller(GHook * hook,DomainStatsMarshalData * data)2598 domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
2599 {
2600   GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
2601 
2602   if (!callback (data->domain, data->stats, hook->data))
2603     g_hook_destroy (&domain_stats_hooks, hook->hook_id);
2604 }
2605 
2606 static void
emit_ptp_statistics(guint8 domain,const GstStructure * stats)2607 emit_ptp_statistics (guint8 domain, const GstStructure * stats)
2608 {
2609   DomainStatsMarshalData data = { domain, stats };
2610 
2611   g_mutex_lock (&ptp_lock);
2612   g_hook_list_marshal (&domain_stats_hooks, TRUE,
2613       (GHookMarshaller) domain_stats_marshaller, &data);
2614   g_mutex_unlock (&ptp_lock);
2615 }
2616 
2617 /**
2618  * gst_ptp_statistics_callback_add:
2619  * @callback: GstPtpStatisticsCallback to call
2620  * @user_data: Data to pass to the callback
2621  * @destroy_data: GDestroyNotify to destroy the data
2622  *
2623  * Installs a new statistics callback for gathering PTP statistics. See
2624  * GstPtpStatisticsCallback for a list of statistics that are provided.
2625  *
2626  * Returns: Id for the callback that can be passed to
2627  * gst_ptp_statistics_callback_remove()
2628  *
2629  * Since: 1.6
2630  */
2631 gulong
gst_ptp_statistics_callback_add(GstPtpStatisticsCallback callback,gpointer user_data,GDestroyNotify destroy_data)2632 gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
2633     gpointer user_data, GDestroyNotify destroy_data)
2634 {
2635   GHook *hook;
2636 
2637   g_mutex_lock (&ptp_lock);
2638 
2639   if (!domain_stats_hooks_initted) {
2640     g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
2641     domain_stats_hooks_initted = TRUE;
2642   }
2643 
2644   hook = g_hook_alloc (&domain_stats_hooks);
2645   hook->func = callback;
2646   hook->data = user_data;
2647   hook->destroy = destroy_data;
2648   g_hook_prepend (&domain_stats_hooks, hook);
2649   g_atomic_int_add (&domain_stats_n_hooks, 1);
2650 
2651   g_mutex_unlock (&ptp_lock);
2652 
2653   return hook->hook_id;
2654 }
2655 
2656 /**
2657  * gst_ptp_statistics_callback_remove:
2658  * @id: Callback id to remove
2659  *
2660  * Removes a PTP statistics callback that was previously added with
2661  * gst_ptp_statistics_callback_add().
2662  *
2663  * Since: 1.6
2664  */
2665 void
gst_ptp_statistics_callback_remove(gulong id)2666 gst_ptp_statistics_callback_remove (gulong id)
2667 {
2668   g_mutex_lock (&ptp_lock);
2669   if (g_hook_destroy (&domain_stats_hooks, id))
2670     g_atomic_int_add (&domain_stats_n_hooks, -1);
2671   g_mutex_unlock (&ptp_lock);
2672 }
2673