1 """Thread module emulating a subset of Java's threading model."""
2
3 import os as _os
4 import sys as _sys
5 import _thread
6 import functools
7
8 from time import monotonic as _time
9 from _weakrefset import WeakSet
10 from itertools import count as _count
11 try:
12 from _collections import deque as _deque
13 except ImportError:
14 from collections import deque as _deque
15
16 # Note regarding PEP 8 compliant names
17 # This threading model was originally inspired by Java, and inherited
18 # the convention of camelCase function and method names from that
19 # language. Those original names are not in any imminent danger of
20 # being deprecated (even for Py3k),so this module provides them as an
21 # alias for the PEP 8 compliant names
22 # Note that using the new PEP 8 compliant names facilitates substitution
23 # with the multiprocessing module, which doesn't provide the old
24 # Java inspired names.
25
26 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
27 'enumerate', 'main_thread', 'TIMEOUT_MAX',
28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
30 'setprofile', 'settrace', 'local', 'stack_size',
31 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile',
32 'setprofile_all_threads','settrace_all_threads']
33
34 # Rename some stuff so "from threading import *" is safe
35 _start_new_thread = _thread.start_new_thread
36 _daemon_threads_allowed = _thread.daemon_threads_allowed
37 _allocate_lock = _thread.allocate_lock
38 _set_sentinel = _thread._set_sentinel
39 get_ident = _thread.get_ident
40 try:
41 get_native_id = _thread.get_native_id
42 _HAVE_THREAD_NATIVE_ID = True
43 __all__.append('get_native_id')
44 except AttributeError:
45 _HAVE_THREAD_NATIVE_ID = False
46 ThreadError = _thread.error
47 try:
48 _CRLock = _thread.RLock
49 except AttributeError:
50 _CRLock = None
51 TIMEOUT_MAX = _thread.TIMEOUT_MAX
52 del _thread
53
54
55 # Support for profile and trace hooks
56
57 _profile_hook = None
58 _trace_hook = None
59
60 def setprofile(func):
61 """Set a profile function for all threads started from the threading module.
62
63 The func will be passed to sys.setprofile() for each thread, before its
64 run() method is called.
65 """
66 global _profile_hook
67 _profile_hook = func
68
69 def setprofile_all_threads(func):
70 """Set a profile function for all threads started from the threading module
71 and all Python threads that are currently executing.
72
73 The func will be passed to sys.setprofile() for each thread, before its
74 run() method is called.
75 """
76 setprofile(func)
77 _sys._setprofileallthreads(func)
78
79 def getprofile():
80 """Get the profiler function as set by threading.setprofile()."""
81 return _profile_hook
82
83 def settrace(func):
84 """Set a trace function for all threads started from the threading module.
85
86 The func will be passed to sys.settrace() for each thread, before its run()
87 method is called.
88 """
89 global _trace_hook
90 _trace_hook = func
91
92 def settrace_all_threads(func):
93 """Set a trace function for all threads started from the threading module
94 and all Python threads that are currently executing.
95
96 The func will be passed to sys.settrace() for each thread, before its run()
97 method is called.
98 """
99 settrace(func)
100 _sys._settraceallthreads(func)
101
102 def gettrace():
103 """Get the trace function as set by threading.settrace()."""
104 return _trace_hook
105
106 # Synchronization classes
107
108 Lock = _allocate_lock
109
110 def RLock(*args, **kwargs):
111 """Factory function that returns a new reentrant lock.
112
113 A reentrant lock must be released by the thread that acquired it. Once a
114 thread has acquired a reentrant lock, the same thread may acquire it again
115 without blocking; the thread must release it once for each time it has
116 acquired it.
117
118 """
119 if _CRLock is None:
120 return _PyRLock(*args, **kwargs)
121 return _CRLock(*args, **kwargs)
122
123 class ESC[4;38;5;81m_RLock:
124 """This class implements reentrant lock objects.
125
126 A reentrant lock must be released by the thread that acquired it. Once a
127 thread has acquired a reentrant lock, the same thread may acquire it
128 again without blocking; the thread must release it once for each time it
129 has acquired it.
130
131 """
132
133 def __init__(self):
134 self._block = _allocate_lock()
135 self._owner = None
136 self._count = 0
137
138 def __repr__(self):
139 owner = self._owner
140 try:
141 owner = _active[owner].name
142 except KeyError:
143 pass
144 return "<%s %s.%s object owner=%r count=%d at %s>" % (
145 "locked" if self._block.locked() else "unlocked",
146 self.__class__.__module__,
147 self.__class__.__qualname__,
148 owner,
149 self._count,
150 hex(id(self))
151 )
152
153 def _at_fork_reinit(self):
154 self._block._at_fork_reinit()
155 self._owner = None
156 self._count = 0
157
158 def acquire(self, blocking=True, timeout=-1):
159 """Acquire a lock, blocking or non-blocking.
160
161 When invoked without arguments: if this thread already owns the lock,
162 increment the recursion level by one, and return immediately. Otherwise,
163 if another thread owns the lock, block until the lock is unlocked. Once
164 the lock is unlocked (not owned by any thread), then grab ownership, set
165 the recursion level to one, and return. If more than one thread is
166 blocked waiting until the lock is unlocked, only one at a time will be
167 able to grab ownership of the lock. There is no return value in this
168 case.
169
170 When invoked with the blocking argument set to true, do the same thing
171 as when called without arguments, and return true.
172
173 When invoked with the blocking argument set to false, do not block. If a
174 call without an argument would block, return false immediately;
175 otherwise, do the same thing as when called without arguments, and
176 return true.
177
178 When invoked with the floating-point timeout argument set to a positive
179 value, block for at most the number of seconds specified by timeout
180 and as long as the lock cannot be acquired. Return true if the lock has
181 been acquired, false if the timeout has elapsed.
182
183 """
184 me = get_ident()
185 if self._owner == me:
186 self._count += 1
187 return 1
188 rc = self._block.acquire(blocking, timeout)
189 if rc:
190 self._owner = me
191 self._count = 1
192 return rc
193
194 __enter__ = acquire
195
196 def release(self):
197 """Release a lock, decrementing the recursion level.
198
199 If after the decrement it is zero, reset the lock to unlocked (not owned
200 by any thread), and if any other threads are blocked waiting for the
201 lock to become unlocked, allow exactly one of them to proceed. If after
202 the decrement the recursion level is still nonzero, the lock remains
203 locked and owned by the calling thread.
204
205 Only call this method when the calling thread owns the lock. A
206 RuntimeError is raised if this method is called when the lock is
207 unlocked.
208
209 There is no return value.
210
211 """
212 if self._owner != get_ident():
213 raise RuntimeError("cannot release un-acquired lock")
214 self._count = count = self._count - 1
215 if not count:
216 self._owner = None
217 self._block.release()
218
219 def __exit__(self, t, v, tb):
220 self.release()
221
222 # Internal methods used by condition variables
223
224 def _acquire_restore(self, state):
225 self._block.acquire()
226 self._count, self._owner = state
227
228 def _release_save(self):
229 if self._count == 0:
230 raise RuntimeError("cannot release un-acquired lock")
231 count = self._count
232 self._count = 0
233 owner = self._owner
234 self._owner = None
235 self._block.release()
236 return (count, owner)
237
238 def _is_owned(self):
239 return self._owner == get_ident()
240
241 _PyRLock = _RLock
242
243
244 class ESC[4;38;5;81mCondition:
245 """Class that implements a condition variable.
246
247 A condition variable allows one or more threads to wait until they are
248 notified by another thread.
249
250 If the lock argument is given and not None, it must be a Lock or RLock
251 object, and it is used as the underlying lock. Otherwise, a new RLock object
252 is created and used as the underlying lock.
253
254 """
255
256 def __init__(self, lock=None):
257 if lock is None:
258 lock = RLock()
259 self._lock = lock
260 # Export the lock's acquire() and release() methods
261 self.acquire = lock.acquire
262 self.release = lock.release
263 # If the lock defines _release_save() and/or _acquire_restore(),
264 # these override the default implementations (which just call
265 # release() and acquire() on the lock). Ditto for _is_owned().
266 if hasattr(lock, '_release_save'):
267 self._release_save = lock._release_save
268 if hasattr(lock, '_acquire_restore'):
269 self._acquire_restore = lock._acquire_restore
270 if hasattr(lock, '_is_owned'):
271 self._is_owned = lock._is_owned
272 self._waiters = _deque()
273
274 def _at_fork_reinit(self):
275 self._lock._at_fork_reinit()
276 self._waiters.clear()
277
278 def __enter__(self):
279 return self._lock.__enter__()
280
281 def __exit__(self, *args):
282 return self._lock.__exit__(*args)
283
284 def __repr__(self):
285 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
286
287 def _release_save(self):
288 self._lock.release() # No state to save
289
290 def _acquire_restore(self, x):
291 self._lock.acquire() # Ignore saved state
292
293 def _is_owned(self):
294 # Return True if lock is owned by current_thread.
295 # This method is called only if _lock doesn't have _is_owned().
296 if self._lock.acquire(False):
297 self._lock.release()
298 return False
299 else:
300 return True
301
302 def wait(self, timeout=None):
303 """Wait until notified or until a timeout occurs.
304
305 If the calling thread has not acquired the lock when this method is
306 called, a RuntimeError is raised.
307
308 This method releases the underlying lock, and then blocks until it is
309 awakened by a notify() or notify_all() call for the same condition
310 variable in another thread, or until the optional timeout occurs. Once
311 awakened or timed out, it re-acquires the lock and returns.
312
313 When the timeout argument is present and not None, it should be a
314 floating point number specifying a timeout for the operation in seconds
315 (or fractions thereof).
316
317 When the underlying lock is an RLock, it is not released using its
318 release() method, since this may not actually unlock the lock when it
319 was acquired multiple times recursively. Instead, an internal interface
320 of the RLock class is used, which really unlocks it even when it has
321 been recursively acquired several times. Another internal interface is
322 then used to restore the recursion level when the lock is reacquired.
323
324 """
325 if not self._is_owned():
326 raise RuntimeError("cannot wait on un-acquired lock")
327 waiter = _allocate_lock()
328 waiter.acquire()
329 self._waiters.append(waiter)
330 saved_state = self._release_save()
331 gotit = False
332 try: # restore state no matter what (e.g., KeyboardInterrupt)
333 if timeout is None:
334 waiter.acquire()
335 gotit = True
336 else:
337 if timeout > 0:
338 gotit = waiter.acquire(True, timeout)
339 else:
340 gotit = waiter.acquire(False)
341 return gotit
342 finally:
343 self._acquire_restore(saved_state)
344 if not gotit:
345 try:
346 self._waiters.remove(waiter)
347 except ValueError:
348 pass
349
350 def wait_for(self, predicate, timeout=None):
351 """Wait until a condition evaluates to True.
352
353 predicate should be a callable which result will be interpreted as a
354 boolean value. A timeout may be provided giving the maximum time to
355 wait.
356
357 """
358 endtime = None
359 waittime = timeout
360 result = predicate()
361 while not result:
362 if waittime is not None:
363 if endtime is None:
364 endtime = _time() + waittime
365 else:
366 waittime = endtime - _time()
367 if waittime <= 0:
368 break
369 self.wait(waittime)
370 result = predicate()
371 return result
372
373 def notify(self, n=1):
374 """Wake up one or more threads waiting on this condition, if any.
375
376 If the calling thread has not acquired the lock when this method is
377 called, a RuntimeError is raised.
378
379 This method wakes up at most n of the threads waiting for the condition
380 variable; it is a no-op if no threads are waiting.
381
382 """
383 if not self._is_owned():
384 raise RuntimeError("cannot notify on un-acquired lock")
385 waiters = self._waiters
386 while waiters and n > 0:
387 waiter = waiters[0]
388 try:
389 waiter.release()
390 except RuntimeError:
391 # gh-92530: The previous call of notify() released the lock,
392 # but was interrupted before removing it from the queue.
393 # It can happen if a signal handler raises an exception,
394 # like CTRL+C which raises KeyboardInterrupt.
395 pass
396 else:
397 n -= 1
398 try:
399 waiters.remove(waiter)
400 except ValueError:
401 pass
402
403 def notify_all(self):
404 """Wake up all threads waiting on this condition.
405
406 If the calling thread has not acquired the lock when this method
407 is called, a RuntimeError is raised.
408
409 """
410 self.notify(len(self._waiters))
411
412 def notifyAll(self):
413 """Wake up all threads waiting on this condition.
414
415 This method is deprecated, use notify_all() instead.
416
417 """
418 import warnings
419 warnings.warn('notifyAll() is deprecated, use notify_all() instead',
420 DeprecationWarning, stacklevel=2)
421 self.notify_all()
422
423
424 class ESC[4;38;5;81mSemaphore:
425 """This class implements semaphore objects.
426
427 Semaphores manage a counter representing the number of release() calls minus
428 the number of acquire() calls, plus an initial value. The acquire() method
429 blocks if necessary until it can return without making the counter
430 negative. If not given, value defaults to 1.
431
432 """
433
434 # After Tim Peters' semaphore class, but not quite the same (no maximum)
435
436 def __init__(self, value=1):
437 if value < 0:
438 raise ValueError("semaphore initial value must be >= 0")
439 self._cond = Condition(Lock())
440 self._value = value
441
442 def __repr__(self):
443 cls = self.__class__
444 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
445 f" value={self._value}>")
446
447 def acquire(self, blocking=True, timeout=None):
448 """Acquire a semaphore, decrementing the internal counter by one.
449
450 When invoked without arguments: if the internal counter is larger than
451 zero on entry, decrement it by one and return immediately. If it is zero
452 on entry, block, waiting until some other thread has called release() to
453 make it larger than zero. This is done with proper interlocking so that
454 if multiple acquire() calls are blocked, release() will wake exactly one
455 of them up. The implementation may pick one at random, so the order in
456 which blocked threads are awakened should not be relied on. There is no
457 return value in this case.
458
459 When invoked with blocking set to true, do the same thing as when called
460 without arguments, and return true.
461
462 When invoked with blocking set to false, do not block. If a call without
463 an argument would block, return false immediately; otherwise, do the
464 same thing as when called without arguments, and return true.
465
466 When invoked with a timeout other than None, it will block for at
467 most timeout seconds. If acquire does not complete successfully in
468 that interval, return false. Return true otherwise.
469
470 """
471 if not blocking and timeout is not None:
472 raise ValueError("can't specify timeout for non-blocking acquire")
473 rc = False
474 endtime = None
475 with self._cond:
476 while self._value == 0:
477 if not blocking:
478 break
479 if timeout is not None:
480 if endtime is None:
481 endtime = _time() + timeout
482 else:
483 timeout = endtime - _time()
484 if timeout <= 0:
485 break
486 self._cond.wait(timeout)
487 else:
488 self._value -= 1
489 rc = True
490 return rc
491
492 __enter__ = acquire
493
494 def release(self, n=1):
495 """Release a semaphore, incrementing the internal counter by one or more.
496
497 When the counter is zero on entry and another thread is waiting for it
498 to become larger than zero again, wake up that thread.
499
500 """
501 if n < 1:
502 raise ValueError('n must be one or more')
503 with self._cond:
504 self._value += n
505 self._cond.notify(n)
506
507 def __exit__(self, t, v, tb):
508 self.release()
509
510
511 class ESC[4;38;5;81mBoundedSemaphore(ESC[4;38;5;149mSemaphore):
512 """Implements a bounded semaphore.
513
514 A bounded semaphore checks to make sure its current value doesn't exceed its
515 initial value. If it does, ValueError is raised. In most situations
516 semaphores are used to guard resources with limited capacity.
517
518 If the semaphore is released too many times it's a sign of a bug. If not
519 given, value defaults to 1.
520
521 Like regular semaphores, bounded semaphores manage a counter representing
522 the number of release() calls minus the number of acquire() calls, plus an
523 initial value. The acquire() method blocks if necessary until it can return
524 without making the counter negative. If not given, value defaults to 1.
525
526 """
527
528 def __init__(self, value=1):
529 super().__init__(value)
530 self._initial_value = value
531
532 def __repr__(self):
533 cls = self.__class__
534 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
535 f" value={self._value}/{self._initial_value}>")
536
537 def release(self, n=1):
538 """Release a semaphore, incrementing the internal counter by one or more.
539
540 When the counter is zero on entry and another thread is waiting for it
541 to become larger than zero again, wake up that thread.
542
543 If the number of releases exceeds the number of acquires,
544 raise a ValueError.
545
546 """
547 if n < 1:
548 raise ValueError('n must be one or more')
549 with self._cond:
550 if self._value + n > self._initial_value:
551 raise ValueError("Semaphore released too many times")
552 self._value += n
553 self._cond.notify(n)
554
555
556 class ESC[4;38;5;81mEvent:
557 """Class implementing event objects.
558
559 Events manage a flag that can be set to true with the set() method and reset
560 to false with the clear() method. The wait() method blocks until the flag is
561 true. The flag is initially false.
562
563 """
564
565 # After Tim Peters' event class (without is_posted())
566
567 def __init__(self):
568 self._cond = Condition(Lock())
569 self._flag = False
570
571 def __repr__(self):
572 cls = self.__class__
573 status = 'set' if self._flag else 'unset'
574 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>"
575
576 def _at_fork_reinit(self):
577 # Private method called by Thread._reset_internal_locks()
578 self._cond._at_fork_reinit()
579
580 def is_set(self):
581 """Return true if and only if the internal flag is true."""
582 return self._flag
583
584 def isSet(self):
585 """Return true if and only if the internal flag is true.
586
587 This method is deprecated, use is_set() instead.
588
589 """
590 import warnings
591 warnings.warn('isSet() is deprecated, use is_set() instead',
592 DeprecationWarning, stacklevel=2)
593 return self.is_set()
594
595 def set(self):
596 """Set the internal flag to true.
597
598 All threads waiting for it to become true are awakened. Threads
599 that call wait() once the flag is true will not block at all.
600
601 """
602 with self._cond:
603 self._flag = True
604 self._cond.notify_all()
605
606 def clear(self):
607 """Reset the internal flag to false.
608
609 Subsequently, threads calling wait() will block until set() is called to
610 set the internal flag to true again.
611
612 """
613 with self._cond:
614 self._flag = False
615
616 def wait(self, timeout=None):
617 """Block until the internal flag is true.
618
619 If the internal flag is true on entry, return immediately. Otherwise,
620 block until another thread calls set() to set the flag to true, or until
621 the optional timeout occurs.
622
623 When the timeout argument is present and not None, it should be a
624 floating point number specifying a timeout for the operation in seconds
625 (or fractions thereof).
626
627 This method returns the internal flag on exit, so it will always return
628 True except if a timeout is given and the operation times out.
629
630 """
631 with self._cond:
632 signaled = self._flag
633 if not signaled:
634 signaled = self._cond.wait(timeout)
635 return signaled
636
637
638 # A barrier class. Inspired in part by the pthread_barrier_* api and
639 # the CyclicBarrier class from Java. See
640 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
641 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
642 # CyclicBarrier.html
643 # for information.
644 # We maintain two main states, 'filling' and 'draining' enabling the barrier
645 # to be cyclic. Threads are not allowed into it until it has fully drained
646 # since the previous cycle. In addition, a 'resetting' state exists which is
647 # similar to 'draining' except that threads leave with a BrokenBarrierError,
648 # and a 'broken' state in which all threads get the exception.
649 class ESC[4;38;5;81mBarrier:
650 """Implements a Barrier.
651
652 Useful for synchronizing a fixed number of threads at known synchronization
653 points. Threads block on 'wait()' and are simultaneously awoken once they
654 have all made that call.
655
656 """
657
658 def __init__(self, parties, action=None, timeout=None):
659 """Create a barrier, initialised to 'parties' threads.
660
661 'action' is a callable which, when supplied, will be called by one of
662 the threads after they have all entered the barrier and just prior to
663 releasing them all. If a 'timeout' is provided, it is used as the
664 default for all subsequent 'wait()' calls.
665
666 """
667 self._cond = Condition(Lock())
668 self._action = action
669 self._timeout = timeout
670 self._parties = parties
671 self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken
672 self._count = 0
673
674 def __repr__(self):
675 cls = self.__class__
676 if self.broken:
677 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: broken>"
678 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
679 f" waiters={self.n_waiting}/{self.parties}>")
680
681 def wait(self, timeout=None):
682 """Wait for the barrier.
683
684 When the specified number of threads have started waiting, they are all
685 simultaneously awoken. If an 'action' was provided for the barrier, one
686 of the threads will have executed that callback prior to returning.
687 Returns an individual index number from 0 to 'parties-1'.
688
689 """
690 if timeout is None:
691 timeout = self._timeout
692 with self._cond:
693 self._enter() # Block while the barrier drains.
694 index = self._count
695 self._count += 1
696 try:
697 if index + 1 == self._parties:
698 # We release the barrier
699 self._release()
700 else:
701 # We wait until someone releases us
702 self._wait(timeout)
703 return index
704 finally:
705 self._count -= 1
706 # Wake up any threads waiting for barrier to drain.
707 self._exit()
708
709 # Block until the barrier is ready for us, or raise an exception
710 # if it is broken.
711 def _enter(self):
712 while self._state in (-1, 1):
713 # It is draining or resetting, wait until done
714 self._cond.wait()
715 #see if the barrier is in a broken state
716 if self._state < 0:
717 raise BrokenBarrierError
718 assert self._state == 0
719
720 # Optionally run the 'action' and release the threads waiting
721 # in the barrier.
722 def _release(self):
723 try:
724 if self._action:
725 self._action()
726 # enter draining state
727 self._state = 1
728 self._cond.notify_all()
729 except:
730 #an exception during the _action handler. Break and reraise
731 self._break()
732 raise
733
734 # Wait in the barrier until we are released. Raise an exception
735 # if the barrier is reset or broken.
736 def _wait(self, timeout):
737 if not self._cond.wait_for(lambda : self._state != 0, timeout):
738 #timed out. Break the barrier
739 self._break()
740 raise BrokenBarrierError
741 if self._state < 0:
742 raise BrokenBarrierError
743 assert self._state == 1
744
745 # If we are the last thread to exit the barrier, signal any threads
746 # waiting for the barrier to drain.
747 def _exit(self):
748 if self._count == 0:
749 if self._state in (-1, 1):
750 #resetting or draining
751 self._state = 0
752 self._cond.notify_all()
753
754 def reset(self):
755 """Reset the barrier to the initial state.
756
757 Any threads currently waiting will get the BrokenBarrier exception
758 raised.
759
760 """
761 with self._cond:
762 if self._count > 0:
763 if self._state == 0:
764 #reset the barrier, waking up threads
765 self._state = -1
766 elif self._state == -2:
767 #was broken, set it to reset state
768 #which clears when the last thread exits
769 self._state = -1
770 else:
771 self._state = 0
772 self._cond.notify_all()
773
774 def abort(self):
775 """Place the barrier into a 'broken' state.
776
777 Useful in case of error. Any currently waiting threads and threads
778 attempting to 'wait()' will have BrokenBarrierError raised.
779
780 """
781 with self._cond:
782 self._break()
783
784 def _break(self):
785 # An internal error was detected. The barrier is set to
786 # a broken state all parties awakened.
787 self._state = -2
788 self._cond.notify_all()
789
790 @property
791 def parties(self):
792 """Return the number of threads required to trip the barrier."""
793 return self._parties
794
795 @property
796 def n_waiting(self):
797 """Return the number of threads currently waiting at the barrier."""
798 # We don't need synchronization here since this is an ephemeral result
799 # anyway. It returns the correct value in the steady state.
800 if self._state == 0:
801 return self._count
802 return 0
803
804 @property
805 def broken(self):
806 """Return True if the barrier is in a broken state."""
807 return self._state == -2
808
809 # exception raised by the Barrier class
810 class ESC[4;38;5;81mBrokenBarrierError(ESC[4;38;5;149mRuntimeError):
811 pass
812
813
814 # Helper to generate new thread names
815 _counter = _count(1).__next__
816 def _newname(name_template):
817 return name_template % _counter()
818
819 # Active thread administration.
820 #
821 # bpo-44422: Use a reentrant lock to allow reentrant calls to functions like
822 # threading.enumerate().
823 _active_limbo_lock = RLock()
824 _active = {} # maps thread id to Thread object
825 _limbo = {}
826 _dangling = WeakSet()
827
828 # Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
829 # to wait until all Python thread states get deleted:
830 # see Thread._set_tstate_lock().
831 _shutdown_locks_lock = _allocate_lock()
832 _shutdown_locks = set()
833
834 def _maintain_shutdown_locks():
835 """
836 Drop any shutdown locks that don't correspond to running threads anymore.
837
838 Calling this from time to time avoids an ever-growing _shutdown_locks
839 set when Thread objects are not joined explicitly. See bpo-37788.
840
841 This must be called with _shutdown_locks_lock acquired.
842 """
843 # If a lock was released, the corresponding thread has exited
844 to_remove = [lock for lock in _shutdown_locks if not lock.locked()]
845 _shutdown_locks.difference_update(to_remove)
846
847
848 # Main class for threads
849
850 class ESC[4;38;5;81mThread:
851 """A class that represents a thread of control.
852
853 This class can be safely subclassed in a limited fashion. There are two ways
854 to specify the activity: by passing a callable object to the constructor, or
855 by overriding the run() method in a subclass.
856
857 """
858
859 _initialized = False
860
861 def __init__(self, group=None, target=None, name=None,
862 args=(), kwargs=None, *, daemon=None):
863 """This constructor should always be called with keyword arguments. Arguments are:
864
865 *group* should be None; reserved for future extension when a ThreadGroup
866 class is implemented.
867
868 *target* is the callable object to be invoked by the run()
869 method. Defaults to None, meaning nothing is called.
870
871 *name* is the thread name. By default, a unique name is constructed of
872 the form "Thread-N" where N is a small decimal number.
873
874 *args* is a list or tuple of arguments for the target invocation. Defaults to ().
875
876 *kwargs* is a dictionary of keyword arguments for the target
877 invocation. Defaults to {}.
878
879 If a subclass overrides the constructor, it must make sure to invoke
880 the base class constructor (Thread.__init__()) before doing anything
881 else to the thread.
882
883 """
884 assert group is None, "group argument must be None for now"
885 if kwargs is None:
886 kwargs = {}
887 if name:
888 name = str(name)
889 else:
890 name = _newname("Thread-%d")
891 if target is not None:
892 try:
893 target_name = target.__name__
894 name += f" ({target_name})"
895 except AttributeError:
896 pass
897
898 self._target = target
899 self._name = name
900 self._args = args
901 self._kwargs = kwargs
902 if daemon is not None:
903 if daemon and not _daemon_threads_allowed():
904 raise RuntimeError('daemon threads are disabled in this (sub)interpreter')
905 self._daemonic = daemon
906 else:
907 self._daemonic = current_thread().daemon
908 self._ident = None
909 if _HAVE_THREAD_NATIVE_ID:
910 self._native_id = None
911 self._tstate_lock = None
912 self._started = Event()
913 self._is_stopped = False
914 self._initialized = True
915 # Copy of sys.stderr used by self._invoke_excepthook()
916 self._stderr = _sys.stderr
917 self._invoke_excepthook = _make_invoke_excepthook()
918 # For debugging and _after_fork()
919 _dangling.add(self)
920
921 def _reset_internal_locks(self, is_alive):
922 # private! Called by _after_fork() to reset our internal locks as
923 # they may be in an invalid state leading to a deadlock or crash.
924 self._started._at_fork_reinit()
925 if is_alive:
926 # bpo-42350: If the fork happens when the thread is already stopped
927 # (ex: after threading._shutdown() has been called), _tstate_lock
928 # is None. Do nothing in this case.
929 if self._tstate_lock is not None:
930 self._tstate_lock._at_fork_reinit()
931 self._tstate_lock.acquire()
932 else:
933 # The thread isn't alive after fork: it doesn't have a tstate
934 # anymore.
935 self._is_stopped = True
936 self._tstate_lock = None
937
938 def __repr__(self):
939 assert self._initialized, "Thread.__init__() was not called"
940 status = "initial"
941 if self._started.is_set():
942 status = "started"
943 self.is_alive() # easy way to get ._is_stopped set when appropriate
944 if self._is_stopped:
945 status = "stopped"
946 if self._daemonic:
947 status += " daemon"
948 if self._ident is not None:
949 status += " %s" % self._ident
950 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
951
952 def start(self):
953 """Start the thread's activity.
954
955 It must be called at most once per thread object. It arranges for the
956 object's run() method to be invoked in a separate thread of control.
957
958 This method will raise a RuntimeError if called more than once on the
959 same thread object.
960
961 """
962 if not self._initialized:
963 raise RuntimeError("thread.__init__() not called")
964
965 if self._started.is_set():
966 raise RuntimeError("threads can only be started once")
967
968 with _active_limbo_lock:
969 _limbo[self] = self
970 try:
971 _start_new_thread(self._bootstrap, ())
972 except Exception:
973 with _active_limbo_lock:
974 del _limbo[self]
975 raise
976 self._started.wait()
977
978 def run(self):
979 """Method representing the thread's activity.
980
981 You may override this method in a subclass. The standard run() method
982 invokes the callable object passed to the object's constructor as the
983 target argument, if any, with sequential and keyword arguments taken
984 from the args and kwargs arguments, respectively.
985
986 """
987 try:
988 if self._target is not None:
989 self._target(*self._args, **self._kwargs)
990 finally:
991 # Avoid a refcycle if the thread is running a function with
992 # an argument that has a member that points to the thread.
993 del self._target, self._args, self._kwargs
994
995 def _bootstrap(self):
996 # Wrapper around the real bootstrap code that ignores
997 # exceptions during interpreter cleanup. Those typically
998 # happen when a daemon thread wakes up at an unfortunate
999 # moment, finds the world around it destroyed, and raises some
1000 # random exception *** while trying to report the exception in
1001 # _bootstrap_inner() below ***. Those random exceptions
1002 # don't help anybody, and they confuse users, so we suppress
1003 # them. We suppress them only when it appears that the world
1004 # indeed has already been destroyed, so that exceptions in
1005 # _bootstrap_inner() during normal business hours are properly
1006 # reported. Also, we only suppress them for daemonic threads;
1007 # if a non-daemonic encounters this, something else is wrong.
1008 try:
1009 self._bootstrap_inner()
1010 except:
1011 if self._daemonic and _sys is None:
1012 return
1013 raise
1014
1015 def _set_ident(self):
1016 self._ident = get_ident()
1017
1018 if _HAVE_THREAD_NATIVE_ID:
1019 def _set_native_id(self):
1020 self._native_id = get_native_id()
1021
1022 def _set_tstate_lock(self):
1023 """
1024 Set a lock object which will be released by the interpreter when
1025 the underlying thread state (see pystate.h) gets deleted.
1026 """
1027 self._tstate_lock = _set_sentinel()
1028 self._tstate_lock.acquire()
1029
1030 if not self.daemon:
1031 with _shutdown_locks_lock:
1032 _maintain_shutdown_locks()
1033 _shutdown_locks.add(self._tstate_lock)
1034
1035 def _bootstrap_inner(self):
1036 try:
1037 self._set_ident()
1038 self._set_tstate_lock()
1039 if _HAVE_THREAD_NATIVE_ID:
1040 self._set_native_id()
1041 self._started.set()
1042 with _active_limbo_lock:
1043 _active[self._ident] = self
1044 del _limbo[self]
1045
1046 if _trace_hook:
1047 _sys.settrace(_trace_hook)
1048 if _profile_hook:
1049 _sys.setprofile(_profile_hook)
1050
1051 try:
1052 self.run()
1053 except:
1054 self._invoke_excepthook(self)
1055 finally:
1056 self._delete()
1057
1058 def _stop(self):
1059 # After calling ._stop(), .is_alive() returns False and .join() returns
1060 # immediately. ._tstate_lock must be released before calling ._stop().
1061 #
1062 # Normal case: C code at the end of the thread's life
1063 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
1064 # that's detected by our ._wait_for_tstate_lock(), called by .join()
1065 # and .is_alive(). Any number of threads _may_ call ._stop()
1066 # simultaneously (for example, if multiple threads are blocked in
1067 # .join() calls), and they're not serialized. That's harmless -
1068 # they'll just make redundant rebindings of ._is_stopped and
1069 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
1070 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
1071 # (the assert is executed only if ._tstate_lock is None).
1072 #
1073 # Special case: _main_thread releases ._tstate_lock via this
1074 # module's _shutdown() function.
1075 lock = self._tstate_lock
1076 if lock is not None:
1077 assert not lock.locked()
1078 self._is_stopped = True
1079 self._tstate_lock = None
1080 if not self.daemon:
1081 with _shutdown_locks_lock:
1082 # Remove our lock and other released locks from _shutdown_locks
1083 _maintain_shutdown_locks()
1084
1085 def _delete(self):
1086 "Remove current thread from the dict of currently running threads."
1087 with _active_limbo_lock:
1088 del _active[get_ident()]
1089 # There must not be any python code between the previous line
1090 # and after the lock is released. Otherwise a tracing function
1091 # could try to acquire the lock again in the same thread, (in
1092 # current_thread()), and would block.
1093
1094 def join(self, timeout=None):
1095 """Wait until the thread terminates.
1096
1097 This blocks the calling thread until the thread whose join() method is
1098 called terminates -- either normally or through an unhandled exception
1099 or until the optional timeout occurs.
1100
1101 When the timeout argument is present and not None, it should be a
1102 floating point number specifying a timeout for the operation in seconds
1103 (or fractions thereof). As join() always returns None, you must call
1104 is_alive() after join() to decide whether a timeout happened -- if the
1105 thread is still alive, the join() call timed out.
1106
1107 When the timeout argument is not present or None, the operation will
1108 block until the thread terminates.
1109
1110 A thread can be join()ed many times.
1111
1112 join() raises a RuntimeError if an attempt is made to join the current
1113 thread as that would cause a deadlock. It is also an error to join() a
1114 thread before it has been started and attempts to do so raises the same
1115 exception.
1116
1117 """
1118 if not self._initialized:
1119 raise RuntimeError("Thread.__init__() not called")
1120 if not self._started.is_set():
1121 raise RuntimeError("cannot join thread before it is started")
1122 if self is current_thread():
1123 raise RuntimeError("cannot join current thread")
1124
1125 if timeout is None:
1126 self._wait_for_tstate_lock()
1127 else:
1128 # the behavior of a negative timeout isn't documented, but
1129 # historically .join(timeout=x) for x<0 has acted as if timeout=0
1130 self._wait_for_tstate_lock(timeout=max(timeout, 0))
1131
1132 def _wait_for_tstate_lock(self, block=True, timeout=-1):
1133 # Issue #18808: wait for the thread state to be gone.
1134 # At the end of the thread's life, after all knowledge of the thread
1135 # is removed from C data structures, C code releases our _tstate_lock.
1136 # This method passes its arguments to _tstate_lock.acquire().
1137 # If the lock is acquired, the C code is done, and self._stop() is
1138 # called. That sets ._is_stopped to True, and ._tstate_lock to None.
1139 lock = self._tstate_lock
1140 if lock is None:
1141 # already determined that the C code is done
1142 assert self._is_stopped
1143 return
1144
1145 try:
1146 if lock.acquire(block, timeout):
1147 lock.release()
1148 self._stop()
1149 except:
1150 if lock.locked():
1151 # bpo-45274: lock.acquire() acquired the lock, but the function
1152 # was interrupted with an exception before reaching the
1153 # lock.release(). It can happen if a signal handler raises an
1154 # exception, like CTRL+C which raises KeyboardInterrupt.
1155 lock.release()
1156 self._stop()
1157 raise
1158
1159 @property
1160 def name(self):
1161 """A string used for identification purposes only.
1162
1163 It has no semantics. Multiple threads may be given the same name. The
1164 initial name is set by the constructor.
1165
1166 """
1167 assert self._initialized, "Thread.__init__() not called"
1168 return self._name
1169
1170 @name.setter
1171 def name(self, name):
1172 assert self._initialized, "Thread.__init__() not called"
1173 self._name = str(name)
1174
1175 @property
1176 def ident(self):
1177 """Thread identifier of this thread or None if it has not been started.
1178
1179 This is a nonzero integer. See the get_ident() function. Thread
1180 identifiers may be recycled when a thread exits and another thread is
1181 created. The identifier is available even after the thread has exited.
1182
1183 """
1184 assert self._initialized, "Thread.__init__() not called"
1185 return self._ident
1186
1187 if _HAVE_THREAD_NATIVE_ID:
1188 @property
1189 def native_id(self):
1190 """Native integral thread ID of this thread, or None if it has not been started.
1191
1192 This is a non-negative integer. See the get_native_id() function.
1193 This represents the Thread ID as reported by the kernel.
1194
1195 """
1196 assert self._initialized, "Thread.__init__() not called"
1197 return self._native_id
1198
1199 def is_alive(self):
1200 """Return whether the thread is alive.
1201
1202 This method returns True just before the run() method starts until just
1203 after the run() method terminates. See also the module function
1204 enumerate().
1205
1206 """
1207 assert self._initialized, "Thread.__init__() not called"
1208 if self._is_stopped or not self._started.is_set():
1209 return False
1210 self._wait_for_tstate_lock(False)
1211 return not self._is_stopped
1212
1213 @property
1214 def daemon(self):
1215 """A boolean value indicating whether this thread is a daemon thread.
1216
1217 This must be set before start() is called, otherwise RuntimeError is
1218 raised. Its initial value is inherited from the creating thread; the
1219 main thread is not a daemon thread and therefore all threads created in
1220 the main thread default to daemon = False.
1221
1222 The entire Python program exits when only daemon threads are left.
1223
1224 """
1225 assert self._initialized, "Thread.__init__() not called"
1226 return self._daemonic
1227
1228 @daemon.setter
1229 def daemon(self, daemonic):
1230 if not self._initialized:
1231 raise RuntimeError("Thread.__init__() not called")
1232 if daemonic and not _daemon_threads_allowed():
1233 raise RuntimeError('daemon threads are disabled in this interpreter')
1234 if self._started.is_set():
1235 raise RuntimeError("cannot set daemon status of active thread")
1236 self._daemonic = daemonic
1237
1238 def isDaemon(self):
1239 """Return whether this thread is a daemon.
1240
1241 This method is deprecated, use the daemon attribute instead.
1242
1243 """
1244 import warnings
1245 warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
1246 DeprecationWarning, stacklevel=2)
1247 return self.daemon
1248
1249 def setDaemon(self, daemonic):
1250 """Set whether this thread is a daemon.
1251
1252 This method is deprecated, use the .daemon property instead.
1253
1254 """
1255 import warnings
1256 warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
1257 DeprecationWarning, stacklevel=2)
1258 self.daemon = daemonic
1259
1260 def getName(self):
1261 """Return a string used for identification purposes only.
1262
1263 This method is deprecated, use the name attribute instead.
1264
1265 """
1266 import warnings
1267 warnings.warn('getName() is deprecated, get the name attribute instead',
1268 DeprecationWarning, stacklevel=2)
1269 return self.name
1270
1271 def setName(self, name):
1272 """Set the name string for this thread.
1273
1274 This method is deprecated, use the name attribute instead.
1275
1276 """
1277 import warnings
1278 warnings.warn('setName() is deprecated, set the name attribute instead',
1279 DeprecationWarning, stacklevel=2)
1280 self.name = name
1281
1282
1283 try:
1284 from _thread import (_excepthook as excepthook,
1285 _ExceptHookArgs as ExceptHookArgs)
1286 except ImportError:
1287 # Simple Python implementation if _thread._excepthook() is not available
1288 from traceback import print_exception as _print_exception
1289 from collections import namedtuple
1290
1291 _ExceptHookArgs = namedtuple(
1292 'ExceptHookArgs',
1293 'exc_type exc_value exc_traceback thread')
1294
1295 def ExceptHookArgs(args):
1296 return _ExceptHookArgs(*args)
1297
1298 def excepthook(args, /):
1299 """
1300 Handle uncaught Thread.run() exception.
1301 """
1302 if args.exc_type == SystemExit:
1303 # silently ignore SystemExit
1304 return
1305
1306 if _sys is not None and _sys.stderr is not None:
1307 stderr = _sys.stderr
1308 elif args.thread is not None:
1309 stderr = args.thread._stderr
1310 if stderr is None:
1311 # do nothing if sys.stderr is None and sys.stderr was None
1312 # when the thread was created
1313 return
1314 else:
1315 # do nothing if sys.stderr is None and args.thread is None
1316 return
1317
1318 if args.thread is not None:
1319 name = args.thread.name
1320 else:
1321 name = get_ident()
1322 print(f"Exception in thread {name}:",
1323 file=stderr, flush=True)
1324 _print_exception(args.exc_type, args.exc_value, args.exc_traceback,
1325 file=stderr)
1326 stderr.flush()
1327
1328
1329 # Original value of threading.excepthook
1330 __excepthook__ = excepthook
1331
1332
1333 def _make_invoke_excepthook():
1334 # Create a local namespace to ensure that variables remain alive
1335 # when _invoke_excepthook() is called, even if it is called late during
1336 # Python shutdown. It is mostly needed for daemon threads.
1337
1338 old_excepthook = excepthook
1339 old_sys_excepthook = _sys.excepthook
1340 if old_excepthook is None:
1341 raise RuntimeError("threading.excepthook is None")
1342 if old_sys_excepthook is None:
1343 raise RuntimeError("sys.excepthook is None")
1344
1345 sys_exc_info = _sys.exc_info
1346 local_print = print
1347 local_sys = _sys
1348
1349 def invoke_excepthook(thread):
1350 global excepthook
1351 try:
1352 hook = excepthook
1353 if hook is None:
1354 hook = old_excepthook
1355
1356 args = ExceptHookArgs([*sys_exc_info(), thread])
1357
1358 hook(args)
1359 except Exception as exc:
1360 exc.__suppress_context__ = True
1361 del exc
1362
1363 if local_sys is not None and local_sys.stderr is not None:
1364 stderr = local_sys.stderr
1365 else:
1366 stderr = thread._stderr
1367
1368 local_print("Exception in threading.excepthook:",
1369 file=stderr, flush=True)
1370
1371 if local_sys is not None and local_sys.excepthook is not None:
1372 sys_excepthook = local_sys.excepthook
1373 else:
1374 sys_excepthook = old_sys_excepthook
1375
1376 sys_excepthook(*sys_exc_info())
1377 finally:
1378 # Break reference cycle (exception stored in a variable)
1379 args = None
1380
1381 return invoke_excepthook
1382
1383
1384 # The timer class was contributed by Itamar Shtull-Trauring
1385
1386 class ESC[4;38;5;81mTimer(ESC[4;38;5;149mThread):
1387 """Call a function after a specified number of seconds:
1388
1389 t = Timer(30.0, f, args=None, kwargs=None)
1390 t.start()
1391 t.cancel() # stop the timer's action if it's still waiting
1392
1393 """
1394
1395 def __init__(self, interval, function, args=None, kwargs=None):
1396 Thread.__init__(self)
1397 self.interval = interval
1398 self.function = function
1399 self.args = args if args is not None else []
1400 self.kwargs = kwargs if kwargs is not None else {}
1401 self.finished = Event()
1402
1403 def cancel(self):
1404 """Stop the timer if it hasn't finished yet."""
1405 self.finished.set()
1406
1407 def run(self):
1408 self.finished.wait(self.interval)
1409 if not self.finished.is_set():
1410 self.function(*self.args, **self.kwargs)
1411 self.finished.set()
1412
1413
1414 # Special thread class to represent the main thread
1415
1416 class ESC[4;38;5;81m_MainThread(ESC[4;38;5;149mThread):
1417
1418 def __init__(self):
1419 Thread.__init__(self, name="MainThread", daemon=False)
1420 self._set_tstate_lock()
1421 self._started.set()
1422 self._set_ident()
1423 if _HAVE_THREAD_NATIVE_ID:
1424 self._set_native_id()
1425 with _active_limbo_lock:
1426 _active[self._ident] = self
1427
1428
1429 # Dummy thread class to represent threads not started here.
1430 # These aren't garbage collected when they die, nor can they be waited for.
1431 # If they invoke anything in threading.py that calls current_thread(), they
1432 # leave an entry in the _active dict forever after.
1433 # Their purpose is to return *something* from current_thread().
1434 # They are marked as daemon threads so we won't wait for them
1435 # when we exit (conform previous semantics).
1436
1437 class ESC[4;38;5;81m_DummyThread(ESC[4;38;5;149mThread):
1438
1439 def __init__(self):
1440 Thread.__init__(self, name=_newname("Dummy-%d"),
1441 daemon=_daemon_threads_allowed())
1442
1443 self._started.set()
1444 self._set_ident()
1445 if _HAVE_THREAD_NATIVE_ID:
1446 self._set_native_id()
1447 with _active_limbo_lock:
1448 _active[self._ident] = self
1449
1450 def _stop(self):
1451 pass
1452
1453 def is_alive(self):
1454 assert not self._is_stopped and self._started.is_set()
1455 return True
1456
1457 def join(self, timeout=None):
1458 assert False, "cannot join a dummy thread"
1459
1460
1461 # Global API functions
1462
1463 def current_thread():
1464 """Return the current Thread object, corresponding to the caller's thread of control.
1465
1466 If the caller's thread of control was not created through the threading
1467 module, a dummy thread object with limited functionality is returned.
1468
1469 """
1470 try:
1471 return _active[get_ident()]
1472 except KeyError:
1473 return _DummyThread()
1474
1475 def currentThread():
1476 """Return the current Thread object, corresponding to the caller's thread of control.
1477
1478 This function is deprecated, use current_thread() instead.
1479
1480 """
1481 import warnings
1482 warnings.warn('currentThread() is deprecated, use current_thread() instead',
1483 DeprecationWarning, stacklevel=2)
1484 return current_thread()
1485
1486 def active_count():
1487 """Return the number of Thread objects currently alive.
1488
1489 The returned count is equal to the length of the list returned by
1490 enumerate().
1491
1492 """
1493 # NOTE: if the logic in here ever changes, update Modules/posixmodule.c
1494 # warn_about_fork_with_threads() to match.
1495 with _active_limbo_lock:
1496 return len(_active) + len(_limbo)
1497
1498 def activeCount():
1499 """Return the number of Thread objects currently alive.
1500
1501 This function is deprecated, use active_count() instead.
1502
1503 """
1504 import warnings
1505 warnings.warn('activeCount() is deprecated, use active_count() instead',
1506 DeprecationWarning, stacklevel=2)
1507 return active_count()
1508
1509 def _enumerate():
1510 # Same as enumerate(), but without the lock. Internal use only.
1511 return list(_active.values()) + list(_limbo.values())
1512
1513 def enumerate():
1514 """Return a list of all Thread objects currently alive.
1515
1516 The list includes daemonic threads, dummy thread objects created by
1517 current_thread(), and the main thread. It excludes terminated threads and
1518 threads that have not yet been started.
1519
1520 """
1521 with _active_limbo_lock:
1522 return list(_active.values()) + list(_limbo.values())
1523
1524
1525 _threading_atexits = []
1526 _SHUTTING_DOWN = False
1527
1528 def _register_atexit(func, *arg, **kwargs):
1529 """CPython internal: register *func* to be called before joining threads.
1530
1531 The registered *func* is called with its arguments just before all
1532 non-daemon threads are joined in `_shutdown()`. It provides a similar
1533 purpose to `atexit.register()`, but its functions are called prior to
1534 threading shutdown instead of interpreter shutdown.
1535
1536 For similarity to atexit, the registered functions are called in reverse.
1537 """
1538 if _SHUTTING_DOWN:
1539 raise RuntimeError("can't register atexit after shutdown")
1540
1541 call = functools.partial(func, *arg, **kwargs)
1542 _threading_atexits.append(call)
1543
1544
1545 from _thread import stack_size
1546
1547 # Create the main thread object,
1548 # and make it available for the interpreter
1549 # (Py_Main) as threading._shutdown.
1550
1551 _main_thread = _MainThread()
1552
1553 def _shutdown():
1554 """
1555 Wait until the Python thread state of all non-daemon threads get deleted.
1556 """
1557 # Obscure: other threads may be waiting to join _main_thread. That's
1558 # dubious, but some code does it. We can't wait for C code to release
1559 # the main thread's tstate_lock - that won't happen until the interpreter
1560 # is nearly dead. So we release it here. Note that just calling _stop()
1561 # isn't enough: other threads may already be waiting on _tstate_lock.
1562 if _main_thread._is_stopped:
1563 # _shutdown() was already called
1564 return
1565
1566 global _SHUTTING_DOWN
1567 _SHUTTING_DOWN = True
1568
1569 # Call registered threading atexit functions before threads are joined.
1570 # Order is reversed, similar to atexit.
1571 for atexit_call in reversed(_threading_atexits):
1572 atexit_call()
1573
1574 # Main thread
1575 if _main_thread.ident == get_ident():
1576 tlock = _main_thread._tstate_lock
1577 # The main thread isn't finished yet, so its thread state lock can't
1578 # have been released.
1579 assert tlock is not None
1580 assert tlock.locked()
1581 tlock.release()
1582 _main_thread._stop()
1583 else:
1584 # bpo-1596321: _shutdown() must be called in the main thread.
1585 # If the threading module was not imported by the main thread,
1586 # _main_thread is the thread which imported the threading module.
1587 # In this case, ignore _main_thread, similar behavior than for threads
1588 # spawned by C libraries or using _thread.start_new_thread().
1589 pass
1590
1591 # Join all non-deamon threads
1592 while True:
1593 with _shutdown_locks_lock:
1594 locks = list(_shutdown_locks)
1595 _shutdown_locks.clear()
1596
1597 if not locks:
1598 break
1599
1600 for lock in locks:
1601 # mimic Thread.join()
1602 lock.acquire()
1603 lock.release()
1604
1605 # new threads can be spawned while we were waiting for the other
1606 # threads to complete
1607
1608
1609 def main_thread():
1610 """Return the main thread object.
1611
1612 In normal conditions, the main thread is the thread from which the
1613 Python interpreter was started.
1614 """
1615 return _main_thread
1616
1617 # get thread-local implementation, either from the thread
1618 # module, or from the python fallback
1619
1620 try:
1621 from _thread import _local as local
1622 except ImportError:
1623 from _threading_local import local
1624
1625
1626 def _after_fork():
1627 """
1628 Cleanup threading module state that should not exist after a fork.
1629 """
1630 # Reset _active_limbo_lock, in case we forked while the lock was held
1631 # by another (non-forked) thread. http://bugs.python.org/issue874900
1632 global _active_limbo_lock, _main_thread
1633 global _shutdown_locks_lock, _shutdown_locks
1634 _active_limbo_lock = RLock()
1635
1636 # fork() only copied the current thread; clear references to others.
1637 new_active = {}
1638
1639 try:
1640 current = _active[get_ident()]
1641 except KeyError:
1642 # fork() was called in a thread which was not spawned
1643 # by threading.Thread. For example, a thread spawned
1644 # by thread.start_new_thread().
1645 current = _MainThread()
1646
1647 _main_thread = current
1648
1649 # reset _shutdown() locks: threads re-register their _tstate_lock below
1650 _shutdown_locks_lock = _allocate_lock()
1651 _shutdown_locks = set()
1652
1653 with _active_limbo_lock:
1654 # Dangling thread instances must still have their locks reset,
1655 # because someone may join() them.
1656 threads = set(_enumerate())
1657 threads.update(_dangling)
1658 for thread in threads:
1659 # Any lock/condition variable may be currently locked or in an
1660 # invalid state, so we reinitialize them.
1661 if thread is current:
1662 # There is only one active thread. We reset the ident to
1663 # its new value since it can have changed.
1664 thread._reset_internal_locks(True)
1665 ident = get_ident()
1666 thread._ident = ident
1667 new_active[ident] = thread
1668 else:
1669 # All the others are already stopped.
1670 thread._reset_internal_locks(False)
1671 thread._stop()
1672
1673 _limbo.clear()
1674 _active.clear()
1675 _active.update(new_active)
1676 assert len(_active) == 1
1677
1678
1679 if hasattr(_os, "register_at_fork"):
1680 _os.register_at_fork(after_in_child=_after_fork)