• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Simple watchdog class."""
15
16import threading
17from typing import Any, Callable
18
19
20class Watchdog:
21    """Simple class that times out unless reset.
22
23    This class could be used, for example, to track a device's connection state
24    for devices that send a periodic heartbeat packet.
25    """
26    def __init__(self,
27                 on_reset: Callable[[], Any],
28                 on_expiration: Callable[[], Any],
29                 while_expired: Callable[[], Any] = lambda: None,
30                 timeout_s: float = 1,
31                 expired_timeout_s: float = None):
32        """Creates a watchdog; start() must be called to start it.
33
34        Args:
35          on_reset: Function called when the watchdog is reset after having
36              expired.
37          on_expiration: Function called when the timeout expires.
38          while_expired: Function called repeatedly while the watchdog is
39              expired.
40          timeout_s: If reset() is not called for timeout_s, the watchdog
41              expires and calls the on_expiration callback.
42          expired_timeout_s: While expired, the watchdog calls the
43              while_expired callback every expired_timeout_s.
44        """
45        self._on_reset = on_reset
46        self._on_expiration = on_expiration
47        self._while_expired = while_expired
48
49        self.timeout_s = timeout_s
50
51        if expired_timeout_s is None:
52            self.expired_timeout_s = self.timeout_s * 10
53        else:
54            self.expired_timeout_s = expired_timeout_s
55
56        self.expired: bool = False
57        self._watchdog = threading.Timer(0, self._timeout_expired)
58
59    def start(self) -> None:
60        """Starts the watchdog; must be called for the watchdog to work."""
61        self._watchdog.cancel()
62        self._watchdog = threading.Timer(
63            self.expired_timeout_s if self.expired else self.timeout_s,
64            self._timeout_expired)
65        self._watchdog.daemon = True
66        self._watchdog.start()
67
68    def reset(self) -> None:
69        """Resets the timeout; calls the on_reset callback if expired."""
70        if self.expired:
71            self.expired = False
72            self._on_reset()
73
74        self.start()
75
76    def _timeout_expired(self) -> None:
77        if self.expired:
78            self._while_expired()
79        else:
80            self.expired = True
81            self._on_expiration()
82
83        self.start()
84