• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Run an interruptable, cancellable function after debouncing run requests"""
15
16import enum
17import logging
18import threading
19from abc import ABC, abstractmethod
20
21_LOG = logging.getLogger('pw_watch')
22
23
24class DebouncedFunction(ABC):
25    """Function to be run by Debouncer"""
26    @abstractmethod
27    def run(self) -> None:
28        """Run the function"""
29
30    @abstractmethod
31    def cancel(self) -> bool:
32        """Cancel an in-progress run of the function.
33        Must be called from different thread than run().
34        Returns true if run was successfully cancelled, false otherwise"""
35
36    @abstractmethod
37    def on_complete(self, cancelled: bool = False) -> None:
38        """Called after run() finishes. If true, cancelled indicates
39        cancel() was invoked during the last run()"""
40
41    # Note: The debounce uses threads. Since there is no way to guarantee which
42    # thread recieves a KeyboardInterrupt, it is necessary catch this event
43    # in all debouncer threads and forward it to the user.
44    @abstractmethod
45    def on_keyboard_interrupt(self):
46        """Called when keyboard interrupt is delivered to a debouncer thread"""
47
48
49class State(enum.Enum):
50    IDLE = 1  # ------- Transistions to: DEBOUNCING
51    DEBOUNCING = 2  # - Transistions to: RUNNING
52    RUNNING = 3  # ---- Transistions to: INTERRUPTED or COOLDOWN
53    INTERRUPTED = 4  #- Transistions to: RERUN
54    COOLDOWN = 5  #---- Transistions to: IDLE
55    RERUN = 6  #------- Transistions to: IDLE (but triggers a press)
56
57
58class Debouncer:
59    """Run an interruptable, cancellable function with debouncing"""
60    def __init__(self, function: DebouncedFunction) -> None:
61        super().__init__()
62        self.function = function
63
64        self.state = State.IDLE
65
66        self.debounce_seconds = 1
67        self.debounce_timer = None
68
69        self.cooldown_seconds = 1
70        self.cooldown_timer = None
71
72        self.rerun_event_description = ''
73
74        self.lock = threading.Lock()
75
76    def press(self, event_description: str = '') -> None:
77        """Try to run the function for the class. If the function is recently
78        started, this may push out the deadline for actually starting. If the
79        function is already running, will interrupt the function"""
80        with self.lock:
81            self._press_unlocked(event_description)
82
83    def _press_unlocked(self, event_description: str) -> None:
84        _LOG.debug('Press - state = %s', str(self.state))
85        if self.state == State.IDLE:
86            if event_description:
87                _LOG.info('%s', event_description)
88            self._start_debounce_timer()
89            self._transition(State.DEBOUNCING)
90
91        elif self.state == State.DEBOUNCING:
92            self._start_debounce_timer()
93
94        elif self.state == State.RUNNING:
95            # When the function is already running but we get an incoming
96            # event, go into the INTERRUPTED state to signal that we should
97            # re-try running afterwards.
98
99            # Push an empty line to flush ongoing I/O in subprocess.
100            _LOG.error('')
101
102            # Surround the error message with newlines to make it stand out.
103            _LOG.error('')
104            _LOG.error('Event while running: %s', event_description)
105            _LOG.error('')
106
107            self.function.cancel()
108            self._transition(State.INTERRUPTED)
109            self.rerun_event_description = event_description
110
111        elif self.state == State.INTERRUPTED:
112            # Function is running but was already interrupted. Do nothing.
113            _LOG.debug('Ignoring press - interrupted')
114
115        elif self.state == State.COOLDOWN:
116            # Function just finished and we are cooling down; so trigger rerun.
117            _LOG.debug('Got event in cooldown; scheduling rerun')
118            self._transition(State.RERUN)
119            self.rerun_event_description = event_description
120
121    def _transition(self, new_state: State) -> None:
122        _LOG.debug('State: %s -> %s', self.state, new_state)
123        self.state = new_state
124
125    def _start_debounce_timer(self):
126        assert self.lock.locked()
127        if self.state == State.DEBOUNCING:
128            self.debounce_timer.cancel()
129        self.debounce_timer = threading.Timer(self.debounce_seconds,
130                                              self._run_function)
131        self.debounce_timer.start()
132
133    # Called from debounce_timer thread.
134    def _run_function(self):
135        try:
136            with self.lock:
137                assert self.state == State.DEBOUNCING
138                self.debounce_timer = None
139                self._transition(State.RUNNING)
140
141            # Must run the function without the lock held so further press()
142            # calls don't deadlock.
143            _LOG.debug('Running debounced function')
144            self.function.run()
145
146            _LOG.debug('Finished running debounced function')
147            with self.lock:
148                if self.state == State.RUNNING:
149                    self.function.on_complete(cancelled=False)
150                    self._transition(State.COOLDOWN)
151                elif self.state == State.INTERRUPTED:
152                    self.function.on_complete(cancelled=True)
153                    self._transition(State.RERUN)
154                self._start_cooldown_timer()
155        # Ctrl-C on Unix generates KeyboardInterrupt
156        # Ctrl-Z on Windows generates EOFError
157        except (KeyboardInterrupt, EOFError):
158            self.function.on_keyboard_interrupt()
159
160    def _start_cooldown_timer(self):
161        assert self.lock.locked()
162        self.cooldown_timer = threading.Timer(self.cooldown_seconds,
163                                              self._exit_cooldown)
164        self.cooldown_timer.start()
165
166    # Called from cooldown_timer thread.
167    def _exit_cooldown(self):
168        try:
169            with self.lock:
170                self.cooldown_timer = None
171                rerun = (self.state == State.RERUN)
172                self._transition(State.IDLE)
173
174                # If we were in the RERUN state, then re-trigger the event.
175                if rerun:
176                    self._press_unlocked('Rerunning: ' +
177                                         self.rerun_event_description)
178
179        # Ctrl-C on Unix generates KeyboardInterrupt
180        # Ctrl-Z on Windows generates EOFError
181        except (KeyboardInterrupt, EOFError):
182            self.function.on_keyboard_interrupt()
183