python (3.11.7)
1 #
2 # Module providing the `Process` class which emulates `threading.Thread`
3 #
4 # multiprocessing/process.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 __all__ = ['BaseProcess', 'current_process', 'active_children',
11 'parent_process']
12
13 #
14 # Imports
15 #
16
17 import os
18 import sys
19 import signal
20 import itertools
21 import threading
22 from _weakrefset import WeakSet
23
24 #
25 #
26 #
27
28 try:
29 ORIGINAL_DIR = os.path.abspath(os.getcwd())
30 except OSError:
31 ORIGINAL_DIR = None
32
33 #
34 # Public functions
35 #
36
37 def current_process():
38 '''
39 Return process object representing the current process
40 '''
41 return _current_process
42
43 def active_children():
44 '''
45 Return list of process objects corresponding to live child processes
46 '''
47 _cleanup()
48 return list(_children)
49
50
51 def parent_process():
52 '''
53 Return process object representing the parent process
54 '''
55 return _parent_process
56
57 #
58 #
59 #
60
61 def _cleanup():
62 # check for processes which have finished
63 for p in list(_children):
64 if (child_popen := p._popen) and child_popen.poll() is not None:
65 _children.discard(p)
66
67 #
68 # The `Process` class
69 #
70
71 class ESC[4;38;5;81mBaseProcess(ESC[4;38;5;149mobject):
72 '''
73 Process objects represent activity that is run in a separate process
74
75 The class is analogous to `threading.Thread`
76 '''
77 def _Popen(self):
78 raise NotImplementedError
79
80 def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
81 *, daemon=None):
82 assert group is None, 'group argument must be None for now'
83 count = next(_process_counter)
84 self._identity = _current_process._identity + (count,)
85 self._config = _current_process._config.copy()
86 self._parent_pid = os.getpid()
87 self._parent_name = _current_process.name
88 self._popen = None
89 self._closed = False
90 self._target = target
91 self._args = tuple(args)
92 self._kwargs = dict(kwargs)
93 self._name = name or type(self).__name__ + '-' + \
94 ':'.join(str(i) for i in self._identity)
95 if daemon is not None:
96 self.daemon = daemon
97 _dangling.add(self)
98
99 def _check_closed(self):
100 if self._closed:
101 raise ValueError("process object is closed")
102
103 def run(self):
104 '''
105 Method to be run in sub-process; can be overridden in sub-class
106 '''
107 if self._target:
108 self._target(*self._args, **self._kwargs)
109
110 def start(self):
111 '''
112 Start child process
113 '''
114 self._check_closed()
115 assert self._popen is None, 'cannot start a process twice'
116 assert self._parent_pid == os.getpid(), \
117 'can only start a process object created by current process'
118 assert not _current_process._config.get('daemon'), \
119 'daemonic processes are not allowed to have children'
120 _cleanup()
121 self._popen = self._Popen(self)
122 self._sentinel = self._popen.sentinel
123 # Avoid a refcycle if the target function holds an indirect
124 # reference to the process object (see bpo-30775)
125 del self._target, self._args, self._kwargs
126 _children.add(self)
127
128 def terminate(self):
129 '''
130 Terminate process; sends SIGTERM signal or uses TerminateProcess()
131 '''
132 self._check_closed()
133 self._popen.terminate()
134
135 def kill(self):
136 '''
137 Terminate process; sends SIGKILL signal or uses TerminateProcess()
138 '''
139 self._check_closed()
140 self._popen.kill()
141
142 def join(self, timeout=None):
143 '''
144 Wait until child process terminates
145 '''
146 self._check_closed()
147 assert self._parent_pid == os.getpid(), 'can only join a child process'
148 assert self._popen is not None, 'can only join a started process'
149 res = self._popen.wait(timeout)
150 if res is not None:
151 _children.discard(self)
152
153 def is_alive(self):
154 '''
155 Return whether process is alive
156 '''
157 self._check_closed()
158 if self is _current_process:
159 return True
160 assert self._parent_pid == os.getpid(), 'can only test a child process'
161
162 if self._popen is None:
163 return False
164
165 returncode = self._popen.poll()
166 if returncode is None:
167 return True
168 else:
169 _children.discard(self)
170 return False
171
172 def close(self):
173 '''
174 Close the Process object.
175
176 This method releases resources held by the Process object. It is
177 an error to call this method if the child process is still running.
178 '''
179 if self._popen is not None:
180 if self._popen.poll() is None:
181 raise ValueError("Cannot close a process while it is still running. "
182 "You should first call join() or terminate().")
183 self._popen.close()
184 self._popen = None
185 del self._sentinel
186 _children.discard(self)
187 self._closed = True
188
189 @property
190 def name(self):
191 return self._name
192
193 @name.setter
194 def name(self, name):
195 assert isinstance(name, str), 'name must be a string'
196 self._name = name
197
198 @property
199 def daemon(self):
200 '''
201 Return whether process is a daemon
202 '''
203 return self._config.get('daemon', False)
204
205 @daemon.setter
206 def daemon(self, daemonic):
207 '''
208 Set whether process is a daemon
209 '''
210 assert self._popen is None, 'process has already started'
211 self._config['daemon'] = daemonic
212
213 @property
214 def authkey(self):
215 return self._config['authkey']
216
217 @authkey.setter
218 def authkey(self, authkey):
219 '''
220 Set authorization key of process
221 '''
222 self._config['authkey'] = AuthenticationString(authkey)
223
224 @property
225 def exitcode(self):
226 '''
227 Return exit code of process or `None` if it has yet to stop
228 '''
229 self._check_closed()
230 if self._popen is None:
231 return self._popen
232 return self._popen.poll()
233
234 @property
235 def ident(self):
236 '''
237 Return identifier (PID) of process or `None` if it has yet to start
238 '''
239 self._check_closed()
240 if self is _current_process:
241 return os.getpid()
242 else:
243 return self._popen and self._popen.pid
244
245 pid = ident
246
247 @property
248 def sentinel(self):
249 '''
250 Return a file descriptor (Unix) or handle (Windows) suitable for
251 waiting for process termination.
252 '''
253 self._check_closed()
254 try:
255 return self._sentinel
256 except AttributeError:
257 raise ValueError("process not started") from None
258
259 def __repr__(self):
260 exitcode = None
261 if self is _current_process:
262 status = 'started'
263 elif self._closed:
264 status = 'closed'
265 elif self._parent_pid != os.getpid():
266 status = 'unknown'
267 elif self._popen is None:
268 status = 'initial'
269 else:
270 exitcode = self._popen.poll()
271 if exitcode is not None:
272 status = 'stopped'
273 else:
274 status = 'started'
275
276 info = [type(self).__name__, 'name=%r' % self._name]
277 if self._popen is not None:
278 info.append('pid=%s' % self._popen.pid)
279 info.append('parent=%s' % self._parent_pid)
280 info.append(status)
281 if exitcode is not None:
282 exitcode = _exitcode_to_name.get(exitcode, exitcode)
283 info.append('exitcode=%s' % exitcode)
284 if self.daemon:
285 info.append('daemon')
286 return '<%s>' % ' '.join(info)
287
288 ##
289
290 def _bootstrap(self, parent_sentinel=None):
291 from . import util, context
292 global _current_process, _parent_process, _process_counter, _children
293
294 try:
295 if self._start_method is not None:
296 context._force_start_method(self._start_method)
297 _process_counter = itertools.count(1)
298 _children = set()
299 util._close_stdin()
300 old_process = _current_process
301 _current_process = self
302 _parent_process = _ParentProcess(
303 self._parent_name, self._parent_pid, parent_sentinel)
304 if threading._HAVE_THREAD_NATIVE_ID:
305 threading.main_thread()._set_native_id()
306 try:
307 self._after_fork()
308 finally:
309 # delay finalization of the old process object until after
310 # _run_after_forkers() is executed
311 del old_process
312 util.info('child process calling self.run()')
313 try:
314 self.run()
315 exitcode = 0
316 finally:
317 util._exit_function()
318 except SystemExit as e:
319 if e.code is None:
320 exitcode = 0
321 elif isinstance(e.code, int):
322 exitcode = e.code
323 else:
324 sys.stderr.write(str(e.code) + '\n')
325 exitcode = 1
326 except:
327 exitcode = 1
328 import traceback
329 sys.stderr.write('Process %s:\n' % self.name)
330 traceback.print_exc()
331 finally:
332 threading._shutdown()
333 util.info('process exiting with exitcode %d' % exitcode)
334 util._flush_std_streams()
335
336 return exitcode
337
338 @staticmethod
339 def _after_fork():
340 from . import util
341 util._finalizer_registry.clear()
342 util._run_after_forkers()
343
344
345 #
346 # We subclass bytes to avoid accidental transmission of auth keys over network
347 #
348
349 class ESC[4;38;5;81mAuthenticationString(ESC[4;38;5;149mbytes):
350 def __reduce__(self):
351 from .context import get_spawning_popen
352 if get_spawning_popen() is None:
353 raise TypeError(
354 'Pickling an AuthenticationString object is '
355 'disallowed for security reasons'
356 )
357 return AuthenticationString, (bytes(self),)
358
359
360 #
361 # Create object representing the parent process
362 #
363
364 class ESC[4;38;5;81m_ParentProcess(ESC[4;38;5;149mBaseProcess):
365
366 def __init__(self, name, pid, sentinel):
367 self._identity = ()
368 self._name = name
369 self._pid = pid
370 self._parent_pid = None
371 self._popen = None
372 self._closed = False
373 self._sentinel = sentinel
374 self._config = {}
375
376 def is_alive(self):
377 from multiprocessing.connection import wait
378 return not wait([self._sentinel], timeout=0)
379
380 @property
381 def ident(self):
382 return self._pid
383
384 def join(self, timeout=None):
385 '''
386 Wait until parent process terminates
387 '''
388 from multiprocessing.connection import wait
389 wait([self._sentinel], timeout=timeout)
390
391 pid = ident
392
393 #
394 # Create object representing the main process
395 #
396
397 class ESC[4;38;5;81m_MainProcess(ESC[4;38;5;149mBaseProcess):
398
399 def __init__(self):
400 self._identity = ()
401 self._name = 'MainProcess'
402 self._parent_pid = None
403 self._popen = None
404 self._closed = False
405 self._config = {'authkey': AuthenticationString(os.urandom(32)),
406 'semprefix': '/mp'}
407 # Note that some versions of FreeBSD only allow named
408 # semaphores to have names of up to 14 characters. Therefore
409 # we choose a short prefix.
410 #
411 # On MacOSX in a sandbox it may be necessary to use a
412 # different prefix -- see #19478.
413 #
414 # Everything in self._config will be inherited by descendant
415 # processes.
416
417 def close(self):
418 pass
419
420
421 _parent_process = None
422 _current_process = _MainProcess()
423 _process_counter = itertools.count(1)
424 _children = set()
425 del _MainProcess
426
427 #
428 # Give names to some return codes
429 #
430
431 _exitcode_to_name = {}
432
433 for name, signum in list(signal.__dict__.items()):
434 if name[:3]=='SIG' and '_' not in name:
435 _exitcode_to_name[-signum] = f'-{name}'
436 del name, signum
437
438 # For debug and leak testing
439 _dangling = WeakSet()