1  import multiprocessing
       2  import time
       3  import random
       4  import sys
       5  
       6  #
       7  # Functions used by test code
       8  #
       9  
      10  def calculate(func, args):
      11      result = func(*args)
      12      return '%s says that %s%s = %s' % (
      13          multiprocessing.current_process().name,
      14          func.__name__, args, result
      15          )
      16  
      17  def calculatestar(args):
      18      return calculate(*args)
      19  
      20  def mul(a, b):
      21      time.sleep(0.5 * random.random())
      22      return a * b
      23  
      24  def plus(a, b):
      25      time.sleep(0.5 * random.random())
      26      return a + b
      27  
      28  def f(x):
      29      return 1.0 / (x - 5.0)
      30  
      31  def pow3(x):
      32      return x ** 3
      33  
      34  def noop(x):
      35      pass
      36  
      37  #
      38  # Test code
      39  #
      40  
      41  def test():
      42      PROCESSES = 4
      43      print('Creating pool with %d processes\n' % PROCESSES)
      44  
      45      with multiprocessing.Pool(PROCESSES) as pool:
      46          #
      47          # Tests
      48          #
      49  
      50          TASKS = [(mul, (i, 7)) for i in range(10)] + \
      51                  [(plus, (i, 8)) for i in range(10)]
      52  
      53          results = [pool.apply_async(calculate, t) for t in TASKS]
      54          imap_it = pool.imap(calculatestar, TASKS)
      55          imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
      56  
      57          print('Ordered results using pool.apply_async():')
      58          for r in results:
      59              print('\t', r.get())
      60          print()
      61  
      62          print('Ordered results using pool.imap():')
      63          for x in imap_it:
      64              print('\t', x)
      65          print()
      66  
      67          print('Unordered results using pool.imap_unordered():')
      68          for x in imap_unordered_it:
      69              print('\t', x)
      70          print()
      71  
      72          print('Ordered results using pool.map() --- will block till complete:')
      73          for x in pool.map(calculatestar, TASKS):
      74              print('\t', x)
      75          print()
      76  
      77          #
      78          # Test error handling
      79          #
      80  
      81          print('Testing error handling:')
      82  
      83          try:
      84              print(pool.apply(f, (5,)))
      85          except ZeroDivisionError:
      86              print('\tGot ZeroDivisionError as expected from pool.apply()')
      87          else:
      88              raise AssertionError('expected ZeroDivisionError')
      89  
      90          try:
      91              print(pool.map(f, list(range(10))))
      92          except ZeroDivisionError:
      93              print('\tGot ZeroDivisionError as expected from pool.map()')
      94          else:
      95              raise AssertionError('expected ZeroDivisionError')
      96  
      97          try:
      98              print(list(pool.imap(f, list(range(10)))))
      99          except ZeroDivisionError:
     100              print('\tGot ZeroDivisionError as expected from list(pool.imap())')
     101          else:
     102              raise AssertionError('expected ZeroDivisionError')
     103  
     104          it = pool.imap(f, list(range(10)))
     105          for i in range(10):
     106              try:
     107                  x = next(it)
     108              except ZeroDivisionError:
     109                  if i == 5:
     110                      pass
     111              except StopIteration:
     112                  break
     113              else:
     114                  if i == 5:
     115                      raise AssertionError('expected ZeroDivisionError')
     116  
     117          assert i == 9
     118          print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
     119          print()
     120  
     121          #
     122          # Testing timeouts
     123          #
     124  
     125          print('Testing ApplyResult.get() with timeout:', end=' ')
     126          res = pool.apply_async(calculate, TASKS[0])
     127          while 1:
     128              sys.stdout.flush()
     129              try:
     130                  sys.stdout.write('\n\t%s' % res.get(0.02))
     131                  break
     132              except multiprocessing.TimeoutError:
     133                  sys.stdout.write('.')
     134          print()
     135          print()
     136  
     137          print('Testing IMapIterator.next() with timeout:', end=' ')
     138          it = pool.imap(calculatestar, TASKS)
     139          while 1:
     140              sys.stdout.flush()
     141              try:
     142                  sys.stdout.write('\n\t%s' % it.next(0.02))
     143              except StopIteration:
     144                  break
     145              except multiprocessing.TimeoutError:
     146                  sys.stdout.write('.')
     147          print()
     148          print()
     149  
     150  
     151  if __name__ == '__main__':
     152      multiprocessing.freeze_support()
     153      test()