1 import os
2 import sys
3 import threading
4
5 from . import process
6 from . import reduction
7
8 __all__ = ()
9
10 #
11 # Exceptions
12 #
13
14 class ESC[4;38;5;81mProcessError(ESC[4;38;5;149mException):
15 pass
16
17 class ESC[4;38;5;81mBufferTooShort(ESC[4;38;5;149mProcessError):
18 pass
19
20 class ESC[4;38;5;81mTimeoutError(ESC[4;38;5;149mProcessError):
21 pass
22
23 class ESC[4;38;5;81mAuthenticationError(ESC[4;38;5;149mProcessError):
24 pass
25
26 #
27 # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
28 #
29
30 class ESC[4;38;5;81mBaseContext(ESC[4;38;5;149mobject):
31
32 ProcessError = ProcessError
33 BufferTooShort = BufferTooShort
34 TimeoutError = TimeoutError
35 AuthenticationError = AuthenticationError
36
37 current_process = staticmethod(process.current_process)
38 parent_process = staticmethod(process.parent_process)
39 active_children = staticmethod(process.active_children)
40
41 def cpu_count(self):
42 '''Returns the number of CPUs in the system'''
43 num = os.cpu_count()
44 if num is None:
45 raise NotImplementedError('cannot determine number of cpus')
46 else:
47 return num
48
49 def Manager(self):
50 '''Returns a manager associated with a running server process
51
52 The managers methods such as `Lock()`, `Condition()` and `Queue()`
53 can be used to create shared objects.
54 '''
55 from .managers import SyncManager
56 m = SyncManager(ctx=self.get_context())
57 m.start()
58 return m
59
60 def Pipe(self, duplex=True):
61 '''Returns two connection object connected by a pipe'''
62 from .connection import Pipe
63 return Pipe(duplex)
64
65 def Lock(self):
66 '''Returns a non-recursive lock object'''
67 from .synchronize import Lock
68 return Lock(ctx=self.get_context())
69
70 def RLock(self):
71 '''Returns a recursive lock object'''
72 from .synchronize import RLock
73 return RLock(ctx=self.get_context())
74
75 def Condition(self, lock=None):
76 '''Returns a condition object'''
77 from .synchronize import Condition
78 return Condition(lock, ctx=self.get_context())
79
80 def Semaphore(self, value=1):
81 '''Returns a semaphore object'''
82 from .synchronize import Semaphore
83 return Semaphore(value, ctx=self.get_context())
84
85 def BoundedSemaphore(self, value=1):
86 '''Returns a bounded semaphore object'''
87 from .synchronize import BoundedSemaphore
88 return BoundedSemaphore(value, ctx=self.get_context())
89
90 def Event(self):
91 '''Returns an event object'''
92 from .synchronize import Event
93 return Event(ctx=self.get_context())
94
95 def Barrier(self, parties, action=None, timeout=None):
96 '''Returns a barrier object'''
97 from .synchronize import Barrier
98 return Barrier(parties, action, timeout, ctx=self.get_context())
99
100 def Queue(self, maxsize=0):
101 '''Returns a queue object'''
102 from .queues import Queue
103 return Queue(maxsize, ctx=self.get_context())
104
105 def JoinableQueue(self, maxsize=0):
106 '''Returns a queue object'''
107 from .queues import JoinableQueue
108 return JoinableQueue(maxsize, ctx=self.get_context())
109
110 def SimpleQueue(self):
111 '''Returns a queue object'''
112 from .queues import SimpleQueue
113 return SimpleQueue(ctx=self.get_context())
114
115 def Pool(self, processes=None, initializer=None, initargs=(),
116 maxtasksperchild=None):
117 '''Returns a process pool object'''
118 from .pool import Pool
119 return Pool(processes, initializer, initargs, maxtasksperchild,
120 context=self.get_context())
121
122 def RawValue(self, typecode_or_type, *args):
123 '''Returns a shared object'''
124 from .sharedctypes import RawValue
125 return RawValue(typecode_or_type, *args)
126
127 def RawArray(self, typecode_or_type, size_or_initializer):
128 '''Returns a shared array'''
129 from .sharedctypes import RawArray
130 return RawArray(typecode_or_type, size_or_initializer)
131
132 def Value(self, typecode_or_type, *args, lock=True):
133 '''Returns a synchronized shared object'''
134 from .sharedctypes import Value
135 return Value(typecode_or_type, *args, lock=lock,
136 ctx=self.get_context())
137
138 def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
139 '''Returns a synchronized shared array'''
140 from .sharedctypes import Array
141 return Array(typecode_or_type, size_or_initializer, lock=lock,
142 ctx=self.get_context())
143
144 def freeze_support(self):
145 '''Check whether this is a fake forked process in a frozen executable.
146 If so then run code specified by commandline and exit.
147 '''
148 if sys.platform == 'win32' and getattr(sys, 'frozen', False):
149 from .spawn import freeze_support
150 freeze_support()
151
152 def get_logger(self):
153 '''Return package logger -- if it does not already exist then
154 it is created.
155 '''
156 from .util import get_logger
157 return get_logger()
158
159 def log_to_stderr(self, level=None):
160 '''Turn on logging and add a handler which prints to stderr'''
161 from .util import log_to_stderr
162 return log_to_stderr(level)
163
164 def allow_connection_pickling(self):
165 '''Install support for sending connections and sockets
166 between processes
167 '''
168 # This is undocumented. In previous versions of multiprocessing
169 # its only effect was to make socket objects inheritable on Windows.
170 from . import connection
171
172 def set_executable(self, executable):
173 '''Sets the path to a python.exe or pythonw.exe binary used to run
174 child processes instead of sys.executable when using the 'spawn'
175 start method. Useful for people embedding Python.
176 '''
177 from .spawn import set_executable
178 set_executable(executable)
179
180 def set_forkserver_preload(self, module_names):
181 '''Set list of module names to try to load in forkserver process.
182 This is really just a hint.
183 '''
184 from .forkserver import set_forkserver_preload
185 set_forkserver_preload(module_names)
186
187 def get_context(self, method=None):
188 if method is None:
189 return self
190 try:
191 ctx = _concrete_contexts[method]
192 except KeyError:
193 raise ValueError('cannot find context for %r' % method) from None
194 ctx._check_available()
195 return ctx
196
197 def get_start_method(self, allow_none=False):
198 return self._name
199
200 def set_start_method(self, method, force=False):
201 raise ValueError('cannot set start method of concrete context')
202
203 @property
204 def reducer(self):
205 '''Controls how objects will be reduced to a form that can be
206 shared with other processes.'''
207 return globals().get('reduction')
208
209 @reducer.setter
210 def reducer(self, reduction):
211 globals()['reduction'] = reduction
212
213 def _check_available(self):
214 pass
215
216 #
217 # Type of default context -- underlying context can be set at most once
218 #
219
220 class ESC[4;38;5;81mProcess(ESC[4;38;5;149mprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseProcess):
221 _start_method = None
222 @staticmethod
223 def _Popen(process_obj):
224 return _default_context.get_context().Process._Popen(process_obj)
225
226 @staticmethod
227 def _after_fork():
228 return _default_context.get_context().Process._after_fork()
229
230 class ESC[4;38;5;81mDefaultContext(ESC[4;38;5;149mBaseContext):
231 Process = Process
232
233 def __init__(self, context):
234 self._default_context = context
235 self._actual_context = None
236
237 def get_context(self, method=None):
238 if method is None:
239 if self._actual_context is None:
240 self._actual_context = self._default_context
241 return self._actual_context
242 else:
243 return super().get_context(method)
244
245 def set_start_method(self, method, force=False):
246 if self._actual_context is not None and not force:
247 raise RuntimeError('context has already been set')
248 if method is None and force:
249 self._actual_context = None
250 return
251 self._actual_context = self.get_context(method)
252
253 def get_start_method(self, allow_none=False):
254 if self._actual_context is None:
255 if allow_none:
256 return None
257 self._actual_context = self._default_context
258 return self._actual_context._name
259
260 def get_all_start_methods(self):
261 if sys.platform == 'win32':
262 return ['spawn']
263 else:
264 methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn']
265 if reduction.HAVE_SEND_HANDLE:
266 methods.append('forkserver')
267 return methods
268
269
270 #
271 # Context types for fixed start method
272 #
273
274 if sys.platform != 'win32':
275
276 class ESC[4;38;5;81mForkProcess(ESC[4;38;5;149mprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseProcess):
277 _start_method = 'fork'
278 @staticmethod
279 def _Popen(process_obj):
280 from .popen_fork import Popen
281 return Popen(process_obj)
282
283 class ESC[4;38;5;81mSpawnProcess(ESC[4;38;5;149mprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseProcess):
284 _start_method = 'spawn'
285 @staticmethod
286 def _Popen(process_obj):
287 from .popen_spawn_posix import Popen
288 return Popen(process_obj)
289
290 @staticmethod
291 def _after_fork():
292 # process is spawned, nothing to do
293 pass
294
295 class ESC[4;38;5;81mForkServerProcess(ESC[4;38;5;149mprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseProcess):
296 _start_method = 'forkserver'
297 @staticmethod
298 def _Popen(process_obj):
299 from .popen_forkserver import Popen
300 return Popen(process_obj)
301
302 class ESC[4;38;5;81mForkContext(ESC[4;38;5;149mBaseContext):
303 _name = 'fork'
304 Process = ForkProcess
305
306 class ESC[4;38;5;81mSpawnContext(ESC[4;38;5;149mBaseContext):
307 _name = 'spawn'
308 Process = SpawnProcess
309
310 class ESC[4;38;5;81mForkServerContext(ESC[4;38;5;149mBaseContext):
311 _name = 'forkserver'
312 Process = ForkServerProcess
313 def _check_available(self):
314 if not reduction.HAVE_SEND_HANDLE:
315 raise ValueError('forkserver start method not available')
316
317 _concrete_contexts = {
318 'fork': ForkContext(),
319 'spawn': SpawnContext(),
320 'forkserver': ForkServerContext(),
321 }
322 if sys.platform == 'darwin':
323 # bpo-33725: running arbitrary code after fork() is no longer reliable
324 # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
325 _default_context = DefaultContext(_concrete_contexts['spawn'])
326 else:
327 _default_context = DefaultContext(_concrete_contexts['fork'])
328
329 else:
330
331 class ESC[4;38;5;81mSpawnProcess(ESC[4;38;5;149mprocessESC[4;38;5;149m.ESC[4;38;5;149mBaseProcess):
332 _start_method = 'spawn'
333 @staticmethod
334 def _Popen(process_obj):
335 from .popen_spawn_win32 import Popen
336 return Popen(process_obj)
337
338 @staticmethod
339 def _after_fork():
340 # process is spawned, nothing to do
341 pass
342
343 class ESC[4;38;5;81mSpawnContext(ESC[4;38;5;149mBaseContext):
344 _name = 'spawn'
345 Process = SpawnProcess
346
347 _concrete_contexts = {
348 'spawn': SpawnContext(),
349 }
350 _default_context = DefaultContext(_concrete_contexts['spawn'])
351
352 #
353 # Force the start method
354 #
355
356 def _force_start_method(method):
357 _default_context._actual_context = _concrete_contexts[method]
358
359 #
360 # Check that the current thread is spawning a child process
361 #
362
363 _tls = threading.local()
364
365 def get_spawning_popen():
366 return getattr(_tls, 'spawning_popen', None)
367
368 def set_spawning_popen(popen):
369 _tls.spawning_popen = popen
370
371 def assert_spawning(obj):
372 if get_spawning_popen() is None:
373 raise RuntimeError(
374 '%s objects should only be shared between processes'
375 ' through inheritance' % type(obj).__name__
376 )