1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import random 16import threading 17import time 18import unittest 19 20import grpc_testing 21 22_QUANTUM = 0.3 23_MANY = 10000 24# Tests that run in real time can either wait for the scheduler to 25# eventually run what needs to be run (and risk timing out) or declare 26# that the scheduler didn't schedule work reasonably fast enough. We 27# choose the latter for this test. 28_PATHOLOGICAL_SCHEDULING = 'pathological thread scheduling!' 29 30 31class _TimeNoter(object): 32 33 def __init__(self, time): 34 self._condition = threading.Condition() 35 self._time = time 36 self._call_times = [] 37 38 def __call__(self): 39 with self._condition: 40 self._call_times.append(self._time.time()) 41 42 def call_times(self): 43 with self._condition: 44 return tuple(self._call_times) 45 46 47class TimeTest(object): 48 49 def test_sleep_for(self): 50 start_time = self._time.time() 51 self._time.sleep_for(_QUANTUM) 52 end_time = self._time.time() 53 54 self.assertLessEqual(start_time + _QUANTUM, end_time) 55 56 def test_sleep_until(self): 57 start_time = self._time.time() 58 self._time.sleep_until(start_time + _QUANTUM) 59 end_time = self._time.time() 60 61 self.assertLessEqual(start_time + _QUANTUM, end_time) 62 63 def test_call_in(self): 64 time_noter = _TimeNoter(self._time) 65 66 start_time = self._time.time() 67 self._time.call_in(time_noter, _QUANTUM) 68 self._time.sleep_for(_QUANTUM * 2) 69 call_times = time_noter.call_times() 70 71 self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) 72 self.assertLessEqual(start_time + _QUANTUM, call_times[0]) 73 74 def test_call_at(self): 75 time_noter = _TimeNoter(self._time) 76 77 start_time = self._time.time() 78 self._time.call_at(time_noter, self._time.time() + _QUANTUM) 79 self._time.sleep_for(_QUANTUM * 2) 80 call_times = time_noter.call_times() 81 82 self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) 83 self.assertLessEqual(start_time + _QUANTUM, call_times[0]) 84 85 def test_cancel(self): 86 time_noter = _TimeNoter(self._time) 87 88 future = self._time.call_in(time_noter, _QUANTUM * 2) 89 self._time.sleep_for(_QUANTUM) 90 cancelled = future.cancel() 91 self._time.sleep_for(_QUANTUM * 2) 92 call_times = time_noter.call_times() 93 94 self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING) 95 self.assertTrue(cancelled) 96 self.assertTrue(future.cancelled()) 97 98 def test_many(self): 99 test_events = tuple(threading.Event() for _ in range(_MANY)) 100 possibly_cancelled_futures = {} 101 background_noise_futures = [] 102 103 for test_event in test_events: 104 possibly_cancelled_futures[test_event] = self._time.call_in( 105 test_event.set, _QUANTUM * (2 + random.random())) 106 for _ in range(_MANY): 107 background_noise_futures.append( 108 self._time.call_in(threading.Event().set, 109 _QUANTUM * 1000 * random.random())) 110 self._time.sleep_for(_QUANTUM) 111 cancelled = set() 112 for test_event, test_future in possibly_cancelled_futures.items(): 113 if bool(random.randint(0, 1)) and test_future.cancel(): 114 cancelled.add(test_event) 115 self._time.sleep_for(_QUANTUM * 3) 116 117 for test_event in test_events: 118 (self.assertFalse if test_event in cancelled else 119 self.assertTrue)(test_event.is_set()) 120 for background_noise_future in background_noise_futures: 121 background_noise_future.cancel() 122 123 def test_same_behavior_used_several_times(self): 124 time_noter = _TimeNoter(self._time) 125 126 start_time = self._time.time() 127 first_future_at_one = self._time.call_in(time_noter, _QUANTUM) 128 second_future_at_one = self._time.call_in(time_noter, _QUANTUM) 129 first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) 130 second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) 131 self._time.sleep_for(_QUANTUM * 2) 132 first_future_at_one_cancelled = first_future_at_one.cancel() 133 second_future_at_one_cancelled = second_future_at_one.cancel() 134 first_future_at_three_cancelled = first_future_at_three.cancel() 135 self._time.sleep_for(_QUANTUM * 2) 136 second_future_at_three_cancelled = second_future_at_three.cancel() 137 first_future_at_three_cancelled_again = first_future_at_three.cancel() 138 call_times = time_noter.call_times() 139 140 self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING) 141 self.assertFalse(first_future_at_one_cancelled) 142 self.assertFalse(second_future_at_one_cancelled) 143 self.assertTrue(first_future_at_three_cancelled) 144 self.assertFalse(second_future_at_three_cancelled) 145 self.assertTrue(first_future_at_three_cancelled_again) 146 self.assertLessEqual(start_time + _QUANTUM, call_times[0]) 147 self.assertLessEqual(start_time + _QUANTUM, call_times[1]) 148 self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2]) 149 150 151class StrictRealTimeTest(TimeTest, unittest.TestCase): 152 153 def setUp(self): 154 self._time = grpc_testing.strict_real_time() 155 156 157class StrictFakeTimeTest(TimeTest, unittest.TestCase): 158 159 def setUp(self): 160 self._time = grpc_testing.strict_fake_time( 161 random.randint(0, int(time.time()))) 162 163 164if __name__ == '__main__': 165 unittest.main(verbosity=2) 166