1 import contextlib
2 import logging
3 import queue
4 import time
5 import unittest
6 from concurrent.futures._base import BrokenExecutor
7 from logging.handlers import QueueHandler
8
9 from test import support
10
11 from .util import ExecutorMixin, create_executor_tests, setup_module
12
13
14 INITIALIZER_STATUS = 'uninitialized'
15
16 def init(x):
17 global INITIALIZER_STATUS
18 INITIALIZER_STATUS = x
19
20 def get_init_status():
21 return INITIALIZER_STATUS
22
23 def init_fail(log_queue=None):
24 if log_queue is not None:
25 logger = logging.getLogger('concurrent.futures')
26 logger.addHandler(QueueHandler(log_queue))
27 logger.setLevel('CRITICAL')
28 logger.propagate = False
29 time.sleep(0.1) # let some futures be scheduled
30 raise ValueError('error in initializer')
31
32
33 class ESC[4;38;5;81mInitializerMixin(ESC[4;38;5;149mExecutorMixin):
34 worker_count = 2
35
36 def setUp(self):
37 global INITIALIZER_STATUS
38 INITIALIZER_STATUS = 'uninitialized'
39 self.executor_kwargs = dict(initializer=init,
40 initargs=('initialized',))
41 super().setUp()
42
43 def test_initializer(self):
44 futures = [self.executor.submit(get_init_status)
45 for _ in range(self.worker_count)]
46
47 for f in futures:
48 self.assertEqual(f.result(), 'initialized')
49
50
51 class ESC[4;38;5;81mFailingInitializerMixin(ESC[4;38;5;149mExecutorMixin):
52 worker_count = 2
53
54 def setUp(self):
55 if hasattr(self, "ctx"):
56 # Pass a queue to redirect the child's logging output
57 self.mp_context = self.get_context()
58 self.log_queue = self.mp_context.Queue()
59 self.executor_kwargs = dict(initializer=init_fail,
60 initargs=(self.log_queue,))
61 else:
62 # In a thread pool, the child shares our logging setup
63 # (see _assert_logged())
64 self.mp_context = None
65 self.log_queue = None
66 self.executor_kwargs = dict(initializer=init_fail)
67 super().setUp()
68
69 def test_initializer(self):
70 with self._assert_logged('ValueError: error in initializer'):
71 try:
72 future = self.executor.submit(get_init_status)
73 except BrokenExecutor:
74 # Perhaps the executor is already broken
75 pass
76 else:
77 with self.assertRaises(BrokenExecutor):
78 future.result()
79
80 # At some point, the executor should break
81 for _ in support.sleeping_retry(5, "executor not broken"):
82 if self.executor._broken:
83 break
84
85 # ... and from this point submit() is guaranteed to fail
86 with self.assertRaises(BrokenExecutor):
87 self.executor.submit(get_init_status)
88
89 @contextlib.contextmanager
90 def _assert_logged(self, msg):
91 if self.log_queue is not None:
92 yield
93 output = []
94 try:
95 while True:
96 output.append(self.log_queue.get_nowait().getMessage())
97 except queue.Empty:
98 pass
99 else:
100 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
101 yield
102 output = cm.output
103 self.assertTrue(any(msg in line for line in output),
104 output)
105
106
107 create_executor_tests(globals(), InitializerMixin)
108 create_executor_tests(globals(), FailingInitializerMixin)
109
110
111 def setUpModule():
112 setup_module()
113
114
115 if __name__ == "__main__":
116 unittest.main()