1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test times.""" 15 16import collections 17import logging 18import threading 19import time as _time 20 21import grpc 22import grpc_testing 23 24logging.basicConfig() 25_LOGGER = logging.getLogger(__name__) 26 27 28def _call(behaviors): 29 for behavior in behaviors: 30 try: 31 behavior() 32 except Exception: # pylint: disable=broad-except 33 _LOGGER.exception('Exception calling behavior "%r"!', behavior) 34 35 36def _call_in_thread(behaviors): 37 calling = threading.Thread(target=_call, args=(behaviors,)) 38 calling.start() 39 # NOTE(nathaniel): Because this function is called from "strict" Time 40 # implementations, it blocks until after all behaviors have terminated. 41 calling.join() 42 43 44class _State(object): 45 def __init__(self): 46 self.condition = threading.Condition() 47 self.times_to_behaviors = collections.defaultdict(list) 48 49 50class _Delta( 51 collections.namedtuple( 52 "_Delta", 53 ( 54 "mature_behaviors", 55 "earliest_mature_time", 56 "earliest_immature_time", 57 ), 58 ) 59): 60 pass 61 62 63def _process(state, now): 64 mature_behaviors = [] 65 earliest_mature_time = None 66 while state.times_to_behaviors: 67 earliest_time = min(state.times_to_behaviors) 68 if earliest_time <= now: 69 if earliest_mature_time is None: 70 earliest_mature_time = earliest_time 71 earliest_mature_behaviors = state.times_to_behaviors.pop( 72 earliest_time 73 ) 74 mature_behaviors.extend(earliest_mature_behaviors) 75 else: 76 earliest_immature_time = earliest_time 77 break 78 else: 79 earliest_immature_time = None 80 return _Delta( 81 mature_behaviors, earliest_mature_time, earliest_immature_time 82 ) 83 84 85class _Future(grpc.Future): 86 def __init__(self, state, behavior, time): 87 self._state = state 88 self._behavior = behavior 89 self._time = time 90 self._cancelled = False 91 92 def cancel(self): 93 with self._state.condition: 94 if self._cancelled: 95 return True 96 else: 97 behaviors_at_time = self._state.times_to_behaviors.get( 98 self._time 99 ) 100 if behaviors_at_time is None: 101 return False 102 else: 103 behaviors_at_time.remove(self._behavior) 104 if not behaviors_at_time: 105 self._state.times_to_behaviors.pop(self._time) 106 self._state.condition.notify_all() 107 self._cancelled = True 108 return True 109 110 def cancelled(self): 111 with self._state.condition: 112 return self._cancelled 113 114 def running(self): 115 raise NotImplementedError() 116 117 def done(self): 118 raise NotImplementedError() 119 120 def result(self, timeout=None): 121 raise NotImplementedError() 122 123 def exception(self, timeout=None): 124 raise NotImplementedError() 125 126 def traceback(self, timeout=None): 127 raise NotImplementedError() 128 129 def add_done_callback(self, fn): 130 raise NotImplementedError() 131 132 133class StrictRealTime(grpc_testing.Time): 134 def __init__(self): 135 self._state = _State() 136 self._active = False 137 self._calling = None 138 139 def _activity(self): 140 while True: 141 with self._state.condition: 142 while True: 143 now = _time.time() 144 delta = _process(self._state, now) 145 self._state.condition.notify_all() 146 if delta.mature_behaviors: 147 self._calling = delta.earliest_mature_time 148 break 149 self._calling = None 150 if delta.earliest_immature_time is None: 151 self._active = False 152 return 153 else: 154 timeout = max(0, delta.earliest_immature_time - now) 155 self._state.condition.wait(timeout=timeout) 156 _call(delta.mature_behaviors) 157 158 def _ensure_called_through(self, time): 159 with self._state.condition: 160 while ( 161 self._state.times_to_behaviors 162 and min(self._state.times_to_behaviors) < time 163 ) or (self._calling is not None and self._calling < time): 164 self._state.condition.wait() 165 166 def _call_at(self, behavior, time): 167 with self._state.condition: 168 self._state.times_to_behaviors[time].append(behavior) 169 if self._active: 170 self._state.condition.notify_all() 171 else: 172 activity = threading.Thread(target=self._activity) 173 activity.start() 174 self._active = True 175 return _Future(self._state, behavior, time) 176 177 def time(self): 178 return _time.time() 179 180 def call_in(self, behavior, delay): 181 return self._call_at(behavior, _time.time() + delay) 182 183 def call_at(self, behavior, time): 184 return self._call_at(behavior, time) 185 186 def sleep_for(self, duration): 187 time = _time.time() + duration 188 _time.sleep(duration) 189 self._ensure_called_through(time) 190 191 def sleep_until(self, time): 192 _time.sleep(max(0, time - _time.time())) 193 self._ensure_called_through(time) 194 195 196class StrictFakeTime(grpc_testing.Time): 197 def __init__(self, time): 198 self._state = _State() 199 self._time = time 200 201 def time(self): 202 return self._time 203 204 def call_in(self, behavior, delay): 205 if delay <= 0: 206 _call_in_thread((behavior,)) 207 else: 208 with self._state.condition: 209 time = self._time + delay 210 self._state.times_to_behaviors[time].append(behavior) 211 return _Future(self._state, behavior, time) 212 213 def call_at(self, behavior, time): 214 with self._state.condition: 215 if time <= self._time: 216 _call_in_thread((behavior,)) 217 else: 218 self._state.times_to_behaviors[time].append(behavior) 219 return _Future(self._state, behavior, time) 220 221 def sleep_for(self, duration): 222 if 0 < duration: 223 with self._state.condition: 224 self._time += duration 225 delta = _process(self._state, self._time) 226 _call_in_thread(delta.mature_behaviors) 227 228 def sleep_until(self, time): 229 with self._state.condition: 230 if self._time < time: 231 self._time = time 232 delta = _process(self._state, self._time) 233 _call_in_thread(delta.mature_behaviors) 234