• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test times."""
15
16import collections
17import logging
18import threading
19import time as _time
20
21import grpc
22import grpc_testing
23
24logging.basicConfig()
25_LOGGER = logging.getLogger(__name__)
26
27
28def _call(behaviors):
29    for behavior in behaviors:
30        try:
31            behavior()
32        except Exception:  # pylint: disable=broad-except
33            _LOGGER.exception('Exception calling behavior "%r"!', behavior)
34
35
36def _call_in_thread(behaviors):
37    calling = threading.Thread(target=_call, args=(behaviors,))
38    calling.start()
39    # NOTE(nathaniel): Because this function is called from "strict" Time
40    # implementations, it blocks until after all behaviors have terminated.
41    calling.join()
42
43
44class _State(object):
45    def __init__(self):
46        self.condition = threading.Condition()
47        self.times_to_behaviors = collections.defaultdict(list)
48
49
50class _Delta(
51    collections.namedtuple(
52        "_Delta",
53        (
54            "mature_behaviors",
55            "earliest_mature_time",
56            "earliest_immature_time",
57        ),
58    )
59):
60    pass
61
62
63def _process(state, now):
64    mature_behaviors = []
65    earliest_mature_time = None
66    while state.times_to_behaviors:
67        earliest_time = min(state.times_to_behaviors)
68        if earliest_time <= now:
69            if earliest_mature_time is None:
70                earliest_mature_time = earliest_time
71            earliest_mature_behaviors = state.times_to_behaviors.pop(
72                earliest_time
73            )
74            mature_behaviors.extend(earliest_mature_behaviors)
75        else:
76            earliest_immature_time = earliest_time
77            break
78    else:
79        earliest_immature_time = None
80    return _Delta(
81        mature_behaviors, earliest_mature_time, earliest_immature_time
82    )
83
84
85class _Future(grpc.Future):
86    def __init__(self, state, behavior, time):
87        self._state = state
88        self._behavior = behavior
89        self._time = time
90        self._cancelled = False
91
92    def cancel(self):
93        with self._state.condition:
94            if self._cancelled:
95                return True
96            else:
97                behaviors_at_time = self._state.times_to_behaviors.get(
98                    self._time
99                )
100                if behaviors_at_time is None:
101                    return False
102                else:
103                    behaviors_at_time.remove(self._behavior)
104                    if not behaviors_at_time:
105                        self._state.times_to_behaviors.pop(self._time)
106                        self._state.condition.notify_all()
107                    self._cancelled = True
108                    return True
109
110    def cancelled(self):
111        with self._state.condition:
112            return self._cancelled
113
114    def running(self):
115        raise NotImplementedError()
116
117    def done(self):
118        raise NotImplementedError()
119
120    def result(self, timeout=None):
121        raise NotImplementedError()
122
123    def exception(self, timeout=None):
124        raise NotImplementedError()
125
126    def traceback(self, timeout=None):
127        raise NotImplementedError()
128
129    def add_done_callback(self, fn):
130        raise NotImplementedError()
131
132
133class StrictRealTime(grpc_testing.Time):
134    def __init__(self):
135        self._state = _State()
136        self._active = False
137        self._calling = None
138
139    def _activity(self):
140        while True:
141            with self._state.condition:
142                while True:
143                    now = _time.time()
144                    delta = _process(self._state, now)
145                    self._state.condition.notify_all()
146                    if delta.mature_behaviors:
147                        self._calling = delta.earliest_mature_time
148                        break
149                    self._calling = None
150                    if delta.earliest_immature_time is None:
151                        self._active = False
152                        return
153                    else:
154                        timeout = max(0, delta.earliest_immature_time - now)
155                        self._state.condition.wait(timeout=timeout)
156            _call(delta.mature_behaviors)
157
158    def _ensure_called_through(self, time):
159        with self._state.condition:
160            while (
161                self._state.times_to_behaviors
162                and min(self._state.times_to_behaviors) < time
163            ) or (self._calling is not None and self._calling < time):
164                self._state.condition.wait()
165
166    def _call_at(self, behavior, time):
167        with self._state.condition:
168            self._state.times_to_behaviors[time].append(behavior)
169            if self._active:
170                self._state.condition.notify_all()
171            else:
172                activity = threading.Thread(target=self._activity)
173                activity.start()
174                self._active = True
175            return _Future(self._state, behavior, time)
176
177    def time(self):
178        return _time.time()
179
180    def call_in(self, behavior, delay):
181        return self._call_at(behavior, _time.time() + delay)
182
183    def call_at(self, behavior, time):
184        return self._call_at(behavior, time)
185
186    def sleep_for(self, duration):
187        time = _time.time() + duration
188        _time.sleep(duration)
189        self._ensure_called_through(time)
190
191    def sleep_until(self, time):
192        _time.sleep(max(0, time - _time.time()))
193        self._ensure_called_through(time)
194
195
196class StrictFakeTime(grpc_testing.Time):
197    def __init__(self, time):
198        self._state = _State()
199        self._time = time
200
201    def time(self):
202        return self._time
203
204    def call_in(self, behavior, delay):
205        if delay <= 0:
206            _call_in_thread((behavior,))
207        else:
208            with self._state.condition:
209                time = self._time + delay
210                self._state.times_to_behaviors[time].append(behavior)
211        return _Future(self._state, behavior, time)
212
213    def call_at(self, behavior, time):
214        with self._state.condition:
215            if time <= self._time:
216                _call_in_thread((behavior,))
217            else:
218                self._state.times_to_behaviors[time].append(behavior)
219        return _Future(self._state, behavior, time)
220
221    def sleep_for(self, duration):
222        if 0 < duration:
223            with self._state.condition:
224                self._time += duration
225                delta = _process(self._state, self._time)
226                _call_in_thread(delta.mature_behaviors)
227
228    def sleep_until(self, time):
229        with self._state.condition:
230            if self._time < time:
231                self._time = time
232                delta = _process(self._state, self._time)
233                _call_in_thread(delta.mature_behaviors)
234