(root)/
Python-3.12.0/
Tools/
ccbench/
ccbench.py
       1  # This file should be kept compatible with both Python 2.6 and Python >= 3.0.
       2  
       3  from __future__ import division
       4  from __future__ import print_function
       5  
       6  """
       7  ccbench, a Python concurrency benchmark.
       8  """
       9  
      10  import time
      11  import os
      12  import sys
      13  import itertools
      14  import threading
      15  import subprocess
      16  import socket
      17  from optparse import OptionParser, SUPPRESS_HELP
      18  import platform
      19  
      20  # Compatibility
      21  try:
      22      xrange
      23  except NameError:
      24      xrange = range
      25  
      26  try:
      27      map = itertools.imap
      28  except AttributeError:
      29      pass
      30  
      31  
      32  THROUGHPUT_DURATION = 2.0
      33  
      34  LATENCY_PING_INTERVAL = 0.1
      35  LATENCY_DURATION = 2.0
      36  
      37  BANDWIDTH_PACKET_SIZE = 1024
      38  BANDWIDTH_DURATION = 2.0
      39  
      40  
      41  def task_pidigits():
      42      """Pi calculation (Python)"""
      43      _map = map
      44      _count = itertools.count
      45      _islice = itertools.islice
      46  
      47      def calc_ndigits(n):
      48          # From http://shootout.alioth.debian.org/
      49          def gen_x():
      50              return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
      51  
      52          def compose(a, b):
      53              aq, ar, as_, at = a
      54              bq, br, bs, bt = b
      55              return (aq * bq,
      56                      aq * br + ar * bt,
      57                      as_ * bq + at * bs,
      58                      as_ * br + at * bt)
      59  
      60          def extract(z, j):
      61              q, r, s, t = z
      62              return (q*j + r) // (s*j + t)
      63  
      64          def pi_digits():
      65              z = (1, 0, 0, 1)
      66              x = gen_x()
      67              while 1:
      68                  y = extract(z, 3)
      69                  while y != extract(z, 4):
      70                      z = compose(z, next(x))
      71                      y = extract(z, 3)
      72                  z = compose((10, -10*y, 0, 1), z)
      73                  yield y
      74  
      75          return list(_islice(pi_digits(), n))
      76  
      77      return calc_ndigits, (50, )
      78  
      79  def task_regex():
      80      """regular expression (C)"""
      81      # XXX this task gives horrendous latency results.
      82      import re
      83      # Taken from the `inspect` module
      84      pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
      85      with open(__file__, "r") as f:
      86          arg = f.read(2000)
      87      return pat.findall, (arg, )
      88  
      89  def task_sort():
      90      """list sorting (C)"""
      91      def list_sort(l):
      92          l = l[::-1]
      93          l.sort()
      94  
      95      return list_sort, (list(range(1000)), )
      96  
      97  def task_compress_zlib():
      98      """zlib compression (C)"""
      99      import zlib
     100      with open(__file__, "rb") as f:
     101          arg = f.read(5000) * 3
     102  
     103      def compress(s):
     104          zlib.decompress(zlib.compress(s, 5))
     105      return compress, (arg, )
     106  
     107  def task_compress_bz2():
     108      """bz2 compression (C)"""
     109      import bz2
     110      with open(__file__, "rb") as f:
     111          arg = f.read(3000) * 2
     112  
     113      def compress(s):
     114          bz2.compress(s)
     115      return compress, (arg, )
     116  
     117  def task_hashing():
     118      """SHA1 hashing (C)"""
     119      import hashlib
     120      with open(__file__, "rb") as f:
     121          arg = f.read(5000) * 30
     122  
     123      def compute(s):
     124          hashlib.sha1(s).digest()
     125      return compute, (arg, )
     126  
     127  
     128  throughput_tasks = [task_pidigits, task_regex]
     129  for mod in 'bz2', 'hashlib':
     130      try:
     131          globals()[mod] = __import__(mod)
     132      except ImportError:
     133          globals()[mod] = None
     134  
     135  # For whatever reasons, zlib gives irregular results, so we prefer bz2 or
     136  # hashlib if available.
     137  # (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
     138  if bz2 is not None:
     139      throughput_tasks.append(task_compress_bz2)
     140  elif hashlib is not None:
     141      throughput_tasks.append(task_hashing)
     142  else:
     143      throughput_tasks.append(task_compress_zlib)
     144  
     145  latency_tasks = throughput_tasks
     146  bandwidth_tasks = [task_pidigits]
     147  
     148  
     149  class ESC[4;38;5;81mTimedLoop:
     150      def __init__(self, func, args):
     151          self.func = func
     152          self.args = args
     153  
     154      def __call__(self, start_time, min_duration, end_event, do_yield=False):
     155          step = 20
     156          niters = 0
     157          duration = 0.0
     158          _time = time.time
     159          _sleep = time.sleep
     160          _func = self.func
     161          _args = self.args
     162          t1 = start_time
     163          while True:
     164              for i in range(step):
     165                  _func(*_args)
     166              t2 = _time()
     167              # If another thread terminated, the current measurement is invalid
     168              # => return the previous one.
     169              if end_event:
     170                  return niters, duration
     171              niters += step
     172              duration = t2 - start_time
     173              if duration >= min_duration:
     174                  end_event.append(None)
     175                  return niters, duration
     176              if t2 - t1 < 0.01:
     177                  # Minimize interference of measurement on overall runtime
     178                  step = step * 3 // 2
     179              elif do_yield:
     180                  # OS scheduling of Python threads is sometimes so bad that we
     181                  # have to force thread switching ourselves, otherwise we get
     182                  # completely useless results.
     183                  _sleep(0.0001)
     184              t1 = t2
     185  
     186  
     187  def run_throughput_test(func, args, nthreads):
     188      assert nthreads >= 1
     189  
     190      # Warm up
     191      func(*args)
     192  
     193      results = []
     194      loop = TimedLoop(func, args)
     195      end_event = []
     196  
     197      if nthreads == 1:
     198          # Pure single-threaded performance, without any switching or
     199          # synchronization overhead.
     200          start_time = time.time()
     201          results.append(loop(start_time, THROUGHPUT_DURATION,
     202                              end_event, do_yield=False))
     203          return results
     204  
     205      started = False
     206      ready_cond = threading.Condition()
     207      start_cond = threading.Condition()
     208      ready = []
     209  
     210      def run():
     211          with ready_cond:
     212              ready.append(None)
     213              ready_cond.notify()
     214          with start_cond:
     215              while not started:
     216                  start_cond.wait()
     217          results.append(loop(start_time, THROUGHPUT_DURATION,
     218                              end_event, do_yield=True))
     219  
     220      threads = []
     221      for i in range(nthreads):
     222          threads.append(threading.Thread(target=run))
     223      for t in threads:
     224          t.daemon = True
     225          t.start()
     226      # We don't want measurements to include thread startup overhead,
     227      # so we arrange for timing to start after all threads are ready.
     228      with ready_cond:
     229          while len(ready) < nthreads:
     230              ready_cond.wait()
     231      with start_cond:
     232          start_time = time.time()
     233          started = True
     234          start_cond.notify(nthreads)
     235      for t in threads:
     236          t.join()
     237  
     238      return results
     239  
     240  def run_throughput_tests(max_threads):
     241      for task in throughput_tasks:
     242          print(task.__doc__)
     243          print()
     244          func, args = task()
     245          nthreads = 1
     246          baseline_speed = None
     247          while nthreads <= max_threads:
     248              results = run_throughput_test(func, args, nthreads)
     249              # Taking the max duration rather than average gives pessimistic
     250              # results rather than optimistic.
     251              speed = sum(r[0] for r in results) / max(r[1] for r in results)
     252              print("threads=%d: %d" % (nthreads, speed), end="")
     253              if baseline_speed is None:
     254                  print(" iterations/s.")
     255                  baseline_speed = speed
     256              else:
     257                  print(" ( %d %%)" % (speed / baseline_speed * 100))
     258              nthreads += 1
     259          print()
     260  
     261  
     262  LAT_END = "END"
     263  
     264  def _sendto(sock, s, addr):
     265      sock.sendto(s.encode('ascii'), addr)
     266  
     267  def _recv(sock, n):
     268      return sock.recv(n).decode('ascii')
     269  
     270  def latency_client(addr, nb_pings, interval):
     271      sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     272      try:
     273          _time = time.time
     274          _sleep = time.sleep
     275          def _ping():
     276              _sendto(sock, "%r\n" % _time(), addr)
     277          # The first ping signals the parent process that we are ready.
     278          _ping()
     279          # We give the parent a bit of time to notice.
     280          _sleep(1.0)
     281          for i in range(nb_pings):
     282              _sleep(interval)
     283              _ping()
     284          _sendto(sock, LAT_END + "\n", addr)
     285      finally:
     286          sock.close()
     287  
     288  def run_latency_client(**kwargs):
     289      cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
     290      cmd_line.extend(['--latclient', repr(kwargs)])
     291      return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
     292                              #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
     293  
     294  def run_latency_test(func, args, nthreads):
     295      # Create a listening socket to receive the pings. We use UDP which should
     296      # be painlessly cross-platform.
     297      sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     298      sock.bind(("127.0.0.1", 0))
     299      addr = sock.getsockname()
     300  
     301      interval = LATENCY_PING_INTERVAL
     302      duration = LATENCY_DURATION
     303      nb_pings = int(duration / interval)
     304  
     305      results = []
     306      threads = []
     307      end_event = []
     308      start_cond = threading.Condition()
     309      started = False
     310      if nthreads > 0:
     311          # Warm up
     312          func(*args)
     313  
     314          results = []
     315          loop = TimedLoop(func, args)
     316          ready = []
     317          ready_cond = threading.Condition()
     318  
     319          def run():
     320              with ready_cond:
     321                  ready.append(None)
     322                  ready_cond.notify()
     323              with start_cond:
     324                  while not started:
     325                      start_cond.wait()
     326              loop(start_time, duration * 1.5, end_event, do_yield=False)
     327  
     328          for i in range(nthreads):
     329              threads.append(threading.Thread(target=run))
     330          for t in threads:
     331              t.daemon = True
     332              t.start()
     333          # Wait for threads to be ready
     334          with ready_cond:
     335              while len(ready) < nthreads:
     336                  ready_cond.wait()
     337  
     338      # Run the client and wait for the first ping(s) to arrive before
     339      # unblocking the background threads.
     340      chunks = []
     341      process = run_latency_client(addr=sock.getsockname(),
     342                                   nb_pings=nb_pings, interval=interval)
     343      s = _recv(sock, 4096)
     344      _time = time.time
     345  
     346      with start_cond:
     347          start_time = _time()
     348          started = True
     349          start_cond.notify(nthreads)
     350  
     351      while LAT_END not in s:
     352          s = _recv(sock, 4096)
     353          t = _time()
     354          chunks.append((t, s))
     355  
     356      # Tell the background threads to stop.
     357      end_event.append(None)
     358      for t in threads:
     359          t.join()
     360      process.wait()
     361      sock.close()
     362  
     363      for recv_time, chunk in chunks:
     364          # NOTE: it is assumed that a line sent by a client wasn't received
     365          # in two chunks because the lines are very small.
     366          for line in chunk.splitlines():
     367              line = line.strip()
     368              if line and line != LAT_END:
     369                  send_time = eval(line)
     370                  assert isinstance(send_time, float)
     371                  results.append((send_time, recv_time))
     372  
     373      return results
     374  
     375  def run_latency_tests(max_threads):
     376      for task in latency_tasks:
     377          print("Background CPU task:", task.__doc__)
     378          print()
     379          func, args = task()
     380          nthreads = 0
     381          while nthreads <= max_threads:
     382              results = run_latency_test(func, args, nthreads)
     383              n = len(results)
     384              # We print out milliseconds
     385              lats = [1000 * (t2 - t1) for (t1, t2) in results]
     386              #print(list(map(int, lats)))
     387              avg = sum(lats) / n
     388              dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
     389              print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
     390              print()
     391              #print("    [... from %d samples]" % n)
     392              nthreads += 1
     393          print()
     394  
     395  
     396  BW_END = "END"
     397  
     398  def bandwidth_client(addr, packet_size, duration):
     399      sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     400      sock.bind(("127.0.0.1", 0))
     401      local_addr = sock.getsockname()
     402      _time = time.time
     403      _sleep = time.sleep
     404      def _send_chunk(msg):
     405          _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
     406      # We give the parent some time to be ready.
     407      _sleep(1.0)
     408      try:
     409          start_time = _time()
     410          end_time = start_time + duration * 2.0
     411          i = 0
     412          while _time() < end_time:
     413              _send_chunk(str(i))
     414              s = _recv(sock, packet_size)
     415              assert len(s) == packet_size
     416              i += 1
     417          _send_chunk(BW_END)
     418      finally:
     419          sock.close()
     420  
     421  def run_bandwidth_client(**kwargs):
     422      cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
     423      cmd_line.extend(['--bwclient', repr(kwargs)])
     424      return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
     425                              #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
     426  
     427  def run_bandwidth_test(func, args, nthreads):
     428      # Create a listening socket to receive the packets. We use UDP which should
     429      # be painlessly cross-platform.
     430      with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
     431          sock.bind(("127.0.0.1", 0))
     432          addr = sock.getsockname()
     433  
     434          duration = BANDWIDTH_DURATION
     435          packet_size = BANDWIDTH_PACKET_SIZE
     436  
     437          results = []
     438          threads = []
     439          end_event = []
     440          start_cond = threading.Condition()
     441          started = False
     442          if nthreads > 0:
     443              # Warm up
     444              func(*args)
     445  
     446              results = []
     447              loop = TimedLoop(func, args)
     448              ready = []
     449              ready_cond = threading.Condition()
     450  
     451              def run():
     452                  with ready_cond:
     453                      ready.append(None)
     454                      ready_cond.notify()
     455                  with start_cond:
     456                      while not started:
     457                          start_cond.wait()
     458                  loop(start_time, duration * 1.5, end_event, do_yield=False)
     459  
     460              for i in range(nthreads):
     461                  threads.append(threading.Thread(target=run))
     462              for t in threads:
     463                  t.daemon = True
     464                  t.start()
     465              # Wait for threads to be ready
     466              with ready_cond:
     467                  while len(ready) < nthreads:
     468                      ready_cond.wait()
     469  
     470          # Run the client and wait for the first packet to arrive before
     471          # unblocking the background threads.
     472          process = run_bandwidth_client(addr=addr,
     473                                         packet_size=packet_size,
     474                                         duration=duration)
     475          _time = time.time
     476          # This will also wait for the parent to be ready
     477          s = _recv(sock, packet_size)
     478          remote_addr = eval(s.partition('#')[0])
     479  
     480          with start_cond:
     481              start_time = _time()
     482              started = True
     483              start_cond.notify(nthreads)
     484  
     485          n = 0
     486          first_time = None
     487          while not end_event and BW_END not in s:
     488              _sendto(sock, s, remote_addr)
     489              s = _recv(sock, packet_size)
     490              if first_time is None:
     491                  first_time = _time()
     492              n += 1
     493          end_time = _time()
     494  
     495      end_event.append(None)
     496      for t in threads:
     497          t.join()
     498      process.kill()
     499  
     500      return (n - 1) / (end_time - first_time)
     501  
     502  def run_bandwidth_tests(max_threads):
     503      for task in bandwidth_tasks:
     504          print("Background CPU task:", task.__doc__)
     505          print()
     506          func, args = task()
     507          nthreads = 0
     508          baseline_speed = None
     509          while nthreads <= max_threads:
     510              results = run_bandwidth_test(func, args, nthreads)
     511              speed = results
     512              #speed = len(results) * 1.0 / results[-1][0]
     513              print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
     514              if baseline_speed is None:
     515                  print(" packets/s.")
     516                  baseline_speed = speed
     517              else:
     518                  print(" ( %d %%)" % (speed / baseline_speed * 100))
     519              nthreads += 1
     520          print()
     521  
     522  
     523  def main():
     524      usage = "usage: %prog [-h|--help] [options]"
     525      parser = OptionParser(usage=usage)
     526      parser.add_option("-t", "--throughput",
     527                        action="store_true", dest="throughput", default=False,
     528                        help="run throughput tests")
     529      parser.add_option("-l", "--latency",
     530                        action="store_true", dest="latency", default=False,
     531                        help="run latency tests")
     532      parser.add_option("-b", "--bandwidth",
     533                        action="store_true", dest="bandwidth", default=False,
     534                        help="run I/O bandwidth tests")
     535      parser.add_option("-i", "--interval",
     536                        action="store", type="int", dest="check_interval", default=None,
     537                        help="sys.setcheckinterval() value "
     538                             "(Python 3.8 and older)")
     539      parser.add_option("-I", "--switch-interval",
     540                        action="store", type="float", dest="switch_interval", default=None,
     541                        help="sys.setswitchinterval() value "
     542                             "(Python 3.2 and newer)")
     543      parser.add_option("-n", "--num-threads",
     544                        action="store", type="int", dest="nthreads", default=4,
     545                        help="max number of threads in tests")
     546  
     547      # Hidden option to run the pinging and bandwidth clients
     548      parser.add_option("", "--latclient",
     549                        action="store", dest="latclient", default=None,
     550                        help=SUPPRESS_HELP)
     551      parser.add_option("", "--bwclient",
     552                        action="store", dest="bwclient", default=None,
     553                        help=SUPPRESS_HELP)
     554  
     555      options, args = parser.parse_args()
     556      if args:
     557          parser.error("unexpected arguments")
     558  
     559      if options.latclient:
     560          kwargs = eval(options.latclient)
     561          latency_client(**kwargs)
     562          return
     563  
     564      if options.bwclient:
     565          kwargs = eval(options.bwclient)
     566          bandwidth_client(**kwargs)
     567          return
     568  
     569      if not options.throughput and not options.latency and not options.bandwidth:
     570          options.throughput = options.latency = options.bandwidth = True
     571      if options.check_interval:
     572          sys.setcheckinterval(options.check_interval)
     573      if options.switch_interval:
     574          sys.setswitchinterval(options.switch_interval)
     575  
     576      print("== %s %s (%s) ==" % (
     577          platform.python_implementation(),
     578          platform.python_version(),
     579          platform.python_build()[0],
     580      ))
     581      # Processor identification often has repeated spaces
     582      cpu = ' '.join(platform.processor().split())
     583      print("== %s %s on '%s' ==" % (
     584          platform.machine(),
     585          platform.system(),
     586          cpu,
     587      ))
     588      print()
     589  
     590      if options.throughput:
     591          print("--- Throughput ---")
     592          print()
     593          run_throughput_tests(options.nthreads)
     594  
     595      if options.latency:
     596          print("--- Latency ---")
     597          print()
     598          run_latency_tests(options.nthreads)
     599  
     600      if options.bandwidth:
     601          print("--- I/O bandwidth ---")
     602          print()
     603          run_bandwidth_tests(options.nthreads)
     604  
     605  if __name__ == "__main__":
     606      main()