(root)/
Python-3.11.7/
Lib/
test/
test_sched.py
       1  import queue
       2  import sched
       3  import threading
       4  import time
       5  import unittest
       6  from test import support
       7  from test.support import threading_helper
       8  
       9  
      10  TIMEOUT = support.SHORT_TIMEOUT
      11  
      12  
      13  class ESC[4;38;5;81mTimer:
      14      def __init__(self):
      15          self._cond = threading.Condition()
      16          self._time = 0
      17          self._stop = 0
      18  
      19      def time(self):
      20          with self._cond:
      21              return self._time
      22  
      23      # increase the time but not beyond the established limit
      24      def sleep(self, t):
      25          assert t >= 0
      26          with self._cond:
      27              t += self._time
      28              while self._stop < t:
      29                  self._time = self._stop
      30                  self._cond.wait()
      31              self._time = t
      32  
      33      # advance time limit for user code
      34      def advance(self, t):
      35          assert t >= 0
      36          with self._cond:
      37              self._stop += t
      38              self._cond.notify_all()
      39  
      40  
      41  class ESC[4;38;5;81mTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      42  
      43      def test_enter(self):
      44          l = []
      45          fun = lambda x: l.append(x)
      46          scheduler = sched.scheduler(time.time, time.sleep)
      47          for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
      48              z = scheduler.enter(x, 1, fun, (x,))
      49          scheduler.run()
      50          self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
      51  
      52      def test_enterabs(self):
      53          l = []
      54          fun = lambda x: l.append(x)
      55          scheduler = sched.scheduler(time.time, time.sleep)
      56          for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
      57              z = scheduler.enterabs(x, 1, fun, (x,))
      58          scheduler.run()
      59          self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
      60  
      61      @threading_helper.requires_working_threading()
      62      def test_enter_concurrent(self):
      63          q = queue.Queue()
      64          fun = q.put
      65          timer = Timer()
      66          scheduler = sched.scheduler(timer.time, timer.sleep)
      67          scheduler.enter(1, 1, fun, (1,))
      68          scheduler.enter(3, 1, fun, (3,))
      69          t = threading.Thread(target=scheduler.run)
      70          t.start()
      71          timer.advance(1)
      72          self.assertEqual(q.get(timeout=TIMEOUT), 1)
      73          self.assertTrue(q.empty())
      74          for x in [4, 5, 2]:
      75              z = scheduler.enter(x - 1, 1, fun, (x,))
      76          timer.advance(2)
      77          self.assertEqual(q.get(timeout=TIMEOUT), 2)
      78          self.assertEqual(q.get(timeout=TIMEOUT), 3)
      79          self.assertTrue(q.empty())
      80          timer.advance(1)
      81          self.assertEqual(q.get(timeout=TIMEOUT), 4)
      82          self.assertTrue(q.empty())
      83          timer.advance(1)
      84          self.assertEqual(q.get(timeout=TIMEOUT), 5)
      85          self.assertTrue(q.empty())
      86          timer.advance(1000)
      87          threading_helper.join_thread(t)
      88          self.assertTrue(q.empty())
      89          self.assertEqual(timer.time(), 5)
      90  
      91      def test_priority(self):
      92          l = []
      93          fun = lambda x: l.append(x)
      94          scheduler = sched.scheduler(time.time, time.sleep)
      95  
      96          cases = [
      97              ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
      98              ([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]),
      99              ([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]),
     100              ([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]),
     101          ]
     102          for priorities, expected in cases:
     103              with self.subTest(priorities=priorities, expected=expected):
     104                  for priority in priorities:
     105                      scheduler.enterabs(0.01, priority, fun, (priority,))
     106                  scheduler.run()
     107                  self.assertEqual(l, expected)
     108  
     109                  # Cleanup:
     110                  self.assertTrue(scheduler.empty())
     111                  l.clear()
     112  
     113      def test_cancel(self):
     114          l = []
     115          fun = lambda x: l.append(x)
     116          scheduler = sched.scheduler(time.time, time.sleep)
     117          now = time.time()
     118          event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
     119          event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
     120          event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
     121          event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
     122          event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
     123          scheduler.cancel(event1)
     124          scheduler.cancel(event5)
     125          scheduler.run()
     126          self.assertEqual(l, [0.02, 0.03, 0.04])
     127  
     128      @threading_helper.requires_working_threading()
     129      def test_cancel_concurrent(self):
     130          q = queue.Queue()
     131          fun = q.put
     132          timer = Timer()
     133          scheduler = sched.scheduler(timer.time, timer.sleep)
     134          now = timer.time()
     135          event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
     136          event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
     137          event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
     138          event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
     139          event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
     140          t = threading.Thread(target=scheduler.run)
     141          t.start()
     142          timer.advance(1)
     143          self.assertEqual(q.get(timeout=TIMEOUT), 1)
     144          self.assertTrue(q.empty())
     145          scheduler.cancel(event2)
     146          scheduler.cancel(event5)
     147          timer.advance(1)
     148          self.assertTrue(q.empty())
     149          timer.advance(1)
     150          self.assertEqual(q.get(timeout=TIMEOUT), 3)
     151          self.assertTrue(q.empty())
     152          timer.advance(1)
     153          self.assertEqual(q.get(timeout=TIMEOUT), 4)
     154          self.assertTrue(q.empty())
     155          timer.advance(1000)
     156          threading_helper.join_thread(t)
     157          self.assertTrue(q.empty())
     158          self.assertEqual(timer.time(), 4)
     159  
     160      def test_cancel_correct_event(self):
     161          # bpo-19270
     162          events = []
     163          scheduler = sched.scheduler()
     164          scheduler.enterabs(1, 1, events.append, ("a",))
     165          b = scheduler.enterabs(1, 1, events.append, ("b",))
     166          scheduler.enterabs(1, 1, events.append, ("c",))
     167          scheduler.cancel(b)
     168          scheduler.run()
     169          self.assertEqual(events, ["a", "c"])
     170  
     171      def test_empty(self):
     172          l = []
     173          fun = lambda x: l.append(x)
     174          scheduler = sched.scheduler(time.time, time.sleep)
     175          self.assertTrue(scheduler.empty())
     176          for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
     177              z = scheduler.enterabs(x, 1, fun, (x,))
     178          self.assertFalse(scheduler.empty())
     179          scheduler.run()
     180          self.assertTrue(scheduler.empty())
     181  
     182      def test_queue(self):
     183          l = []
     184          fun = lambda x: l.append(x)
     185          scheduler = sched.scheduler(time.time, time.sleep)
     186          now = time.time()
     187          e5 = scheduler.enterabs(now + 0.05, 1, fun)
     188          e1 = scheduler.enterabs(now + 0.01, 1, fun)
     189          e2 = scheduler.enterabs(now + 0.02, 1, fun)
     190          e4 = scheduler.enterabs(now + 0.04, 1, fun)
     191          e3 = scheduler.enterabs(now + 0.03, 1, fun)
     192          # queue property is supposed to return an order list of
     193          # upcoming events
     194          self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
     195  
     196      def test_args_kwargs(self):
     197          seq = []
     198          def fun(*a, **b):
     199              seq.append((a, b))
     200  
     201          now = time.time()
     202          scheduler = sched.scheduler(time.time, time.sleep)
     203          scheduler.enterabs(now, 1, fun)
     204          scheduler.enterabs(now, 1, fun, argument=(1, 2))
     205          scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
     206          scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
     207          scheduler.run()
     208          self.assertCountEqual(seq, [
     209              ((), {}),
     210              ((1, 2), {}),
     211              (('a', 'b'), {}),
     212              ((1, 2), {'foo': 3})
     213          ])
     214  
     215      def test_run_non_blocking(self):
     216          l = []
     217          fun = lambda x: l.append(x)
     218          scheduler = sched.scheduler(time.time, time.sleep)
     219          for x in [10, 9, 8, 7, 6]:
     220              scheduler.enter(x, 1, fun, (x,))
     221          scheduler.run(blocking=False)
     222          self.assertEqual(l, [])
     223  
     224  
     225  if __name__ == "__main__":
     226      unittest.main()