1  """Test result object"""
       2  
       3  import io
       4  import sys
       5  import traceback
       6  
       7  from . import util
       8  from functools import wraps
       9  
      10  __unittest = True
      11  
      12  def failfast(method):
      13      @wraps(method)
      14      def inner(self, *args, **kw):
      15          if getattr(self, 'failfast', False):
      16              self.stop()
      17          return method(self, *args, **kw)
      18      return inner
      19  
      20  STDOUT_LINE = '\nStdout:\n%s'
      21  STDERR_LINE = '\nStderr:\n%s'
      22  
      23  
      24  class ESC[4;38;5;81mTestResult(ESC[4;38;5;149mobject):
      25      """Holder for test result information.
      26  
      27      Test results are automatically managed by the TestCase and TestSuite
      28      classes, and do not need to be explicitly manipulated by writers of tests.
      29  
      30      Each instance holds the total number of tests run, and collections of
      31      failures and errors that occurred among those test runs. The collections
      32      contain tuples of (testcase, exceptioninfo), where exceptioninfo is the
      33      formatted traceback of the error that occurred.
      34      """
      35      _previousTestClass = None
      36      _testRunEntered = False
      37      _moduleSetUpFailed = False
      38      def __init__(self, stream=None, descriptions=None, verbosity=None):
      39          self.failfast = False
      40          self.failures = []
      41          self.errors = []
      42          self.testsRun = 0
      43          self.skipped = []
      44          self.expectedFailures = []
      45          self.unexpectedSuccesses = []
      46          self.collectedDurations = []
      47          self.shouldStop = False
      48          self.buffer = False
      49          self.tb_locals = False
      50          self._stdout_buffer = None
      51          self._stderr_buffer = None
      52          self._original_stdout = sys.stdout
      53          self._original_stderr = sys.stderr
      54          self._mirrorOutput = False
      55  
      56      def printErrors(self):
      57          "Called by TestRunner after test run"
      58  
      59      def startTest(self, test):
      60          "Called when the given test is about to be run"
      61          self.testsRun += 1
      62          self._mirrorOutput = False
      63          self._setupStdout()
      64  
      65      def _setupStdout(self):
      66          if self.buffer:
      67              if self._stderr_buffer is None:
      68                  self._stderr_buffer = io.StringIO()
      69                  self._stdout_buffer = io.StringIO()
      70              sys.stdout = self._stdout_buffer
      71              sys.stderr = self._stderr_buffer
      72  
      73      def startTestRun(self):
      74          """Called once before any tests are executed.
      75  
      76          See startTest for a method called before each test.
      77          """
      78  
      79      def stopTest(self, test):
      80          """Called when the given test has been run"""
      81          self._restoreStdout()
      82          self._mirrorOutput = False
      83  
      84      def _restoreStdout(self):
      85          if self.buffer:
      86              if self._mirrorOutput:
      87                  output = sys.stdout.getvalue()
      88                  error = sys.stderr.getvalue()
      89                  if output:
      90                      if not output.endswith('\n'):
      91                          output += '\n'
      92                      self._original_stdout.write(STDOUT_LINE % output)
      93                  if error:
      94                      if not error.endswith('\n'):
      95                          error += '\n'
      96                      self._original_stderr.write(STDERR_LINE % error)
      97  
      98              sys.stdout = self._original_stdout
      99              sys.stderr = self._original_stderr
     100              self._stdout_buffer.seek(0)
     101              self._stdout_buffer.truncate()
     102              self._stderr_buffer.seek(0)
     103              self._stderr_buffer.truncate()
     104  
     105      def stopTestRun(self):
     106          """Called once after all tests are executed.
     107  
     108          See stopTest for a method called after each test.
     109          """
     110  
     111      @failfast
     112      def addError(self, test, err):
     113          """Called when an error has occurred. 'err' is a tuple of values as
     114          returned by sys.exc_info().
     115          """
     116          self.errors.append((test, self._exc_info_to_string(err, test)))
     117          self._mirrorOutput = True
     118  
     119      @failfast
     120      def addFailure(self, test, err):
     121          """Called when an error has occurred. 'err' is a tuple of values as
     122          returned by sys.exc_info()."""
     123          self.failures.append((test, self._exc_info_to_string(err, test)))
     124          self._mirrorOutput = True
     125  
     126      def addSubTest(self, test, subtest, err):
     127          """Called at the end of a subtest.
     128          'err' is None if the subtest ended successfully, otherwise it's a
     129          tuple of values as returned by sys.exc_info().
     130          """
     131          # By default, we don't do anything with successful subtests, but
     132          # more sophisticated test results might want to record them.
     133          if err is not None:
     134              if getattr(self, 'failfast', False):
     135                  self.stop()
     136              if issubclass(err[0], test.failureException):
     137                  errors = self.failures
     138              else:
     139                  errors = self.errors
     140              errors.append((subtest, self._exc_info_to_string(err, test)))
     141              self._mirrorOutput = True
     142  
     143      def addSuccess(self, test):
     144          "Called when a test has completed successfully"
     145          pass
     146  
     147      def addSkip(self, test, reason):
     148          """Called when a test is skipped."""
     149          self.skipped.append((test, reason))
     150  
     151      def addExpectedFailure(self, test, err):
     152          """Called when an expected failure/error occurred."""
     153          self.expectedFailures.append(
     154              (test, self._exc_info_to_string(err, test)))
     155  
     156      @failfast
     157      def addUnexpectedSuccess(self, test):
     158          """Called when a test was expected to fail, but succeed."""
     159          self.unexpectedSuccesses.append(test)
     160  
     161      def addDuration(self, test, elapsed):
     162          """Called when a test finished to run, regardless of its outcome.
     163          *test* is the test case corresponding to the test method.
     164          *elapsed* is the time represented in seconds, and it includes the
     165          execution of cleanup functions.
     166          """
     167          # support for a TextTestRunner using an old TestResult class
     168          if hasattr(self, "collectedDurations"):
     169              # Pass test repr and not the test object itself to avoid resources leak
     170              self.collectedDurations.append((str(test), elapsed))
     171  
     172      def wasSuccessful(self):
     173          """Tells whether or not this result was a success."""
     174          # The hasattr check is for test_result's OldResult test.  That
     175          # way this method works on objects that lack the attribute.
     176          # (where would such result instances come from? old stored pickles?)
     177          return ((len(self.failures) == len(self.errors) == 0) and
     178                  (not hasattr(self, 'unexpectedSuccesses') or
     179                   len(self.unexpectedSuccesses) == 0))
     180  
     181      def stop(self):
     182          """Indicates that the tests should be aborted."""
     183          self.shouldStop = True
     184  
     185      def _exc_info_to_string(self, err, test):
     186          """Converts a sys.exc_info()-style tuple of values into a string."""
     187          exctype, value, tb = err
     188          tb = self._clean_tracebacks(exctype, value, tb, test)
     189          tb_e = traceback.TracebackException(
     190              exctype, value, tb,
     191              capture_locals=self.tb_locals, compact=True)
     192          msgLines = list(tb_e.format())
     193  
     194          if self.buffer:
     195              output = sys.stdout.getvalue()
     196              error = sys.stderr.getvalue()
     197              if output:
     198                  if not output.endswith('\n'):
     199                      output += '\n'
     200                  msgLines.append(STDOUT_LINE % output)
     201              if error:
     202                  if not error.endswith('\n'):
     203                      error += '\n'
     204                  msgLines.append(STDERR_LINE % error)
     205          return ''.join(msgLines)
     206  
     207      def _clean_tracebacks(self, exctype, value, tb, test):
     208          ret = None
     209          first = True
     210          excs = [(exctype, value, tb)]
     211          seen = {id(value)}  # Detect loops in chained exceptions.
     212          while excs:
     213              (exctype, value, tb) = excs.pop()
     214              # Skip test runner traceback levels
     215              while tb and self._is_relevant_tb_level(tb):
     216                  tb = tb.tb_next
     217  
     218              # Skip assert*() traceback levels
     219              if exctype is test.failureException:
     220                  self._remove_unittest_tb_frames(tb)
     221  
     222              if first:
     223                  ret = tb
     224                  first = False
     225              else:
     226                  value.__traceback__ = tb
     227  
     228              if value is not None:
     229                  for c in (value.__cause__, value.__context__):
     230                      if c is not None and id(c) not in seen:
     231                          excs.append((type(c), c, c.__traceback__))
     232                          seen.add(id(c))
     233          return ret
     234  
     235      def _is_relevant_tb_level(self, tb):
     236          return '__unittest' in tb.tb_frame.f_globals
     237  
     238      def _remove_unittest_tb_frames(self, tb):
     239          '''Truncates usercode tb at the first unittest frame.
     240  
     241          If the first frame of the traceback is in user code,
     242          the prefix up to the first unittest frame is returned.
     243          If the first frame is already in the unittest module,
     244          the traceback is not modified.
     245          '''
     246          prev = None
     247          while tb and not self._is_relevant_tb_level(tb):
     248              prev = tb
     249              tb = tb.tb_next
     250          if prev is not None:
     251              prev.tb_next = None
     252  
     253      def __repr__(self):
     254          return ("<%s run=%i errors=%i failures=%i>" %
     255                 (util.strclass(self.__class__), self.testsRun, len(self.errors),
     256                  len(self.failures)))