1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import random 16import threading 17import time 18import unittest 19 20import grpc_testing 21 22_QUANTUM = 0.3 23_MANY = 10000 24# Tests that run in real time can either wait for the scheduler to 25# eventually run what needs to be run (and risk timing out) or declare 26# that the scheduler didn't schedule work reasonably fast enough. We 27# choose the latter for this test. 28_PATHOLOGICAL_SCHEDULING = "pathological thread scheduling!" 29 30 31class _TimeNoter(object): 32 def __init__(self, time): 33 self._condition = threading.Condition() 34 self._time = time 35 self._call_times = [] 36 37 def __call__(self): 38 with self._condition: 39 self._call_times.append(self._time.time()) 40 41 def call_times(self): 42 with self._condition: 43 return tuple(self._call_times) 44 45 46class TimeTest(object): 47 def test_sleep_for(self): 48 start_time = self._time.time() 49 self._time.sleep_for(_QUANTUM) 50 end_time = self._time.time() 51 52 self.assertLessEqual(start_time + _QUANTUM, end_time) 53 54 def test_sleep_until(self): 55 start_time = self._time.time() 56 self._time.sleep_until(start_time + _QUANTUM) 57 end_time = self._time.time() 58 59 self.assertLessEqual(start_time + _QUANTUM, end_time) 60 61 def test_call_in(self): 62 time_noter = _TimeNoter(self._time) 63 64 start_time = self._time.time() 65 self._time.call_in(time_noter, _QUANTUM) 66 self._time.sleep_for(_QUANTUM * 2) 67 call_times = time_noter.call_times() 68 69 self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) 70 self.assertLessEqual(start_time + _QUANTUM, call_times[0]) 71 72 def test_call_at(self): 73 time_noter = _TimeNoter(self._time) 74 75 start_time = self._time.time() 76 self._time.call_at(time_noter, self._time.time() + _QUANTUM) 77 self._time.sleep_for(_QUANTUM * 2) 78 call_times = time_noter.call_times() 79 80 self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) 81 self.assertLessEqual(start_time + _QUANTUM, call_times[0]) 82 83 def test_cancel(self): 84 time_noter = _TimeNoter(self._time) 85 86 future = self._time.call_in(time_noter, _QUANTUM * 2) 87 self._time.sleep_for(_QUANTUM) 88 cancelled = future.cancel() 89 self._time.sleep_for(_QUANTUM * 2) 90 call_times = time_noter.call_times() 91 92 self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING) 93 self.assertTrue(cancelled) 94 self.assertTrue(future.cancelled()) 95 96 def test_many(self): 97 test_events = tuple(threading.Event() for _ in range(_MANY)) 98 possibly_cancelled_futures = {} 99 background_noise_futures = [] 100 101 for test_event in test_events: 102 possibly_cancelled_futures[test_event] = self._time.call_in( 103 test_event.set, _QUANTUM * (2 + random.random()) 104 ) 105 for _ in range(_MANY): 106 background_noise_futures.append( 107 self._time.call_in( 108 threading.Event().set, _QUANTUM * 1000 * random.random() 109 ) 110 ) 111 self._time.sleep_for(_QUANTUM) 112 cancelled = set() 113 for test_event, test_future in possibly_cancelled_futures.items(): 114 if bool(random.randint(0, 1)) and test_future.cancel(): 115 cancelled.add(test_event) 116 self._time.sleep_for(_QUANTUM * 3) 117 118 for test_event in test_events: 119 (self.assertFalse if test_event in cancelled else self.assertTrue)( 120 test_event.is_set() 121 ) 122 for background_noise_future in background_noise_futures: 123 background_noise_future.cancel() 124 125 def test_same_behavior_used_several_times(self): 126 time_noter = _TimeNoter(self._time) 127 128 start_time = self._time.time() 129 first_future_at_one = self._time.call_in(time_noter, _QUANTUM) 130 second_future_at_one = self._time.call_in(time_noter, _QUANTUM) 131 first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) 132 second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) 133 self._time.sleep_for(_QUANTUM * 2) 134 first_future_at_one_cancelled = first_future_at_one.cancel() 135 second_future_at_one_cancelled = second_future_at_one.cancel() 136 first_future_at_three_cancelled = first_future_at_three.cancel() 137 self._time.sleep_for(_QUANTUM * 2) 138 second_future_at_three_cancelled = second_future_at_three.cancel() 139 first_future_at_three_cancelled_again = first_future_at_three.cancel() 140 call_times = time_noter.call_times() 141 142 self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING) 143 self.assertFalse(first_future_at_one_cancelled) 144 self.assertFalse(second_future_at_one_cancelled) 145 self.assertTrue(first_future_at_three_cancelled) 146 self.assertFalse(second_future_at_three_cancelled) 147 self.assertTrue(first_future_at_three_cancelled_again) 148 self.assertLessEqual(start_time + _QUANTUM, call_times[0]) 149 self.assertLessEqual(start_time + _QUANTUM, call_times[1]) 150 self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2]) 151 152 153class StrictRealTimeTest(TimeTest, unittest.TestCase): 154 def setUp(self): 155 self._time = grpc_testing.strict_real_time() 156 157 158class StrictFakeTimeTest(TimeTest, unittest.TestCase): 159 def setUp(self): 160 self._time = grpc_testing.strict_fake_time( 161 random.randint(0, int(time.time())) 162 ) 163 164 165if __name__ == "__main__": 166 unittest.main(verbosity=2) 167