(root)/
Python-3.12.0/
Lib/
wsgiref/
validate.py
       1  # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
       2  # Licensed under the MIT license: https://opensource.org/licenses/mit-license.php
       3  # Also licenced under the Apache License, 2.0: https://opensource.org/licenses/apache2.0.php
       4  # Licensed to PSF under a Contributor Agreement
       5  """
       6  Middleware to check for obedience to the WSGI specification.
       7  
       8  Some of the things this checks:
       9  
      10  * Signature of the application and start_response (including that
      11    keyword arguments are not used).
      12  
      13  * Environment checks:
      14  
      15    - Environment is a dictionary (and not a subclass).
      16  
      17    - That all the required keys are in the environment: REQUEST_METHOD,
      18      SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors,
      19      wsgi.multithread, wsgi.multiprocess, wsgi.run_once
      20  
      21    - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the
      22      environment (these headers should appear as CONTENT_LENGTH and
      23      CONTENT_TYPE).
      24  
      25    - Warns if QUERY_STRING is missing, as the cgi module acts
      26      unpredictably in that case.
      27  
      28    - That CGI-style variables (that don't contain a .) have
      29      (non-unicode) string values
      30  
      31    - That wsgi.version is a tuple
      32  
      33    - That wsgi.url_scheme is 'http' or 'https' (@@: is this too
      34      restrictive?)
      35  
      36    - Warns if the REQUEST_METHOD is not known (@@: probably too
      37      restrictive).
      38  
      39    - That SCRIPT_NAME and PATH_INFO are empty or start with /
      40  
      41    - That at least one of SCRIPT_NAME or PATH_INFO are set.
      42  
      43    - That CONTENT_LENGTH is a positive integer.
      44  
      45    - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should
      46      be '/').
      47  
      48    - That wsgi.input has the methods read, readline, readlines, and
      49      __iter__
      50  
      51    - That wsgi.errors has the methods flush, write, writelines
      52  
      53  * The status is a string, contains a space, starts with an integer,
      54    and that integer is in range (> 100).
      55  
      56  * That the headers is a list (not a subclass, not another kind of
      57    sequence).
      58  
      59  * That the items of the headers are tuples of strings.
      60  
      61  * That there is no 'status' header (that is used in CGI, but not in
      62    WSGI).
      63  
      64  * That the headers don't contain newlines or colons, end in _ or -, or
      65    contain characters codes below 037.
      66  
      67  * That Content-Type is given if there is content (CGI often has a
      68    default content type, but WSGI does not).
      69  
      70  * That no Content-Type is given when there is no content (@@: is this
      71    too restrictive?)
      72  
      73  * That the exc_info argument to start_response is a tuple or None.
      74  
      75  * That all calls to the writer are with strings, and no other methods
      76    on the writer are accessed.
      77  
      78  * That wsgi.input is used properly:
      79  
      80    - .read() is called with exactly one argument
      81  
      82    - That it returns a string
      83  
      84    - That readline, readlines, and __iter__ return strings
      85  
      86    - That .close() is not called
      87  
      88    - No other methods are provided
      89  
      90  * That wsgi.errors is used properly:
      91  
      92    - .write() and .writelines() is called with a string
      93  
      94    - That .close() is not called, and no other methods are provided.
      95  
      96  * The response iterator:
      97  
      98    - That it is not a string (it should be a list of a single string; a
      99      string will work, but perform horribly).
     100  
     101    - That .__next__() returns a string
     102  
     103    - That the iterator is not iterated over until start_response has
     104      been called (that can signal either a server or application
     105      error).
     106  
     107    - That .close() is called (doesn't raise exception, only prints to
     108      sys.stderr, because we only know it isn't called when the object
     109      is garbage collected).
     110  """
     111  __all__ = ['validator']
     112  
     113  
     114  import re
     115  import sys
     116  import warnings
     117  
     118  header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$')
     119  bad_header_value_re = re.compile(r'[\000-\037]')
     120  
     121  class ESC[4;38;5;81mWSGIWarning(ESC[4;38;5;149mWarning):
     122      """
     123      Raised in response to WSGI-spec-related warnings
     124      """
     125  
     126  def assert_(cond, *args):
     127      if not cond:
     128          raise AssertionError(*args)
     129  
     130  def check_string_type(value, title):
     131      if type (value) is str:
     132          return value
     133      raise AssertionError(
     134          "{0} must be of type str (got {1})".format(title, repr(value)))
     135  
     136  def validator(application):
     137  
     138      """
     139      When applied between a WSGI server and a WSGI application, this
     140      middleware will check for WSGI compliance on a number of levels.
     141      This middleware does not modify the request or response in any
     142      way, but will raise an AssertionError if anything seems off
     143      (except for a failure to close the application iterator, which
     144      will be printed to stderr -- there's no way to raise an exception
     145      at that point).
     146      """
     147  
     148      def lint_app(*args, **kw):
     149          assert_(len(args) == 2, "Two arguments required")
     150          assert_(not kw, "No keyword arguments allowed")
     151          environ, start_response = args
     152  
     153          check_environ(environ)
     154  
     155          # We use this to check if the application returns without
     156          # calling start_response:
     157          start_response_started = []
     158  
     159          def start_response_wrapper(*args, **kw):
     160              assert_(len(args) == 2 or len(args) == 3, (
     161                  "Invalid number of arguments: %s" % (args,)))
     162              assert_(not kw, "No keyword arguments allowed")
     163              status = args[0]
     164              headers = args[1]
     165              if len(args) == 3:
     166                  exc_info = args[2]
     167              else:
     168                  exc_info = None
     169  
     170              check_status(status)
     171              check_headers(headers)
     172              check_content_type(status, headers)
     173              check_exc_info(exc_info)
     174  
     175              start_response_started.append(None)
     176              return WriteWrapper(start_response(*args))
     177  
     178          environ['wsgi.input'] = InputWrapper(environ['wsgi.input'])
     179          environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors'])
     180  
     181          iterator = application(environ, start_response_wrapper)
     182          assert_(iterator is not None and iterator != False,
     183              "The application must return an iterator, if only an empty list")
     184  
     185          check_iterator(iterator)
     186  
     187          return IteratorWrapper(iterator, start_response_started)
     188  
     189      return lint_app
     190  
     191  class ESC[4;38;5;81mInputWrapper:
     192  
     193      def __init__(self, wsgi_input):
     194          self.input = wsgi_input
     195  
     196      def read(self, *args):
     197          assert_(len(args) == 1)
     198          v = self.input.read(*args)
     199          assert_(type(v) is bytes)
     200          return v
     201  
     202      def readline(self, *args):
     203          assert_(len(args) <= 1)
     204          v = self.input.readline(*args)
     205          assert_(type(v) is bytes)
     206          return v
     207  
     208      def readlines(self, *args):
     209          assert_(len(args) <= 1)
     210          lines = self.input.readlines(*args)
     211          assert_(type(lines) is list)
     212          for line in lines:
     213              assert_(type(line) is bytes)
     214          return lines
     215  
     216      def __iter__(self):
     217          while line := self.readline():
     218              yield line
     219  
     220      def close(self):
     221          assert_(0, "input.close() must not be called")
     222  
     223  class ESC[4;38;5;81mErrorWrapper:
     224  
     225      def __init__(self, wsgi_errors):
     226          self.errors = wsgi_errors
     227  
     228      def write(self, s):
     229          assert_(type(s) is str)
     230          self.errors.write(s)
     231  
     232      def flush(self):
     233          self.errors.flush()
     234  
     235      def writelines(self, seq):
     236          for line in seq:
     237              self.write(line)
     238  
     239      def close(self):
     240          assert_(0, "errors.close() must not be called")
     241  
     242  class ESC[4;38;5;81mWriteWrapper:
     243  
     244      def __init__(self, wsgi_writer):
     245          self.writer = wsgi_writer
     246  
     247      def __call__(self, s):
     248          assert_(type(s) is bytes)
     249          self.writer(s)
     250  
     251  class ESC[4;38;5;81mPartialIteratorWrapper:
     252  
     253      def __init__(self, wsgi_iterator):
     254          self.iterator = wsgi_iterator
     255  
     256      def __iter__(self):
     257          # We want to make sure __iter__ is called
     258          return IteratorWrapper(self.iterator, None)
     259  
     260  class ESC[4;38;5;81mIteratorWrapper:
     261  
     262      def __init__(self, wsgi_iterator, check_start_response):
     263          self.original_iterator = wsgi_iterator
     264          self.iterator = iter(wsgi_iterator)
     265          self.closed = False
     266          self.check_start_response = check_start_response
     267  
     268      def __iter__(self):
     269          return self
     270  
     271      def __next__(self):
     272          assert_(not self.closed,
     273              "Iterator read after closed")
     274          v = next(self.iterator)
     275          if type(v) is not bytes:
     276              assert_(False, "Iterator yielded non-bytestring (%r)" % (v,))
     277          if self.check_start_response is not None:
     278              assert_(self.check_start_response,
     279                  "The application returns and we started iterating over its body, but start_response has not yet been called")
     280              self.check_start_response = None
     281          return v
     282  
     283      def close(self):
     284          self.closed = True
     285          if hasattr(self.original_iterator, 'close'):
     286              self.original_iterator.close()
     287  
     288      def __del__(self):
     289          if not self.closed:
     290              sys.stderr.write(
     291                  "Iterator garbage collected without being closed")
     292          assert_(self.closed,
     293              "Iterator garbage collected without being closed")
     294  
     295  def check_environ(environ):
     296      assert_(type(environ) is dict,
     297          "Environment is not of the right type: %r (environment: %r)"
     298          % (type(environ), environ))
     299  
     300      for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT',
     301                  'wsgi.version', 'wsgi.input', 'wsgi.errors',
     302                  'wsgi.multithread', 'wsgi.multiprocess',
     303                  'wsgi.run_once']:
     304          assert_(key in environ,
     305              "Environment missing required key: %r" % (key,))
     306  
     307      for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']:
     308          assert_(key not in environ,
     309              "Environment should not have the key: %s "
     310              "(use %s instead)" % (key, key[5:]))
     311  
     312      if 'QUERY_STRING' not in environ:
     313          warnings.warn(
     314              'QUERY_STRING is not in the WSGI environment; the cgi '
     315              'module will use sys.argv when this variable is missing, '
     316              'so application errors are more likely',
     317              WSGIWarning)
     318  
     319      for key in environ.keys():
     320          if '.' in key:
     321              # Extension, we don't care about its type
     322              continue
     323          assert_(type(environ[key]) is str,
     324              "Environmental variable %s is not a string: %r (value: %r)"
     325              % (key, type(environ[key]), environ[key]))
     326  
     327      assert_(type(environ['wsgi.version']) is tuple,
     328          "wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],))
     329      assert_(environ['wsgi.url_scheme'] in ('http', 'https'),
     330          "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'])
     331  
     332      check_input(environ['wsgi.input'])
     333      check_errors(environ['wsgi.errors'])
     334  
     335      # @@: these need filling out:
     336      if environ['REQUEST_METHOD'] not in (
     337          'GET', 'HEAD', 'POST', 'OPTIONS', 'PATCH', 'PUT', 'DELETE', 'TRACE'):
     338          warnings.warn(
     339              "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'],
     340              WSGIWarning)
     341  
     342      assert_(not environ.get('SCRIPT_NAME')
     343              or environ['SCRIPT_NAME'].startswith('/'),
     344          "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME'])
     345      assert_(not environ.get('PATH_INFO')
     346              or environ['PATH_INFO'].startswith('/'),
     347          "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO'])
     348      if environ.get('CONTENT_LENGTH'):
     349          assert_(int(environ['CONTENT_LENGTH']) >= 0,
     350              "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH'])
     351  
     352      if not environ.get('SCRIPT_NAME'):
     353          assert_('PATH_INFO' in environ,
     354              "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO "
     355              "should at least be '/' if SCRIPT_NAME is empty)")
     356      assert_(environ.get('SCRIPT_NAME') != '/',
     357          "SCRIPT_NAME cannot be '/'; it should instead be '', and "
     358          "PATH_INFO should be '/'")
     359  
     360  def check_input(wsgi_input):
     361      for attr in ['read', 'readline', 'readlines', '__iter__']:
     362          assert_(hasattr(wsgi_input, attr),
     363              "wsgi.input (%r) doesn't have the attribute %s"
     364              % (wsgi_input, attr))
     365  
     366  def check_errors(wsgi_errors):
     367      for attr in ['flush', 'write', 'writelines']:
     368          assert_(hasattr(wsgi_errors, attr),
     369              "wsgi.errors (%r) doesn't have the attribute %s"
     370              % (wsgi_errors, attr))
     371  
     372  def check_status(status):
     373      status = check_string_type(status, "Status")
     374      # Implicitly check that we can turn it into an integer:
     375      status_code = status.split(None, 1)[0]
     376      assert_(len(status_code) == 3,
     377          "Status codes must be three characters: %r" % status_code)
     378      status_int = int(status_code)
     379      assert_(status_int >= 100, "Status code is invalid: %r" % status_int)
     380      if len(status) < 4 or status[3] != ' ':
     381          warnings.warn(
     382              "The status string (%r) should be a three-digit integer "
     383              "followed by a single space and a status explanation"
     384              % status, WSGIWarning)
     385  
     386  def check_headers(headers):
     387      assert_(type(headers) is list,
     388          "Headers (%r) must be of type list: %r"
     389          % (headers, type(headers)))
     390      for item in headers:
     391          assert_(type(item) is tuple,
     392              "Individual headers (%r) must be of type tuple: %r"
     393              % (item, type(item)))
     394          assert_(len(item) == 2)
     395          name, value = item
     396          name = check_string_type(name, "Header name")
     397          value = check_string_type(value, "Header value")
     398          assert_(name.lower() != 'status',
     399              "The Status header cannot be used; it conflicts with CGI "
     400              "script, and HTTP status is not given through headers "
     401              "(value: %r)." % value)
     402          assert_('\n' not in name and ':' not in name,
     403              "Header names may not contain ':' or '\\n': %r" % name)
     404          assert_(header_re.search(name), "Bad header name: %r" % name)
     405          assert_(not name.endswith('-') and not name.endswith('_'),
     406              "Names may not end in '-' or '_': %r" % name)
     407          if bad_header_value_re.search(value):
     408              assert_(0, "Bad header value: %r (bad char: %r)"
     409              % (value, bad_header_value_re.search(value).group(0)))
     410  
     411  def check_content_type(status, headers):
     412      status = check_string_type(status, "Status")
     413      code = int(status.split(None, 1)[0])
     414      # @@: need one more person to verify this interpretation of RFC 2616
     415      #     http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
     416      NO_MESSAGE_BODY = (204, 304)
     417      for name, value in headers:
     418          name = check_string_type(name, "Header name")
     419          if name.lower() == 'content-type':
     420              if code not in NO_MESSAGE_BODY:
     421                  return
     422              assert_(0, ("Content-Type header found in a %s response, "
     423                          "which must not return content.") % code)
     424      if code not in NO_MESSAGE_BODY:
     425          assert_(0, "No Content-Type header found in headers (%s)" % headers)
     426  
     427  def check_exc_info(exc_info):
     428      assert_(exc_info is None or type(exc_info) is tuple,
     429          "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)))
     430      # More exc_info checks?
     431  
     432  def check_iterator(iterator):
     433      # Technically a bytestring is legal, which is why it's a really bad
     434      # idea, because it may cause the response to be returned
     435      # character-by-character
     436      assert_(not isinstance(iterator, (str, bytes)),
     437          "You should not return a string as your application iterator, "
     438          "instead return a single-item list containing a bytestring.")