• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import random
16import threading
17import time
18import unittest
19
20import grpc_testing
21
22_QUANTUM = 0.3
23_MANY = 10000
24# Tests that run in real time can either wait for the scheduler to
25# eventually run what needs to be run (and risk timing out) or declare
26# that the scheduler didn't schedule work reasonably fast enough. We
27# choose the latter for this test.
28_PATHOLOGICAL_SCHEDULING = "pathological thread scheduling!"
29
30
31class _TimeNoter(object):
32    def __init__(self, time):
33        self._condition = threading.Condition()
34        self._time = time
35        self._call_times = []
36
37    def __call__(self):
38        with self._condition:
39            self._call_times.append(self._time.time())
40
41    def call_times(self):
42        with self._condition:
43            return tuple(self._call_times)
44
45
46class TimeTest(object):
47    def test_sleep_for(self):
48        start_time = self._time.time()
49        self._time.sleep_for(_QUANTUM)
50        end_time = self._time.time()
51
52        self.assertLessEqual(start_time + _QUANTUM, end_time)
53
54    def test_sleep_until(self):
55        start_time = self._time.time()
56        self._time.sleep_until(start_time + _QUANTUM)
57        end_time = self._time.time()
58
59        self.assertLessEqual(start_time + _QUANTUM, end_time)
60
61    def test_call_in(self):
62        time_noter = _TimeNoter(self._time)
63
64        start_time = self._time.time()
65        self._time.call_in(time_noter, _QUANTUM)
66        self._time.sleep_for(_QUANTUM * 2)
67        call_times = time_noter.call_times()
68
69        self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING)
70        self.assertLessEqual(start_time + _QUANTUM, call_times[0])
71
72    def test_call_at(self):
73        time_noter = _TimeNoter(self._time)
74
75        start_time = self._time.time()
76        self._time.call_at(time_noter, self._time.time() + _QUANTUM)
77        self._time.sleep_for(_QUANTUM * 2)
78        call_times = time_noter.call_times()
79
80        self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING)
81        self.assertLessEqual(start_time + _QUANTUM, call_times[0])
82
83    def test_cancel(self):
84        time_noter = _TimeNoter(self._time)
85
86        future = self._time.call_in(time_noter, _QUANTUM * 2)
87        self._time.sleep_for(_QUANTUM)
88        cancelled = future.cancel()
89        self._time.sleep_for(_QUANTUM * 2)
90        call_times = time_noter.call_times()
91
92        self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING)
93        self.assertTrue(cancelled)
94        self.assertTrue(future.cancelled())
95
96    def test_many(self):
97        test_events = tuple(threading.Event() for _ in range(_MANY))
98        possibly_cancelled_futures = {}
99        background_noise_futures = []
100
101        for test_event in test_events:
102            possibly_cancelled_futures[test_event] = self._time.call_in(
103                test_event.set, _QUANTUM * (2 + random.random())
104            )
105        for _ in range(_MANY):
106            background_noise_futures.append(
107                self._time.call_in(
108                    threading.Event().set, _QUANTUM * 1000 * random.random()
109                )
110            )
111        self._time.sleep_for(_QUANTUM)
112        cancelled = set()
113        for test_event, test_future in possibly_cancelled_futures.items():
114            if bool(random.randint(0, 1)) and test_future.cancel():
115                cancelled.add(test_event)
116        self._time.sleep_for(_QUANTUM * 3)
117
118        for test_event in test_events:
119            (self.assertFalse if test_event in cancelled else self.assertTrue)(
120                test_event.is_set()
121            )
122        for background_noise_future in background_noise_futures:
123            background_noise_future.cancel()
124
125    def test_same_behavior_used_several_times(self):
126        time_noter = _TimeNoter(self._time)
127
128        start_time = self._time.time()
129        first_future_at_one = self._time.call_in(time_noter, _QUANTUM)
130        second_future_at_one = self._time.call_in(time_noter, _QUANTUM)
131        first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3)
132        second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3)
133        self._time.sleep_for(_QUANTUM * 2)
134        first_future_at_one_cancelled = first_future_at_one.cancel()
135        second_future_at_one_cancelled = second_future_at_one.cancel()
136        first_future_at_three_cancelled = first_future_at_three.cancel()
137        self._time.sleep_for(_QUANTUM * 2)
138        second_future_at_three_cancelled = second_future_at_three.cancel()
139        first_future_at_three_cancelled_again = first_future_at_three.cancel()
140        call_times = time_noter.call_times()
141
142        self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING)
143        self.assertFalse(first_future_at_one_cancelled)
144        self.assertFalse(second_future_at_one_cancelled)
145        self.assertTrue(first_future_at_three_cancelled)
146        self.assertFalse(second_future_at_three_cancelled)
147        self.assertTrue(first_future_at_three_cancelled_again)
148        self.assertLessEqual(start_time + _QUANTUM, call_times[0])
149        self.assertLessEqual(start_time + _QUANTUM, call_times[1])
150        self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2])
151
152
153class StrictRealTimeTest(TimeTest, unittest.TestCase):
154    def setUp(self):
155        self._time = grpc_testing.strict_real_time()
156
157
158class StrictFakeTimeTest(TimeTest, unittest.TestCase):
159    def setUp(self):
160        self._time = grpc_testing.strict_fake_time(
161            random.randint(0, int(time.time()))
162        )
163
164
165if __name__ == "__main__":
166    unittest.main(verbosity=2)
167