1 import queue
2 import sched
3 import threading
4 import time
5 import unittest
6 from test import support
7 from test.support import threading_helper
8
9
10 TIMEOUT = support.SHORT_TIMEOUT
11
12
13 class ESC[4;38;5;81mTimer:
14 def __init__(self):
15 self._cond = threading.Condition()
16 self._time = 0
17 self._stop = 0
18
19 def time(self):
20 with self._cond:
21 return self._time
22
23 # increase the time but not beyond the established limit
24 def sleep(self, t):
25 assert t >= 0
26 with self._cond:
27 t += self._time
28 while self._stop < t:
29 self._time = self._stop
30 self._cond.wait()
31 self._time = t
32
33 # advance time limit for user code
34 def advance(self, t):
35 assert t >= 0
36 with self._cond:
37 self._stop += t
38 self._cond.notify_all()
39
40
41 class ESC[4;38;5;81mTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
42
43 def test_enter(self):
44 l = []
45 fun = lambda x: l.append(x)
46 scheduler = sched.scheduler(time.time, time.sleep)
47 for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
48 z = scheduler.enter(x, 1, fun, (x,))
49 scheduler.run()
50 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
51
52 def test_enterabs(self):
53 l = []
54 fun = lambda x: l.append(x)
55 scheduler = sched.scheduler(time.time, time.sleep)
56 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
57 z = scheduler.enterabs(x, 1, fun, (x,))
58 scheduler.run()
59 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
60
61 @threading_helper.requires_working_threading()
62 def test_enter_concurrent(self):
63 q = queue.Queue()
64 fun = q.put
65 timer = Timer()
66 scheduler = sched.scheduler(timer.time, timer.sleep)
67 scheduler.enter(1, 1, fun, (1,))
68 scheduler.enter(3, 1, fun, (3,))
69 t = threading.Thread(target=scheduler.run)
70 t.start()
71 timer.advance(1)
72 self.assertEqual(q.get(timeout=TIMEOUT), 1)
73 self.assertTrue(q.empty())
74 for x in [4, 5, 2]:
75 z = scheduler.enter(x - 1, 1, fun, (x,))
76 timer.advance(2)
77 self.assertEqual(q.get(timeout=TIMEOUT), 2)
78 self.assertEqual(q.get(timeout=TIMEOUT), 3)
79 self.assertTrue(q.empty())
80 timer.advance(1)
81 self.assertEqual(q.get(timeout=TIMEOUT), 4)
82 self.assertTrue(q.empty())
83 timer.advance(1)
84 self.assertEqual(q.get(timeout=TIMEOUT), 5)
85 self.assertTrue(q.empty())
86 timer.advance(1000)
87 threading_helper.join_thread(t)
88 self.assertTrue(q.empty())
89 self.assertEqual(timer.time(), 5)
90
91 def test_priority(self):
92 l = []
93 fun = lambda x: l.append(x)
94 scheduler = sched.scheduler(time.time, time.sleep)
95
96 cases = [
97 ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
98 ([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]),
99 ([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]),
100 ([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]),
101 ]
102 for priorities, expected in cases:
103 with self.subTest(priorities=priorities, expected=expected):
104 for priority in priorities:
105 scheduler.enterabs(0.01, priority, fun, (priority,))
106 scheduler.run()
107 self.assertEqual(l, expected)
108
109 # Cleanup:
110 self.assertTrue(scheduler.empty())
111 l.clear()
112
113 def test_cancel(self):
114 l = []
115 fun = lambda x: l.append(x)
116 scheduler = sched.scheduler(time.time, time.sleep)
117 now = time.time()
118 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
119 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
120 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
121 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
122 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
123 scheduler.cancel(event1)
124 scheduler.cancel(event5)
125 scheduler.run()
126 self.assertEqual(l, [0.02, 0.03, 0.04])
127
128 @threading_helper.requires_working_threading()
129 def test_cancel_concurrent(self):
130 q = queue.Queue()
131 fun = q.put
132 timer = Timer()
133 scheduler = sched.scheduler(timer.time, timer.sleep)
134 now = timer.time()
135 event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
136 event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
137 event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
138 event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
139 event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
140 t = threading.Thread(target=scheduler.run)
141 t.start()
142 timer.advance(1)
143 self.assertEqual(q.get(timeout=TIMEOUT), 1)
144 self.assertTrue(q.empty())
145 scheduler.cancel(event2)
146 scheduler.cancel(event5)
147 timer.advance(1)
148 self.assertTrue(q.empty())
149 timer.advance(1)
150 self.assertEqual(q.get(timeout=TIMEOUT), 3)
151 self.assertTrue(q.empty())
152 timer.advance(1)
153 self.assertEqual(q.get(timeout=TIMEOUT), 4)
154 self.assertTrue(q.empty())
155 timer.advance(1000)
156 threading_helper.join_thread(t)
157 self.assertTrue(q.empty())
158 self.assertEqual(timer.time(), 4)
159
160 def test_cancel_correct_event(self):
161 # bpo-19270
162 events = []
163 scheduler = sched.scheduler()
164 scheduler.enterabs(1, 1, events.append, ("a",))
165 b = scheduler.enterabs(1, 1, events.append, ("b",))
166 scheduler.enterabs(1, 1, events.append, ("c",))
167 scheduler.cancel(b)
168 scheduler.run()
169 self.assertEqual(events, ["a", "c"])
170
171 def test_empty(self):
172 l = []
173 fun = lambda x: l.append(x)
174 scheduler = sched.scheduler(time.time, time.sleep)
175 self.assertTrue(scheduler.empty())
176 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
177 z = scheduler.enterabs(x, 1, fun, (x,))
178 self.assertFalse(scheduler.empty())
179 scheduler.run()
180 self.assertTrue(scheduler.empty())
181
182 def test_queue(self):
183 l = []
184 fun = lambda x: l.append(x)
185 scheduler = sched.scheduler(time.time, time.sleep)
186 now = time.time()
187 e5 = scheduler.enterabs(now + 0.05, 1, fun)
188 e1 = scheduler.enterabs(now + 0.01, 1, fun)
189 e2 = scheduler.enterabs(now + 0.02, 1, fun)
190 e4 = scheduler.enterabs(now + 0.04, 1, fun)
191 e3 = scheduler.enterabs(now + 0.03, 1, fun)
192 # queue property is supposed to return an order list of
193 # upcoming events
194 self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
195
196 def test_args_kwargs(self):
197 seq = []
198 def fun(*a, **b):
199 seq.append((a, b))
200
201 now = time.time()
202 scheduler = sched.scheduler(time.time, time.sleep)
203 scheduler.enterabs(now, 1, fun)
204 scheduler.enterabs(now, 1, fun, argument=(1, 2))
205 scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
206 scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
207 scheduler.run()
208 self.assertCountEqual(seq, [
209 ((), {}),
210 ((1, 2), {}),
211 (('a', 'b'), {}),
212 ((1, 2), {'foo': 3})
213 ])
214
215 def test_run_non_blocking(self):
216 l = []
217 fun = lambda x: l.append(x)
218 scheduler = sched.scheduler(time.time, time.sleep)
219 for x in [10, 9, 8, 7, 6]:
220 scheduler.enter(x, 1, fun, (x,))
221 scheduler.run(blocking=False)
222 self.assertEqual(l, [])
223
224
225 if __name__ == "__main__":
226 unittest.main()