1 ###############################################################################
2 # Server process to keep track of unlinked resources (like shared memory
3 # segments, semaphores etc.) and clean them.
4 #
5 # On Unix we run a server process which keeps track of unlinked
6 # resources. The server ignores SIGINT and SIGTERM and reads from a
7 # pipe. Every other process of the program has a copy of the writable
8 # end of the pipe, so we get EOF when all other processes have exited.
9 # Then the server process unlinks any remaining resource names.
10 #
11 # This is important because there may be system limits for such resources: for
12 # instance, the system only supports a limited number of named semaphores, and
13 # shared-memory segments live in the RAM. If a python process leaks such a
14 # resource, this resource will not be removed till the next reboot. Without
15 # this resource tracker process, "killall python" would probably leave unlinked
16 # resources.
17
18 import os
19 import signal
20 import sys
21 import threading
22 import warnings
23
24 from . import spawn
25 from . import util
26
27 __all__ = ['ensure_running', 'register', 'unregister']
28
29 _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
30 _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
31
32 _CLEANUP_FUNCS = {
33 'noop': lambda: None,
34 }
35
36 if os.name == 'posix':
37 import _multiprocessing
38 import _posixshmem
39
40 # Use sem_unlink() to clean up named semaphores.
41 #
42 # sem_unlink() may be missing if the Python build process detected the
43 # absence of POSIX named semaphores. In that case, no named semaphores were
44 # ever opened, so no cleanup would be necessary.
45 if hasattr(_multiprocessing, 'sem_unlink'):
46 _CLEANUP_FUNCS.update({
47 'semaphore': _multiprocessing.sem_unlink,
48 })
49 _CLEANUP_FUNCS.update({
50 'shared_memory': _posixshmem.shm_unlink,
51 })
52
53
54 class ESC[4;38;5;81mReentrantCallError(ESC[4;38;5;149mRuntimeError):
55 pass
56
57
58 class ESC[4;38;5;81mResourceTracker(ESC[4;38;5;149mobject):
59
60 def __init__(self):
61 self._lock = threading.RLock()
62 self._fd = None
63 self._pid = None
64
65 def _reentrant_call_error(self):
66 # gh-109629: this happens if an explicit call to the ResourceTracker
67 # gets interrupted by a garbage collection, invoking a finalizer (*)
68 # that itself calls back into ResourceTracker.
69 # (*) for example the SemLock finalizer
70 raise ReentrantCallError(
71 "Reentrant call into the multiprocessing resource tracker")
72
73 def _stop(self):
74 with self._lock:
75 # This should not happen (_stop() isn't called by a finalizer)
76 # but we check for it anyway.
77 if self._lock._recursion_count() > 1:
78 return self._reentrant_call_error()
79 if self._fd is None:
80 # not running
81 return
82
83 # closing the "alive" file descriptor stops main()
84 os.close(self._fd)
85 self._fd = None
86
87 os.waitpid(self._pid, 0)
88 self._pid = None
89
90 def getfd(self):
91 self.ensure_running()
92 return self._fd
93
94 def ensure_running(self):
95 '''Make sure that resource tracker process is running.
96
97 This can be run from any process. Usually a child process will use
98 the resource created by its parent.'''
99 with self._lock:
100 if self._lock._recursion_count() > 1:
101 # The code below is certainly not reentrant-safe, so bail out
102 return self._reentrant_call_error()
103 if self._fd is not None:
104 # resource tracker was launched before, is it still running?
105 if self._check_alive():
106 # => still alive
107 return
108 # => dead, launch it again
109 os.close(self._fd)
110
111 # Clean-up to avoid dangling processes.
112 try:
113 # _pid can be None if this process is a child from another
114 # python process, which has started the resource_tracker.
115 if self._pid is not None:
116 os.waitpid(self._pid, 0)
117 except ChildProcessError:
118 # The resource_tracker has already been terminated.
119 pass
120 self._fd = None
121 self._pid = None
122
123 warnings.warn('resource_tracker: process died unexpectedly, '
124 'relaunching. Some resources might leak.')
125
126 fds_to_pass = []
127 try:
128 fds_to_pass.append(sys.stderr.fileno())
129 except Exception:
130 pass
131 cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
132 r, w = os.pipe()
133 try:
134 fds_to_pass.append(r)
135 # process will out live us, so no need to wait on pid
136 exe = spawn.get_executable()
137 args = [exe] + util._args_from_interpreter_flags()
138 args += ['-c', cmd % r]
139 # bpo-33613: Register a signal mask that will block the signals.
140 # This signal mask will be inherited by the child that is going
141 # to be spawned and will protect the child from a race condition
142 # that can make the child die before it registers signal handlers
143 # for SIGINT and SIGTERM. The mask is unregistered after spawning
144 # the child.
145 try:
146 if _HAVE_SIGMASK:
147 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
148 pid = util.spawnv_passfds(exe, args, fds_to_pass)
149 finally:
150 if _HAVE_SIGMASK:
151 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
152 except:
153 os.close(w)
154 raise
155 else:
156 self._fd = w
157 self._pid = pid
158 finally:
159 os.close(r)
160
161 def _check_alive(self):
162 '''Check that the pipe has not been closed by sending a probe.'''
163 try:
164 # We cannot use send here as it calls ensure_running, creating
165 # a cycle.
166 os.write(self._fd, b'PROBE:0:noop\n')
167 except OSError:
168 return False
169 else:
170 return True
171
172 def register(self, name, rtype):
173 '''Register name of resource with resource tracker.'''
174 self._send('REGISTER', name, rtype)
175
176 def unregister(self, name, rtype):
177 '''Unregister name of resource with resource tracker.'''
178 self._send('UNREGISTER', name, rtype)
179
180 def _send(self, cmd, name, rtype):
181 try:
182 self.ensure_running()
183 except ReentrantCallError:
184 # The code below might or might not work, depending on whether
185 # the resource tracker was already running and still alive.
186 # Better warn the user.
187 # (XXX is warnings.warn itself reentrant-safe? :-)
188 warnings.warn(
189 f"ResourceTracker called reentrantly for resource cleanup, "
190 f"which is unsupported. "
191 f"The {rtype} object {name!r} might leak.")
192 msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
193 if len(msg) > 512:
194 # posix guarantees that writes to a pipe of less than PIPE_BUF
195 # bytes are atomic, and that PIPE_BUF >= 512
196 raise ValueError('msg too long')
197 nbytes = os.write(self._fd, msg)
198 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
199 nbytes, len(msg))
200
201
202 _resource_tracker = ResourceTracker()
203 ensure_running = _resource_tracker.ensure_running
204 register = _resource_tracker.register
205 unregister = _resource_tracker.unregister
206 getfd = _resource_tracker.getfd
207
208
209 def main(fd):
210 '''Run resource tracker.'''
211 # protect the process from ^C and "killall python" etc
212 signal.signal(signal.SIGINT, signal.SIG_IGN)
213 signal.signal(signal.SIGTERM, signal.SIG_IGN)
214 if _HAVE_SIGMASK:
215 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
216
217 for f in (sys.stdin, sys.stdout):
218 try:
219 f.close()
220 except Exception:
221 pass
222
223 cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
224 try:
225 # keep track of registered/unregistered resources
226 with open(fd, 'rb') as f:
227 for line in f:
228 try:
229 cmd, name, rtype = line.strip().decode('ascii').split(':')
230 cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
231 if cleanup_func is None:
232 raise ValueError(
233 f'Cannot register {name} for automatic cleanup: '
234 f'unknown resource type {rtype}')
235
236 if cmd == 'REGISTER':
237 cache[rtype].add(name)
238 elif cmd == 'UNREGISTER':
239 cache[rtype].remove(name)
240 elif cmd == 'PROBE':
241 pass
242 else:
243 raise RuntimeError('unrecognized command %r' % cmd)
244 except Exception:
245 try:
246 sys.excepthook(*sys.exc_info())
247 except:
248 pass
249 finally:
250 # all processes have terminated; cleanup any remaining resources
251 for rtype, rtype_cache in cache.items():
252 if rtype_cache:
253 try:
254 warnings.warn('resource_tracker: There appear to be %d '
255 'leaked %s objects to clean up at shutdown' %
256 (len(rtype_cache), rtype))
257 except Exception:
258 pass
259 for name in rtype_cache:
260 # For some reason the process which created and registered this
261 # resource has failed to unregister it. Presumably it has
262 # died. We therefore unlink it.
263 try:
264 try:
265 _CLEANUP_FUNCS[rtype](name)
266 except Exception as e:
267 warnings.warn('resource_tracker: %r: %s' % (name, e))
268 finally:
269 pass