1  """Running tests"""
       2  
       3  import sys
       4  import time
       5  import warnings
       6  
       7  from . import result
       8  from .case import _SubTest
       9  from .signals import registerResult
      10  
      11  __unittest = True
      12  
      13  
      14  class ESC[4;38;5;81m_WritelnDecorator(ESC[4;38;5;149mobject):
      15      """Used to decorate file-like objects with a handy 'writeln' method"""
      16      def __init__(self,stream):
      17          self.stream = stream
      18  
      19      def __getattr__(self, attr):
      20          if attr in ('stream', '__getstate__'):
      21              raise AttributeError(attr)
      22          return getattr(self.stream,attr)
      23  
      24      def writeln(self, arg=None):
      25          if arg:
      26              self.write(arg)
      27          self.write('\n') # text-mode streams translate to \r\n if needed
      28  
      29  
      30  class ESC[4;38;5;81mTextTestResult(ESC[4;38;5;149mresultESC[4;38;5;149m.ESC[4;38;5;149mTestResult):
      31      """A test result class that can print formatted text results to a stream.
      32  
      33      Used by TextTestRunner.
      34      """
      35      separator1 = '=' * 70
      36      separator2 = '-' * 70
      37  
      38      def __init__(self, stream, descriptions, verbosity, *, durations=None):
      39          """Construct a TextTestResult. Subclasses should accept **kwargs
      40          to ensure compatibility as the interface changes."""
      41          super(TextTestResult, self).__init__(stream, descriptions, verbosity)
      42          self.stream = stream
      43          self.showAll = verbosity > 1
      44          self.dots = verbosity == 1
      45          self.descriptions = descriptions
      46          self._newline = True
      47          self.durations = durations
      48  
      49      def getDescription(self, test):
      50          doc_first_line = test.shortDescription()
      51          if self.descriptions and doc_first_line:
      52              return '\n'.join((str(test), doc_first_line))
      53          else:
      54              return str(test)
      55  
      56      def startTest(self, test):
      57          super(TextTestResult, self).startTest(test)
      58          if self.showAll:
      59              self.stream.write(self.getDescription(test))
      60              self.stream.write(" ... ")
      61              self.stream.flush()
      62              self._newline = False
      63  
      64      def _write_status(self, test, status):
      65          is_subtest = isinstance(test, _SubTest)
      66          if is_subtest or self._newline:
      67              if not self._newline:
      68                  self.stream.writeln()
      69              if is_subtest:
      70                  self.stream.write("  ")
      71              self.stream.write(self.getDescription(test))
      72              self.stream.write(" ... ")
      73          self.stream.writeln(status)
      74          self.stream.flush()
      75          self._newline = True
      76  
      77      def addSubTest(self, test, subtest, err):
      78          if err is not None:
      79              if self.showAll:
      80                  if issubclass(err[0], subtest.failureException):
      81                      self._write_status(subtest, "FAIL")
      82                  else:
      83                      self._write_status(subtest, "ERROR")
      84              elif self.dots:
      85                  if issubclass(err[0], subtest.failureException):
      86                      self.stream.write('F')
      87                  else:
      88                      self.stream.write('E')
      89                  self.stream.flush()
      90          super(TextTestResult, self).addSubTest(test, subtest, err)
      91  
      92      def addSuccess(self, test):
      93          super(TextTestResult, self).addSuccess(test)
      94          if self.showAll:
      95              self._write_status(test, "ok")
      96          elif self.dots:
      97              self.stream.write('.')
      98              self.stream.flush()
      99  
     100      def addError(self, test, err):
     101          super(TextTestResult, self).addError(test, err)
     102          if self.showAll:
     103              self._write_status(test, "ERROR")
     104          elif self.dots:
     105              self.stream.write('E')
     106              self.stream.flush()
     107  
     108      def addFailure(self, test, err):
     109          super(TextTestResult, self).addFailure(test, err)
     110          if self.showAll:
     111              self._write_status(test, "FAIL")
     112          elif self.dots:
     113              self.stream.write('F')
     114              self.stream.flush()
     115  
     116      def addSkip(self, test, reason):
     117          super(TextTestResult, self).addSkip(test, reason)
     118          if self.showAll:
     119              self._write_status(test, "skipped {0!r}".format(reason))
     120          elif self.dots:
     121              self.stream.write("s")
     122              self.stream.flush()
     123  
     124      def addExpectedFailure(self, test, err):
     125          super(TextTestResult, self).addExpectedFailure(test, err)
     126          if self.showAll:
     127              self.stream.writeln("expected failure")
     128              self.stream.flush()
     129          elif self.dots:
     130              self.stream.write("x")
     131              self.stream.flush()
     132  
     133      def addUnexpectedSuccess(self, test):
     134          super(TextTestResult, self).addUnexpectedSuccess(test)
     135          if self.showAll:
     136              self.stream.writeln("unexpected success")
     137              self.stream.flush()
     138          elif self.dots:
     139              self.stream.write("u")
     140              self.stream.flush()
     141  
     142      def printErrors(self):
     143          if self.dots or self.showAll:
     144              self.stream.writeln()
     145              self.stream.flush()
     146          self.printErrorList('ERROR', self.errors)
     147          self.printErrorList('FAIL', self.failures)
     148          unexpectedSuccesses = getattr(self, 'unexpectedSuccesses', ())
     149          if unexpectedSuccesses:
     150              self.stream.writeln(self.separator1)
     151              for test in unexpectedSuccesses:
     152                  self.stream.writeln(f"UNEXPECTED SUCCESS: {self.getDescription(test)}")
     153              self.stream.flush()
     154  
     155      def printErrorList(self, flavour, errors):
     156          for test, err in errors:
     157              self.stream.writeln(self.separator1)
     158              self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
     159              self.stream.writeln(self.separator2)
     160              self.stream.writeln("%s" % err)
     161              self.stream.flush()
     162  
     163  
     164  class ESC[4;38;5;81mTextTestRunner(ESC[4;38;5;149mobject):
     165      """A test runner class that displays results in textual form.
     166  
     167      It prints out the names of tests as they are run, errors as they
     168      occur, and a summary of the results at the end of the test run.
     169      """
     170      resultclass = TextTestResult
     171  
     172      def __init__(self, stream=None, descriptions=True, verbosity=1,
     173                   failfast=False, buffer=False, resultclass=None, warnings=None,
     174                   *, tb_locals=False, durations=None):
     175          """Construct a TextTestRunner.
     176  
     177          Subclasses should accept **kwargs to ensure compatibility as the
     178          interface changes.
     179          """
     180          if stream is None:
     181              stream = sys.stderr
     182          self.stream = _WritelnDecorator(stream)
     183          self.descriptions = descriptions
     184          self.verbosity = verbosity
     185          self.failfast = failfast
     186          self.buffer = buffer
     187          self.tb_locals = tb_locals
     188          self.durations = durations
     189          self.warnings = warnings
     190          if resultclass is not None:
     191              self.resultclass = resultclass
     192  
     193      def _makeResult(self):
     194          try:
     195              return self.resultclass(self.stream, self.descriptions,
     196                                      self.verbosity, durations=self.durations)
     197          except TypeError:
     198              # didn't accept the durations argument
     199              return self.resultclass(self.stream, self.descriptions,
     200                                      self.verbosity)
     201  
     202      def _printDurations(self, result):
     203          if not result.collectedDurations:
     204              return
     205          ls = sorted(result.collectedDurations, key=lambda x: x[1],
     206                      reverse=True)
     207          if self.durations > 0:
     208              ls = ls[:self.durations]
     209          self.stream.writeln("Slowest test durations")
     210          if hasattr(result, 'separator2'):
     211              self.stream.writeln(result.separator2)
     212          hidden = False
     213          for test, elapsed in ls:
     214              if self.verbosity < 2 and elapsed < 0.001:
     215                  hidden = True
     216                  continue
     217              self.stream.writeln("%-10s %s" % ("%.3fs" % elapsed, test))
     218          if hidden:
     219              self.stream.writeln("\n(durations < 0.001s were hidden; "
     220                                  "use -v to show these durations)")
     221          else:
     222              self.stream.writeln("")
     223  
     224      def run(self, test):
     225          "Run the given test case or test suite."
     226          result = self._makeResult()
     227          registerResult(result)
     228          result.failfast = self.failfast
     229          result.buffer = self.buffer
     230          result.tb_locals = self.tb_locals
     231          with warnings.catch_warnings():
     232              if self.warnings:
     233                  # if self.warnings is set, use it to filter all the warnings
     234                  warnings.simplefilter(self.warnings)
     235              startTime = time.perf_counter()
     236              startTestRun = getattr(result, 'startTestRun', None)
     237              if startTestRun is not None:
     238                  startTestRun()
     239              try:
     240                  test(result)
     241              finally:
     242                  stopTestRun = getattr(result, 'stopTestRun', None)
     243                  if stopTestRun is not None:
     244                      stopTestRun()
     245              stopTime = time.perf_counter()
     246          timeTaken = stopTime - startTime
     247          result.printErrors()
     248          if self.durations is not None:
     249              self._printDurations(result)
     250  
     251          if hasattr(result, 'separator2'):
     252              self.stream.writeln(result.separator2)
     253  
     254          run = result.testsRun
     255          self.stream.writeln("Ran %d test%s in %.3fs" %
     256                              (run, run != 1 and "s" or "", timeTaken))
     257          self.stream.writeln()
     258  
     259          expectedFails = unexpectedSuccesses = skipped = 0
     260          try:
     261              results = map(len, (result.expectedFailures,
     262                                  result.unexpectedSuccesses,
     263                                  result.skipped))
     264          except AttributeError:
     265              pass
     266          else:
     267              expectedFails, unexpectedSuccesses, skipped = results
     268  
     269          infos = []
     270          if not result.wasSuccessful():
     271              self.stream.write("FAILED")
     272              failed, errored = len(result.failures), len(result.errors)
     273              if failed:
     274                  infos.append("failures=%d" % failed)
     275              if errored:
     276                  infos.append("errors=%d" % errored)
     277          elif run == 0:
     278              self.stream.write("NO TESTS RAN")
     279          else:
     280              self.stream.write("OK")
     281          if skipped:
     282              infos.append("skipped=%d" % skipped)
     283          if expectedFails:
     284              infos.append("expected failures=%d" % expectedFails)
     285          if unexpectedSuccesses:
     286              infos.append("unexpected successes=%d" % unexpectedSuccesses)
     287          if infos:
     288              self.stream.writeln(" (%s)" % (", ".join(infos),))
     289          else:
     290              self.stream.write("\n")
     291          self.stream.flush()
     292          return result