• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Simple watchdog class."""
15
16import threading
17from typing import Any, Callable, Optional
18
19
20class Watchdog:
21    """Simple class that times out unless reset.
22
23    This class could be used, for example, to track a device's connection state
24    for devices that send a periodic heartbeat packet.
25    """
26
27    def __init__(
28        self,
29        on_reset: Callable[[], Any],
30        on_expiration: Callable[[], Any],
31        while_expired: Callable[[], Any] = lambda: None,
32        timeout_s: float = 1,
33        expired_timeout_s: Optional[float] = None,
34    ):
35        """Creates a watchdog; start() must be called to start it.
36
37        Args:
38          on_reset: Function called when the watchdog is reset after having
39              expired.
40          on_expiration: Function called when the timeout expires.
41          while_expired: Function called repeatedly while the watchdog is
42              expired.
43          timeout_s: If reset() is not called for timeout_s, the watchdog
44              expires and calls the on_expiration callback.
45          expired_timeout_s: While expired, the watchdog calls the
46              while_expired callback every expired_timeout_s.
47        """
48        self._on_reset = on_reset
49        self._on_expiration = on_expiration
50        self._while_expired = while_expired
51
52        self.timeout_s = timeout_s
53
54        if expired_timeout_s is None:
55            self.expired_timeout_s = self.timeout_s * 10
56        else:
57            self.expired_timeout_s = expired_timeout_s
58
59        self.expired: bool = False
60        self._watchdog = threading.Timer(0, self._timeout_expired)
61
62    def start(self) -> None:
63        """Starts the watchdog; must be called for the watchdog to work."""
64        self._watchdog.cancel()
65        self._watchdog = threading.Timer(
66            self.expired_timeout_s if self.expired else self.timeout_s,
67            self._timeout_expired,
68        )
69        self._watchdog.daemon = True
70        self._watchdog.start()
71
72    def reset(self) -> bool:
73        """Resets the timeout; calls the on_reset callback if expired.
74
75        Returns True if was expired.
76        """
77        if self.expired:
78            self.expired = False
79            self._on_reset()
80            return True
81
82        self.start()
83        return False
84
85    def _timeout_expired(self) -> None:
86        if self.expired:
87            self._while_expired()
88        else:
89            self.expired = True
90            self._on_expiration()
91
92        self.start()
93