1import queue 2import sched 3import threading 4import time 5import unittest 6from test import support 7 8 9TIMEOUT = support.SHORT_TIMEOUT 10 11 12class Timer: 13 def __init__(self): 14 self._cond = threading.Condition() 15 self._time = 0 16 self._stop = 0 17 18 def time(self): 19 with self._cond: 20 return self._time 21 22 # increase the time but not beyond the established limit 23 def sleep(self, t): 24 assert t >= 0 25 with self._cond: 26 t += self._time 27 while self._stop < t: 28 self._time = self._stop 29 self._cond.wait() 30 self._time = t 31 32 # advance time limit for user code 33 def advance(self, t): 34 assert t >= 0 35 with self._cond: 36 self._stop += t 37 self._cond.notify_all() 38 39 40class TestCase(unittest.TestCase): 41 42 def test_enter(self): 43 l = [] 44 fun = lambda x: l.append(x) 45 scheduler = sched.scheduler(time.time, time.sleep) 46 for x in [0.5, 0.4, 0.3, 0.2, 0.1]: 47 z = scheduler.enter(x, 1, fun, (x,)) 48 scheduler.run() 49 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5]) 50 51 def test_enterabs(self): 52 l = [] 53 fun = lambda x: l.append(x) 54 scheduler = sched.scheduler(time.time, time.sleep) 55 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 56 z = scheduler.enterabs(x, 1, fun, (x,)) 57 scheduler.run() 58 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) 59 60 def test_enter_concurrent(self): 61 q = queue.Queue() 62 fun = q.put 63 timer = Timer() 64 scheduler = sched.scheduler(timer.time, timer.sleep) 65 scheduler.enter(1, 1, fun, (1,)) 66 scheduler.enter(3, 1, fun, (3,)) 67 t = threading.Thread(target=scheduler.run) 68 t.start() 69 timer.advance(1) 70 self.assertEqual(q.get(timeout=TIMEOUT), 1) 71 self.assertTrue(q.empty()) 72 for x in [4, 5, 2]: 73 z = scheduler.enter(x - 1, 1, fun, (x,)) 74 timer.advance(2) 75 self.assertEqual(q.get(timeout=TIMEOUT), 2) 76 self.assertEqual(q.get(timeout=TIMEOUT), 3) 77 self.assertTrue(q.empty()) 78 timer.advance(1) 79 self.assertEqual(q.get(timeout=TIMEOUT), 4) 80 self.assertTrue(q.empty()) 81 timer.advance(1) 82 self.assertEqual(q.get(timeout=TIMEOUT), 5) 83 self.assertTrue(q.empty()) 84 timer.advance(1000) 85 support.join_thread(t) 86 self.assertTrue(q.empty()) 87 self.assertEqual(timer.time(), 5) 88 89 def test_priority(self): 90 l = [] 91 fun = lambda x: l.append(x) 92 scheduler = sched.scheduler(time.time, time.sleep) 93 for priority in [1, 2, 3, 4, 5]: 94 z = scheduler.enterabs(0.01, priority, fun, (priority,)) 95 scheduler.run() 96 self.assertEqual(l, [1, 2, 3, 4, 5]) 97 98 def test_cancel(self): 99 l = [] 100 fun = lambda x: l.append(x) 101 scheduler = sched.scheduler(time.time, time.sleep) 102 now = time.time() 103 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,)) 104 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,)) 105 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,)) 106 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,)) 107 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,)) 108 scheduler.cancel(event1) 109 scheduler.cancel(event5) 110 scheduler.run() 111 self.assertEqual(l, [0.02, 0.03, 0.04]) 112 113 def test_cancel_concurrent(self): 114 q = queue.Queue() 115 fun = q.put 116 timer = Timer() 117 scheduler = sched.scheduler(timer.time, timer.sleep) 118 now = timer.time() 119 event1 = scheduler.enterabs(now + 1, 1, fun, (1,)) 120 event2 = scheduler.enterabs(now + 2, 1, fun, (2,)) 121 event4 = scheduler.enterabs(now + 4, 1, fun, (4,)) 122 event5 = scheduler.enterabs(now + 5, 1, fun, (5,)) 123 event3 = scheduler.enterabs(now + 3, 1, fun, (3,)) 124 t = threading.Thread(target=scheduler.run) 125 t.start() 126 timer.advance(1) 127 self.assertEqual(q.get(timeout=TIMEOUT), 1) 128 self.assertTrue(q.empty()) 129 scheduler.cancel(event2) 130 scheduler.cancel(event5) 131 timer.advance(1) 132 self.assertTrue(q.empty()) 133 timer.advance(1) 134 self.assertEqual(q.get(timeout=TIMEOUT), 3) 135 self.assertTrue(q.empty()) 136 timer.advance(1) 137 self.assertEqual(q.get(timeout=TIMEOUT), 4) 138 self.assertTrue(q.empty()) 139 timer.advance(1000) 140 support.join_thread(t) 141 self.assertTrue(q.empty()) 142 self.assertEqual(timer.time(), 4) 143 144 def test_empty(self): 145 l = [] 146 fun = lambda x: l.append(x) 147 scheduler = sched.scheduler(time.time, time.sleep) 148 self.assertTrue(scheduler.empty()) 149 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 150 z = scheduler.enterabs(x, 1, fun, (x,)) 151 self.assertFalse(scheduler.empty()) 152 scheduler.run() 153 self.assertTrue(scheduler.empty()) 154 155 def test_queue(self): 156 l = [] 157 fun = lambda x: l.append(x) 158 scheduler = sched.scheduler(time.time, time.sleep) 159 now = time.time() 160 e5 = scheduler.enterabs(now + 0.05, 1, fun) 161 e1 = scheduler.enterabs(now + 0.01, 1, fun) 162 e2 = scheduler.enterabs(now + 0.02, 1, fun) 163 e4 = scheduler.enterabs(now + 0.04, 1, fun) 164 e3 = scheduler.enterabs(now + 0.03, 1, fun) 165 # queue property is supposed to return an order list of 166 # upcoming events 167 self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5]) 168 169 def test_args_kwargs(self): 170 seq = [] 171 def fun(*a, **b): 172 seq.append((a, b)) 173 174 now = time.time() 175 scheduler = sched.scheduler(time.time, time.sleep) 176 scheduler.enterabs(now, 1, fun) 177 scheduler.enterabs(now, 1, fun, argument=(1, 2)) 178 scheduler.enterabs(now, 1, fun, argument=('a', 'b')) 179 scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3}) 180 scheduler.run() 181 self.assertCountEqual(seq, [ 182 ((), {}), 183 ((1, 2), {}), 184 (('a', 'b'), {}), 185 ((1, 2), {'foo': 3}) 186 ]) 187 188 def test_run_non_blocking(self): 189 l = [] 190 fun = lambda x: l.append(x) 191 scheduler = sched.scheduler(time.time, time.sleep) 192 for x in [10, 9, 8, 7, 6]: 193 scheduler.enter(x, 1, fun, (x,)) 194 scheduler.run(blocking=False) 195 self.assertEqual(l, []) 196 197 198if __name__ == "__main__": 199 unittest.main() 200