(root)/
Python-3.12.0/
Lib/
test/
test_xmlrpc.py
       1  import base64
       2  import datetime
       3  import decimal
       4  import sys
       5  import time
       6  import unittest
       7  from unittest import mock
       8  import xmlrpc.client as xmlrpclib
       9  import xmlrpc.server
      10  import http.client
      11  import http, http.server
      12  import socket
      13  import threading
      14  import re
      15  import io
      16  import contextlib
      17  from test import support
      18  from test.support import os_helper
      19  from test.support import socket_helper
      20  from test.support import threading_helper
      21  from test.support import ALWAYS_EQ, LARGEST, SMALLEST
      22  
      23  try:
      24      import gzip
      25  except ImportError:
      26      gzip = None
      27  
      28  support.requires_working_socket(module=True)
      29  
      30  alist = [{'astring': 'foo@bar.baz.spam',
      31            'afloat': 7283.43,
      32            'anint': 2**20,
      33            'ashortlong': 2,
      34            'anotherlist': ['.zyx.41'],
      35            'abase64': xmlrpclib.Binary(b"my dog has fleas"),
      36            'b64bytes': b"my dog has fleas",
      37            'b64bytearray': bytearray(b"my dog has fleas"),
      38            'boolean': False,
      39            'unicode': '\u4000\u6000\u8000',
      40            'ukey\u4000': 'regular value',
      41            'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
      42            'datetime2': xmlrpclib.DateTime(
      43                          (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
      44            'datetime3': xmlrpclib.DateTime(
      45                          datetime.datetime(2005, 2, 10, 11, 41, 23)),
      46            }]
      47  
      48  class ESC[4;38;5;81mXMLRPCTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      49  
      50      def test_dump_load(self):
      51          dump = xmlrpclib.dumps((alist,))
      52          load = xmlrpclib.loads(dump)
      53          self.assertEqual(alist, load[0][0])
      54  
      55      def test_dump_bare_datetime(self):
      56          # This checks that an unwrapped datetime.date object can be handled
      57          # by the marshalling code.  This can't be done via test_dump_load()
      58          # since with use_builtin_types set to 1 the unmarshaller would create
      59          # datetime objects for the 'datetime[123]' keys as well
      60          dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
      61          self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
      62          s = xmlrpclib.dumps((dt,))
      63  
      64          result, m = xmlrpclib.loads(s, use_builtin_types=True)
      65          (newdt,) = result
      66          self.assertEqual(newdt, dt)
      67          self.assertIs(type(newdt), datetime.datetime)
      68          self.assertIsNone(m)
      69  
      70          result, m = xmlrpclib.loads(s, use_builtin_types=False)
      71          (newdt,) = result
      72          self.assertEqual(newdt, dt)
      73          self.assertIs(type(newdt), xmlrpclib.DateTime)
      74          self.assertIsNone(m)
      75  
      76          result, m = xmlrpclib.loads(s, use_datetime=True)
      77          (newdt,) = result
      78          self.assertEqual(newdt, dt)
      79          self.assertIs(type(newdt), datetime.datetime)
      80          self.assertIsNone(m)
      81  
      82          result, m = xmlrpclib.loads(s, use_datetime=False)
      83          (newdt,) = result
      84          self.assertEqual(newdt, dt)
      85          self.assertIs(type(newdt), xmlrpclib.DateTime)
      86          self.assertIsNone(m)
      87  
      88  
      89      def test_datetime_before_1900(self):
      90          # same as before but with a date before 1900
      91          dt = datetime.datetime(1,  2, 10, 11, 41, 23)
      92          self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
      93          s = xmlrpclib.dumps((dt,))
      94  
      95          result, m = xmlrpclib.loads(s, use_builtin_types=True)
      96          (newdt,) = result
      97          self.assertEqual(newdt, dt)
      98          self.assertIs(type(newdt), datetime.datetime)
      99          self.assertIsNone(m)
     100  
     101          result, m = xmlrpclib.loads(s, use_builtin_types=False)
     102          (newdt,) = result
     103          self.assertEqual(newdt, dt)
     104          self.assertIs(type(newdt), xmlrpclib.DateTime)
     105          self.assertIsNone(m)
     106  
     107      def test_bug_1164912 (self):
     108          d = xmlrpclib.DateTime()
     109          ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
     110                                              methodresponse=True))
     111          self.assertIsInstance(new_d.value, str)
     112  
     113          # Check that the output of dumps() is still an 8-bit string
     114          s = xmlrpclib.dumps((new_d,), methodresponse=True)
     115          self.assertIsInstance(s, str)
     116  
     117      def test_newstyle_class(self):
     118          class ESC[4;38;5;81mT(ESC[4;38;5;149mobject):
     119              pass
     120          t = T()
     121          t.x = 100
     122          t.y = "Hello"
     123          ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
     124          self.assertEqual(t2, t.__dict__)
     125  
     126      def test_dump_big_long(self):
     127          self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
     128  
     129      def test_dump_bad_dict(self):
     130          self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
     131  
     132      def test_dump_recursive_seq(self):
     133          l = [1,2,3]
     134          t = [3,4,5,l]
     135          l.append(t)
     136          self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
     137  
     138      def test_dump_recursive_dict(self):
     139          d = {'1':1, '2':1}
     140          t = {'3':3, 'd':d}
     141          d['t'] = t
     142          self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
     143  
     144      def test_dump_big_int(self):
     145          if sys.maxsize > 2**31-1:
     146              self.assertRaises(OverflowError, xmlrpclib.dumps,
     147                                (int(2**34),))
     148  
     149          xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
     150          self.assertRaises(OverflowError, xmlrpclib.dumps,
     151                            (xmlrpclib.MAXINT+1,))
     152          self.assertRaises(OverflowError, xmlrpclib.dumps,
     153                            (xmlrpclib.MININT-1,))
     154  
     155          def dummy_write(s):
     156              pass
     157  
     158          m = xmlrpclib.Marshaller()
     159          m.dump_int(xmlrpclib.MAXINT, dummy_write)
     160          m.dump_int(xmlrpclib.MININT, dummy_write)
     161          self.assertRaises(OverflowError, m.dump_int,
     162                            xmlrpclib.MAXINT+1, dummy_write)
     163          self.assertRaises(OverflowError, m.dump_int,
     164                            xmlrpclib.MININT-1, dummy_write)
     165  
     166      def test_dump_double(self):
     167          xmlrpclib.dumps((float(2 ** 34),))
     168          xmlrpclib.dumps((float(xmlrpclib.MAXINT),
     169                           float(xmlrpclib.MININT)))
     170          xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
     171                           float(xmlrpclib.MININT - 42)))
     172  
     173          def dummy_write(s):
     174              pass
     175  
     176          m = xmlrpclib.Marshaller()
     177          m.dump_double(xmlrpclib.MAXINT, dummy_write)
     178          m.dump_double(xmlrpclib.MININT, dummy_write)
     179          m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
     180          m.dump_double(xmlrpclib.MININT - 42, dummy_write)
     181  
     182      def test_dump_none(self):
     183          value = alist + [None]
     184          arg1 = (alist + [None],)
     185          strg = xmlrpclib.dumps(arg1, allow_none=True)
     186          self.assertEqual(value,
     187                            xmlrpclib.loads(strg)[0][0])
     188          self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
     189  
     190      def test_dump_encoding(self):
     191          value = {'key\u20ac\xa4':
     192                   'value\u20ac\xa4'}
     193          strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
     194          strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
     195          self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
     196          strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
     197          self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
     198  
     199          strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
     200                                 methodresponse=True)
     201          self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
     202          strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
     203          self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
     204  
     205          methodname = 'method\u20ac\xa4'
     206          strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
     207                                 methodname=methodname)
     208          self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
     209          self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
     210  
     211      def test_dump_bytes(self):
     212          sample = b"my dog has fleas"
     213          self.assertEqual(sample, xmlrpclib.Binary(sample))
     214          for type_ in bytes, bytearray, xmlrpclib.Binary:
     215              value = type_(sample)
     216              s = xmlrpclib.dumps((value,))
     217  
     218              result, m = xmlrpclib.loads(s, use_builtin_types=True)
     219              (newvalue,) = result
     220              self.assertEqual(newvalue, sample)
     221              self.assertIs(type(newvalue), bytes)
     222              self.assertIsNone(m)
     223  
     224              result, m = xmlrpclib.loads(s, use_builtin_types=False)
     225              (newvalue,) = result
     226              self.assertEqual(newvalue, sample)
     227              self.assertIs(type(newvalue), xmlrpclib.Binary)
     228              self.assertIsNone(m)
     229  
     230      def test_loads_unsupported(self):
     231          ResponseError = xmlrpclib.ResponseError
     232          data = '<params><param><value><spam/></value></param></params>'
     233          self.assertRaises(ResponseError, xmlrpclib.loads, data)
     234          data = ('<params><param><value><array>'
     235                  '<value><spam/></value>'
     236                  '</array></value></param></params>')
     237          self.assertRaises(ResponseError, xmlrpclib.loads, data)
     238          data = ('<params><param><value><struct>'
     239                  '<member><name>a</name><value><spam/></value></member>'
     240                  '<member><name>b</name><value><spam/></value></member>'
     241                  '</struct></value></param></params>')
     242          self.assertRaises(ResponseError, xmlrpclib.loads, data)
     243  
     244      def check_loads(self, s, value, **kwargs):
     245          dump = '<params><param><value>%s</value></param></params>' % s
     246          result, m = xmlrpclib.loads(dump, **kwargs)
     247          (newvalue,) = result
     248          self.assertEqual(newvalue, value)
     249          self.assertIs(type(newvalue), type(value))
     250          self.assertIsNone(m)
     251  
     252      def test_load_standard_types(self):
     253          check = self.check_loads
     254          check('string', 'string')
     255          check('<string>string</string>', 'string')
     256          check('<string>𝔘𝔫𝔦𝔠𝔬𝔡𝔢 string</string>', '𝔘𝔫𝔦𝔠𝔬𝔡𝔢 string')
     257          check('<int>2056183947</int>', 2056183947)
     258          check('<int>-2056183947</int>', -2056183947)
     259          check('<i4>2056183947</i4>', 2056183947)
     260          check('<double>46093.78125</double>', 46093.78125)
     261          check('<boolean>0</boolean>', False)
     262          check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
     263                xmlrpclib.Binary(b'\x00byte string\xff'))
     264          check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
     265                b'\x00byte string\xff', use_builtin_types=True)
     266          check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
     267                xmlrpclib.DateTime('20050210T11:41:23'))
     268          check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
     269                datetime.datetime(2005, 2, 10, 11, 41, 23),
     270                use_builtin_types=True)
     271          check('<array><data>'
     272                '<value><int>1</int></value><value><int>2</int></value>'
     273                '</data></array>', [1, 2])
     274          check('<struct>'
     275                '<member><name>b</name><value><int>2</int></value></member>'
     276                '<member><name>a</name><value><int>1</int></value></member>'
     277                '</struct>', {'a': 1, 'b': 2})
     278  
     279      def test_load_extension_types(self):
     280          check = self.check_loads
     281          check('<nil/>', None)
     282          check('<ex:nil/>', None)
     283          check('<i1>205</i1>', 205)
     284          check('<i2>20561</i2>', 20561)
     285          check('<i8>9876543210</i8>', 9876543210)
     286          check('<biginteger>98765432100123456789</biginteger>',
     287                98765432100123456789)
     288          check('<float>93.78125</float>', 93.78125)
     289          check('<bigdecimal>9876543210.0123456789</bigdecimal>',
     290                decimal.Decimal('9876543210.0123456789'))
     291  
     292      def test_limit_int(self):
     293          check = self.check_loads
     294          maxdigits = 5000
     295          with support.adjust_int_max_str_digits(maxdigits):
     296              s = '1' * (maxdigits + 1)
     297              with self.assertRaises(ValueError):
     298                  check(f'<int>{s}</int>', None)
     299              with self.assertRaises(ValueError):
     300                  check(f'<biginteger>{s}</biginteger>', None)
     301  
     302      def test_get_host_info(self):
     303          # see bug #3613, this raised a TypeError
     304          transp = xmlrpc.client.Transport()
     305          self.assertEqual(transp.get_host_info("user@host.tld"),
     306                            ('host.tld',
     307                             [('Authorization', 'Basic dXNlcg==')], {}))
     308  
     309      def test_ssl_presence(self):
     310          try:
     311              import ssl
     312          except ImportError:
     313              has_ssl = False
     314          else:
     315              has_ssl = True
     316          try:
     317              xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
     318          except NotImplementedError:
     319              self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
     320          except OSError:
     321              self.assertTrue(has_ssl)
     322  
     323      def test_keepalive_disconnect(self):
     324          class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
     325              protocol_version = "HTTP/1.1"
     326              handled = False
     327  
     328              def do_POST(self):
     329                  length = int(self.headers.get("Content-Length"))
     330                  self.rfile.read(length)
     331                  if self.handled:
     332                      self.close_connection = True
     333                      return
     334                  response = xmlrpclib.dumps((5,), methodresponse=True)
     335                  response = response.encode()
     336                  self.send_response(http.HTTPStatus.OK)
     337                  self.send_header("Content-Length", len(response))
     338                  self.end_headers()
     339                  self.wfile.write(response)
     340                  self.handled = True
     341                  self.close_connection = False
     342  
     343              def log_message(self, format, *args):
     344                  # don't clobber sys.stderr
     345                  pass
     346  
     347          def run_server():
     348              server.socket.settimeout(float(1))  # Don't hang if client fails
     349              server.handle_request()  # First request and attempt at second
     350              server.handle_request()  # Retried second request
     351  
     352          server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
     353          self.addCleanup(server.server_close)
     354          thread = threading.Thread(target=run_server)
     355          thread.start()
     356          self.addCleanup(thread.join)
     357          url = "http://{}:{}/".format(*server.server_address)
     358          with xmlrpclib.ServerProxy(url) as p:
     359              self.assertEqual(p.method(), 5)
     360              self.assertEqual(p.method(), 5)
     361  
     362  
     363  class ESC[4;38;5;81mSimpleXMLRPCDispatcherTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     364      class ESC[4;38;5;81mDispatchExc(ESC[4;38;5;149mException):
     365          """Raised inside the dispatched functions when checking for
     366          chained exceptions"""
     367  
     368      def test_call_registered_func(self):
     369          """Calls explicitly registered function"""
     370          # Makes sure any exception raised inside the function has no other
     371          # exception chained to it
     372  
     373          exp_params = 1, 2, 3
     374  
     375          def dispatched_func(*params):
     376              raise self.DispatchExc(params)
     377  
     378          dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
     379          dispatcher.register_function(dispatched_func)
     380          with self.assertRaises(self.DispatchExc) as exc_ctx:
     381              dispatcher._dispatch('dispatched_func', exp_params)
     382          self.assertEqual(exc_ctx.exception.args, (exp_params,))
     383          self.assertIsNone(exc_ctx.exception.__cause__)
     384          self.assertIsNone(exc_ctx.exception.__context__)
     385  
     386      def test_call_instance_func(self):
     387          """Calls a registered instance attribute as a function"""
     388          # Makes sure any exception raised inside the function has no other
     389          # exception chained to it
     390  
     391          exp_params = 1, 2, 3
     392  
     393          class ESC[4;38;5;81mDispatchedClass:
     394              def dispatched_func(self, *params):
     395                  raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
     396  
     397          dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
     398          dispatcher.register_instance(DispatchedClass())
     399          with self.assertRaises(self.DispatchExc) as exc_ctx:
     400              dispatcher._dispatch('dispatched_func', exp_params)
     401          self.assertEqual(exc_ctx.exception.args, (exp_params,))
     402          self.assertIsNone(exc_ctx.exception.__cause__)
     403          self.assertIsNone(exc_ctx.exception.__context__)
     404  
     405      def test_call_dispatch_func(self):
     406          """Calls the registered instance's `_dispatch` function"""
     407          # Makes sure any exception raised inside the function has no other
     408          # exception chained to it
     409  
     410          exp_method = 'method'
     411          exp_params = 1, 2, 3
     412  
     413          class ESC[4;38;5;81mTestInstance:
     414              def _dispatch(self, method, params):
     415                  raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
     416                      method, params)
     417  
     418          dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
     419          dispatcher.register_instance(TestInstance())
     420          with self.assertRaises(self.DispatchExc) as exc_ctx:
     421              dispatcher._dispatch(exp_method, exp_params)
     422          self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
     423          self.assertIsNone(exc_ctx.exception.__cause__)
     424          self.assertIsNone(exc_ctx.exception.__context__)
     425  
     426      def test_registered_func_is_none(self):
     427          """Calls explicitly registered function which is None"""
     428  
     429          dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
     430          dispatcher.register_function(None, name='method')
     431          with self.assertRaisesRegex(Exception, 'method'):
     432              dispatcher._dispatch('method', ('param',))
     433  
     434      def test_instance_has_no_func(self):
     435          """Attempts to call nonexistent function on a registered instance"""
     436  
     437          dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
     438          dispatcher.register_instance(object())
     439          with self.assertRaisesRegex(Exception, 'method'):
     440              dispatcher._dispatch('method', ('param',))
     441  
     442      def test_cannot_locate_func(self):
     443          """Calls a function that the dispatcher cannot locate"""
     444  
     445          dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
     446          with self.assertRaisesRegex(Exception, 'method'):
     447              dispatcher._dispatch('method', ('param',))
     448  
     449  
     450  class ESC[4;38;5;81mHelperTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     451      def test_escape(self):
     452          self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
     453          self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
     454          self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
     455  
     456  class ESC[4;38;5;81mFaultTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     457      def test_repr(self):
     458          f = xmlrpclib.Fault(42, 'Test Fault')
     459          self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
     460          self.assertEqual(repr(f), str(f))
     461  
     462      def test_dump_fault(self):
     463          f = xmlrpclib.Fault(42, 'Test Fault')
     464          s = xmlrpclib.dumps((f,))
     465          (newf,), m = xmlrpclib.loads(s)
     466          self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
     467          self.assertEqual(m, None)
     468  
     469          s = xmlrpclib.Marshaller().dumps(f)
     470          self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
     471  
     472      def test_dotted_attribute(self):
     473          # this will raise AttributeError because code don't want us to use
     474          # private methods
     475          self.assertRaises(AttributeError,
     476                            xmlrpc.server.resolve_dotted_attribute, str, '__add')
     477          self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
     478  
     479  class ESC[4;38;5;81mDateTimeTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     480      def test_default(self):
     481          with mock.patch('time.localtime') as localtime_mock:
     482              time_struct = time.struct_time(
     483                  [2013, 7, 15, 0, 24, 49, 0, 196, 0])
     484              localtime_mock.return_value = time_struct
     485              localtime = time.localtime()
     486              t = xmlrpclib.DateTime()
     487              self.assertEqual(str(t),
     488                               time.strftime("%Y%m%dT%H:%M:%S", localtime))
     489  
     490      def test_time(self):
     491          d = 1181399930.036952
     492          t = xmlrpclib.DateTime(d)
     493          self.assertEqual(str(t),
     494                           time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
     495  
     496      def test_time_tuple(self):
     497          d = (2007,6,9,10,38,50,5,160,0)
     498          t = xmlrpclib.DateTime(d)
     499          self.assertEqual(str(t), '20070609T10:38:50')
     500  
     501      def test_time_struct(self):
     502          d = time.localtime(1181399930.036952)
     503          t = xmlrpclib.DateTime(d)
     504          self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
     505  
     506      def test_datetime_datetime(self):
     507          d = datetime.datetime(2007,1,2,3,4,5)
     508          t = xmlrpclib.DateTime(d)
     509          self.assertEqual(str(t), '20070102T03:04:05')
     510  
     511      def test_repr(self):
     512          d = datetime.datetime(2007,1,2,3,4,5)
     513          t = xmlrpclib.DateTime(d)
     514          val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
     515          self.assertEqual(repr(t), val)
     516  
     517      def test_decode(self):
     518          d = ' 20070908T07:11:13  '
     519          t1 = xmlrpclib.DateTime()
     520          t1.decode(d)
     521          tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
     522          self.assertEqual(t1, tref)
     523  
     524          t2 = xmlrpclib._datetime(d)
     525          self.assertEqual(t2, tref)
     526  
     527      def test_comparison(self):
     528          now = datetime.datetime.now()
     529          dtime = xmlrpclib.DateTime(now.timetuple())
     530  
     531          # datetime vs. DateTime
     532          self.assertTrue(dtime == now)
     533          self.assertTrue(now == dtime)
     534          then = now + datetime.timedelta(seconds=4)
     535          self.assertTrue(then >= dtime)
     536          self.assertTrue(dtime < then)
     537  
     538          # str vs. DateTime
     539          dstr = now.strftime("%Y%m%dT%H:%M:%S")
     540          self.assertTrue(dtime == dstr)
     541          self.assertTrue(dstr == dtime)
     542          dtime_then = xmlrpclib.DateTime(then.timetuple())
     543          self.assertTrue(dtime_then >= dstr)
     544          self.assertTrue(dstr < dtime_then)
     545  
     546          # some other types
     547          dbytes = dstr.encode('ascii')
     548          dtuple = now.timetuple()
     549          self.assertFalse(dtime == 1970)
     550          self.assertTrue(dtime != dbytes)
     551          self.assertFalse(dtime == bytearray(dbytes))
     552          self.assertTrue(dtime != dtuple)
     553          with self.assertRaises(TypeError):
     554              dtime < float(1970)
     555          with self.assertRaises(TypeError):
     556              dtime > dbytes
     557          with self.assertRaises(TypeError):
     558              dtime <= bytearray(dbytes)
     559          with self.assertRaises(TypeError):
     560              dtime >= dtuple
     561  
     562          self.assertTrue(dtime == ALWAYS_EQ)
     563          self.assertFalse(dtime != ALWAYS_EQ)
     564          self.assertTrue(dtime < LARGEST)
     565          self.assertFalse(dtime > LARGEST)
     566          self.assertTrue(dtime <= LARGEST)
     567          self.assertFalse(dtime >= LARGEST)
     568          self.assertFalse(dtime < SMALLEST)
     569          self.assertTrue(dtime > SMALLEST)
     570          self.assertFalse(dtime <= SMALLEST)
     571          self.assertTrue(dtime >= SMALLEST)
     572  
     573  
     574  class ESC[4;38;5;81mBinaryTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     575  
     576      # XXX What should str(Binary(b"\xff")) return?  I'm choosing "\xff"
     577      # for now (i.e. interpreting the binary data as Latin-1-encoded
     578      # text).  But this feels very unsatisfactory.  Perhaps we should
     579      # only define repr(), and return r"Binary(b'\xff')" instead?
     580  
     581      def test_default(self):
     582          t = xmlrpclib.Binary()
     583          self.assertEqual(str(t), '')
     584  
     585      def test_string(self):
     586          d = b'\x01\x02\x03abc123\xff\xfe'
     587          t = xmlrpclib.Binary(d)
     588          self.assertEqual(str(t), str(d, "latin-1"))
     589  
     590      def test_decode(self):
     591          d = b'\x01\x02\x03abc123\xff\xfe'
     592          de = base64.encodebytes(d)
     593          t1 = xmlrpclib.Binary()
     594          t1.decode(de)
     595          self.assertEqual(str(t1), str(d, "latin-1"))
     596  
     597          t2 = xmlrpclib._binary(de)
     598          self.assertEqual(str(t2), str(d, "latin-1"))
     599  
     600  
     601  ADDR = PORT = URL = None
     602  
     603  # The evt is set twice.  First when the server is ready to serve.
     604  # Second when the server has been shutdown.  The user must clear
     605  # the event after it has been set the first time to catch the second set.
     606  def http_server(evt, numrequests, requestHandler=None, encoding=None):
     607      class ESC[4;38;5;81mTestInstanceClass:
     608          def div(self, x, y):
     609              return x // y
     610  
     611          def _methodHelp(self, name):
     612              if name == 'div':
     613                  return 'This is the div function'
     614  
     615          class ESC[4;38;5;81mFixture:
     616              @staticmethod
     617              def getData():
     618                  return '42'
     619  
     620      class ESC[4;38;5;81mMyXMLRPCServer(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCServer):
     621          def get_request(self):
     622              # Ensure the socket is always non-blocking.  On Linux, socket
     623              # attributes are not inherited like they are on *BSD and Windows.
     624              s, port = self.socket.accept()
     625              s.setblocking(True)
     626              return s, port
     627  
     628      if not requestHandler:
     629          requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
     630      serv = MyXMLRPCServer(("localhost", 0), requestHandler,
     631                            encoding=encoding,
     632                            logRequests=False, bind_and_activate=False)
     633      try:
     634          serv.server_bind()
     635          global ADDR, PORT, URL
     636          ADDR, PORT = serv.socket.getsockname()
     637          #connect to IP address directly.  This avoids socket.create_connection()
     638          #trying to connect to "localhost" using all address families, which
     639          #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
     640          #on AF_INET only.
     641          URL = "http://%s:%d"%(ADDR, PORT)
     642          serv.server_activate()
     643          serv.register_introspection_functions()
     644          serv.register_multicall_functions()
     645          serv.register_function(pow)
     646          serv.register_function(lambda x: x, 'têšt')
     647          @serv.register_function
     648          def my_function():
     649              '''This is my function'''
     650              return True
     651          @serv.register_function(name='add')
     652          def _(x, y):
     653              return x + y
     654          testInstance = TestInstanceClass()
     655          serv.register_instance(testInstance, allow_dotted_names=True)
     656          evt.set()
     657  
     658          # handle up to 'numrequests' requests
     659          while numrequests > 0:
     660              serv.handle_request()
     661              numrequests -= 1
     662  
     663      except TimeoutError:
     664          pass
     665      finally:
     666          serv.socket.close()
     667          PORT = None
     668          evt.set()
     669  
     670  def http_multi_server(evt, numrequests, requestHandler=None):
     671      class ESC[4;38;5;81mTestInstanceClass:
     672          def div(self, x, y):
     673              return x // y
     674  
     675          def _methodHelp(self, name):
     676              if name == 'div':
     677                  return 'This is the div function'
     678  
     679      def my_function():
     680          '''This is my function'''
     681          return True
     682  
     683      class ESC[4;38;5;81mMyXMLRPCServer(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mMultiPathXMLRPCServer):
     684          def get_request(self):
     685              # Ensure the socket is always non-blocking.  On Linux, socket
     686              # attributes are not inherited like they are on *BSD and Windows.
     687              s, port = self.socket.accept()
     688              s.setblocking(True)
     689              return s, port
     690  
     691      if not requestHandler:
     692          requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
     693      class ESC[4;38;5;81mMyRequestHandler(ESC[4;38;5;149mrequestHandler):
     694          rpc_paths = []
     695  
     696      class ESC[4;38;5;81mBrokenDispatcher:
     697          def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
     698              raise RuntimeError("broken dispatcher")
     699  
     700      serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
     701                            logRequests=False, bind_and_activate=False)
     702      serv.socket.settimeout(3)
     703      serv.server_bind()
     704      try:
     705          global ADDR, PORT, URL
     706          ADDR, PORT = serv.socket.getsockname()
     707          #connect to IP address directly.  This avoids socket.create_connection()
     708          #trying to connect to "localhost" using all address families, which
     709          #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
     710          #on AF_INET only.
     711          URL = "http://%s:%d"%(ADDR, PORT)
     712          serv.server_activate()
     713          paths = [
     714              "/foo", "/foo/bar",
     715              "/foo?k=v", "/foo#frag", "/foo?k=v#frag",
     716              "", "/", "/RPC2", "?k=v", "#frag",
     717          ]
     718          for path in paths:
     719              d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
     720              d.register_introspection_functions()
     721              d.register_multicall_functions()
     722              d.register_function(lambda p=path: p, 'test')
     723          serv.get_dispatcher(paths[0]).register_function(pow)
     724          serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
     725          serv.add_dispatcher("/is/broken", BrokenDispatcher())
     726          evt.set()
     727  
     728          # handle up to 'numrequests' requests
     729          while numrequests > 0:
     730              serv.handle_request()
     731              numrequests -= 1
     732  
     733      except TimeoutError:
     734          pass
     735      finally:
     736          serv.socket.close()
     737          PORT = None
     738          evt.set()
     739  
     740  # This function prevents errors like:
     741  #    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
     742  def is_unavailable_exception(e):
     743      '''Returns True if the given ProtocolError is the product of a server-side
     744         exception caused by the 'temporarily unavailable' response sometimes
     745         given by operations on non-blocking sockets.'''
     746  
     747      # sometimes we get a -1 error code and/or empty headers
     748      try:
     749          if e.errcode == -1 or e.headers is None:
     750              return True
     751          exc_mess = e.headers.get('X-exception')
     752      except AttributeError:
     753          # Ignore OSErrors here.
     754          exc_mess = str(e)
     755  
     756      if exc_mess and 'temporarily unavailable' in exc_mess.lower():
     757          return True
     758  
     759  def make_request_and_skipIf(condition, reason):
     760      # If we skip the test, we have to make a request because
     761      # the server created in setUp blocks expecting one to come in.
     762      if not condition:
     763          return lambda func: func
     764      def decorator(func):
     765          def make_request_and_skip(self):
     766              try:
     767                  xmlrpclib.ServerProxy(URL).my_function()
     768              except (xmlrpclib.ProtocolError, OSError) as e:
     769                  if not is_unavailable_exception(e):
     770                      raise
     771              raise unittest.SkipTest(reason)
     772          return make_request_and_skip
     773      return decorator
     774  
     775  class ESC[4;38;5;81mBaseServerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     776      requestHandler = None
     777      request_count = 1
     778      threadFunc = staticmethod(http_server)
     779  
     780      def setUp(self):
     781          # enable traceback reporting
     782          xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
     783  
     784          self.evt = threading.Event()
     785          # start server thread to handle requests
     786          serv_args = (self.evt, self.request_count, self.requestHandler)
     787          thread = threading.Thread(target=self.threadFunc, args=serv_args)
     788          thread.start()
     789          self.addCleanup(thread.join)
     790  
     791          # wait for the server to be ready
     792          self.evt.wait()
     793          self.evt.clear()
     794  
     795      def tearDown(self):
     796          # wait on the server thread to terminate
     797          self.evt.wait()
     798  
     799          # disable traceback reporting
     800          xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
     801  
     802  class ESC[4;38;5;81mSimpleServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
     803      def test_simple1(self):
     804          try:
     805              p = xmlrpclib.ServerProxy(URL)
     806              self.assertEqual(p.pow(6,8), 6**8)
     807          except (xmlrpclib.ProtocolError, OSError) as e:
     808              # ignore failures due to non-blocking socket 'unavailable' errors
     809              if not is_unavailable_exception(e):
     810                  # protocol error; provide additional information in test output
     811                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     812  
     813      def test_nonascii(self):
     814          start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
     815          end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
     816          try:
     817              p = xmlrpclib.ServerProxy(URL)
     818              self.assertEqual(p.add(start_string, end_string),
     819                               start_string + end_string)
     820          except (xmlrpclib.ProtocolError, OSError) as e:
     821              # ignore failures due to non-blocking socket 'unavailable' errors
     822              if not is_unavailable_exception(e):
     823                  # protocol error; provide additional information in test output
     824                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     825  
     826      def test_client_encoding(self):
     827          start_string = '\u20ac'
     828          end_string = '\xa4'
     829  
     830          try:
     831              p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
     832              self.assertEqual(p.add(start_string, end_string),
     833                               start_string + end_string)
     834          except (xmlrpclib.ProtocolError, socket.error) as e:
     835              # ignore failures due to non-blocking socket unavailable errors.
     836              if not is_unavailable_exception(e):
     837                  # protocol error; provide additional information in test output
     838                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     839  
     840      def test_nonascii_methodname(self):
     841          try:
     842              p = xmlrpclib.ServerProxy(URL, encoding='ascii')
     843              self.assertEqual(p.têšt(42), 42)
     844          except (xmlrpclib.ProtocolError, socket.error) as e:
     845              # ignore failures due to non-blocking socket unavailable errors.
     846              if not is_unavailable_exception(e):
     847                  # protocol error; provide additional information in test output
     848                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     849  
     850      def test_404(self):
     851          # send POST with http.client, it should return 404 header and
     852          # 'Not Found' message.
     853          with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
     854              conn.request('POST', '/this-is-not-valid')
     855              response = conn.getresponse()
     856  
     857          self.assertEqual(response.status, 404)
     858          self.assertEqual(response.reason, 'Not Found')
     859  
     860      def test_introspection1(self):
     861          expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
     862                                  'system.listMethods', 'system.methodHelp',
     863                                  'system.methodSignature', 'system.multicall',
     864                                  'Fixture'])
     865          try:
     866              p = xmlrpclib.ServerProxy(URL)
     867              meth = p.system.listMethods()
     868              self.assertEqual(set(meth), expected_methods)
     869          except (xmlrpclib.ProtocolError, OSError) as e:
     870              # ignore failures due to non-blocking socket 'unavailable' errors
     871              if not is_unavailable_exception(e):
     872                  # protocol error; provide additional information in test output
     873                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     874  
     875  
     876      def test_introspection2(self):
     877          try:
     878              # test _methodHelp()
     879              p = xmlrpclib.ServerProxy(URL)
     880              divhelp = p.system.methodHelp('div')
     881              self.assertEqual(divhelp, 'This is the div function')
     882          except (xmlrpclib.ProtocolError, OSError) as e:
     883              # ignore failures due to non-blocking socket 'unavailable' errors
     884              if not is_unavailable_exception(e):
     885                  # protocol error; provide additional information in test output
     886                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     887  
     888      @make_request_and_skipIf(sys.flags.optimize >= 2,
     889                       "Docstrings are omitted with -O2 and above")
     890      def test_introspection3(self):
     891          try:
     892              # test native doc
     893              p = xmlrpclib.ServerProxy(URL)
     894              myfunction = p.system.methodHelp('my_function')
     895              self.assertEqual(myfunction, 'This is my function')
     896          except (xmlrpclib.ProtocolError, OSError) as e:
     897              # ignore failures due to non-blocking socket 'unavailable' errors
     898              if not is_unavailable_exception(e):
     899                  # protocol error; provide additional information in test output
     900                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     901  
     902      def test_introspection4(self):
     903          # the SimpleXMLRPCServer doesn't support signatures, but
     904          # at least check that we can try making the call
     905          try:
     906              p = xmlrpclib.ServerProxy(URL)
     907              divsig = p.system.methodSignature('div')
     908              self.assertEqual(divsig, 'signatures not supported')
     909          except (xmlrpclib.ProtocolError, OSError) as e:
     910              # ignore failures due to non-blocking socket 'unavailable' errors
     911              if not is_unavailable_exception(e):
     912                  # protocol error; provide additional information in test output
     913                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     914  
     915      def test_multicall(self):
     916          try:
     917              p = xmlrpclib.ServerProxy(URL)
     918              multicall = xmlrpclib.MultiCall(p)
     919              multicall.add(2,3)
     920              multicall.pow(6,8)
     921              multicall.div(127,42)
     922              add_result, pow_result, div_result = multicall()
     923              self.assertEqual(add_result, 2+3)
     924              self.assertEqual(pow_result, 6**8)
     925              self.assertEqual(div_result, 127//42)
     926          except (xmlrpclib.ProtocolError, OSError) as e:
     927              # ignore failures due to non-blocking socket 'unavailable' errors
     928              if not is_unavailable_exception(e):
     929                  # protocol error; provide additional information in test output
     930                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     931  
     932      def test_non_existing_multicall(self):
     933          try:
     934              p = xmlrpclib.ServerProxy(URL)
     935              multicall = xmlrpclib.MultiCall(p)
     936              multicall.this_is_not_exists()
     937              result = multicall()
     938  
     939              # result.results contains;
     940              # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
     941              #   'method "this_is_not_exists" is not supported'>}]
     942  
     943              self.assertEqual(result.results[0]['faultCode'], 1)
     944              self.assertEqual(result.results[0]['faultString'],
     945                  '<class \'Exception\'>:method "this_is_not_exists" '
     946                  'is not supported')
     947          except (xmlrpclib.ProtocolError, OSError) as e:
     948              # ignore failures due to non-blocking socket 'unavailable' errors
     949              if not is_unavailable_exception(e):
     950                  # protocol error; provide additional information in test output
     951                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
     952  
     953      def test_dotted_attribute(self):
     954          # Raises an AttributeError because private methods are not allowed.
     955          self.assertRaises(AttributeError,
     956                            xmlrpc.server.resolve_dotted_attribute, str, '__add')
     957  
     958          self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
     959          # Get the test to run faster by sending a request with test_simple1.
     960          # This avoids waiting for the socket timeout.
     961          self.test_simple1()
     962  
     963      def test_allow_dotted_names_true(self):
     964          # XXX also need allow_dotted_names_false test.
     965          server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
     966          data = server.Fixture.getData()
     967          self.assertEqual(data, '42')
     968  
     969      def test_unicode_host(self):
     970          server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
     971          self.assertEqual(server.add("a", "\xe9"), "a\xe9")
     972  
     973      def test_partial_post(self):
     974          # Check that a partial POST doesn't make the server loop: issue #14001.
     975          with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
     976              conn.send('POST /RPC2 HTTP/1.0\r\n'
     977                        'Content-Length: 100\r\n\r\n'
     978                        'bye HTTP/1.1\r\n'
     979                        f'Host: {ADDR}:{PORT}\r\n'
     980                        'Accept-Encoding: identity\r\n'
     981                        'Content-Length: 0\r\n\r\n'.encode('ascii'))
     982  
     983      def test_context_manager(self):
     984          with xmlrpclib.ServerProxy(URL) as server:
     985              server.add(2, 3)
     986              self.assertNotEqual(server('transport')._connection,
     987                                  (None, None))
     988          self.assertEqual(server('transport')._connection,
     989                           (None, None))
     990  
     991      def test_context_manager_method_error(self):
     992          try:
     993              with xmlrpclib.ServerProxy(URL) as server:
     994                  server.add(2, "a")
     995          except xmlrpclib.Fault:
     996              pass
     997          self.assertEqual(server('transport')._connection,
     998                           (None, None))
     999  
    1000  
    1001  class ESC[4;38;5;81mSimpleServerEncodingTestCase(ESC[4;38;5;149mBaseServerTestCase):
    1002      @staticmethod
    1003      def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
    1004          http_server(evt, numrequests, requestHandler, 'iso-8859-15')
    1005  
    1006      def test_server_encoding(self):
    1007          start_string = '\u20ac'
    1008          end_string = '\xa4'
    1009  
    1010          try:
    1011              p = xmlrpclib.ServerProxy(URL)
    1012              self.assertEqual(p.add(start_string, end_string),
    1013                               start_string + end_string)
    1014          except (xmlrpclib.ProtocolError, socket.error) as e:
    1015              # ignore failures due to non-blocking socket unavailable errors.
    1016              if not is_unavailable_exception(e):
    1017                  # protocol error; provide additional information in test output
    1018                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    1019  
    1020  
    1021  class ESC[4;38;5;81mMultiPathServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
    1022      threadFunc = staticmethod(http_multi_server)
    1023      request_count = 2
    1024      def test_path1(self):
    1025          p = xmlrpclib.ServerProxy(URL+"/foo")
    1026          self.assertEqual(p.pow(6,8), 6**8)
    1027          self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
    1028  
    1029      def test_path2(self):
    1030          p = xmlrpclib.ServerProxy(URL+"/foo/bar")
    1031          self.assertEqual(p.add(6,8), 6+8)
    1032          self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
    1033  
    1034      @support.requires_resource('walltime')
    1035      def test_path3(self):
    1036          p = xmlrpclib.ServerProxy(URL+"/is/broken")
    1037          self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
    1038  
    1039      @support.requires_resource('walltime')
    1040      def test_invalid_path(self):
    1041          p = xmlrpclib.ServerProxy(URL+"/invalid")
    1042          self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
    1043  
    1044      @support.requires_resource('walltime')
    1045      def test_path_query_fragment(self):
    1046          p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
    1047          self.assertEqual(p.test(), "/foo?k=v#frag")
    1048  
    1049      @support.requires_resource('walltime')
    1050      def test_path_fragment(self):
    1051          p = xmlrpclib.ServerProxy(URL+"/foo#frag")
    1052          self.assertEqual(p.test(), "/foo#frag")
    1053  
    1054      @support.requires_resource('walltime')
    1055      def test_path_query(self):
    1056          p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
    1057          self.assertEqual(p.test(), "/foo?k=v")
    1058  
    1059      @support.requires_resource('walltime')
    1060      def test_empty_path(self):
    1061          p = xmlrpclib.ServerProxy(URL)
    1062          self.assertEqual(p.test(), "/RPC2")
    1063  
    1064      @support.requires_resource('walltime')
    1065      def test_root_path(self):
    1066          p = xmlrpclib.ServerProxy(URL + "/")
    1067          self.assertEqual(p.test(), "/")
    1068  
    1069      @support.requires_resource('walltime')
    1070      def test_empty_path_query(self):
    1071          p = xmlrpclib.ServerProxy(URL + "?k=v")
    1072          self.assertEqual(p.test(), "?k=v")
    1073  
    1074      @support.requires_resource('walltime')
    1075      def test_empty_path_fragment(self):
    1076          p = xmlrpclib.ServerProxy(URL + "#frag")
    1077          self.assertEqual(p.test(), "#frag")
    1078  
    1079  
    1080  #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
    1081  #does indeed serve subsequent requests on the same connection
    1082  class ESC[4;38;5;81mBaseKeepaliveServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
    1083      #a request handler that supports keep-alive and logs requests into a
    1084      #class variable
    1085      class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCRequestHandler):
    1086          parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
    1087          protocol_version = 'HTTP/1.1'
    1088          myRequests = []
    1089          def handle(self):
    1090              self.myRequests.append([])
    1091              self.reqidx = len(self.myRequests)-1
    1092              return self.parentClass.handle(self)
    1093          def handle_one_request(self):
    1094              result = self.parentClass.handle_one_request(self)
    1095              self.myRequests[self.reqidx].append(self.raw_requestline)
    1096              return result
    1097  
    1098      requestHandler = RequestHandler
    1099      def setUp(self):
    1100          #clear request log
    1101          self.RequestHandler.myRequests = []
    1102          return BaseServerTestCase.setUp(self)
    1103  
    1104  #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
    1105  #does indeed serve subsequent requests on the same connection
    1106  class ESC[4;38;5;81mKeepaliveServerTestCase1(ESC[4;38;5;149mBaseKeepaliveServerTestCase):
    1107      def test_two(self):
    1108          p = xmlrpclib.ServerProxy(URL)
    1109          #do three requests.
    1110          self.assertEqual(p.pow(6,8), 6**8)
    1111          self.assertEqual(p.pow(6,8), 6**8)
    1112          self.assertEqual(p.pow(6,8), 6**8)
    1113          p("close")()
    1114  
    1115          #they should have all been handled by a single request handler
    1116          self.assertEqual(len(self.RequestHandler.myRequests), 1)
    1117  
    1118          #check that we did at least two (the third may be pending append
    1119          #due to thread scheduling)
    1120          self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
    1121  
    1122  
    1123  #test special attribute access on the serverproxy, through the __call__
    1124  #function.
    1125  class ESC[4;38;5;81mKeepaliveServerTestCase2(ESC[4;38;5;149mBaseKeepaliveServerTestCase):
    1126      #ask for two keepalive requests to be handled.
    1127      request_count=2
    1128  
    1129      def test_close(self):
    1130          p = xmlrpclib.ServerProxy(URL)
    1131          #do some requests with close.
    1132          self.assertEqual(p.pow(6,8), 6**8)
    1133          self.assertEqual(p.pow(6,8), 6**8)
    1134          self.assertEqual(p.pow(6,8), 6**8)
    1135          p("close")() #this should trigger a new keep-alive request
    1136          self.assertEqual(p.pow(6,8), 6**8)
    1137          self.assertEqual(p.pow(6,8), 6**8)
    1138          self.assertEqual(p.pow(6,8), 6**8)
    1139          p("close")()
    1140  
    1141          #they should have all been two request handlers, each having logged at least
    1142          #two complete requests
    1143          self.assertEqual(len(self.RequestHandler.myRequests), 2)
    1144          self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
    1145          self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
    1146  
    1147  
    1148      def test_transport(self):
    1149          p = xmlrpclib.ServerProxy(URL)
    1150          #do some requests with close.
    1151          self.assertEqual(p.pow(6,8), 6**8)
    1152          p("transport").close() #same as above, really.
    1153          self.assertEqual(p.pow(6,8), 6**8)
    1154          p("close")()
    1155          self.assertEqual(len(self.RequestHandler.myRequests), 2)
    1156  
    1157  #A test case that verifies that gzip encoding works in both directions
    1158  #(for a request and the response)
    1159  @unittest.skipIf(gzip is None, 'requires gzip')
    1160  class ESC[4;38;5;81mGzipServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
    1161      #a request handler that supports keep-alive and logs requests into a
    1162      #class variable
    1163      class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCRequestHandler):
    1164          parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
    1165          protocol_version = 'HTTP/1.1'
    1166  
    1167          def do_POST(self):
    1168              #store content of last request in class
    1169              self.__class__.content_length = int(self.headers["content-length"])
    1170              return self.parentClass.do_POST(self)
    1171      requestHandler = RequestHandler
    1172  
    1173      class ESC[4;38;5;81mTransport(ESC[4;38;5;149mxmlrpclibESC[4;38;5;149m.ESC[4;38;5;149mTransport):
    1174          #custom transport, stores the response length for our perusal
    1175          fake_gzip = False
    1176          def parse_response(self, response):
    1177              self.response_length=int(response.getheader("content-length", 0))
    1178              return xmlrpclib.Transport.parse_response(self, response)
    1179  
    1180          def send_content(self, connection, body):
    1181              if self.fake_gzip:
    1182                  #add a lone gzip header to induce decode error remotely
    1183                  connection.putheader("Content-Encoding", "gzip")
    1184              return xmlrpclib.Transport.send_content(self, connection, body)
    1185  
    1186      def setUp(self):
    1187          BaseServerTestCase.setUp(self)
    1188  
    1189      def test_gzip_request(self):
    1190          t = self.Transport()
    1191          t.encode_threshold = None
    1192          p = xmlrpclib.ServerProxy(URL, transport=t)
    1193          self.assertEqual(p.pow(6,8), 6**8)
    1194          a = self.RequestHandler.content_length
    1195          t.encode_threshold = 0 #turn on request encoding
    1196          self.assertEqual(p.pow(6,8), 6**8)
    1197          b = self.RequestHandler.content_length
    1198          self.assertTrue(a>b)
    1199          p("close")()
    1200  
    1201      def test_bad_gzip_request(self):
    1202          t = self.Transport()
    1203          t.encode_threshold = None
    1204          t.fake_gzip = True
    1205          p = xmlrpclib.ServerProxy(URL, transport=t)
    1206          cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
    1207                                      re.compile(r"\b400\b"))
    1208          with cm:
    1209              p.pow(6, 8)
    1210          p("close")()
    1211  
    1212      def test_gzip_response(self):
    1213          t = self.Transport()
    1214          p = xmlrpclib.ServerProxy(URL, transport=t)
    1215          old = self.requestHandler.encode_threshold
    1216          self.requestHandler.encode_threshold = None #no encoding
    1217          self.assertEqual(p.pow(6,8), 6**8)
    1218          a = t.response_length
    1219          self.requestHandler.encode_threshold = 0 #always encode
    1220          self.assertEqual(p.pow(6,8), 6**8)
    1221          p("close")()
    1222          b = t.response_length
    1223          self.requestHandler.encode_threshold = old
    1224          self.assertTrue(a>b)
    1225  
    1226  
    1227  @unittest.skipIf(gzip is None, 'requires gzip')
    1228  class ESC[4;38;5;81mGzipUtilTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1229  
    1230      def test_gzip_decode_limit(self):
    1231          max_gzip_decode = 20 * 1024 * 1024
    1232          data = b'\0' * max_gzip_decode
    1233          encoded = xmlrpclib.gzip_encode(data)
    1234          decoded = xmlrpclib.gzip_decode(encoded)
    1235          self.assertEqual(len(decoded), max_gzip_decode)
    1236  
    1237          data = b'\0' * (max_gzip_decode + 1)
    1238          encoded = xmlrpclib.gzip_encode(data)
    1239  
    1240          with self.assertRaisesRegex(ValueError,
    1241                                      "max gzipped payload length exceeded"):
    1242              xmlrpclib.gzip_decode(encoded)
    1243  
    1244          xmlrpclib.gzip_decode(encoded, max_decode=-1)
    1245  
    1246  
    1247  class ESC[4;38;5;81mHeadersServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
    1248      class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCRequestHandler):
    1249          test_headers = None
    1250  
    1251          def do_POST(self):
    1252              self.__class__.test_headers = self.headers
    1253              return super().do_POST()
    1254      requestHandler = RequestHandler
    1255      standard_headers = [
    1256          'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
    1257          'Content-Length']
    1258  
    1259      def setUp(self):
    1260          self.RequestHandler.test_headers = None
    1261          return super().setUp()
    1262  
    1263      def assertContainsAdditionalHeaders(self, headers, additional):
    1264          expected_keys = sorted(self.standard_headers + list(additional.keys()))
    1265          self.assertListEqual(sorted(headers.keys()), expected_keys)
    1266  
    1267          for key, value in additional.items():
    1268              self.assertEqual(headers.get(key), value)
    1269  
    1270      def test_header(self):
    1271          p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
    1272          self.assertEqual(p.pow(6, 8), 6**8)
    1273  
    1274          headers = self.RequestHandler.test_headers
    1275          self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
    1276  
    1277      def test_header_many(self):
    1278          p = xmlrpclib.ServerProxy(
    1279              URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
    1280          self.assertEqual(p.pow(6, 8), 6**8)
    1281  
    1282          headers = self.RequestHandler.test_headers
    1283          self.assertContainsAdditionalHeaders(
    1284              headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
    1285  
    1286      def test_header_empty(self):
    1287          p = xmlrpclib.ServerProxy(URL, headers=[])
    1288          self.assertEqual(p.pow(6, 8), 6**8)
    1289  
    1290          headers = self.RequestHandler.test_headers
    1291          self.assertContainsAdditionalHeaders(headers, {})
    1292  
    1293      def test_header_tuple(self):
    1294          p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
    1295          self.assertEqual(p.pow(6, 8), 6**8)
    1296  
    1297          headers = self.RequestHandler.test_headers
    1298          self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
    1299  
    1300      def test_header_items(self):
    1301          p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
    1302          self.assertEqual(p.pow(6, 8), 6**8)
    1303  
    1304          headers = self.RequestHandler.test_headers
    1305          self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
    1306  
    1307  
    1308  #Test special attributes of the ServerProxy object
    1309  class ESC[4;38;5;81mServerProxyTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1310      def setUp(self):
    1311          unittest.TestCase.setUp(self)
    1312          # Actual value of the URL doesn't matter if it is a string in
    1313          # the correct format.
    1314          self.url = 'http://fake.localhost'
    1315  
    1316      def test_close(self):
    1317          p = xmlrpclib.ServerProxy(self.url)
    1318          self.assertEqual(p('close')(), None)
    1319  
    1320      def test_transport(self):
    1321          t = xmlrpclib.Transport()
    1322          p = xmlrpclib.ServerProxy(self.url, transport=t)
    1323          self.assertEqual(p('transport'), t)
    1324  
    1325  
    1326  # This is a contrived way to make a failure occur on the server side
    1327  # in order to test the _send_traceback_header flag on the server
    1328  class ESC[4;38;5;81mFailingMessageClass(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPMessage):
    1329      def get(self, key, failobj=None):
    1330          key = key.lower()
    1331          if key == 'content-length':
    1332              return 'I am broken'
    1333          return super().get(key, failobj)
    1334  
    1335  
    1336  class ESC[4;38;5;81mFailingServerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1337      def setUp(self):
    1338          self.evt = threading.Event()
    1339          # start server thread to handle requests
    1340          serv_args = (self.evt, 1)
    1341          thread = threading.Thread(target=http_server, args=serv_args)
    1342          thread.start()
    1343          self.addCleanup(thread.join)
    1344  
    1345          # wait for the server to be ready
    1346          self.evt.wait()
    1347          self.evt.clear()
    1348  
    1349      def tearDown(self):
    1350          # wait on the server thread to terminate
    1351          self.evt.wait()
    1352          # reset flag
    1353          xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
    1354          # reset message class
    1355          default_class = http.client.HTTPMessage
    1356          xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
    1357  
    1358      def test_basic(self):
    1359          # check that flag is false by default
    1360          flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
    1361          self.assertEqual(flagval, False)
    1362  
    1363          # enable traceback reporting
    1364          xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
    1365  
    1366          # test a call that shouldn't fail just as a smoke test
    1367          try:
    1368              p = xmlrpclib.ServerProxy(URL)
    1369              self.assertEqual(p.pow(6,8), 6**8)
    1370          except (xmlrpclib.ProtocolError, OSError) as e:
    1371              # ignore failures due to non-blocking socket 'unavailable' errors
    1372              if not is_unavailable_exception(e):
    1373                  # protocol error; provide additional information in test output
    1374                  self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
    1375  
    1376      def test_fail_no_info(self):
    1377          # use the broken message class
    1378          xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
    1379  
    1380          try:
    1381              p = xmlrpclib.ServerProxy(URL)
    1382              p.pow(6,8)
    1383          except (xmlrpclib.ProtocolError, OSError) as e:
    1384              # ignore failures due to non-blocking socket 'unavailable' errors
    1385              if not is_unavailable_exception(e) and hasattr(e, "headers"):
    1386                  # The two server-side error headers shouldn't be sent back in this case
    1387                  self.assertTrue(e.headers.get("X-exception") is None)
    1388                  self.assertTrue(e.headers.get("X-traceback") is None)
    1389          else:
    1390              self.fail('ProtocolError not raised')
    1391  
    1392      def test_fail_with_info(self):
    1393          # use the broken message class
    1394          xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
    1395  
    1396          # Check that errors in the server send back exception/traceback
    1397          # info when flag is set
    1398          xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
    1399  
    1400          try:
    1401              p = xmlrpclib.ServerProxy(URL)
    1402              p.pow(6,8)
    1403          except (xmlrpclib.ProtocolError, OSError) as e:
    1404              # ignore failures due to non-blocking socket 'unavailable' errors
    1405              if not is_unavailable_exception(e) and hasattr(e, "headers"):
    1406                  # We should get error info in the response
    1407                  expected_err = "invalid literal for int() with base 10: 'I am broken'"
    1408                  self.assertEqual(e.headers.get("X-exception"), expected_err)
    1409                  self.assertTrue(e.headers.get("X-traceback") is not None)
    1410          else:
    1411              self.fail('ProtocolError not raised')
    1412  
    1413  
    1414  @contextlib.contextmanager
    1415  def captured_stdout(encoding='utf-8'):
    1416      """A variation on support.captured_stdout() which gives a text stream
    1417      having a `buffer` attribute.
    1418      """
    1419      orig_stdout = sys.stdout
    1420      sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
    1421      try:
    1422          yield sys.stdout
    1423      finally:
    1424          sys.stdout = orig_stdout
    1425  
    1426  
    1427  class ESC[4;38;5;81mCGIHandlerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1428      def setUp(self):
    1429          self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
    1430  
    1431      def tearDown(self):
    1432          self.cgi = None
    1433  
    1434      def test_cgi_get(self):
    1435          with os_helper.EnvironmentVarGuard() as env:
    1436              env['REQUEST_METHOD'] = 'GET'
    1437              # if the method is GET and no request_text is given, it runs handle_get
    1438              # get sysout output
    1439              with captured_stdout(encoding=self.cgi.encoding) as data_out:
    1440                  self.cgi.handle_request()
    1441  
    1442              # parse Status header
    1443              data_out.seek(0)
    1444              handle = data_out.read()
    1445              status = handle.split()[1]
    1446              message = ' '.join(handle.split()[2:4])
    1447  
    1448              self.assertEqual(status, '400')
    1449              self.assertEqual(message, 'Bad Request')
    1450  
    1451  
    1452      def test_cgi_xmlrpc_response(self):
    1453          data = """<?xml version='1.0'?>
    1454          <methodCall>
    1455              <methodName>test_method</methodName>
    1456              <params>
    1457                  <param>
    1458                      <value><string>foo</string></value>
    1459                  </param>
    1460                  <param>
    1461                      <value><string>bar</string></value>
    1462                  </param>
    1463              </params>
    1464          </methodCall>
    1465          """
    1466  
    1467          with os_helper.EnvironmentVarGuard() as env, \
    1468               captured_stdout(encoding=self.cgi.encoding) as data_out, \
    1469               support.captured_stdin() as data_in:
    1470              data_in.write(data)
    1471              data_in.seek(0)
    1472              env['CONTENT_LENGTH'] = str(len(data))
    1473              self.cgi.handle_request()
    1474          data_out.seek(0)
    1475  
    1476          # will respond exception, if so, our goal is achieved ;)
    1477          handle = data_out.read()
    1478  
    1479          # start with 44th char so as not to get http header, we just
    1480          # need only xml
    1481          self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
    1482  
    1483          # Also test the content-length returned  by handle_request
    1484          # Using the same test method inorder to avoid all the datapassing
    1485          # boilerplate code.
    1486          # Test for bug: http://bugs.python.org/issue5040
    1487  
    1488          content = handle[handle.find("<?xml"):]
    1489  
    1490          self.assertEqual(
    1491              int(re.search(r'Content-Length: (\d+)', handle).group(1)),
    1492              len(content))
    1493  
    1494  
    1495  class ESC[4;38;5;81mUseBuiltinTypesTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1496  
    1497      def test_use_builtin_types(self):
    1498          # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
    1499          # makes all dispatch of binary data as bytes instances, and all
    1500          # dispatch of datetime argument as datetime.datetime instances.
    1501          self.log = []
    1502          expected_bytes = b"my dog has fleas"
    1503          expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
    1504          marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
    1505          def foobar(*args):
    1506              self.log.extend(args)
    1507          handler = xmlrpc.server.SimpleXMLRPCDispatcher(
    1508              allow_none=True, encoding=None, use_builtin_types=True)
    1509          handler.register_function(foobar)
    1510          handler._marshaled_dispatch(marshaled)
    1511          self.assertEqual(len(self.log), 2)
    1512          mybytes, mydate = self.log
    1513          self.assertEqual(self.log, [expected_bytes, expected_date])
    1514          self.assertIs(type(mydate), datetime.datetime)
    1515          self.assertIs(type(mybytes), bytes)
    1516  
    1517      def test_cgihandler_has_use_builtin_types_flag(self):
    1518          handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
    1519          self.assertTrue(handler.use_builtin_types)
    1520  
    1521      def test_xmlrpcserver_has_use_builtin_types_flag(self):
    1522          server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
    1523              use_builtin_types=True)
    1524          server.server_close()
    1525          self.assertTrue(server.use_builtin_types)
    1526  
    1527  
    1528  def setUpModule():
    1529      thread_info = threading_helper.threading_setup()
    1530      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
    1531  
    1532  
    1533  if __name__ == "__main__":
    1534      unittest.main()