(root)/
Python-3.11.7/
Tools/
iobench/
iobench.py
       1  # -*- coding: utf-8 -*-
       2  # This file should be kept compatible with both Python 2.6 and Python >= 3.0.
       3  
       4  import itertools
       5  import os
       6  import platform
       7  import re
       8  import sys
       9  import time
      10  from optparse import OptionParser
      11  
      12  out = sys.stdout
      13  
      14  TEXT_ENCODING = 'utf8'
      15  NEWLINES = 'lf'
      16  
      17  # Compatibility
      18  try:
      19      xrange
      20  except NameError:
      21      xrange = range
      22  
      23  def text_open(fn, mode, encoding=None):
      24      try:
      25          return open(fn, mode, encoding=encoding or TEXT_ENCODING)
      26      except TypeError:
      27          if 'r' in mode:
      28              mode += 'U' # 'U' mode is needed only in Python 2.x
      29          return open(fn, mode)
      30  
      31  def get_file_sizes():
      32      for s in ['20 KiB', '400 KiB', '10 MiB']:
      33          size, unit = s.split()
      34          size = int(size) * {'KiB': 1024, 'MiB': 1024 ** 2}[unit]
      35          yield s.replace(' ', ''), size
      36  
      37  def get_binary_files():
      38      return ((name + ".bin", size) for name, size in get_file_sizes())
      39  
      40  def get_text_files():
      41      return (("%s-%s-%s.txt" % (name, TEXT_ENCODING, NEWLINES), size)
      42          for name, size in get_file_sizes())
      43  
      44  def with_open_mode(mode):
      45      def decorate(f):
      46          f.file_open_mode = mode
      47          return f
      48      return decorate
      49  
      50  def with_sizes(*sizes):
      51      def decorate(f):
      52          f.file_sizes = sizes
      53          return f
      54      return decorate
      55  
      56  
      57  # Here begin the tests
      58  
      59  @with_open_mode("r")
      60  @with_sizes("medium")
      61  def read_bytewise(f):
      62      """ read one unit at a time """
      63      f.seek(0)
      64      while f.read(1):
      65          pass
      66  
      67  @with_open_mode("r")
      68  @with_sizes("medium")
      69  def read_small_chunks(f):
      70      """ read 20 units at a time """
      71      f.seek(0)
      72      while f.read(20):
      73          pass
      74  
      75  @with_open_mode("r")
      76  @with_sizes("medium")
      77  def read_big_chunks(f):
      78      """ read 4096 units at a time """
      79      f.seek(0)
      80      while f.read(4096):
      81          pass
      82  
      83  @with_open_mode("r")
      84  @with_sizes("small", "medium", "large")
      85  def read_whole_file(f):
      86      """ read whole contents at once """
      87      f.seek(0)
      88      while f.read():
      89          pass
      90  
      91  @with_open_mode("rt")
      92  @with_sizes("medium")
      93  def read_lines(f):
      94      """ read one line at a time """
      95      f.seek(0)
      96      for line in f:
      97          pass
      98  
      99  @with_open_mode("r")
     100  @with_sizes("medium")
     101  def seek_forward_bytewise(f):
     102      """ seek forward one unit at a time """
     103      f.seek(0, 2)
     104      size = f.tell()
     105      f.seek(0, 0)
     106      for i in xrange(0, size - 1):
     107          f.seek(i, 0)
     108  
     109  @with_open_mode("r")
     110  @with_sizes("medium")
     111  def seek_forward_blockwise(f):
     112      """ seek forward 1000 units at a time """
     113      f.seek(0, 2)
     114      size = f.tell()
     115      f.seek(0, 0)
     116      for i in xrange(0, size - 1, 1000):
     117          f.seek(i, 0)
     118  
     119  @with_open_mode("rb")
     120  @with_sizes("medium")
     121  def read_seek_bytewise(f):
     122      """ alternate read & seek one unit """
     123      f.seek(0)
     124      while f.read(1):
     125          f.seek(1, 1)
     126  
     127  @with_open_mode("rb")
     128  @with_sizes("medium")
     129  def read_seek_blockwise(f):
     130      """ alternate read & seek 1000 units """
     131      f.seek(0)
     132      while f.read(1000):
     133          f.seek(1000, 1)
     134  
     135  
     136  @with_open_mode("w")
     137  @with_sizes("small")
     138  def write_bytewise(f, source):
     139      """ write one unit at a time """
     140      for i in xrange(0, len(source)):
     141          f.write(source[i:i+1])
     142  
     143  @with_open_mode("w")
     144  @with_sizes("medium")
     145  def write_small_chunks(f, source):
     146      """ write 20 units at a time """
     147      for i in xrange(0, len(source), 20):
     148          f.write(source[i:i+20])
     149  
     150  @with_open_mode("w")
     151  @with_sizes("medium")
     152  def write_medium_chunks(f, source):
     153      """ write 4096 units at a time """
     154      for i in xrange(0, len(source), 4096):
     155          f.write(source[i:i+4096])
     156  
     157  @with_open_mode("w")
     158  @with_sizes("large")
     159  def write_large_chunks(f, source):
     160      """ write 1e6 units at a time """
     161      for i in xrange(0, len(source), 1000000):
     162          f.write(source[i:i+1000000])
     163  
     164  
     165  @with_open_mode("w+")
     166  @with_sizes("small")
     167  def modify_bytewise(f, source):
     168      """ modify one unit at a time """
     169      f.seek(0)
     170      for i in xrange(0, len(source)):
     171          f.write(source[i:i+1])
     172  
     173  @with_open_mode("w+")
     174  @with_sizes("medium")
     175  def modify_small_chunks(f, source):
     176      """ modify 20 units at a time """
     177      f.seek(0)
     178      for i in xrange(0, len(source), 20):
     179          f.write(source[i:i+20])
     180  
     181  @with_open_mode("w+")
     182  @with_sizes("medium")
     183  def modify_medium_chunks(f, source):
     184      """ modify 4096 units at a time """
     185      f.seek(0)
     186      for i in xrange(0, len(source), 4096):
     187          f.write(source[i:i+4096])
     188  
     189  @with_open_mode("wb+")
     190  @with_sizes("medium")
     191  def modify_seek_forward_bytewise(f, source):
     192      """ alternate write & seek one unit """
     193      f.seek(0)
     194      for i in xrange(0, len(source), 2):
     195          f.write(source[i:i+1])
     196          f.seek(i+2)
     197  
     198  @with_open_mode("wb+")
     199  @with_sizes("medium")
     200  def modify_seek_forward_blockwise(f, source):
     201      """ alternate write & seek 1000 units """
     202      f.seek(0)
     203      for i in xrange(0, len(source), 2000):
     204          f.write(source[i:i+1000])
     205          f.seek(i+2000)
     206  
     207  # XXX the 2 following tests don't work with py3k's text IO
     208  @with_open_mode("wb+")
     209  @with_sizes("medium")
     210  def read_modify_bytewise(f, source):
     211      """ alternate read & write one unit """
     212      f.seek(0)
     213      for i in xrange(0, len(source), 2):
     214          f.read(1)
     215          f.write(source[i+1:i+2])
     216  
     217  @with_open_mode("wb+")
     218  @with_sizes("medium")
     219  def read_modify_blockwise(f, source):
     220      """ alternate read & write 1000 units """
     221      f.seek(0)
     222      for i in xrange(0, len(source), 2000):
     223          f.read(1000)
     224          f.write(source[i+1000:i+2000])
     225  
     226  
     227  read_tests = [
     228      read_bytewise, read_small_chunks, read_lines, read_big_chunks,
     229      None, read_whole_file, None,
     230      seek_forward_bytewise, seek_forward_blockwise,
     231      read_seek_bytewise, read_seek_blockwise,
     232  ]
     233  
     234  write_tests = [
     235      write_bytewise, write_small_chunks, write_medium_chunks, write_large_chunks,
     236  ]
     237  
     238  modify_tests = [
     239      modify_bytewise, modify_small_chunks, modify_medium_chunks,
     240      None,
     241      modify_seek_forward_bytewise, modify_seek_forward_blockwise,
     242      read_modify_bytewise, read_modify_blockwise,
     243  ]
     244  
     245  def run_during(duration, func):
     246      _t = time.time
     247      n = 0
     248      start = os.times()
     249      start_timestamp = _t()
     250      real_start = start[4] or start_timestamp
     251      while True:
     252          func()
     253          n += 1
     254          if _t() - start_timestamp > duration:
     255              break
     256      end = os.times()
     257      real = (end[4] if start[4] else time.time()) - real_start
     258      return n, real, sum(end[0:2]) - sum(start[0:2])
     259  
     260  def warm_cache(filename):
     261      with open(filename, "rb") as f:
     262          f.read()
     263  
     264  
     265  def run_all_tests(options):
     266      def print_label(filename, func):
     267          name = re.split(r'[-.]', filename)[0]
     268          out.write(
     269              ("[%s] %s... "
     270                  % (name.center(7), func.__doc__.strip())
     271              ).ljust(52))
     272          out.flush()
     273  
     274      def print_results(size, n, real, cpu):
     275          bw = n * float(size) / 1024 ** 2 / real
     276          bw = ("%4d MiB/s" if bw > 100 else "%.3g MiB/s") % bw
     277          out.write(bw.rjust(12) + "\n")
     278          if cpu < 0.90 * real:
     279              out.write("   warning: test above used only %d%% CPU, "
     280                  "result may be flawed!\n" % (100.0 * cpu / real))
     281  
     282      def run_one_test(name, size, open_func, test_func, *args):
     283          mode = test_func.file_open_mode
     284          print_label(name, test_func)
     285          if "w" not in mode or "+" in mode:
     286              warm_cache(name)
     287          with open_func(name) as f:
     288              n, real, cpu = run_during(1.5, lambda: test_func(f, *args))
     289          print_results(size, n, real, cpu)
     290  
     291      def run_test_family(tests, mode_filter, files, open_func, *make_args):
     292          for test_func in tests:
     293              if test_func is None:
     294                  out.write("\n")
     295                  continue
     296              if mode_filter in test_func.file_open_mode:
     297                  continue
     298              for s in test_func.file_sizes:
     299                  name, size = files[size_names[s]]
     300                  #name += file_ext
     301                  args = tuple(f(name, size) for f in make_args)
     302                  run_one_test(name, size,
     303                      open_func, test_func, *args)
     304  
     305      size_names = {
     306          "small": 0,
     307          "medium": 1,
     308          "large": 2,
     309      }
     310  
     311      print("Python %s" % sys.version)
     312      if sys.version_info < (3, 3):
     313          if sys.maxunicode > 0xffff:
     314              text = "UCS-4 (wide build)"
     315          else:
     316              text = "UTF-16 (narrow build)"
     317      else:
     318          text = "PEP 393"
     319      print("Unicode: %s" % text)
     320      print(platform.platform())
     321      binary_files = list(get_binary_files())
     322      text_files = list(get_text_files())
     323      if "b" in options:
     324          print("Binary unit = one byte")
     325      if "t" in options:
     326          print("Text unit = one character (%s-decoded)" % TEXT_ENCODING)
     327  
     328      # Binary reads
     329      if "b" in options and "r" in options:
     330          print("\n** Binary input **\n")
     331          run_test_family(read_tests, "t", binary_files, lambda fn: open(fn, "rb"))
     332  
     333      # Text reads
     334      if "t" in options and "r" in options:
     335          print("\n** Text input **\n")
     336          run_test_family(read_tests, "b", text_files, lambda fn: text_open(fn, "r"))
     337  
     338      # Binary writes
     339      if "b" in options and "w" in options:
     340          print("\n** Binary append **\n")
     341          def make_test_source(name, size):
     342              with open(name, "rb") as f:
     343                  return f.read()
     344          run_test_family(write_tests, "t", binary_files,
     345              lambda fn: open(os.devnull, "wb"), make_test_source)
     346  
     347      # Text writes
     348      if "t" in options and "w" in options:
     349          print("\n** Text append **\n")
     350          def make_test_source(name, size):
     351              with text_open(name, "r") as f:
     352                  return f.read()
     353          run_test_family(write_tests, "b", text_files,
     354              lambda fn: text_open(os.devnull, "w"), make_test_source)
     355  
     356      # Binary overwrites
     357      if "b" in options and "w" in options:
     358          print("\n** Binary overwrite **\n")
     359          def make_test_source(name, size):
     360              with open(name, "rb") as f:
     361                  return f.read()
     362          run_test_family(modify_tests, "t", binary_files,
     363              lambda fn: open(fn, "r+b"), make_test_source)
     364  
     365      # Text overwrites
     366      if "t" in options and "w" in options:
     367          print("\n** Text overwrite **\n")
     368          def make_test_source(name, size):
     369              with text_open(name, "r") as f:
     370                  return f.read()
     371          run_test_family(modify_tests, "b", text_files,
     372              lambda fn: text_open(fn, "r+"), make_test_source)
     373  
     374  
     375  def prepare_files():
     376      print("Preparing files...")
     377      # Binary files
     378      for name, size in get_binary_files():
     379          if os.path.isfile(name) and os.path.getsize(name) == size:
     380              continue
     381          with open(name, "wb") as f:
     382              f.write(os.urandom(size))
     383      # Text files
     384      chunk = []
     385      with text_open(__file__, "r", encoding='utf8') as f:
     386          for line in f:
     387              if line.startswith("# <iobench text chunk marker>"):
     388                  break
     389          else:
     390              raise RuntimeError(
     391                  "Couldn't find chunk marker in %s !" % __file__)
     392          if NEWLINES == "all":
     393              it = itertools.cycle(["\n", "\r", "\r\n"])
     394          else:
     395              it = itertools.repeat(
     396                  {"cr": "\r", "lf": "\n", "crlf": "\r\n"}[NEWLINES])
     397          chunk = "".join(line.replace("\n", next(it)) for line in f)
     398          if isinstance(chunk, bytes):
     399              chunk = chunk.decode('utf8')
     400          chunk = chunk.encode(TEXT_ENCODING)
     401      for name, size in get_text_files():
     402          if os.path.isfile(name) and os.path.getsize(name) == size:
     403              continue
     404          head = chunk * (size // len(chunk))
     405          tail = chunk[:size % len(chunk)]
     406          # Adjust tail to end on a character boundary
     407          while True:
     408              try:
     409                  tail.decode(TEXT_ENCODING)
     410                  break
     411              except UnicodeDecodeError:
     412                  tail = tail[:-1]
     413          with open(name, "wb") as f:
     414              f.write(head)
     415              f.write(tail)
     416  
     417  def main():
     418      global TEXT_ENCODING, NEWLINES
     419  
     420      usage = "usage: %prog [-h|--help] [options]"
     421      parser = OptionParser(usage=usage)
     422      parser.add_option("-b", "--binary",
     423                        action="store_true", dest="binary", default=False,
     424                        help="run binary I/O tests")
     425      parser.add_option("-t", "--text",
     426                        action="store_true", dest="text", default=False,
     427                        help="run text I/O tests")
     428      parser.add_option("-r", "--read",
     429                        action="store_true", dest="read", default=False,
     430                        help="run read tests")
     431      parser.add_option("-w", "--write",
     432                        action="store_true", dest="write", default=False,
     433                        help="run write & modify tests")
     434      parser.add_option("-E", "--encoding",
     435                        action="store", dest="encoding", default=None,
     436                        help="encoding for text tests (default: %s)" % TEXT_ENCODING)
     437      parser.add_option("-N", "--newlines",
     438                        action="store", dest="newlines", default='lf',
     439                        help="line endings for text tests "
     440                             "(one of: {lf (default), cr, crlf, all})")
     441      parser.add_option("-m", "--io-module",
     442                        action="store", dest="io_module", default=None,
     443                        help="io module to test (default: builtin open())")
     444      options, args = parser.parse_args()
     445      if args:
     446          parser.error("unexpected arguments")
     447      NEWLINES = options.newlines.lower()
     448      if NEWLINES not in ('lf', 'cr', 'crlf', 'all'):
     449          parser.error("invalid 'newlines' option: %r" % NEWLINES)
     450  
     451      test_options = ""
     452      if options.read:
     453          test_options += "r"
     454      if options.write:
     455          test_options += "w"
     456      elif not options.read:
     457          test_options += "rw"
     458      if options.text:
     459          test_options += "t"
     460      if options.binary:
     461          test_options += "b"
     462      elif not options.text:
     463          test_options += "tb"
     464  
     465      if options.encoding:
     466          TEXT_ENCODING = options.encoding
     467  
     468      if options.io_module:
     469          globals()['open'] = __import__(options.io_module, {}, {}, ['open']).open
     470  
     471      prepare_files()
     472      run_all_tests(test_options)
     473  
     474  if __name__ == "__main__":
     475      main()
     476  
     477  
     478  # -- This part to exercise text reading. Don't change anything! --
     479  # <iobench text chunk marker>
     480  
     481  """
     482  1.
     483  Gáttir allar,
     484  áðr gangi fram,
     485  um skoðask skyli,
     486  um skyggnast skyli,
     487  því at óvíst er at vita,
     488  hvar óvinir
     489  sitja á fleti fyrir.
     490  
     491  2.
     492  Gefendr heilir!
     493  Gestr er inn kominn,
     494  hvar skal sitja sjá?
     495  Mjök er bráðr,
     496  sá er á bröndum skal
     497  síns of freista frama.
     498  
     499  3.
     500  Elds er þörf,
     501  þeims inn er kominn
     502  ok á kné kalinn;
     503  matar ok váða
     504  er manni þörf,
     505  þeim er hefr um fjall farit.
     506  
     507  4.
     508  Vatns er þörf,
     509  þeim er til verðar kemr,
     510  þerru ok þjóðlaðar,
     511  góðs of æðis,
     512  ef sér geta mætti,
     513  orðs ok endrþögu.
     514  
     515  5.
     516  Vits er þörf,
     517  þeim er víða ratar;
     518  dælt er heima hvat;
     519  at augabragði verðr,
     520  sá er ekki kann
     521  ok með snotrum sitr.
     522  
     523  6.
     524  At hyggjandi sinni
     525  skyli-t maðr hræsinn vera,
     526  heldr gætinn at geði;
     527  þá er horskr ok þögull
     528  kemr heimisgarða til,
     529  sjaldan verðr víti vörum,
     530  því at óbrigðra vin
     531  fær maðr aldregi
     532  en mannvit mikit.
     533  
     534  7.
     535  Inn vari gestr,
     536  er til verðar kemr,
     537  þunnu hljóði þegir,
     538  eyrum hlýðir,
     539  en augum skoðar;
     540  svá nýsisk fróðra hverr fyrir.
     541  
     542  8.
     543  Hinn er sæll,
     544  er sér of getr
     545  lof ok líknstafi;
     546  ódælla er við þat,
     547  er maðr eiga skal
     548  annars brjóstum í.
     549  """
     550  
     551  """
     552  C'est revenir tard, je le sens, sur un sujet trop rebattu et déjà presque oublié. Mon état, qui ne me permet plus aucun travail suivi, mon aversion pour le genre polémique, ont causé ma lenteur à écrire et ma répugnance à publier. J'aurais même tout à fait supprimé ces Lettres, ou plutôt je lie les aurais point écrites, s'il n'eût été question que de moi : Mais ma patrie ne m'est pas tellement devenue étrangère que je puisse voir tranquillement opprimer ses citoyens, surtout lorsqu'ils n'ont compromis leurs droits qu'en défendant ma cause. Je serais le dernier des hommes si dans une telle occasion j'écoutais un sentiment qui n'est plus ni douceur ni patience, mais faiblesse et lâcheté, dans celui qu'il empêche de remplir son devoir.
     553  Rien de moins important pour le public, j'en conviens, que la matière de ces lettres. La constitution d'une petite République, le sort d'un petit particulier, l'exposé de quelques injustices, la réfutation de quelques sophismes ; tout cela n'a rien en soi d'assez considérable pour mériter beaucoup de lecteurs : mais si mes sujets sont petits mes objets sont grands, et dignes de l'attention de tout honnête homme. Laissons Genève à sa place, et Rousseau dans sa dépression ; mais la religion, mais la liberté, la justice ! voilà, qui que vous soyez, ce qui n'est pas au-dessous de vous.
     554  Qu'on ne cherche pas même ici dans le style le dédommagement de l'aridité de la matière. Ceux que quelques traits heureux de ma plume ont si fort irrités trouveront de quoi s'apaiser dans ces lettres, L'honneur de défendre un opprimé eût enflammé mon coeur si j'avais parlé pour un autre. Réduit au triste emploi de me défendre moi-même, j'ai dû me borner à raisonner ; m'échauffer eût été m'avilir. J'aurai donc trouvé grâce en ce point devant ceux qui s'imaginent qu'il est essentiel à la vérité d'être dite froidement ; opinion que pourtant j'ai peine à comprendre. Lorsqu'une vive persuasion nous anime, le moyen d'employer un langage glacé ? Quand Archimède tout transporté courait nu dans les rues de Syracuse, en avait-il moins trouvé la vérité parce qu'il se passionnait pour elle ? Tout au contraire, celui qui la sent ne peut s'abstenir de l'adorer ; celui qui demeure froid ne l'a pas vue.
     555  Quoi qu'il en soit, je prie les lecteurs de vouloir bien mettre à part mon beau style, et d'examiner seulement si je raisonne bien ou mal ; car enfin, de cela seul qu'un auteur s'exprime en bons termes, je ne vois pas comment il peut s'ensuivre que cet auteur ne sait ce qu'il dit.
     556  """