(root)/
Python-3.11.7/
Lib/
test/
_test_venv_multiprocessing.py
       1  import multiprocessing
       2  import random
       3  import sys
       4  import time
       5  
       6  def fill_queue(queue, code):
       7      queue.put(code)
       8  
       9  
      10  def drain_queue(queue, code):
      11      if code != queue.get():
      12          sys.exit(1)
      13  
      14  
      15  def test_func():
      16      code = random.randrange(0, 1000)
      17      queue = multiprocessing.Queue()
      18      fill_pool = multiprocessing.Process(
      19          target=fill_queue,
      20          args=(queue, code)
      21      )
      22      drain_pool = multiprocessing.Process(
      23          target=drain_queue,
      24          args=(queue, code)
      25      )
      26      drain_pool.start()
      27      fill_pool.start()
      28      fill_pool.join()
      29      drain_pool.join()
      30  
      31  
      32  def main():
      33      test_pool = multiprocessing.Process(target=test_func)
      34      test_pool.start()
      35      test_pool.join()
      36      sys.exit(test_pool.exitcode)
      37  
      38  
      39  if __name__ == "__main__":
      40      main()