(root)/
Python-3.11.7/
Lib/
wsgiref/
validate.py
       1  # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
       2  # Licensed under the MIT license: https://opensource.org/licenses/mit-license.php
       3  # Also licenced under the Apache License, 2.0: https://opensource.org/licenses/apache2.0.php
       4  # Licensed to PSF under a Contributor Agreement
       5  """
       6  Middleware to check for obedience to the WSGI specification.
       7  
       8  Some of the things this checks:
       9  
      10  * Signature of the application and start_response (including that
      11    keyword arguments are not used).
      12  
      13  * Environment checks:
      14  
      15    - Environment is a dictionary (and not a subclass).
      16  
      17    - That all the required keys are in the environment: REQUEST_METHOD,
      18      SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors,
      19      wsgi.multithread, wsgi.multiprocess, wsgi.run_once
      20  
      21    - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the
      22      environment (these headers should appear as CONTENT_LENGTH and
      23      CONTENT_TYPE).
      24  
      25    - Warns if QUERY_STRING is missing, as the cgi module acts
      26      unpredictably in that case.
      27  
      28    - That CGI-style variables (that don't contain a .) have
      29      (non-unicode) string values
      30  
      31    - That wsgi.version is a tuple
      32  
      33    - That wsgi.url_scheme is 'http' or 'https' (@@: is this too
      34      restrictive?)
      35  
      36    - Warns if the REQUEST_METHOD is not known (@@: probably too
      37      restrictive).
      38  
      39    - That SCRIPT_NAME and PATH_INFO are empty or start with /
      40  
      41    - That at least one of SCRIPT_NAME or PATH_INFO are set.
      42  
      43    - That CONTENT_LENGTH is a positive integer.
      44  
      45    - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should
      46      be '/').
      47  
      48    - That wsgi.input has the methods read, readline, readlines, and
      49      __iter__
      50  
      51    - That wsgi.errors has the methods flush, write, writelines
      52  
      53  * The status is a string, contains a space, starts with an integer,
      54    and that integer is in range (> 100).
      55  
      56  * That the headers is a list (not a subclass, not another kind of
      57    sequence).
      58  
      59  * That the items of the headers are tuples of strings.
      60  
      61  * That there is no 'status' header (that is used in CGI, but not in
      62    WSGI).
      63  
      64  * That the headers don't contain newlines or colons, end in _ or -, or
      65    contain characters codes below 037.
      66  
      67  * That Content-Type is given if there is content (CGI often has a
      68    default content type, but WSGI does not).
      69  
      70  * That no Content-Type is given when there is no content (@@: is this
      71    too restrictive?)
      72  
      73  * That the exc_info argument to start_response is a tuple or None.
      74  
      75  * That all calls to the writer are with strings, and no other methods
      76    on the writer are accessed.
      77  
      78  * That wsgi.input is used properly:
      79  
      80    - .read() is called with exactly one argument
      81  
      82    - That it returns a string
      83  
      84    - That readline, readlines, and __iter__ return strings
      85  
      86    - That .close() is not called
      87  
      88    - No other methods are provided
      89  
      90  * That wsgi.errors is used properly:
      91  
      92    - .write() and .writelines() is called with a string
      93  
      94    - That .close() is not called, and no other methods are provided.
      95  
      96  * The response iterator:
      97  
      98    - That it is not a string (it should be a list of a single string; a
      99      string will work, but perform horribly).
     100  
     101    - That .__next__() returns a string
     102  
     103    - That the iterator is not iterated over until start_response has
     104      been called (that can signal either a server or application
     105      error).
     106  
     107    - That .close() is called (doesn't raise exception, only prints to
     108      sys.stderr, because we only know it isn't called when the object
     109      is garbage collected).
     110  """
     111  __all__ = ['validator']
     112  
     113  
     114  import re
     115  import sys
     116  import warnings
     117  
     118  header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$')
     119  bad_header_value_re = re.compile(r'[\000-\037]')
     120  
     121  class ESC[4;38;5;81mWSGIWarning(ESC[4;38;5;149mWarning):
     122      """
     123      Raised in response to WSGI-spec-related warnings
     124      """
     125  
     126  def assert_(cond, *args):
     127      if not cond:
     128          raise AssertionError(*args)
     129  
     130  def check_string_type(value, title):
     131      if type (value) is str:
     132          return value
     133      raise AssertionError(
     134          "{0} must be of type str (got {1})".format(title, repr(value)))
     135  
     136  def validator(application):
     137  
     138      """
     139      When applied between a WSGI server and a WSGI application, this
     140      middleware will check for WSGI compliance on a number of levels.
     141      This middleware does not modify the request or response in any
     142      way, but will raise an AssertionError if anything seems off
     143      (except for a failure to close the application iterator, which
     144      will be printed to stderr -- there's no way to raise an exception
     145      at that point).
     146      """
     147  
     148      def lint_app(*args, **kw):
     149          assert_(len(args) == 2, "Two arguments required")
     150          assert_(not kw, "No keyword arguments allowed")
     151          environ, start_response = args
     152  
     153          check_environ(environ)
     154  
     155          # We use this to check if the application returns without
     156          # calling start_response:
     157          start_response_started = []
     158  
     159          def start_response_wrapper(*args, **kw):
     160              assert_(len(args) == 2 or len(args) == 3, (
     161                  "Invalid number of arguments: %s" % (args,)))
     162              assert_(not kw, "No keyword arguments allowed")
     163              status = args[0]
     164              headers = args[1]
     165              if len(args) == 3:
     166                  exc_info = args[2]
     167              else:
     168                  exc_info = None
     169  
     170              check_status(status)
     171              check_headers(headers)
     172              check_content_type(status, headers)
     173              check_exc_info(exc_info)
     174  
     175              start_response_started.append(None)
     176              return WriteWrapper(start_response(*args))
     177  
     178          environ['wsgi.input'] = InputWrapper(environ['wsgi.input'])
     179          environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors'])
     180  
     181          iterator = application(environ, start_response_wrapper)
     182          assert_(iterator is not None and iterator != False,
     183              "The application must return an iterator, if only an empty list")
     184  
     185          check_iterator(iterator)
     186  
     187          return IteratorWrapper(iterator, start_response_started)
     188  
     189      return lint_app
     190  
     191  class ESC[4;38;5;81mInputWrapper:
     192  
     193      def __init__(self, wsgi_input):
     194          self.input = wsgi_input
     195  
     196      def read(self, *args):
     197          assert_(len(args) == 1)
     198          v = self.input.read(*args)
     199          assert_(type(v) is bytes)
     200          return v
     201  
     202      def readline(self, *args):
     203          assert_(len(args) <= 1)
     204          v = self.input.readline(*args)
     205          assert_(type(v) is bytes)
     206          return v
     207  
     208      def readlines(self, *args):
     209          assert_(len(args) <= 1)
     210          lines = self.input.readlines(*args)
     211          assert_(type(lines) is list)
     212          for line in lines:
     213              assert_(type(line) is bytes)
     214          return lines
     215  
     216      def __iter__(self):
     217          while 1:
     218              line = self.readline()
     219              if not line:
     220                  return
     221              yield line
     222  
     223      def close(self):
     224          assert_(0, "input.close() must not be called")
     225  
     226  class ESC[4;38;5;81mErrorWrapper:
     227  
     228      def __init__(self, wsgi_errors):
     229          self.errors = wsgi_errors
     230  
     231      def write(self, s):
     232          assert_(type(s) is str)
     233          self.errors.write(s)
     234  
     235      def flush(self):
     236          self.errors.flush()
     237  
     238      def writelines(self, seq):
     239          for line in seq:
     240              self.write(line)
     241  
     242      def close(self):
     243          assert_(0, "errors.close() must not be called")
     244  
     245  class ESC[4;38;5;81mWriteWrapper:
     246  
     247      def __init__(self, wsgi_writer):
     248          self.writer = wsgi_writer
     249  
     250      def __call__(self, s):
     251          assert_(type(s) is bytes)
     252          self.writer(s)
     253  
     254  class ESC[4;38;5;81mPartialIteratorWrapper:
     255  
     256      def __init__(self, wsgi_iterator):
     257          self.iterator = wsgi_iterator
     258  
     259      def __iter__(self):
     260          # We want to make sure __iter__ is called
     261          return IteratorWrapper(self.iterator, None)
     262  
     263  class ESC[4;38;5;81mIteratorWrapper:
     264  
     265      def __init__(self, wsgi_iterator, check_start_response):
     266          self.original_iterator = wsgi_iterator
     267          self.iterator = iter(wsgi_iterator)
     268          self.closed = False
     269          self.check_start_response = check_start_response
     270  
     271      def __iter__(self):
     272          return self
     273  
     274      def __next__(self):
     275          assert_(not self.closed,
     276              "Iterator read after closed")
     277          v = next(self.iterator)
     278          if type(v) is not bytes:
     279              assert_(False, "Iterator yielded non-bytestring (%r)" % (v,))
     280          if self.check_start_response is not None:
     281              assert_(self.check_start_response,
     282                  "The application returns and we started iterating over its body, but start_response has not yet been called")
     283              self.check_start_response = None
     284          return v
     285  
     286      def close(self):
     287          self.closed = True
     288          if hasattr(self.original_iterator, 'close'):
     289              self.original_iterator.close()
     290  
     291      def __del__(self):
     292          if not self.closed:
     293              sys.stderr.write(
     294                  "Iterator garbage collected without being closed")
     295          assert_(self.closed,
     296              "Iterator garbage collected without being closed")
     297  
     298  def check_environ(environ):
     299      assert_(type(environ) is dict,
     300          "Environment is not of the right type: %r (environment: %r)"
     301          % (type(environ), environ))
     302  
     303      for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT',
     304                  'wsgi.version', 'wsgi.input', 'wsgi.errors',
     305                  'wsgi.multithread', 'wsgi.multiprocess',
     306                  'wsgi.run_once']:
     307          assert_(key in environ,
     308              "Environment missing required key: %r" % (key,))
     309  
     310      for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']:
     311          assert_(key not in environ,
     312              "Environment should not have the key: %s "
     313              "(use %s instead)" % (key, key[5:]))
     314  
     315      if 'QUERY_STRING' not in environ:
     316          warnings.warn(
     317              'QUERY_STRING is not in the WSGI environment; the cgi '
     318              'module will use sys.argv when this variable is missing, '
     319              'so application errors are more likely',
     320              WSGIWarning)
     321  
     322      for key in environ.keys():
     323          if '.' in key:
     324              # Extension, we don't care about its type
     325              continue
     326          assert_(type(environ[key]) is str,
     327              "Environmental variable %s is not a string: %r (value: %r)"
     328              % (key, type(environ[key]), environ[key]))
     329  
     330      assert_(type(environ['wsgi.version']) is tuple,
     331          "wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],))
     332      assert_(environ['wsgi.url_scheme'] in ('http', 'https'),
     333          "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'])
     334  
     335      check_input(environ['wsgi.input'])
     336      check_errors(environ['wsgi.errors'])
     337  
     338      # @@: these need filling out:
     339      if environ['REQUEST_METHOD'] not in (
     340          'GET', 'HEAD', 'POST', 'OPTIONS', 'PATCH', 'PUT', 'DELETE', 'TRACE'):
     341          warnings.warn(
     342              "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'],
     343              WSGIWarning)
     344  
     345      assert_(not environ.get('SCRIPT_NAME')
     346              or environ['SCRIPT_NAME'].startswith('/'),
     347          "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME'])
     348      assert_(not environ.get('PATH_INFO')
     349              or environ['PATH_INFO'].startswith('/'),
     350          "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO'])
     351      if environ.get('CONTENT_LENGTH'):
     352          assert_(int(environ['CONTENT_LENGTH']) >= 0,
     353              "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH'])
     354  
     355      if not environ.get('SCRIPT_NAME'):
     356          assert_('PATH_INFO' in environ,
     357              "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO "
     358              "should at least be '/' if SCRIPT_NAME is empty)")
     359      assert_(environ.get('SCRIPT_NAME') != '/',
     360          "SCRIPT_NAME cannot be '/'; it should instead be '', and "
     361          "PATH_INFO should be '/'")
     362  
     363  def check_input(wsgi_input):
     364      for attr in ['read', 'readline', 'readlines', '__iter__']:
     365          assert_(hasattr(wsgi_input, attr),
     366              "wsgi.input (%r) doesn't have the attribute %s"
     367              % (wsgi_input, attr))
     368  
     369  def check_errors(wsgi_errors):
     370      for attr in ['flush', 'write', 'writelines']:
     371          assert_(hasattr(wsgi_errors, attr),
     372              "wsgi.errors (%r) doesn't have the attribute %s"
     373              % (wsgi_errors, attr))
     374  
     375  def check_status(status):
     376      status = check_string_type(status, "Status")
     377      # Implicitly check that we can turn it into an integer:
     378      status_code = status.split(None, 1)[0]
     379      assert_(len(status_code) == 3,
     380          "Status codes must be three characters: %r" % status_code)
     381      status_int = int(status_code)
     382      assert_(status_int >= 100, "Status code is invalid: %r" % status_int)
     383      if len(status) < 4 or status[3] != ' ':
     384          warnings.warn(
     385              "The status string (%r) should be a three-digit integer "
     386              "followed by a single space and a status explanation"
     387              % status, WSGIWarning)
     388  
     389  def check_headers(headers):
     390      assert_(type(headers) is list,
     391          "Headers (%r) must be of type list: %r"
     392          % (headers, type(headers)))
     393      for item in headers:
     394          assert_(type(item) is tuple,
     395              "Individual headers (%r) must be of type tuple: %r"
     396              % (item, type(item)))
     397          assert_(len(item) == 2)
     398          name, value = item
     399          name = check_string_type(name, "Header name")
     400          value = check_string_type(value, "Header value")
     401          assert_(name.lower() != 'status',
     402              "The Status header cannot be used; it conflicts with CGI "
     403              "script, and HTTP status is not given through headers "
     404              "(value: %r)." % value)
     405          assert_('\n' not in name and ':' not in name,
     406              "Header names may not contain ':' or '\\n': %r" % name)
     407          assert_(header_re.search(name), "Bad header name: %r" % name)
     408          assert_(not name.endswith('-') and not name.endswith('_'),
     409              "Names may not end in '-' or '_': %r" % name)
     410          if bad_header_value_re.search(value):
     411              assert_(0, "Bad header value: %r (bad char: %r)"
     412              % (value, bad_header_value_re.search(value).group(0)))
     413  
     414  def check_content_type(status, headers):
     415      status = check_string_type(status, "Status")
     416      code = int(status.split(None, 1)[0])
     417      # @@: need one more person to verify this interpretation of RFC 2616
     418      #     http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
     419      NO_MESSAGE_BODY = (204, 304)
     420      for name, value in headers:
     421          name = check_string_type(name, "Header name")
     422          if name.lower() == 'content-type':
     423              if code not in NO_MESSAGE_BODY:
     424                  return
     425              assert_(0, ("Content-Type header found in a %s response, "
     426                          "which must not return content.") % code)
     427      if code not in NO_MESSAGE_BODY:
     428          assert_(0, "No Content-Type header found in headers (%s)" % headers)
     429  
     430  def check_exc_info(exc_info):
     431      assert_(exc_info is None or type(exc_info) is tuple,
     432          "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)))
     433      # More exc_info checks?
     434  
     435  def check_iterator(iterator):
     436      # Technically a bytestring is legal, which is why it's a really bad
     437      # idea, because it may cause the response to be returned
     438      # character-by-character
     439      assert_(not isinstance(iterator, (str, bytes)),
     440          "You should not return a string as your application iterator, "
     441          "instead return a single-item list containing a bytestring.")