• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import queue
2import sched
3import threading
4import time
5import unittest
6from test import support
7
8
9TIMEOUT = support.SHORT_TIMEOUT
10
11
12class Timer:
13    def __init__(self):
14        self._cond = threading.Condition()
15        self._time = 0
16        self._stop = 0
17
18    def time(self):
19        with self._cond:
20            return self._time
21
22    # increase the time but not beyond the established limit
23    def sleep(self, t):
24        assert t >= 0
25        with self._cond:
26            t += self._time
27            while self._stop < t:
28                self._time = self._stop
29                self._cond.wait()
30            self._time = t
31
32    # advance time limit for user code
33    def advance(self, t):
34        assert t >= 0
35        with self._cond:
36            self._stop += t
37            self._cond.notify_all()
38
39
40class TestCase(unittest.TestCase):
41
42    def test_enter(self):
43        l = []
44        fun = lambda x: l.append(x)
45        scheduler = sched.scheduler(time.time, time.sleep)
46        for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
47            z = scheduler.enter(x, 1, fun, (x,))
48        scheduler.run()
49        self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
50
51    def test_enterabs(self):
52        l = []
53        fun = lambda x: l.append(x)
54        scheduler = sched.scheduler(time.time, time.sleep)
55        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
56            z = scheduler.enterabs(x, 1, fun, (x,))
57        scheduler.run()
58        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
59
60    def test_enter_concurrent(self):
61        q = queue.Queue()
62        fun = q.put
63        timer = Timer()
64        scheduler = sched.scheduler(timer.time, timer.sleep)
65        scheduler.enter(1, 1, fun, (1,))
66        scheduler.enter(3, 1, fun, (3,))
67        t = threading.Thread(target=scheduler.run)
68        t.start()
69        timer.advance(1)
70        self.assertEqual(q.get(timeout=TIMEOUT), 1)
71        self.assertTrue(q.empty())
72        for x in [4, 5, 2]:
73            z = scheduler.enter(x - 1, 1, fun, (x,))
74        timer.advance(2)
75        self.assertEqual(q.get(timeout=TIMEOUT), 2)
76        self.assertEqual(q.get(timeout=TIMEOUT), 3)
77        self.assertTrue(q.empty())
78        timer.advance(1)
79        self.assertEqual(q.get(timeout=TIMEOUT), 4)
80        self.assertTrue(q.empty())
81        timer.advance(1)
82        self.assertEqual(q.get(timeout=TIMEOUT), 5)
83        self.assertTrue(q.empty())
84        timer.advance(1000)
85        support.join_thread(t)
86        self.assertTrue(q.empty())
87        self.assertEqual(timer.time(), 5)
88
89    def test_priority(self):
90        l = []
91        fun = lambda x: l.append(x)
92        scheduler = sched.scheduler(time.time, time.sleep)
93        for priority in [1, 2, 3, 4, 5]:
94            z = scheduler.enterabs(0.01, priority, fun, (priority,))
95        scheduler.run()
96        self.assertEqual(l, [1, 2, 3, 4, 5])
97
98    def test_cancel(self):
99        l = []
100        fun = lambda x: l.append(x)
101        scheduler = sched.scheduler(time.time, time.sleep)
102        now = time.time()
103        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
104        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
105        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
106        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
107        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
108        scheduler.cancel(event1)
109        scheduler.cancel(event5)
110        scheduler.run()
111        self.assertEqual(l, [0.02, 0.03, 0.04])
112
113    def test_cancel_concurrent(self):
114        q = queue.Queue()
115        fun = q.put
116        timer = Timer()
117        scheduler = sched.scheduler(timer.time, timer.sleep)
118        now = timer.time()
119        event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
120        event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
121        event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
122        event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
123        event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
124        t = threading.Thread(target=scheduler.run)
125        t.start()
126        timer.advance(1)
127        self.assertEqual(q.get(timeout=TIMEOUT), 1)
128        self.assertTrue(q.empty())
129        scheduler.cancel(event2)
130        scheduler.cancel(event5)
131        timer.advance(1)
132        self.assertTrue(q.empty())
133        timer.advance(1)
134        self.assertEqual(q.get(timeout=TIMEOUT), 3)
135        self.assertTrue(q.empty())
136        timer.advance(1)
137        self.assertEqual(q.get(timeout=TIMEOUT), 4)
138        self.assertTrue(q.empty())
139        timer.advance(1000)
140        support.join_thread(t)
141        self.assertTrue(q.empty())
142        self.assertEqual(timer.time(), 4)
143
144    def test_empty(self):
145        l = []
146        fun = lambda x: l.append(x)
147        scheduler = sched.scheduler(time.time, time.sleep)
148        self.assertTrue(scheduler.empty())
149        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
150            z = scheduler.enterabs(x, 1, fun, (x,))
151        self.assertFalse(scheduler.empty())
152        scheduler.run()
153        self.assertTrue(scheduler.empty())
154
155    def test_queue(self):
156        l = []
157        fun = lambda x: l.append(x)
158        scheduler = sched.scheduler(time.time, time.sleep)
159        now = time.time()
160        e5 = scheduler.enterabs(now + 0.05, 1, fun)
161        e1 = scheduler.enterabs(now + 0.01, 1, fun)
162        e2 = scheduler.enterabs(now + 0.02, 1, fun)
163        e4 = scheduler.enterabs(now + 0.04, 1, fun)
164        e3 = scheduler.enterabs(now + 0.03, 1, fun)
165        # queue property is supposed to return an order list of
166        # upcoming events
167        self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
168
169    def test_args_kwargs(self):
170        seq = []
171        def fun(*a, **b):
172            seq.append((a, b))
173
174        now = time.time()
175        scheduler = sched.scheduler(time.time, time.sleep)
176        scheduler.enterabs(now, 1, fun)
177        scheduler.enterabs(now, 1, fun, argument=(1, 2))
178        scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
179        scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
180        scheduler.run()
181        self.assertCountEqual(seq, [
182            ((), {}),
183            ((1, 2), {}),
184            (('a', 'b'), {}),
185            ((1, 2), {'foo': 3})
186        ])
187
188    def test_run_non_blocking(self):
189        l = []
190        fun = lambda x: l.append(x)
191        scheduler = sched.scheduler(time.time, time.sleep)
192        for x in [10, 9, 8, 7, 6]:
193            scheduler.enter(x, 1, fun, (x,))
194        scheduler.run(blocking=False)
195        self.assertEqual(l, [])
196
197
198if __name__ == "__main__":
199    unittest.main()
200