1import queue 2import sched 3import time 4import unittest 5try: 6 import threading 7except ImportError: 8 threading = None 9 10TIMEOUT = 10 11 12 13class Timer: 14 def __init__(self): 15 self._cond = threading.Condition() 16 self._time = 0 17 self._stop = 0 18 19 def time(self): 20 with self._cond: 21 return self._time 22 23 # increase the time but not beyond the established limit 24 def sleep(self, t): 25 assert t >= 0 26 with self._cond: 27 t += self._time 28 while self._stop < t: 29 self._time = self._stop 30 self._cond.wait() 31 self._time = t 32 33 # advance time limit for user code 34 def advance(self, t): 35 assert t >= 0 36 with self._cond: 37 self._stop += t 38 self._cond.notify_all() 39 40 41class TestCase(unittest.TestCase): 42 43 def test_enter(self): 44 l = [] 45 fun = lambda x: l.append(x) 46 scheduler = sched.scheduler(time.time, time.sleep) 47 for x in [0.5, 0.4, 0.3, 0.2, 0.1]: 48 z = scheduler.enter(x, 1, fun, (x,)) 49 scheduler.run() 50 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5]) 51 52 def test_enterabs(self): 53 l = [] 54 fun = lambda x: l.append(x) 55 scheduler = sched.scheduler(time.time, time.sleep) 56 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 57 z = scheduler.enterabs(x, 1, fun, (x,)) 58 scheduler.run() 59 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) 60 61 @unittest.skipUnless(threading, 'Threading required for this test.') 62 def test_enter_concurrent(self): 63 q = queue.Queue() 64 fun = q.put 65 timer = Timer() 66 scheduler = sched.scheduler(timer.time, timer.sleep) 67 scheduler.enter(1, 1, fun, (1,)) 68 scheduler.enter(3, 1, fun, (3,)) 69 t = threading.Thread(target=scheduler.run) 70 t.start() 71 timer.advance(1) 72 self.assertEqual(q.get(timeout=TIMEOUT), 1) 73 self.assertTrue(q.empty()) 74 for x in [4, 5, 2]: 75 z = scheduler.enter(x - 1, 1, fun, (x,)) 76 timer.advance(2) 77 self.assertEqual(q.get(timeout=TIMEOUT), 2) 78 self.assertEqual(q.get(timeout=TIMEOUT), 3) 79 self.assertTrue(q.empty()) 80 timer.advance(1) 81 self.assertEqual(q.get(timeout=TIMEOUT), 4) 82 self.assertTrue(q.empty()) 83 timer.advance(1) 84 self.assertEqual(q.get(timeout=TIMEOUT), 5) 85 self.assertTrue(q.empty()) 86 timer.advance(1000) 87 t.join(timeout=TIMEOUT) 88 self.assertFalse(t.is_alive()) 89 self.assertTrue(q.empty()) 90 self.assertEqual(timer.time(), 5) 91 92 def test_priority(self): 93 l = [] 94 fun = lambda x: l.append(x) 95 scheduler = sched.scheduler(time.time, time.sleep) 96 for priority in [1, 2, 3, 4, 5]: 97 z = scheduler.enterabs(0.01, priority, fun, (priority,)) 98 scheduler.run() 99 self.assertEqual(l, [1, 2, 3, 4, 5]) 100 101 def test_cancel(self): 102 l = [] 103 fun = lambda x: l.append(x) 104 scheduler = sched.scheduler(time.time, time.sleep) 105 now = time.time() 106 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,)) 107 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,)) 108 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,)) 109 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,)) 110 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,)) 111 scheduler.cancel(event1) 112 scheduler.cancel(event5) 113 scheduler.run() 114 self.assertEqual(l, [0.02, 0.03, 0.04]) 115 116 @unittest.skipUnless(threading, 'Threading required for this test.') 117 def test_cancel_concurrent(self): 118 q = queue.Queue() 119 fun = q.put 120 timer = Timer() 121 scheduler = sched.scheduler(timer.time, timer.sleep) 122 now = timer.time() 123 event1 = scheduler.enterabs(now + 1, 1, fun, (1,)) 124 event2 = scheduler.enterabs(now + 2, 1, fun, (2,)) 125 event4 = scheduler.enterabs(now + 4, 1, fun, (4,)) 126 event5 = scheduler.enterabs(now + 5, 1, fun, (5,)) 127 event3 = scheduler.enterabs(now + 3, 1, fun, (3,)) 128 t = threading.Thread(target=scheduler.run) 129 t.start() 130 timer.advance(1) 131 self.assertEqual(q.get(timeout=TIMEOUT), 1) 132 self.assertTrue(q.empty()) 133 scheduler.cancel(event2) 134 scheduler.cancel(event5) 135 timer.advance(1) 136 self.assertTrue(q.empty()) 137 timer.advance(1) 138 self.assertEqual(q.get(timeout=TIMEOUT), 3) 139 self.assertTrue(q.empty()) 140 timer.advance(1) 141 self.assertEqual(q.get(timeout=TIMEOUT), 4) 142 self.assertTrue(q.empty()) 143 timer.advance(1000) 144 t.join(timeout=TIMEOUT) 145 self.assertFalse(t.is_alive()) 146 self.assertTrue(q.empty()) 147 self.assertEqual(timer.time(), 4) 148 149 def test_empty(self): 150 l = [] 151 fun = lambda x: l.append(x) 152 scheduler = sched.scheduler(time.time, time.sleep) 153 self.assertTrue(scheduler.empty()) 154 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 155 z = scheduler.enterabs(x, 1, fun, (x,)) 156 self.assertFalse(scheduler.empty()) 157 scheduler.run() 158 self.assertTrue(scheduler.empty()) 159 160 def test_queue(self): 161 l = [] 162 fun = lambda x: l.append(x) 163 scheduler = sched.scheduler(time.time, time.sleep) 164 now = time.time() 165 e5 = scheduler.enterabs(now + 0.05, 1, fun) 166 e1 = scheduler.enterabs(now + 0.01, 1, fun) 167 e2 = scheduler.enterabs(now + 0.02, 1, fun) 168 e4 = scheduler.enterabs(now + 0.04, 1, fun) 169 e3 = scheduler.enterabs(now + 0.03, 1, fun) 170 # queue property is supposed to return an order list of 171 # upcoming events 172 self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5]) 173 174 def test_args_kwargs(self): 175 seq = [] 176 def fun(*a, **b): 177 seq.append((a, b)) 178 179 now = time.time() 180 scheduler = sched.scheduler(time.time, time.sleep) 181 scheduler.enterabs(now, 1, fun) 182 scheduler.enterabs(now, 1, fun, argument=(1, 2)) 183 scheduler.enterabs(now, 1, fun, argument=('a', 'b')) 184 scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3}) 185 scheduler.run() 186 self.assertCountEqual(seq, [ 187 ((), {}), 188 ((1, 2), {}), 189 (('a', 'b'), {}), 190 ((1, 2), {'foo': 3}) 191 ]) 192 193 def test_run_non_blocking(self): 194 l = [] 195 fun = lambda x: l.append(x) 196 scheduler = sched.scheduler(time.time, time.sleep) 197 for x in [10, 9, 8, 7, 6]: 198 scheduler.enter(x, 1, fun, (x,)) 199 scheduler.run(blocking=False) 200 self.assertEqual(l, []) 201 202 203if __name__ == "__main__": 204 unittest.main() 205