1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test times.""" 15 16import collections 17import logging 18import threading 19import time as _time 20 21import grpc 22import grpc_testing 23 24logging.basicConfig() 25_LOGGER = logging.getLogger(__name__) 26 27 28def _call(behaviors): 29 for behavior in behaviors: 30 try: 31 behavior() 32 except Exception: # pylint: disable=broad-except 33 _LOGGER.exception('Exception calling behavior "%r"!', behavior) 34 35 36def _call_in_thread(behaviors): 37 calling = threading.Thread(target=_call, args=(behaviors,)) 38 calling.start() 39 # NOTE(nathaniel): Because this function is called from "strict" Time 40 # implementations, it blocks until after all behaviors have terminated. 41 calling.join() 42 43 44class _State(object): 45 46 def __init__(self): 47 self.condition = threading.Condition() 48 self.times_to_behaviors = collections.defaultdict(list) 49 50 51class _Delta( 52 collections.namedtuple('_Delta', ( 53 'mature_behaviors', 54 'earliest_mature_time', 55 'earliest_immature_time', 56 ))): 57 pass 58 59 60def _process(state, now): 61 mature_behaviors = [] 62 earliest_mature_time = None 63 while state.times_to_behaviors: 64 earliest_time = min(state.times_to_behaviors) 65 if earliest_time <= now: 66 if earliest_mature_time is None: 67 earliest_mature_time = earliest_time 68 earliest_mature_behaviors = state.times_to_behaviors.pop( 69 earliest_time) 70 mature_behaviors.extend(earliest_mature_behaviors) 71 else: 72 earliest_immature_time = earliest_time 73 break 74 else: 75 earliest_immature_time = None 76 return _Delta(mature_behaviors, earliest_mature_time, 77 earliest_immature_time) 78 79 80class _Future(grpc.Future): 81 82 def __init__(self, state, behavior, time): 83 self._state = state 84 self._behavior = behavior 85 self._time = time 86 self._cancelled = False 87 88 def cancel(self): 89 with self._state.condition: 90 if self._cancelled: 91 return True 92 else: 93 behaviors_at_time = self._state.times_to_behaviors.get( 94 self._time) 95 if behaviors_at_time is None: 96 return False 97 else: 98 behaviors_at_time.remove(self._behavior) 99 if not behaviors_at_time: 100 self._state.times_to_behaviors.pop(self._time) 101 self._state.condition.notify_all() 102 self._cancelled = True 103 return True 104 105 def cancelled(self): 106 with self._state.condition: 107 return self._cancelled 108 109 def running(self): 110 raise NotImplementedError() 111 112 def done(self): 113 raise NotImplementedError() 114 115 def result(self, timeout=None): 116 raise NotImplementedError() 117 118 def exception(self, timeout=None): 119 raise NotImplementedError() 120 121 def traceback(self, timeout=None): 122 raise NotImplementedError() 123 124 def add_done_callback(self, fn): 125 raise NotImplementedError() 126 127 128class StrictRealTime(grpc_testing.Time): 129 130 def __init__(self): 131 self._state = _State() 132 self._active = False 133 self._calling = None 134 135 def _activity(self): 136 while True: 137 with self._state.condition: 138 while True: 139 now = _time.time() 140 delta = _process(self._state, now) 141 self._state.condition.notify_all() 142 if delta.mature_behaviors: 143 self._calling = delta.earliest_mature_time 144 break 145 self._calling = None 146 if delta.earliest_immature_time is None: 147 self._active = False 148 return 149 else: 150 timeout = max(0, delta.earliest_immature_time - now) 151 self._state.condition.wait(timeout=timeout) 152 _call(delta.mature_behaviors) 153 154 def _ensure_called_through(self, time): 155 with self._state.condition: 156 while ((self._state.times_to_behaviors and 157 min(self._state.times_to_behaviors) < time) or 158 (self._calling is not None and self._calling < time)): 159 self._state.condition.wait() 160 161 def _call_at(self, behavior, time): 162 with self._state.condition: 163 self._state.times_to_behaviors[time].append(behavior) 164 if self._active: 165 self._state.condition.notify_all() 166 else: 167 activity = threading.Thread(target=self._activity) 168 activity.start() 169 self._active = True 170 return _Future(self._state, behavior, time) 171 172 def time(self): 173 return _time.time() 174 175 def call_in(self, behavior, delay): 176 return self._call_at(behavior, _time.time() + delay) 177 178 def call_at(self, behavior, time): 179 return self._call_at(behavior, time) 180 181 def sleep_for(self, duration): 182 time = _time.time() + duration 183 _time.sleep(duration) 184 self._ensure_called_through(time) 185 186 def sleep_until(self, time): 187 _time.sleep(max(0, time - _time.time())) 188 self._ensure_called_through(time) 189 190 191class StrictFakeTime(grpc_testing.Time): 192 193 def __init__(self, time): 194 self._state = _State() 195 self._time = time 196 197 def time(self): 198 return self._time 199 200 def call_in(self, behavior, delay): 201 if delay <= 0: 202 _call_in_thread((behavior,)) 203 else: 204 with self._state.condition: 205 time = self._time + delay 206 self._state.times_to_behaviors[time].append(behavior) 207 return _Future(self._state, behavior, time) 208 209 def call_at(self, behavior, time): 210 with self._state.condition: 211 if time <= self._time: 212 _call_in_thread((behavior,)) 213 else: 214 self._state.times_to_behaviors[time].append(behavior) 215 return _Future(self._state, behavior, time) 216 217 def sleep_for(self, duration): 218 if 0 < duration: 219 with self._state.condition: 220 self._time += duration 221 delta = _process(self._state, self._time) 222 _call_in_thread(delta.mature_behaviors) 223 224 def sleep_until(self, time): 225 with self._state.condition: 226 if self._time < time: 227 self._time = time 228 delta = _process(self._state, self._time) 229 _call_in_thread(delta.mature_behaviors) 230