1 """Thread module emulating a subset of Java's threading model."""
2
3 import os as _os
4 import sys as _sys
5 import _thread
6 import functools
7
8 from time import monotonic as _time
9 from _weakrefset import WeakSet
10 from itertools import islice as _islice, count as _count
11 try:
12 from _collections import deque as _deque
13 except ImportError:
14 from collections import deque as _deque
15
16 # Note regarding PEP 8 compliant names
17 # This threading model was originally inspired by Java, and inherited
18 # the convention of camelCase function and method names from that
19 # language. Those original names are not in any imminent danger of
20 # being deprecated (even for Py3k),so this module provides them as an
21 # alias for the PEP 8 compliant names
22 # Note that using the new PEP 8 compliant names facilitates substitution
23 # with the multiprocessing module, which doesn't provide the old
24 # Java inspired names.
25
26 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
27 'enumerate', 'main_thread', 'TIMEOUT_MAX',
28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
30 'setprofile', 'settrace', 'local', 'stack_size',
31 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile']
32
33 # Rename some stuff so "from threading import *" is safe
34 _start_new_thread = _thread.start_new_thread
35 _allocate_lock = _thread.allocate_lock
36 _set_sentinel = _thread._set_sentinel
37 get_ident = _thread.get_ident
38 try:
39 get_native_id = _thread.get_native_id
40 _HAVE_THREAD_NATIVE_ID = True
41 __all__.append('get_native_id')
42 except AttributeError:
43 _HAVE_THREAD_NATIVE_ID = False
44 ThreadError = _thread.error
45 try:
46 _CRLock = _thread.RLock
47 except AttributeError:
48 _CRLock = None
49 TIMEOUT_MAX = _thread.TIMEOUT_MAX
50 del _thread
51
52
53 # Support for profile and trace hooks
54
55 _profile_hook = None
56 _trace_hook = None
57
58 def setprofile(func):
59 """Set a profile function for all threads started from the threading module.
60
61 The func will be passed to sys.setprofile() for each thread, before its
62 run() method is called.
63
64 """
65 global _profile_hook
66 _profile_hook = func
67
68 def getprofile():
69 """Get the profiler function as set by threading.setprofile()."""
70 return _profile_hook
71
72 def settrace(func):
73 """Set a trace function for all threads started from the threading module.
74
75 The func will be passed to sys.settrace() for each thread, before its run()
76 method is called.
77
78 """
79 global _trace_hook
80 _trace_hook = func
81
82 def gettrace():
83 """Get the trace function as set by threading.settrace()."""
84 return _trace_hook
85
86 # Synchronization classes
87
88 Lock = _allocate_lock
89
90 def RLock(*args, **kwargs):
91 """Factory function that returns a new reentrant lock.
92
93 A reentrant lock must be released by the thread that acquired it. Once a
94 thread has acquired a reentrant lock, the same thread may acquire it again
95 without blocking; the thread must release it once for each time it has
96 acquired it.
97
98 """
99 if _CRLock is None:
100 return _PyRLock(*args, **kwargs)
101 return _CRLock(*args, **kwargs)
102
103 class ESC[4;38;5;81m_RLock:
104 """This class implements reentrant lock objects.
105
106 A reentrant lock must be released by the thread that acquired it. Once a
107 thread has acquired a reentrant lock, the same thread may acquire it
108 again without blocking; the thread must release it once for each time it
109 has acquired it.
110
111 """
112
113 def __init__(self):
114 self._block = _allocate_lock()
115 self._owner = None
116 self._count = 0
117
118 def __repr__(self):
119 owner = self._owner
120 try:
121 owner = _active[owner].name
122 except KeyError:
123 pass
124 return "<%s %s.%s object owner=%r count=%d at %s>" % (
125 "locked" if self._block.locked() else "unlocked",
126 self.__class__.__module__,
127 self.__class__.__qualname__,
128 owner,
129 self._count,
130 hex(id(self))
131 )
132
133 def _at_fork_reinit(self):
134 self._block._at_fork_reinit()
135 self._owner = None
136 self._count = 0
137
138 def acquire(self, blocking=True, timeout=-1):
139 """Acquire a lock, blocking or non-blocking.
140
141 When invoked without arguments: if this thread already owns the lock,
142 increment the recursion level by one, and return immediately. Otherwise,
143 if another thread owns the lock, block until the lock is unlocked. Once
144 the lock is unlocked (not owned by any thread), then grab ownership, set
145 the recursion level to one, and return. If more than one thread is
146 blocked waiting until the lock is unlocked, only one at a time will be
147 able to grab ownership of the lock. There is no return value in this
148 case.
149
150 When invoked with the blocking argument set to true, do the same thing
151 as when called without arguments, and return true.
152
153 When invoked with the blocking argument set to false, do not block. If a
154 call without an argument would block, return false immediately;
155 otherwise, do the same thing as when called without arguments, and
156 return true.
157
158 When invoked with the floating-point timeout argument set to a positive
159 value, block for at most the number of seconds specified by timeout
160 and as long as the lock cannot be acquired. Return true if the lock has
161 been acquired, false if the timeout has elapsed.
162
163 """
164 me = get_ident()
165 if self._owner == me:
166 self._count += 1
167 return 1
168 rc = self._block.acquire(blocking, timeout)
169 if rc:
170 self._owner = me
171 self._count = 1
172 return rc
173
174 __enter__ = acquire
175
176 def release(self):
177 """Release a lock, decrementing the recursion level.
178
179 If after the decrement it is zero, reset the lock to unlocked (not owned
180 by any thread), and if any other threads are blocked waiting for the
181 lock to become unlocked, allow exactly one of them to proceed. If after
182 the decrement the recursion level is still nonzero, the lock remains
183 locked and owned by the calling thread.
184
185 Only call this method when the calling thread owns the lock. A
186 RuntimeError is raised if this method is called when the lock is
187 unlocked.
188
189 There is no return value.
190
191 """
192 if self._owner != get_ident():
193 raise RuntimeError("cannot release un-acquired lock")
194 self._count = count = self._count - 1
195 if not count:
196 self._owner = None
197 self._block.release()
198
199 def __exit__(self, t, v, tb):
200 self.release()
201
202 # Internal methods used by condition variables
203
204 def _acquire_restore(self, state):
205 self._block.acquire()
206 self._count, self._owner = state
207
208 def _release_save(self):
209 if self._count == 0:
210 raise RuntimeError("cannot release un-acquired lock")
211 count = self._count
212 self._count = 0
213 owner = self._owner
214 self._owner = None
215 self._block.release()
216 return (count, owner)
217
218 def _is_owned(self):
219 return self._owner == get_ident()
220
221 # Internal method used for reentrancy checks
222
223 def _recursion_count(self):
224 if self._owner != get_ident():
225 return 0
226 return self._count
227
228 _PyRLock = _RLock
229
230
231 class ESC[4;38;5;81mCondition:
232 """Class that implements a condition variable.
233
234 A condition variable allows one or more threads to wait until they are
235 notified by another thread.
236
237 If the lock argument is given and not None, it must be a Lock or RLock
238 object, and it is used as the underlying lock. Otherwise, a new RLock object
239 is created and used as the underlying lock.
240
241 """
242
243 def __init__(self, lock=None):
244 if lock is None:
245 lock = RLock()
246 self._lock = lock
247 # Export the lock's acquire() and release() methods
248 self.acquire = lock.acquire
249 self.release = lock.release
250 # If the lock defines _release_save() and/or _acquire_restore(),
251 # these override the default implementations (which just call
252 # release() and acquire() on the lock). Ditto for _is_owned().
253 try:
254 self._release_save = lock._release_save
255 except AttributeError:
256 pass
257 try:
258 self._acquire_restore = lock._acquire_restore
259 except AttributeError:
260 pass
261 try:
262 self._is_owned = lock._is_owned
263 except AttributeError:
264 pass
265 self._waiters = _deque()
266
267 def _at_fork_reinit(self):
268 self._lock._at_fork_reinit()
269 self._waiters.clear()
270
271 def __enter__(self):
272 return self._lock.__enter__()
273
274 def __exit__(self, *args):
275 return self._lock.__exit__(*args)
276
277 def __repr__(self):
278 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
279
280 def _release_save(self):
281 self._lock.release() # No state to save
282
283 def _acquire_restore(self, x):
284 self._lock.acquire() # Ignore saved state
285
286 def _is_owned(self):
287 # Return True if lock is owned by current_thread.
288 # This method is called only if _lock doesn't have _is_owned().
289 if self._lock.acquire(False):
290 self._lock.release()
291 return False
292 else:
293 return True
294
295 def wait(self, timeout=None):
296 """Wait until notified or until a timeout occurs.
297
298 If the calling thread has not acquired the lock when this method is
299 called, a RuntimeError is raised.
300
301 This method releases the underlying lock, and then blocks until it is
302 awakened by a notify() or notify_all() call for the same condition
303 variable in another thread, or until the optional timeout occurs. Once
304 awakened or timed out, it re-acquires the lock and returns.
305
306 When the timeout argument is present and not None, it should be a
307 floating point number specifying a timeout for the operation in seconds
308 (or fractions thereof).
309
310 When the underlying lock is an RLock, it is not released using its
311 release() method, since this may not actually unlock the lock when it
312 was acquired multiple times recursively. Instead, an internal interface
313 of the RLock class is used, which really unlocks it even when it has
314 been recursively acquired several times. Another internal interface is
315 then used to restore the recursion level when the lock is reacquired.
316
317 """
318 if not self._is_owned():
319 raise RuntimeError("cannot wait on un-acquired lock")
320 waiter = _allocate_lock()
321 waiter.acquire()
322 self._waiters.append(waiter)
323 saved_state = self._release_save()
324 gotit = False
325 try: # restore state no matter what (e.g., KeyboardInterrupt)
326 if timeout is None:
327 waiter.acquire()
328 gotit = True
329 else:
330 if timeout > 0:
331 gotit = waiter.acquire(True, timeout)
332 else:
333 gotit = waiter.acquire(False)
334 return gotit
335 finally:
336 self._acquire_restore(saved_state)
337 if not gotit:
338 try:
339 self._waiters.remove(waiter)
340 except ValueError:
341 pass
342
343 def wait_for(self, predicate, timeout=None):
344 """Wait until a condition evaluates to True.
345
346 predicate should be a callable which result will be interpreted as a
347 boolean value. A timeout may be provided giving the maximum time to
348 wait.
349
350 """
351 endtime = None
352 waittime = timeout
353 result = predicate()
354 while not result:
355 if waittime is not None:
356 if endtime is None:
357 endtime = _time() + waittime
358 else:
359 waittime = endtime - _time()
360 if waittime <= 0:
361 break
362 self.wait(waittime)
363 result = predicate()
364 return result
365
366 def notify(self, n=1):
367 """Wake up one or more threads waiting on this condition, if any.
368
369 If the calling thread has not acquired the lock when this method is
370 called, a RuntimeError is raised.
371
372 This method wakes up at most n of the threads waiting for the condition
373 variable; it is a no-op if no threads are waiting.
374
375 """
376 if not self._is_owned():
377 raise RuntimeError("cannot notify on un-acquired lock")
378 waiters = self._waiters
379 while waiters and n > 0:
380 waiter = waiters[0]
381 try:
382 waiter.release()
383 except RuntimeError:
384 # gh-92530: The previous call of notify() released the lock,
385 # but was interrupted before removing it from the queue.
386 # It can happen if a signal handler raises an exception,
387 # like CTRL+C which raises KeyboardInterrupt.
388 pass
389 else:
390 n -= 1
391 try:
392 waiters.remove(waiter)
393 except ValueError:
394 pass
395
396 def notify_all(self):
397 """Wake up all threads waiting on this condition.
398
399 If the calling thread has not acquired the lock when this method
400 is called, a RuntimeError is raised.
401
402 """
403 self.notify(len(self._waiters))
404
405 def notifyAll(self):
406 """Wake up all threads waiting on this condition.
407
408 This method is deprecated, use notify_all() instead.
409
410 """
411 import warnings
412 warnings.warn('notifyAll() is deprecated, use notify_all() instead',
413 DeprecationWarning, stacklevel=2)
414 self.notify_all()
415
416
417 class ESC[4;38;5;81mSemaphore:
418 """This class implements semaphore objects.
419
420 Semaphores manage a counter representing the number of release() calls minus
421 the number of acquire() calls, plus an initial value. The acquire() method
422 blocks if necessary until it can return without making the counter
423 negative. If not given, value defaults to 1.
424
425 """
426
427 # After Tim Peters' semaphore class, but not quite the same (no maximum)
428
429 def __init__(self, value=1):
430 if value < 0:
431 raise ValueError("semaphore initial value must be >= 0")
432 self._cond = Condition(Lock())
433 self._value = value
434
435 def __repr__(self):
436 cls = self.__class__
437 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
438 f" value={self._value}>")
439
440 def acquire(self, blocking=True, timeout=None):
441 """Acquire a semaphore, decrementing the internal counter by one.
442
443 When invoked without arguments: if the internal counter is larger than
444 zero on entry, decrement it by one and return immediately. If it is zero
445 on entry, block, waiting until some other thread has called release() to
446 make it larger than zero. This is done with proper interlocking so that
447 if multiple acquire() calls are blocked, release() will wake exactly one
448 of them up. The implementation may pick one at random, so the order in
449 which blocked threads are awakened should not be relied on. There is no
450 return value in this case.
451
452 When invoked with blocking set to true, do the same thing as when called
453 without arguments, and return true.
454
455 When invoked with blocking set to false, do not block. If a call without
456 an argument would block, return false immediately; otherwise, do the
457 same thing as when called without arguments, and return true.
458
459 When invoked with a timeout other than None, it will block for at
460 most timeout seconds. If acquire does not complete successfully in
461 that interval, return false. Return true otherwise.
462
463 """
464 if not blocking and timeout is not None:
465 raise ValueError("can't specify timeout for non-blocking acquire")
466 rc = False
467 endtime = None
468 with self._cond:
469 while self._value == 0:
470 if not blocking:
471 break
472 if timeout is not None:
473 if endtime is None:
474 endtime = _time() + timeout
475 else:
476 timeout = endtime - _time()
477 if timeout <= 0:
478 break
479 self._cond.wait(timeout)
480 else:
481 self._value -= 1
482 rc = True
483 return rc
484
485 __enter__ = acquire
486
487 def release(self, n=1):
488 """Release a semaphore, incrementing the internal counter by one or more.
489
490 When the counter is zero on entry and another thread is waiting for it
491 to become larger than zero again, wake up that thread.
492
493 """
494 if n < 1:
495 raise ValueError('n must be one or more')
496 with self._cond:
497 self._value += n
498 for i in range(n):
499 self._cond.notify()
500
501 def __exit__(self, t, v, tb):
502 self.release()
503
504
505 class ESC[4;38;5;81mBoundedSemaphore(ESC[4;38;5;149mSemaphore):
506 """Implements a bounded semaphore.
507
508 A bounded semaphore checks to make sure its current value doesn't exceed its
509 initial value. If it does, ValueError is raised. In most situations
510 semaphores are used to guard resources with limited capacity.
511
512 If the semaphore is released too many times it's a sign of a bug. If not
513 given, value defaults to 1.
514
515 Like regular semaphores, bounded semaphores manage a counter representing
516 the number of release() calls minus the number of acquire() calls, plus an
517 initial value. The acquire() method blocks if necessary until it can return
518 without making the counter negative. If not given, value defaults to 1.
519
520 """
521
522 def __init__(self, value=1):
523 Semaphore.__init__(self, value)
524 self._initial_value = value
525
526 def __repr__(self):
527 cls = self.__class__
528 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
529 f" value={self._value}/{self._initial_value}>")
530
531 def release(self, n=1):
532 """Release a semaphore, incrementing the internal counter by one or more.
533
534 When the counter is zero on entry and another thread is waiting for it
535 to become larger than zero again, wake up that thread.
536
537 If the number of releases exceeds the number of acquires,
538 raise a ValueError.
539
540 """
541 if n < 1:
542 raise ValueError('n must be one or more')
543 with self._cond:
544 if self._value + n > self._initial_value:
545 raise ValueError("Semaphore released too many times")
546 self._value += n
547 for i in range(n):
548 self._cond.notify()
549
550
551 class ESC[4;38;5;81mEvent:
552 """Class implementing event objects.
553
554 Events manage a flag that can be set to true with the set() method and reset
555 to false with the clear() method. The wait() method blocks until the flag is
556 true. The flag is initially false.
557
558 """
559
560 # After Tim Peters' event class (without is_posted())
561
562 def __init__(self):
563 self._cond = Condition(Lock())
564 self._flag = False
565
566 def __repr__(self):
567 cls = self.__class__
568 status = 'set' if self._flag else 'unset'
569 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>"
570
571 def _at_fork_reinit(self):
572 # Private method called by Thread._reset_internal_locks()
573 self._cond._at_fork_reinit()
574
575 def is_set(self):
576 """Return true if and only if the internal flag is true."""
577 return self._flag
578
579 def isSet(self):
580 """Return true if and only if the internal flag is true.
581
582 This method is deprecated, use is_set() instead.
583
584 """
585 import warnings
586 warnings.warn('isSet() is deprecated, use is_set() instead',
587 DeprecationWarning, stacklevel=2)
588 return self.is_set()
589
590 def set(self):
591 """Set the internal flag to true.
592
593 All threads waiting for it to become true are awakened. Threads
594 that call wait() once the flag is true will not block at all.
595
596 """
597 with self._cond:
598 self._flag = True
599 self._cond.notify_all()
600
601 def clear(self):
602 """Reset the internal flag to false.
603
604 Subsequently, threads calling wait() will block until set() is called to
605 set the internal flag to true again.
606
607 """
608 with self._cond:
609 self._flag = False
610
611 def wait(self, timeout=None):
612 """Block until the internal flag is true.
613
614 If the internal flag is true on entry, return immediately. Otherwise,
615 block until another thread calls set() to set the flag to true, or until
616 the optional timeout occurs.
617
618 When the timeout argument is present and not None, it should be a
619 floating point number specifying a timeout for the operation in seconds
620 (or fractions thereof).
621
622 This method returns the internal flag on exit, so it will always return
623 True except if a timeout is given and the operation times out.
624
625 """
626 with self._cond:
627 signaled = self._flag
628 if not signaled:
629 signaled = self._cond.wait(timeout)
630 return signaled
631
632
633 # A barrier class. Inspired in part by the pthread_barrier_* api and
634 # the CyclicBarrier class from Java. See
635 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
636 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
637 # CyclicBarrier.html
638 # for information.
639 # We maintain two main states, 'filling' and 'draining' enabling the barrier
640 # to be cyclic. Threads are not allowed into it until it has fully drained
641 # since the previous cycle. In addition, a 'resetting' state exists which is
642 # similar to 'draining' except that threads leave with a BrokenBarrierError,
643 # and a 'broken' state in which all threads get the exception.
644 class ESC[4;38;5;81mBarrier:
645 """Implements a Barrier.
646
647 Useful for synchronizing a fixed number of threads at known synchronization
648 points. Threads block on 'wait()' and are simultaneously awoken once they
649 have all made that call.
650
651 """
652
653 def __init__(self, parties, action=None, timeout=None):
654 """Create a barrier, initialised to 'parties' threads.
655
656 'action' is a callable which, when supplied, will be called by one of
657 the threads after they have all entered the barrier and just prior to
658 releasing them all. If a 'timeout' is provided, it is used as the
659 default for all subsequent 'wait()' calls.
660
661 """
662 self._cond = Condition(Lock())
663 self._action = action
664 self._timeout = timeout
665 self._parties = parties
666 self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken
667 self._count = 0
668
669 def __repr__(self):
670 cls = self.__class__
671 if self.broken:
672 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: broken>"
673 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
674 f" waiters={self.n_waiting}/{self.parties}>")
675
676 def wait(self, timeout=None):
677 """Wait for the barrier.
678
679 When the specified number of threads have started waiting, they are all
680 simultaneously awoken. If an 'action' was provided for the barrier, one
681 of the threads will have executed that callback prior to returning.
682 Returns an individual index number from 0 to 'parties-1'.
683
684 """
685 if timeout is None:
686 timeout = self._timeout
687 with self._cond:
688 self._enter() # Block while the barrier drains.
689 index = self._count
690 self._count += 1
691 try:
692 if index + 1 == self._parties:
693 # We release the barrier
694 self._release()
695 else:
696 # We wait until someone releases us
697 self._wait(timeout)
698 return index
699 finally:
700 self._count -= 1
701 # Wake up any threads waiting for barrier to drain.
702 self._exit()
703
704 # Block until the barrier is ready for us, or raise an exception
705 # if it is broken.
706 def _enter(self):
707 while self._state in (-1, 1):
708 # It is draining or resetting, wait until done
709 self._cond.wait()
710 #see if the barrier is in a broken state
711 if self._state < 0:
712 raise BrokenBarrierError
713 assert self._state == 0
714
715 # Optionally run the 'action' and release the threads waiting
716 # in the barrier.
717 def _release(self):
718 try:
719 if self._action:
720 self._action()
721 # enter draining state
722 self._state = 1
723 self._cond.notify_all()
724 except:
725 #an exception during the _action handler. Break and reraise
726 self._break()
727 raise
728
729 # Wait in the barrier until we are released. Raise an exception
730 # if the barrier is reset or broken.
731 def _wait(self, timeout):
732 if not self._cond.wait_for(lambda : self._state != 0, timeout):
733 #timed out. Break the barrier
734 self._break()
735 raise BrokenBarrierError
736 if self._state < 0:
737 raise BrokenBarrierError
738 assert self._state == 1
739
740 # If we are the last thread to exit the barrier, signal any threads
741 # waiting for the barrier to drain.
742 def _exit(self):
743 if self._count == 0:
744 if self._state in (-1, 1):
745 #resetting or draining
746 self._state = 0
747 self._cond.notify_all()
748
749 def reset(self):
750 """Reset the barrier to the initial state.
751
752 Any threads currently waiting will get the BrokenBarrier exception
753 raised.
754
755 """
756 with self._cond:
757 if self._count > 0:
758 if self._state == 0:
759 #reset the barrier, waking up threads
760 self._state = -1
761 elif self._state == -2:
762 #was broken, set it to reset state
763 #which clears when the last thread exits
764 self._state = -1
765 else:
766 self._state = 0
767 self._cond.notify_all()
768
769 def abort(self):
770 """Place the barrier into a 'broken' state.
771
772 Useful in case of error. Any currently waiting threads and threads
773 attempting to 'wait()' will have BrokenBarrierError raised.
774
775 """
776 with self._cond:
777 self._break()
778
779 def _break(self):
780 # An internal error was detected. The barrier is set to
781 # a broken state all parties awakened.
782 self._state = -2
783 self._cond.notify_all()
784
785 @property
786 def parties(self):
787 """Return the number of threads required to trip the barrier."""
788 return self._parties
789
790 @property
791 def n_waiting(self):
792 """Return the number of threads currently waiting at the barrier."""
793 # We don't need synchronization here since this is an ephemeral result
794 # anyway. It returns the correct value in the steady state.
795 if self._state == 0:
796 return self._count
797 return 0
798
799 @property
800 def broken(self):
801 """Return True if the barrier is in a broken state."""
802 return self._state == -2
803
804 # exception raised by the Barrier class
805 class ESC[4;38;5;81mBrokenBarrierError(ESC[4;38;5;149mRuntimeError):
806 pass
807
808
809 # Helper to generate new thread names
810 _counter = _count(1).__next__
811 def _newname(name_template):
812 return name_template % _counter()
813
814 # Active thread administration.
815 #
816 # bpo-44422: Use a reentrant lock to allow reentrant calls to functions like
817 # threading.enumerate().
818 _active_limbo_lock = RLock()
819 _active = {} # maps thread id to Thread object
820 _limbo = {}
821 _dangling = WeakSet()
822
823 # Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
824 # to wait until all Python thread states get deleted:
825 # see Thread._set_tstate_lock().
826 _shutdown_locks_lock = _allocate_lock()
827 _shutdown_locks = set()
828
829 def _maintain_shutdown_locks():
830 """
831 Drop any shutdown locks that don't correspond to running threads anymore.
832
833 Calling this from time to time avoids an ever-growing _shutdown_locks
834 set when Thread objects are not joined explicitly. See bpo-37788.
835
836 This must be called with _shutdown_locks_lock acquired.
837 """
838 # If a lock was released, the corresponding thread has exited
839 to_remove = [lock for lock in _shutdown_locks if not lock.locked()]
840 _shutdown_locks.difference_update(to_remove)
841
842
843 # Main class for threads
844
845 class ESC[4;38;5;81mThread:
846 """A class that represents a thread of control.
847
848 This class can be safely subclassed in a limited fashion. There are two ways
849 to specify the activity: by passing a callable object to the constructor, or
850 by overriding the run() method in a subclass.
851
852 """
853
854 _initialized = False
855
856 def __init__(self, group=None, target=None, name=None,
857 args=(), kwargs=None, *, daemon=None):
858 """This constructor should always be called with keyword arguments. Arguments are:
859
860 *group* should be None; reserved for future extension when a ThreadGroup
861 class is implemented.
862
863 *target* is the callable object to be invoked by the run()
864 method. Defaults to None, meaning nothing is called.
865
866 *name* is the thread name. By default, a unique name is constructed of
867 the form "Thread-N" where N is a small decimal number.
868
869 *args* is a list or tuple of arguments for the target invocation. Defaults to ().
870
871 *kwargs* is a dictionary of keyword arguments for the target
872 invocation. Defaults to {}.
873
874 If a subclass overrides the constructor, it must make sure to invoke
875 the base class constructor (Thread.__init__()) before doing anything
876 else to the thread.
877
878 """
879 assert group is None, "group argument must be None for now"
880 if kwargs is None:
881 kwargs = {}
882 if name:
883 name = str(name)
884 else:
885 name = _newname("Thread-%d")
886 if target is not None:
887 try:
888 target_name = target.__name__
889 name += f" ({target_name})"
890 except AttributeError:
891 pass
892
893 self._target = target
894 self._name = name
895 self._args = args
896 self._kwargs = kwargs
897 if daemon is not None:
898 self._daemonic = daemon
899 else:
900 self._daemonic = current_thread().daemon
901 self._ident = None
902 if _HAVE_THREAD_NATIVE_ID:
903 self._native_id = None
904 self._tstate_lock = None
905 self._started = Event()
906 self._is_stopped = False
907 self._initialized = True
908 # Copy of sys.stderr used by self._invoke_excepthook()
909 self._stderr = _sys.stderr
910 self._invoke_excepthook = _make_invoke_excepthook()
911 # For debugging and _after_fork()
912 _dangling.add(self)
913
914 def _reset_internal_locks(self, is_alive):
915 # private! Called by _after_fork() to reset our internal locks as
916 # they may be in an invalid state leading to a deadlock or crash.
917 self._started._at_fork_reinit()
918 if is_alive:
919 # bpo-42350: If the fork happens when the thread is already stopped
920 # (ex: after threading._shutdown() has been called), _tstate_lock
921 # is None. Do nothing in this case.
922 if self._tstate_lock is not None:
923 self._tstate_lock._at_fork_reinit()
924 self._tstate_lock.acquire()
925 else:
926 # The thread isn't alive after fork: it doesn't have a tstate
927 # anymore.
928 self._is_stopped = True
929 self._tstate_lock = None
930
931 def __repr__(self):
932 assert self._initialized, "Thread.__init__() was not called"
933 status = "initial"
934 if self._started.is_set():
935 status = "started"
936 self.is_alive() # easy way to get ._is_stopped set when appropriate
937 if self._is_stopped:
938 status = "stopped"
939 if self._daemonic:
940 status += " daemon"
941 if self._ident is not None:
942 status += " %s" % self._ident
943 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
944
945 def start(self):
946 """Start the thread's activity.
947
948 It must be called at most once per thread object. It arranges for the
949 object's run() method to be invoked in a separate thread of control.
950
951 This method will raise a RuntimeError if called more than once on the
952 same thread object.
953
954 """
955 if not self._initialized:
956 raise RuntimeError("thread.__init__() not called")
957
958 if self._started.is_set():
959 raise RuntimeError("threads can only be started once")
960
961 with _active_limbo_lock:
962 _limbo[self] = self
963 try:
964 _start_new_thread(self._bootstrap, ())
965 except Exception:
966 with _active_limbo_lock:
967 del _limbo[self]
968 raise
969 self._started.wait()
970
971 def run(self):
972 """Method representing the thread's activity.
973
974 You may override this method in a subclass. The standard run() method
975 invokes the callable object passed to the object's constructor as the
976 target argument, if any, with sequential and keyword arguments taken
977 from the args and kwargs arguments, respectively.
978
979 """
980 try:
981 if self._target is not None:
982 self._target(*self._args, **self._kwargs)
983 finally:
984 # Avoid a refcycle if the thread is running a function with
985 # an argument that has a member that points to the thread.
986 del self._target, self._args, self._kwargs
987
988 def _bootstrap(self):
989 # Wrapper around the real bootstrap code that ignores
990 # exceptions during interpreter cleanup. Those typically
991 # happen when a daemon thread wakes up at an unfortunate
992 # moment, finds the world around it destroyed, and raises some
993 # random exception *** while trying to report the exception in
994 # _bootstrap_inner() below ***. Those random exceptions
995 # don't help anybody, and they confuse users, so we suppress
996 # them. We suppress them only when it appears that the world
997 # indeed has already been destroyed, so that exceptions in
998 # _bootstrap_inner() during normal business hours are properly
999 # reported. Also, we only suppress them for daemonic threads;
1000 # if a non-daemonic encounters this, something else is wrong.
1001 try:
1002 self._bootstrap_inner()
1003 except:
1004 if self._daemonic and _sys is None:
1005 return
1006 raise
1007
1008 def _set_ident(self):
1009 self._ident = get_ident()
1010
1011 if _HAVE_THREAD_NATIVE_ID:
1012 def _set_native_id(self):
1013 self._native_id = get_native_id()
1014
1015 def _set_tstate_lock(self):
1016 """
1017 Set a lock object which will be released by the interpreter when
1018 the underlying thread state (see pystate.h) gets deleted.
1019 """
1020 self._tstate_lock = _set_sentinel()
1021 self._tstate_lock.acquire()
1022
1023 if not self.daemon:
1024 with _shutdown_locks_lock:
1025 _maintain_shutdown_locks()
1026 _shutdown_locks.add(self._tstate_lock)
1027
1028 def _bootstrap_inner(self):
1029 try:
1030 self._set_ident()
1031 self._set_tstate_lock()
1032 if _HAVE_THREAD_NATIVE_ID:
1033 self._set_native_id()
1034 self._started.set()
1035 with _active_limbo_lock:
1036 _active[self._ident] = self
1037 del _limbo[self]
1038
1039 if _trace_hook:
1040 _sys.settrace(_trace_hook)
1041 if _profile_hook:
1042 _sys.setprofile(_profile_hook)
1043
1044 try:
1045 self.run()
1046 except:
1047 self._invoke_excepthook(self)
1048 finally:
1049 self._delete()
1050
1051 def _stop(self):
1052 # After calling ._stop(), .is_alive() returns False and .join() returns
1053 # immediately. ._tstate_lock must be released before calling ._stop().
1054 #
1055 # Normal case: C code at the end of the thread's life
1056 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
1057 # that's detected by our ._wait_for_tstate_lock(), called by .join()
1058 # and .is_alive(). Any number of threads _may_ call ._stop()
1059 # simultaneously (for example, if multiple threads are blocked in
1060 # .join() calls), and they're not serialized. That's harmless -
1061 # they'll just make redundant rebindings of ._is_stopped and
1062 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
1063 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
1064 # (the assert is executed only if ._tstate_lock is None).
1065 #
1066 # Special case: _main_thread releases ._tstate_lock via this
1067 # module's _shutdown() function.
1068 lock = self._tstate_lock
1069 if lock is not None:
1070 assert not lock.locked()
1071 self._is_stopped = True
1072 self._tstate_lock = None
1073 if not self.daemon:
1074 with _shutdown_locks_lock:
1075 # Remove our lock and other released locks from _shutdown_locks
1076 _maintain_shutdown_locks()
1077
1078 def _delete(self):
1079 "Remove current thread from the dict of currently running threads."
1080 with _active_limbo_lock:
1081 del _active[get_ident()]
1082 # There must not be any python code between the previous line
1083 # and after the lock is released. Otherwise a tracing function
1084 # could try to acquire the lock again in the same thread, (in
1085 # current_thread()), and would block.
1086
1087 def join(self, timeout=None):
1088 """Wait until the thread terminates.
1089
1090 This blocks the calling thread until the thread whose join() method is
1091 called terminates -- either normally or through an unhandled exception
1092 or until the optional timeout occurs.
1093
1094 When the timeout argument is present and not None, it should be a
1095 floating point number specifying a timeout for the operation in seconds
1096 (or fractions thereof). As join() always returns None, you must call
1097 is_alive() after join() to decide whether a timeout happened -- if the
1098 thread is still alive, the join() call timed out.
1099
1100 When the timeout argument is not present or None, the operation will
1101 block until the thread terminates.
1102
1103 A thread can be join()ed many times.
1104
1105 join() raises a RuntimeError if an attempt is made to join the current
1106 thread as that would cause a deadlock. It is also an error to join() a
1107 thread before it has been started and attempts to do so raises the same
1108 exception.
1109
1110 """
1111 if not self._initialized:
1112 raise RuntimeError("Thread.__init__() not called")
1113 if not self._started.is_set():
1114 raise RuntimeError("cannot join thread before it is started")
1115 if self is current_thread():
1116 raise RuntimeError("cannot join current thread")
1117
1118 if timeout is None:
1119 self._wait_for_tstate_lock()
1120 else:
1121 # the behavior of a negative timeout isn't documented, but
1122 # historically .join(timeout=x) for x<0 has acted as if timeout=0
1123 self._wait_for_tstate_lock(timeout=max(timeout, 0))
1124
1125 def _wait_for_tstate_lock(self, block=True, timeout=-1):
1126 # Issue #18808: wait for the thread state to be gone.
1127 # At the end of the thread's life, after all knowledge of the thread
1128 # is removed from C data structures, C code releases our _tstate_lock.
1129 # This method passes its arguments to _tstate_lock.acquire().
1130 # If the lock is acquired, the C code is done, and self._stop() is
1131 # called. That sets ._is_stopped to True, and ._tstate_lock to None.
1132 lock = self._tstate_lock
1133 if lock is None:
1134 # already determined that the C code is done
1135 assert self._is_stopped
1136 return
1137
1138 try:
1139 if lock.acquire(block, timeout):
1140 lock.release()
1141 self._stop()
1142 except:
1143 if lock.locked():
1144 # bpo-45274: lock.acquire() acquired the lock, but the function
1145 # was interrupted with an exception before reaching the
1146 # lock.release(). It can happen if a signal handler raises an
1147 # exception, like CTRL+C which raises KeyboardInterrupt.
1148 lock.release()
1149 self._stop()
1150 raise
1151
1152 @property
1153 def name(self):
1154 """A string used for identification purposes only.
1155
1156 It has no semantics. Multiple threads may be given the same name. The
1157 initial name is set by the constructor.
1158
1159 """
1160 assert self._initialized, "Thread.__init__() not called"
1161 return self._name
1162
1163 @name.setter
1164 def name(self, name):
1165 assert self._initialized, "Thread.__init__() not called"
1166 self._name = str(name)
1167
1168 @property
1169 def ident(self):
1170 """Thread identifier of this thread or None if it has not been started.
1171
1172 This is a nonzero integer. See the get_ident() function. Thread
1173 identifiers may be recycled when a thread exits and another thread is
1174 created. The identifier is available even after the thread has exited.
1175
1176 """
1177 assert self._initialized, "Thread.__init__() not called"
1178 return self._ident
1179
1180 if _HAVE_THREAD_NATIVE_ID:
1181 @property
1182 def native_id(self):
1183 """Native integral thread ID of this thread, or None if it has not been started.
1184
1185 This is a non-negative integer. See the get_native_id() function.
1186 This represents the Thread ID as reported by the kernel.
1187
1188 """
1189 assert self._initialized, "Thread.__init__() not called"
1190 return self._native_id
1191
1192 def is_alive(self):
1193 """Return whether the thread is alive.
1194
1195 This method returns True just before the run() method starts until just
1196 after the run() method terminates. See also the module function
1197 enumerate().
1198
1199 """
1200 assert self._initialized, "Thread.__init__() not called"
1201 if self._is_stopped or not self._started.is_set():
1202 return False
1203 self._wait_for_tstate_lock(False)
1204 return not self._is_stopped
1205
1206 @property
1207 def daemon(self):
1208 """A boolean value indicating whether this thread is a daemon thread.
1209
1210 This must be set before start() is called, otherwise RuntimeError is
1211 raised. Its initial value is inherited from the creating thread; the
1212 main thread is not a daemon thread and therefore all threads created in
1213 the main thread default to daemon = False.
1214
1215 The entire Python program exits when only daemon threads are left.
1216
1217 """
1218 assert self._initialized, "Thread.__init__() not called"
1219 return self._daemonic
1220
1221 @daemon.setter
1222 def daemon(self, daemonic):
1223 if not self._initialized:
1224 raise RuntimeError("Thread.__init__() not called")
1225 if self._started.is_set():
1226 raise RuntimeError("cannot set daemon status of active thread")
1227 self._daemonic = daemonic
1228
1229 def isDaemon(self):
1230 """Return whether this thread is a daemon.
1231
1232 This method is deprecated, use the daemon attribute instead.
1233
1234 """
1235 import warnings
1236 warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
1237 DeprecationWarning, stacklevel=2)
1238 return self.daemon
1239
1240 def setDaemon(self, daemonic):
1241 """Set whether this thread is a daemon.
1242
1243 This method is deprecated, use the .daemon property instead.
1244
1245 """
1246 import warnings
1247 warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
1248 DeprecationWarning, stacklevel=2)
1249 self.daemon = daemonic
1250
1251 def getName(self):
1252 """Return a string used for identification purposes only.
1253
1254 This method is deprecated, use the name attribute instead.
1255
1256 """
1257 import warnings
1258 warnings.warn('getName() is deprecated, get the name attribute instead',
1259 DeprecationWarning, stacklevel=2)
1260 return self.name
1261
1262 def setName(self, name):
1263 """Set the name string for this thread.
1264
1265 This method is deprecated, use the name attribute instead.
1266
1267 """
1268 import warnings
1269 warnings.warn('setName() is deprecated, set the name attribute instead',
1270 DeprecationWarning, stacklevel=2)
1271 self.name = name
1272
1273
1274 try:
1275 from _thread import (_excepthook as excepthook,
1276 _ExceptHookArgs as ExceptHookArgs)
1277 except ImportError:
1278 # Simple Python implementation if _thread._excepthook() is not available
1279 from traceback import print_exception as _print_exception
1280 from collections import namedtuple
1281
1282 _ExceptHookArgs = namedtuple(
1283 'ExceptHookArgs',
1284 'exc_type exc_value exc_traceback thread')
1285
1286 def ExceptHookArgs(args):
1287 return _ExceptHookArgs(*args)
1288
1289 def excepthook(args, /):
1290 """
1291 Handle uncaught Thread.run() exception.
1292 """
1293 if args.exc_type == SystemExit:
1294 # silently ignore SystemExit
1295 return
1296
1297 if _sys is not None and _sys.stderr is not None:
1298 stderr = _sys.stderr
1299 elif args.thread is not None:
1300 stderr = args.thread._stderr
1301 if stderr is None:
1302 # do nothing if sys.stderr is None and sys.stderr was None
1303 # when the thread was created
1304 return
1305 else:
1306 # do nothing if sys.stderr is None and args.thread is None
1307 return
1308
1309 if args.thread is not None:
1310 name = args.thread.name
1311 else:
1312 name = get_ident()
1313 print(f"Exception in thread {name}:",
1314 file=stderr, flush=True)
1315 _print_exception(args.exc_type, args.exc_value, args.exc_traceback,
1316 file=stderr)
1317 stderr.flush()
1318
1319
1320 # Original value of threading.excepthook
1321 __excepthook__ = excepthook
1322
1323
1324 def _make_invoke_excepthook():
1325 # Create a local namespace to ensure that variables remain alive
1326 # when _invoke_excepthook() is called, even if it is called late during
1327 # Python shutdown. It is mostly needed for daemon threads.
1328
1329 old_excepthook = excepthook
1330 old_sys_excepthook = _sys.excepthook
1331 if old_excepthook is None:
1332 raise RuntimeError("threading.excepthook is None")
1333 if old_sys_excepthook is None:
1334 raise RuntimeError("sys.excepthook is None")
1335
1336 sys_exc_info = _sys.exc_info
1337 local_print = print
1338 local_sys = _sys
1339
1340 def invoke_excepthook(thread):
1341 global excepthook
1342 try:
1343 hook = excepthook
1344 if hook is None:
1345 hook = old_excepthook
1346
1347 args = ExceptHookArgs([*sys_exc_info(), thread])
1348
1349 hook(args)
1350 except Exception as exc:
1351 exc.__suppress_context__ = True
1352 del exc
1353
1354 if local_sys is not None and local_sys.stderr is not None:
1355 stderr = local_sys.stderr
1356 else:
1357 stderr = thread._stderr
1358
1359 local_print("Exception in threading.excepthook:",
1360 file=stderr, flush=True)
1361
1362 if local_sys is not None and local_sys.excepthook is not None:
1363 sys_excepthook = local_sys.excepthook
1364 else:
1365 sys_excepthook = old_sys_excepthook
1366
1367 sys_excepthook(*sys_exc_info())
1368 finally:
1369 # Break reference cycle (exception stored in a variable)
1370 args = None
1371
1372 return invoke_excepthook
1373
1374
1375 # The timer class was contributed by Itamar Shtull-Trauring
1376
1377 class ESC[4;38;5;81mTimer(ESC[4;38;5;149mThread):
1378 """Call a function after a specified number of seconds:
1379
1380 t = Timer(30.0, f, args=None, kwargs=None)
1381 t.start()
1382 t.cancel() # stop the timer's action if it's still waiting
1383
1384 """
1385
1386 def __init__(self, interval, function, args=None, kwargs=None):
1387 Thread.__init__(self)
1388 self.interval = interval
1389 self.function = function
1390 self.args = args if args is not None else []
1391 self.kwargs = kwargs if kwargs is not None else {}
1392 self.finished = Event()
1393
1394 def cancel(self):
1395 """Stop the timer if it hasn't finished yet."""
1396 self.finished.set()
1397
1398 def run(self):
1399 self.finished.wait(self.interval)
1400 if not self.finished.is_set():
1401 self.function(*self.args, **self.kwargs)
1402 self.finished.set()
1403
1404
1405 # Special thread class to represent the main thread
1406
1407 class ESC[4;38;5;81m_MainThread(ESC[4;38;5;149mThread):
1408
1409 def __init__(self):
1410 Thread.__init__(self, name="MainThread", daemon=False)
1411 self._set_tstate_lock()
1412 self._started.set()
1413 self._set_ident()
1414 if _HAVE_THREAD_NATIVE_ID:
1415 self._set_native_id()
1416 with _active_limbo_lock:
1417 _active[self._ident] = self
1418
1419
1420 # Dummy thread class to represent threads not started here.
1421 # These aren't garbage collected when they die, nor can they be waited for.
1422 # If they invoke anything in threading.py that calls current_thread(), they
1423 # leave an entry in the _active dict forever after.
1424 # Their purpose is to return *something* from current_thread().
1425 # They are marked as daemon threads so we won't wait for them
1426 # when we exit (conform previous semantics).
1427
1428 class ESC[4;38;5;81m_DummyThread(ESC[4;38;5;149mThread):
1429
1430 def __init__(self):
1431 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
1432
1433 self._started.set()
1434 self._set_ident()
1435 if _HAVE_THREAD_NATIVE_ID:
1436 self._set_native_id()
1437 with _active_limbo_lock:
1438 _active[self._ident] = self
1439
1440 def _stop(self):
1441 pass
1442
1443 def is_alive(self):
1444 assert not self._is_stopped and self._started.is_set()
1445 return True
1446
1447 def join(self, timeout=None):
1448 assert False, "cannot join a dummy thread"
1449
1450
1451 # Global API functions
1452
1453 def current_thread():
1454 """Return the current Thread object, corresponding to the caller's thread of control.
1455
1456 If the caller's thread of control was not created through the threading
1457 module, a dummy thread object with limited functionality is returned.
1458
1459 """
1460 try:
1461 return _active[get_ident()]
1462 except KeyError:
1463 return _DummyThread()
1464
1465 def currentThread():
1466 """Return the current Thread object, corresponding to the caller's thread of control.
1467
1468 This function is deprecated, use current_thread() instead.
1469
1470 """
1471 import warnings
1472 warnings.warn('currentThread() is deprecated, use current_thread() instead',
1473 DeprecationWarning, stacklevel=2)
1474 return current_thread()
1475
1476 def active_count():
1477 """Return the number of Thread objects currently alive.
1478
1479 The returned count is equal to the length of the list returned by
1480 enumerate().
1481
1482 """
1483 with _active_limbo_lock:
1484 return len(_active) + len(_limbo)
1485
1486 def activeCount():
1487 """Return the number of Thread objects currently alive.
1488
1489 This function is deprecated, use active_count() instead.
1490
1491 """
1492 import warnings
1493 warnings.warn('activeCount() is deprecated, use active_count() instead',
1494 DeprecationWarning, stacklevel=2)
1495 return active_count()
1496
1497 def _enumerate():
1498 # Same as enumerate(), but without the lock. Internal use only.
1499 return list(_active.values()) + list(_limbo.values())
1500
1501 def enumerate():
1502 """Return a list of all Thread objects currently alive.
1503
1504 The list includes daemonic threads, dummy thread objects created by
1505 current_thread(), and the main thread. It excludes terminated threads and
1506 threads that have not yet been started.
1507
1508 """
1509 with _active_limbo_lock:
1510 return list(_active.values()) + list(_limbo.values())
1511
1512
1513 _threading_atexits = []
1514 _SHUTTING_DOWN = False
1515
1516 def _register_atexit(func, *arg, **kwargs):
1517 """CPython internal: register *func* to be called before joining threads.
1518
1519 The registered *func* is called with its arguments just before all
1520 non-daemon threads are joined in `_shutdown()`. It provides a similar
1521 purpose to `atexit.register()`, but its functions are called prior to
1522 threading shutdown instead of interpreter shutdown.
1523
1524 For similarity to atexit, the registered functions are called in reverse.
1525 """
1526 if _SHUTTING_DOWN:
1527 raise RuntimeError("can't register atexit after shutdown")
1528
1529 call = functools.partial(func, *arg, **kwargs)
1530 _threading_atexits.append(call)
1531
1532
1533 from _thread import stack_size
1534
1535 # Create the main thread object,
1536 # and make it available for the interpreter
1537 # (Py_Main) as threading._shutdown.
1538
1539 _main_thread = _MainThread()
1540
1541 def _shutdown():
1542 """
1543 Wait until the Python thread state of all non-daemon threads get deleted.
1544 """
1545 # Obscure: other threads may be waiting to join _main_thread. That's
1546 # dubious, but some code does it. We can't wait for C code to release
1547 # the main thread's tstate_lock - that won't happen until the interpreter
1548 # is nearly dead. So we release it here. Note that just calling _stop()
1549 # isn't enough: other threads may already be waiting on _tstate_lock.
1550 if _main_thread._is_stopped:
1551 # _shutdown() was already called
1552 return
1553
1554 global _SHUTTING_DOWN
1555 _SHUTTING_DOWN = True
1556
1557 # Call registered threading atexit functions before threads are joined.
1558 # Order is reversed, similar to atexit.
1559 for atexit_call in reversed(_threading_atexits):
1560 atexit_call()
1561
1562 # Main thread
1563 if _main_thread.ident == get_ident():
1564 tlock = _main_thread._tstate_lock
1565 # The main thread isn't finished yet, so its thread state lock can't
1566 # have been released.
1567 assert tlock is not None
1568 assert tlock.locked()
1569 tlock.release()
1570 _main_thread._stop()
1571 else:
1572 # bpo-1596321: _shutdown() must be called in the main thread.
1573 # If the threading module was not imported by the main thread,
1574 # _main_thread is the thread which imported the threading module.
1575 # In this case, ignore _main_thread, similar behavior than for threads
1576 # spawned by C libraries or using _thread.start_new_thread().
1577 pass
1578
1579 # Join all non-deamon threads
1580 while True:
1581 with _shutdown_locks_lock:
1582 locks = list(_shutdown_locks)
1583 _shutdown_locks.clear()
1584
1585 if not locks:
1586 break
1587
1588 for lock in locks:
1589 # mimic Thread.join()
1590 lock.acquire()
1591 lock.release()
1592
1593 # new threads can be spawned while we were waiting for the other
1594 # threads to complete
1595
1596
1597 def main_thread():
1598 """Return the main thread object.
1599
1600 In normal conditions, the main thread is the thread from which the
1601 Python interpreter was started.
1602 """
1603 return _main_thread
1604
1605 # get thread-local implementation, either from the thread
1606 # module, or from the python fallback
1607
1608 try:
1609 from _thread import _local as local
1610 except ImportError:
1611 from _threading_local import local
1612
1613
1614 def _after_fork():
1615 """
1616 Cleanup threading module state that should not exist after a fork.
1617 """
1618 # Reset _active_limbo_lock, in case we forked while the lock was held
1619 # by another (non-forked) thread. http://bugs.python.org/issue874900
1620 global _active_limbo_lock, _main_thread
1621 global _shutdown_locks_lock, _shutdown_locks
1622 _active_limbo_lock = RLock()
1623
1624 # fork() only copied the current thread; clear references to others.
1625 new_active = {}
1626
1627 try:
1628 current = _active[get_ident()]
1629 except KeyError:
1630 # fork() was called in a thread which was not spawned
1631 # by threading.Thread. For example, a thread spawned
1632 # by thread.start_new_thread().
1633 current = _MainThread()
1634
1635 _main_thread = current
1636
1637 # reset _shutdown() locks: threads re-register their _tstate_lock below
1638 _shutdown_locks_lock = _allocate_lock()
1639 _shutdown_locks = set()
1640
1641 with _active_limbo_lock:
1642 # Dangling thread instances must still have their locks reset,
1643 # because someone may join() them.
1644 threads = set(_enumerate())
1645 threads.update(_dangling)
1646 for thread in threads:
1647 # Any lock/condition variable may be currently locked or in an
1648 # invalid state, so we reinitialize them.
1649 if thread is current:
1650 # There is only one active thread. We reset the ident to
1651 # its new value since it can have changed.
1652 thread._reset_internal_locks(True)
1653 ident = get_ident()
1654 thread._ident = ident
1655 new_active[ident] = thread
1656 else:
1657 # All the others are already stopped.
1658 thread._reset_internal_locks(False)
1659 thread._stop()
1660
1661 _limbo.clear()
1662 _active.clear()
1663 _active.update(new_active)
1664 assert len(_active) == 1
1665
1666
1667 if hasattr(_os, "register_at_fork"):
1668 _os.register_at_fork(after_in_child=_after_fork)