1 import base64
2 import datetime
3 import decimal
4 import sys
5 import time
6 import unittest
7 from unittest import mock
8 import xmlrpc.client as xmlrpclib
9 import xmlrpc.server
10 import http.client
11 import http, http.server
12 import socket
13 import threading
14 import re
15 import io
16 import contextlib
17 from test import support
18 from test.support import os_helper
19 from test.support import socket_helper
20 from test.support import threading_helper
21 from test.support import ALWAYS_EQ, LARGEST, SMALLEST
22
23 try:
24 import gzip
25 except ImportError:
26 gzip = None
27
28 support.requires_working_socket(module=True)
29
30 alist = [{'astring': 'foo@bar.baz.spam',
31 'afloat': 7283.43,
32 'anint': 2**20,
33 'ashortlong': 2,
34 'anotherlist': ['.zyx.41'],
35 'abase64': xmlrpclib.Binary(b"my dog has fleas"),
36 'b64bytes': b"my dog has fleas",
37 'b64bytearray': bytearray(b"my dog has fleas"),
38 'boolean': False,
39 'unicode': '\u4000\u6000\u8000',
40 'ukey\u4000': 'regular value',
41 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
42 'datetime2': xmlrpclib.DateTime(
43 (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
44 'datetime3': xmlrpclib.DateTime(
45 datetime.datetime(2005, 2, 10, 11, 41, 23)),
46 }]
47
48 class ESC[4;38;5;81mXMLRPCTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
49
50 def test_dump_load(self):
51 dump = xmlrpclib.dumps((alist,))
52 load = xmlrpclib.loads(dump)
53 self.assertEqual(alist, load[0][0])
54
55 def test_dump_bare_datetime(self):
56 # This checks that an unwrapped datetime.date object can be handled
57 # by the marshalling code. This can't be done via test_dump_load()
58 # since with use_builtin_types set to 1 the unmarshaller would create
59 # datetime objects for the 'datetime[123]' keys as well
60 dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
61 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
62 s = xmlrpclib.dumps((dt,))
63
64 result, m = xmlrpclib.loads(s, use_builtin_types=True)
65 (newdt,) = result
66 self.assertEqual(newdt, dt)
67 self.assertIs(type(newdt), datetime.datetime)
68 self.assertIsNone(m)
69
70 result, m = xmlrpclib.loads(s, use_builtin_types=False)
71 (newdt,) = result
72 self.assertEqual(newdt, dt)
73 self.assertIs(type(newdt), xmlrpclib.DateTime)
74 self.assertIsNone(m)
75
76 result, m = xmlrpclib.loads(s, use_datetime=True)
77 (newdt,) = result
78 self.assertEqual(newdt, dt)
79 self.assertIs(type(newdt), datetime.datetime)
80 self.assertIsNone(m)
81
82 result, m = xmlrpclib.loads(s, use_datetime=False)
83 (newdt,) = result
84 self.assertEqual(newdt, dt)
85 self.assertIs(type(newdt), xmlrpclib.DateTime)
86 self.assertIsNone(m)
87
88
89 def test_datetime_before_1900(self):
90 # same as before but with a date before 1900
91 dt = datetime.datetime(1, 2, 10, 11, 41, 23)
92 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
93 s = xmlrpclib.dumps((dt,))
94
95 result, m = xmlrpclib.loads(s, use_builtin_types=True)
96 (newdt,) = result
97 self.assertEqual(newdt, dt)
98 self.assertIs(type(newdt), datetime.datetime)
99 self.assertIsNone(m)
100
101 result, m = xmlrpclib.loads(s, use_builtin_types=False)
102 (newdt,) = result
103 self.assertEqual(newdt, dt)
104 self.assertIs(type(newdt), xmlrpclib.DateTime)
105 self.assertIsNone(m)
106
107 def test_bug_1164912 (self):
108 d = xmlrpclib.DateTime()
109 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
110 methodresponse=True))
111 self.assertIsInstance(new_d.value, str)
112
113 # Check that the output of dumps() is still an 8-bit string
114 s = xmlrpclib.dumps((new_d,), methodresponse=True)
115 self.assertIsInstance(s, str)
116
117 def test_newstyle_class(self):
118 class ESC[4;38;5;81mT(ESC[4;38;5;149mobject):
119 pass
120 t = T()
121 t.x = 100
122 t.y = "Hello"
123 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
124 self.assertEqual(t2, t.__dict__)
125
126 def test_dump_big_long(self):
127 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
128
129 def test_dump_bad_dict(self):
130 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
131
132 def test_dump_recursive_seq(self):
133 l = [1,2,3]
134 t = [3,4,5,l]
135 l.append(t)
136 self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
137
138 def test_dump_recursive_dict(self):
139 d = {'1':1, '2':1}
140 t = {'3':3, 'd':d}
141 d['t'] = t
142 self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
143
144 def test_dump_big_int(self):
145 if sys.maxsize > 2**31-1:
146 self.assertRaises(OverflowError, xmlrpclib.dumps,
147 (int(2**34),))
148
149 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
150 self.assertRaises(OverflowError, xmlrpclib.dumps,
151 (xmlrpclib.MAXINT+1,))
152 self.assertRaises(OverflowError, xmlrpclib.dumps,
153 (xmlrpclib.MININT-1,))
154
155 def dummy_write(s):
156 pass
157
158 m = xmlrpclib.Marshaller()
159 m.dump_int(xmlrpclib.MAXINT, dummy_write)
160 m.dump_int(xmlrpclib.MININT, dummy_write)
161 self.assertRaises(OverflowError, m.dump_int,
162 xmlrpclib.MAXINT+1, dummy_write)
163 self.assertRaises(OverflowError, m.dump_int,
164 xmlrpclib.MININT-1, dummy_write)
165
166 def test_dump_double(self):
167 xmlrpclib.dumps((float(2 ** 34),))
168 xmlrpclib.dumps((float(xmlrpclib.MAXINT),
169 float(xmlrpclib.MININT)))
170 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
171 float(xmlrpclib.MININT - 42)))
172
173 def dummy_write(s):
174 pass
175
176 m = xmlrpclib.Marshaller()
177 m.dump_double(xmlrpclib.MAXINT, dummy_write)
178 m.dump_double(xmlrpclib.MININT, dummy_write)
179 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
180 m.dump_double(xmlrpclib.MININT - 42, dummy_write)
181
182 def test_dump_none(self):
183 value = alist + [None]
184 arg1 = (alist + [None],)
185 strg = xmlrpclib.dumps(arg1, allow_none=True)
186 self.assertEqual(value,
187 xmlrpclib.loads(strg)[0][0])
188 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
189
190 def test_dump_encoding(self):
191 value = {'key\u20ac\xa4':
192 'value\u20ac\xa4'}
193 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
194 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
195 self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
197 self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
198
199 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
200 methodresponse=True)
201 self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
202 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
203 self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
204
205 methodname = 'method\u20ac\xa4'
206 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
207 methodname=methodname)
208 self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
209 self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
210
211 def test_dump_bytes(self):
212 sample = b"my dog has fleas"
213 self.assertEqual(sample, xmlrpclib.Binary(sample))
214 for type_ in bytes, bytearray, xmlrpclib.Binary:
215 value = type_(sample)
216 s = xmlrpclib.dumps((value,))
217
218 result, m = xmlrpclib.loads(s, use_builtin_types=True)
219 (newvalue,) = result
220 self.assertEqual(newvalue, sample)
221 self.assertIs(type(newvalue), bytes)
222 self.assertIsNone(m)
223
224 result, m = xmlrpclib.loads(s, use_builtin_types=False)
225 (newvalue,) = result
226 self.assertEqual(newvalue, sample)
227 self.assertIs(type(newvalue), xmlrpclib.Binary)
228 self.assertIsNone(m)
229
230 def test_loads_unsupported(self):
231 ResponseError = xmlrpclib.ResponseError
232 data = '<params><param><value><spam/></value></param></params>'
233 self.assertRaises(ResponseError, xmlrpclib.loads, data)
234 data = ('<params><param><value><array>'
235 '<value><spam/></value>'
236 '</array></value></param></params>')
237 self.assertRaises(ResponseError, xmlrpclib.loads, data)
238 data = ('<params><param><value><struct>'
239 '<member><name>a</name><value><spam/></value></member>'
240 '<member><name>b</name><value><spam/></value></member>'
241 '</struct></value></param></params>')
242 self.assertRaises(ResponseError, xmlrpclib.loads, data)
243
244 def check_loads(self, s, value, **kwargs):
245 dump = '<params><param><value>%s</value></param></params>' % s
246 result, m = xmlrpclib.loads(dump, **kwargs)
247 (newvalue,) = result
248 self.assertEqual(newvalue, value)
249 self.assertIs(type(newvalue), type(value))
250 self.assertIsNone(m)
251
252 def test_load_standard_types(self):
253 check = self.check_loads
254 check('string', 'string')
255 check('<string>string</string>', 'string')
256 check('<string>𝔘𝔫𝔦𝔠𝔬𝔡𝔢 string</string>', '𝔘𝔫𝔦𝔠𝔬𝔡𝔢 string')
257 check('<int>2056183947</int>', 2056183947)
258 check('<int>-2056183947</int>', -2056183947)
259 check('<i4>2056183947</i4>', 2056183947)
260 check('<double>46093.78125</double>', 46093.78125)
261 check('<boolean>0</boolean>', False)
262 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
263 xmlrpclib.Binary(b'\x00byte string\xff'))
264 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
265 b'\x00byte string\xff', use_builtin_types=True)
266 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
267 xmlrpclib.DateTime('20050210T11:41:23'))
268 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
269 datetime.datetime(2005, 2, 10, 11, 41, 23),
270 use_builtin_types=True)
271 check('<array><data>'
272 '<value><int>1</int></value><value><int>2</int></value>'
273 '</data></array>', [1, 2])
274 check('<struct>'
275 '<member><name>b</name><value><int>2</int></value></member>'
276 '<member><name>a</name><value><int>1</int></value></member>'
277 '</struct>', {'a': 1, 'b': 2})
278
279 def test_load_extension_types(self):
280 check = self.check_loads
281 check('<nil/>', None)
282 check('<ex:nil/>', None)
283 check('<i1>205</i1>', 205)
284 check('<i2>20561</i2>', 20561)
285 check('<i8>9876543210</i8>', 9876543210)
286 check('<biginteger>98765432100123456789</biginteger>',
287 98765432100123456789)
288 check('<float>93.78125</float>', 93.78125)
289 check('<bigdecimal>9876543210.0123456789</bigdecimal>',
290 decimal.Decimal('9876543210.0123456789'))
291
292 def test_limit_int(self):
293 check = self.check_loads
294 maxdigits = 5000
295 with support.adjust_int_max_str_digits(maxdigits):
296 s = '1' * (maxdigits + 1)
297 with self.assertRaises(ValueError):
298 check(f'<int>{s}</int>', None)
299 with self.assertRaises(ValueError):
300 check(f'<biginteger>{s}</biginteger>', None)
301
302 def test_get_host_info(self):
303 # see bug #3613, this raised a TypeError
304 transp = xmlrpc.client.Transport()
305 self.assertEqual(transp.get_host_info("user@host.tld"),
306 ('host.tld',
307 [('Authorization', 'Basic dXNlcg==')], {}))
308
309 def test_ssl_presence(self):
310 try:
311 import ssl
312 except ImportError:
313 has_ssl = False
314 else:
315 has_ssl = True
316 try:
317 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
318 except NotImplementedError:
319 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
320 except OSError:
321 self.assertTrue(has_ssl)
322
323 def test_keepalive_disconnect(self):
324 class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mBaseHTTPRequestHandler):
325 protocol_version = "HTTP/1.1"
326 handled = False
327
328 def do_POST(self):
329 length = int(self.headers.get("Content-Length"))
330 self.rfile.read(length)
331 if self.handled:
332 self.close_connection = True
333 return
334 response = xmlrpclib.dumps((5,), methodresponse=True)
335 response = response.encode()
336 self.send_response(http.HTTPStatus.OK)
337 self.send_header("Content-Length", len(response))
338 self.end_headers()
339 self.wfile.write(response)
340 self.handled = True
341 self.close_connection = False
342
343 def log_message(self, format, *args):
344 # don't clobber sys.stderr
345 pass
346
347 def run_server():
348 server.socket.settimeout(float(1)) # Don't hang if client fails
349 server.handle_request() # First request and attempt at second
350 server.handle_request() # Retried second request
351
352 server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
353 self.addCleanup(server.server_close)
354 thread = threading.Thread(target=run_server)
355 thread.start()
356 self.addCleanup(thread.join)
357 url = "http://{}:{}/".format(*server.server_address)
358 with xmlrpclib.ServerProxy(url) as p:
359 self.assertEqual(p.method(), 5)
360 self.assertEqual(p.method(), 5)
361
362
363 class ESC[4;38;5;81mSimpleXMLRPCDispatcherTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
364 class ESC[4;38;5;81mDispatchExc(ESC[4;38;5;149mException):
365 """Raised inside the dispatched functions when checking for
366 chained exceptions"""
367
368 def test_call_registered_func(self):
369 """Calls explicitly registered function"""
370 # Makes sure any exception raised inside the function has no other
371 # exception chained to it
372
373 exp_params = 1, 2, 3
374
375 def dispatched_func(*params):
376 raise self.DispatchExc(params)
377
378 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
379 dispatcher.register_function(dispatched_func)
380 with self.assertRaises(self.DispatchExc) as exc_ctx:
381 dispatcher._dispatch('dispatched_func', exp_params)
382 self.assertEqual(exc_ctx.exception.args, (exp_params,))
383 self.assertIsNone(exc_ctx.exception.__cause__)
384 self.assertIsNone(exc_ctx.exception.__context__)
385
386 def test_call_instance_func(self):
387 """Calls a registered instance attribute as a function"""
388 # Makes sure any exception raised inside the function has no other
389 # exception chained to it
390
391 exp_params = 1, 2, 3
392
393 class ESC[4;38;5;81mDispatchedClass:
394 def dispatched_func(self, *params):
395 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
396
397 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
398 dispatcher.register_instance(DispatchedClass())
399 with self.assertRaises(self.DispatchExc) as exc_ctx:
400 dispatcher._dispatch('dispatched_func', exp_params)
401 self.assertEqual(exc_ctx.exception.args, (exp_params,))
402 self.assertIsNone(exc_ctx.exception.__cause__)
403 self.assertIsNone(exc_ctx.exception.__context__)
404
405 def test_call_dispatch_func(self):
406 """Calls the registered instance's `_dispatch` function"""
407 # Makes sure any exception raised inside the function has no other
408 # exception chained to it
409
410 exp_method = 'method'
411 exp_params = 1, 2, 3
412
413 class ESC[4;38;5;81mTestInstance:
414 def _dispatch(self, method, params):
415 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
416 method, params)
417
418 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
419 dispatcher.register_instance(TestInstance())
420 with self.assertRaises(self.DispatchExc) as exc_ctx:
421 dispatcher._dispatch(exp_method, exp_params)
422 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
423 self.assertIsNone(exc_ctx.exception.__cause__)
424 self.assertIsNone(exc_ctx.exception.__context__)
425
426 def test_registered_func_is_none(self):
427 """Calls explicitly registered function which is None"""
428
429 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
430 dispatcher.register_function(None, name='method')
431 with self.assertRaisesRegex(Exception, 'method'):
432 dispatcher._dispatch('method', ('param',))
433
434 def test_instance_has_no_func(self):
435 """Attempts to call nonexistent function on a registered instance"""
436
437 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
438 dispatcher.register_instance(object())
439 with self.assertRaisesRegex(Exception, 'method'):
440 dispatcher._dispatch('method', ('param',))
441
442 def test_cannot_locate_func(self):
443 """Calls a function that the dispatcher cannot locate"""
444
445 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
446 with self.assertRaisesRegex(Exception, 'method'):
447 dispatcher._dispatch('method', ('param',))
448
449
450 class ESC[4;38;5;81mHelperTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
451 def test_escape(self):
452 self.assertEqual(xmlrpclib.escape("a&b"), "a&b")
453 self.assertEqual(xmlrpclib.escape("a<b"), "a<b")
454 self.assertEqual(xmlrpclib.escape("a>b"), "a>b")
455
456 class ESC[4;38;5;81mFaultTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
457 def test_repr(self):
458 f = xmlrpclib.Fault(42, 'Test Fault')
459 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
460 self.assertEqual(repr(f), str(f))
461
462 def test_dump_fault(self):
463 f = xmlrpclib.Fault(42, 'Test Fault')
464 s = xmlrpclib.dumps((f,))
465 (newf,), m = xmlrpclib.loads(s)
466 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
467 self.assertEqual(m, None)
468
469 s = xmlrpclib.Marshaller().dumps(f)
470 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
471
472 def test_dotted_attribute(self):
473 # this will raise AttributeError because code don't want us to use
474 # private methods
475 self.assertRaises(AttributeError,
476 xmlrpc.server.resolve_dotted_attribute, str, '__add')
477 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
478
479 class ESC[4;38;5;81mDateTimeTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
480 def test_default(self):
481 with mock.patch('time.localtime') as localtime_mock:
482 time_struct = time.struct_time(
483 [2013, 7, 15, 0, 24, 49, 0, 196, 0])
484 localtime_mock.return_value = time_struct
485 localtime = time.localtime()
486 t = xmlrpclib.DateTime()
487 self.assertEqual(str(t),
488 time.strftime("%Y%m%dT%H:%M:%S", localtime))
489
490 def test_time(self):
491 d = 1181399930.036952
492 t = xmlrpclib.DateTime(d)
493 self.assertEqual(str(t),
494 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
495
496 def test_time_tuple(self):
497 d = (2007,6,9,10,38,50,5,160,0)
498 t = xmlrpclib.DateTime(d)
499 self.assertEqual(str(t), '20070609T10:38:50')
500
501 def test_time_struct(self):
502 d = time.localtime(1181399930.036952)
503 t = xmlrpclib.DateTime(d)
504 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
505
506 def test_datetime_datetime(self):
507 d = datetime.datetime(2007,1,2,3,4,5)
508 t = xmlrpclib.DateTime(d)
509 self.assertEqual(str(t), '20070102T03:04:05')
510
511 def test_repr(self):
512 d = datetime.datetime(2007,1,2,3,4,5)
513 t = xmlrpclib.DateTime(d)
514 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
515 self.assertEqual(repr(t), val)
516
517 def test_decode(self):
518 d = ' 20070908T07:11:13 '
519 t1 = xmlrpclib.DateTime()
520 t1.decode(d)
521 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
522 self.assertEqual(t1, tref)
523
524 t2 = xmlrpclib._datetime(d)
525 self.assertEqual(t2, tref)
526
527 def test_comparison(self):
528 now = datetime.datetime.now()
529 dtime = xmlrpclib.DateTime(now.timetuple())
530
531 # datetime vs. DateTime
532 self.assertTrue(dtime == now)
533 self.assertTrue(now == dtime)
534 then = now + datetime.timedelta(seconds=4)
535 self.assertTrue(then >= dtime)
536 self.assertTrue(dtime < then)
537
538 # str vs. DateTime
539 dstr = now.strftime("%Y%m%dT%H:%M:%S")
540 self.assertTrue(dtime == dstr)
541 self.assertTrue(dstr == dtime)
542 dtime_then = xmlrpclib.DateTime(then.timetuple())
543 self.assertTrue(dtime_then >= dstr)
544 self.assertTrue(dstr < dtime_then)
545
546 # some other types
547 dbytes = dstr.encode('ascii')
548 dtuple = now.timetuple()
549 self.assertFalse(dtime == 1970)
550 self.assertTrue(dtime != dbytes)
551 self.assertFalse(dtime == bytearray(dbytes))
552 self.assertTrue(dtime != dtuple)
553 with self.assertRaises(TypeError):
554 dtime < float(1970)
555 with self.assertRaises(TypeError):
556 dtime > dbytes
557 with self.assertRaises(TypeError):
558 dtime <= bytearray(dbytes)
559 with self.assertRaises(TypeError):
560 dtime >= dtuple
561
562 self.assertTrue(dtime == ALWAYS_EQ)
563 self.assertFalse(dtime != ALWAYS_EQ)
564 self.assertTrue(dtime < LARGEST)
565 self.assertFalse(dtime > LARGEST)
566 self.assertTrue(dtime <= LARGEST)
567 self.assertFalse(dtime >= LARGEST)
568 self.assertFalse(dtime < SMALLEST)
569 self.assertTrue(dtime > SMALLEST)
570 self.assertFalse(dtime <= SMALLEST)
571 self.assertTrue(dtime >= SMALLEST)
572
573
574 class ESC[4;38;5;81mBinaryTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
575
576 # XXX What should str(Binary(b"\xff")) return? I'm choosing "\xff"
577 # for now (i.e. interpreting the binary data as Latin-1-encoded
578 # text). But this feels very unsatisfactory. Perhaps we should
579 # only define repr(), and return r"Binary(b'\xff')" instead?
580
581 def test_default(self):
582 t = xmlrpclib.Binary()
583 self.assertEqual(str(t), '')
584
585 def test_string(self):
586 d = b'\x01\x02\x03abc123\xff\xfe'
587 t = xmlrpclib.Binary(d)
588 self.assertEqual(str(t), str(d, "latin-1"))
589
590 def test_decode(self):
591 d = b'\x01\x02\x03abc123\xff\xfe'
592 de = base64.encodebytes(d)
593 t1 = xmlrpclib.Binary()
594 t1.decode(de)
595 self.assertEqual(str(t1), str(d, "latin-1"))
596
597 t2 = xmlrpclib._binary(de)
598 self.assertEqual(str(t2), str(d, "latin-1"))
599
600
601 ADDR = PORT = URL = None
602
603 # The evt is set twice. First when the server is ready to serve.
604 # Second when the server has been shutdown. The user must clear
605 # the event after it has been set the first time to catch the second set.
606 def http_server(evt, numrequests, requestHandler=None, encoding=None):
607 class ESC[4;38;5;81mTestInstanceClass:
608 def div(self, x, y):
609 return x // y
610
611 def _methodHelp(self, name):
612 if name == 'div':
613 return 'This is the div function'
614
615 class ESC[4;38;5;81mFixture:
616 @staticmethod
617 def getData():
618 return '42'
619
620 class ESC[4;38;5;81mMyXMLRPCServer(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCServer):
621 def get_request(self):
622 # Ensure the socket is always non-blocking. On Linux, socket
623 # attributes are not inherited like they are on *BSD and Windows.
624 s, port = self.socket.accept()
625 s.setblocking(True)
626 return s, port
627
628 if not requestHandler:
629 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
630 serv = MyXMLRPCServer(("localhost", 0), requestHandler,
631 encoding=encoding,
632 logRequests=False, bind_and_activate=False)
633 try:
634 serv.server_bind()
635 global ADDR, PORT, URL
636 ADDR, PORT = serv.socket.getsockname()
637 #connect to IP address directly. This avoids socket.create_connection()
638 #trying to connect to "localhost" using all address families, which
639 #causes slowdown e.g. on vista which supports AF_INET6. The server listens
640 #on AF_INET only.
641 URL = "http://%s:%d"%(ADDR, PORT)
642 serv.server_activate()
643 serv.register_introspection_functions()
644 serv.register_multicall_functions()
645 serv.register_function(pow)
646 serv.register_function(lambda x: x, 'têšt')
647 @serv.register_function
648 def my_function():
649 '''This is my function'''
650 return True
651 @serv.register_function(name='add')
652 def _(x, y):
653 return x + y
654 testInstance = TestInstanceClass()
655 serv.register_instance(testInstance, allow_dotted_names=True)
656 evt.set()
657
658 # handle up to 'numrequests' requests
659 while numrequests > 0:
660 serv.handle_request()
661 numrequests -= 1
662
663 except TimeoutError:
664 pass
665 finally:
666 serv.socket.close()
667 PORT = None
668 evt.set()
669
670 def http_multi_server(evt, numrequests, requestHandler=None):
671 class ESC[4;38;5;81mTestInstanceClass:
672 def div(self, x, y):
673 return x // y
674
675 def _methodHelp(self, name):
676 if name == 'div':
677 return 'This is the div function'
678
679 def my_function():
680 '''This is my function'''
681 return True
682
683 class ESC[4;38;5;81mMyXMLRPCServer(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mMultiPathXMLRPCServer):
684 def get_request(self):
685 # Ensure the socket is always non-blocking. On Linux, socket
686 # attributes are not inherited like they are on *BSD and Windows.
687 s, port = self.socket.accept()
688 s.setblocking(True)
689 return s, port
690
691 if not requestHandler:
692 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
693 class ESC[4;38;5;81mMyRequestHandler(ESC[4;38;5;149mrequestHandler):
694 rpc_paths = []
695
696 class ESC[4;38;5;81mBrokenDispatcher:
697 def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
698 raise RuntimeError("broken dispatcher")
699
700 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
701 logRequests=False, bind_and_activate=False)
702 serv.socket.settimeout(3)
703 serv.server_bind()
704 try:
705 global ADDR, PORT, URL
706 ADDR, PORT = serv.socket.getsockname()
707 #connect to IP address directly. This avoids socket.create_connection()
708 #trying to connect to "localhost" using all address families, which
709 #causes slowdown e.g. on vista which supports AF_INET6. The server listens
710 #on AF_INET only.
711 URL = "http://%s:%d"%(ADDR, PORT)
712 serv.server_activate()
713 paths = [
714 "/foo", "/foo/bar",
715 "/foo?k=v", "/foo#frag", "/foo?k=v#frag",
716 "", "/", "/RPC2", "?k=v", "#frag",
717 ]
718 for path in paths:
719 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
720 d.register_introspection_functions()
721 d.register_multicall_functions()
722 d.register_function(lambda p=path: p, 'test')
723 serv.get_dispatcher(paths[0]).register_function(pow)
724 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
725 serv.add_dispatcher("/is/broken", BrokenDispatcher())
726 evt.set()
727
728 # handle up to 'numrequests' requests
729 while numrequests > 0:
730 serv.handle_request()
731 numrequests -= 1
732
733 except TimeoutError:
734 pass
735 finally:
736 serv.socket.close()
737 PORT = None
738 evt.set()
739
740 # This function prevents errors like:
741 # <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
742 def is_unavailable_exception(e):
743 '''Returns True if the given ProtocolError is the product of a server-side
744 exception caused by the 'temporarily unavailable' response sometimes
745 given by operations on non-blocking sockets.'''
746
747 # sometimes we get a -1 error code and/or empty headers
748 try:
749 if e.errcode == -1 or e.headers is None:
750 return True
751 exc_mess = e.headers.get('X-exception')
752 except AttributeError:
753 # Ignore OSErrors here.
754 exc_mess = str(e)
755
756 if exc_mess and 'temporarily unavailable' in exc_mess.lower():
757 return True
758
759 def make_request_and_skipIf(condition, reason):
760 # If we skip the test, we have to make a request because
761 # the server created in setUp blocks expecting one to come in.
762 if not condition:
763 return lambda func: func
764 def decorator(func):
765 def make_request_and_skip(self):
766 try:
767 xmlrpclib.ServerProxy(URL).my_function()
768 except (xmlrpclib.ProtocolError, OSError) as e:
769 if not is_unavailable_exception(e):
770 raise
771 raise unittest.SkipTest(reason)
772 return make_request_and_skip
773 return decorator
774
775 class ESC[4;38;5;81mBaseServerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
776 requestHandler = None
777 request_count = 1
778 threadFunc = staticmethod(http_server)
779
780 def setUp(self):
781 # enable traceback reporting
782 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
783
784 self.evt = threading.Event()
785 # start server thread to handle requests
786 serv_args = (self.evt, self.request_count, self.requestHandler)
787 thread = threading.Thread(target=self.threadFunc, args=serv_args)
788 thread.start()
789 self.addCleanup(thread.join)
790
791 # wait for the server to be ready
792 self.evt.wait()
793 self.evt.clear()
794
795 def tearDown(self):
796 # wait on the server thread to terminate
797 self.evt.wait()
798
799 # disable traceback reporting
800 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
801
802 class ESC[4;38;5;81mSimpleServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
803 def test_simple1(self):
804 try:
805 p = xmlrpclib.ServerProxy(URL)
806 self.assertEqual(p.pow(6,8), 6**8)
807 except (xmlrpclib.ProtocolError, OSError) as e:
808 # ignore failures due to non-blocking socket 'unavailable' errors
809 if not is_unavailable_exception(e):
810 # protocol error; provide additional information in test output
811 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
812
813 def test_nonascii(self):
814 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
815 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
816 try:
817 p = xmlrpclib.ServerProxy(URL)
818 self.assertEqual(p.add(start_string, end_string),
819 start_string + end_string)
820 except (xmlrpclib.ProtocolError, OSError) as e:
821 # ignore failures due to non-blocking socket 'unavailable' errors
822 if not is_unavailable_exception(e):
823 # protocol error; provide additional information in test output
824 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
825
826 def test_client_encoding(self):
827 start_string = '\u20ac'
828 end_string = '\xa4'
829
830 try:
831 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
832 self.assertEqual(p.add(start_string, end_string),
833 start_string + end_string)
834 except (xmlrpclib.ProtocolError, socket.error) as e:
835 # ignore failures due to non-blocking socket unavailable errors.
836 if not is_unavailable_exception(e):
837 # protocol error; provide additional information in test output
838 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
839
840 def test_nonascii_methodname(self):
841 try:
842 p = xmlrpclib.ServerProxy(URL, encoding='ascii')
843 self.assertEqual(p.têšt(42), 42)
844 except (xmlrpclib.ProtocolError, socket.error) as e:
845 # ignore failures due to non-blocking socket unavailable errors.
846 if not is_unavailable_exception(e):
847 # protocol error; provide additional information in test output
848 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
849
850 def test_404(self):
851 # send POST with http.client, it should return 404 header and
852 # 'Not Found' message.
853 with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
854 conn.request('POST', '/this-is-not-valid')
855 response = conn.getresponse()
856
857 self.assertEqual(response.status, 404)
858 self.assertEqual(response.reason, 'Not Found')
859
860 def test_introspection1(self):
861 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
862 'system.listMethods', 'system.methodHelp',
863 'system.methodSignature', 'system.multicall',
864 'Fixture'])
865 try:
866 p = xmlrpclib.ServerProxy(URL)
867 meth = p.system.listMethods()
868 self.assertEqual(set(meth), expected_methods)
869 except (xmlrpclib.ProtocolError, OSError) as e:
870 # ignore failures due to non-blocking socket 'unavailable' errors
871 if not is_unavailable_exception(e):
872 # protocol error; provide additional information in test output
873 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
874
875
876 def test_introspection2(self):
877 try:
878 # test _methodHelp()
879 p = xmlrpclib.ServerProxy(URL)
880 divhelp = p.system.methodHelp('div')
881 self.assertEqual(divhelp, 'This is the div function')
882 except (xmlrpclib.ProtocolError, OSError) as e:
883 # ignore failures due to non-blocking socket 'unavailable' errors
884 if not is_unavailable_exception(e):
885 # protocol error; provide additional information in test output
886 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
887
888 @make_request_and_skipIf(sys.flags.optimize >= 2,
889 "Docstrings are omitted with -O2 and above")
890 def test_introspection3(self):
891 try:
892 # test native doc
893 p = xmlrpclib.ServerProxy(URL)
894 myfunction = p.system.methodHelp('my_function')
895 self.assertEqual(myfunction, 'This is my function')
896 except (xmlrpclib.ProtocolError, OSError) as e:
897 # ignore failures due to non-blocking socket 'unavailable' errors
898 if not is_unavailable_exception(e):
899 # protocol error; provide additional information in test output
900 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
901
902 def test_introspection4(self):
903 # the SimpleXMLRPCServer doesn't support signatures, but
904 # at least check that we can try making the call
905 try:
906 p = xmlrpclib.ServerProxy(URL)
907 divsig = p.system.methodSignature('div')
908 self.assertEqual(divsig, 'signatures not supported')
909 except (xmlrpclib.ProtocolError, OSError) as e:
910 # ignore failures due to non-blocking socket 'unavailable' errors
911 if not is_unavailable_exception(e):
912 # protocol error; provide additional information in test output
913 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
914
915 def test_multicall(self):
916 try:
917 p = xmlrpclib.ServerProxy(URL)
918 multicall = xmlrpclib.MultiCall(p)
919 multicall.add(2,3)
920 multicall.pow(6,8)
921 multicall.div(127,42)
922 add_result, pow_result, div_result = multicall()
923 self.assertEqual(add_result, 2+3)
924 self.assertEqual(pow_result, 6**8)
925 self.assertEqual(div_result, 127//42)
926 except (xmlrpclib.ProtocolError, OSError) as e:
927 # ignore failures due to non-blocking socket 'unavailable' errors
928 if not is_unavailable_exception(e):
929 # protocol error; provide additional information in test output
930 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
931
932 def test_non_existing_multicall(self):
933 try:
934 p = xmlrpclib.ServerProxy(URL)
935 multicall = xmlrpclib.MultiCall(p)
936 multicall.this_is_not_exists()
937 result = multicall()
938
939 # result.results contains;
940 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
941 # 'method "this_is_not_exists" is not supported'>}]
942
943 self.assertEqual(result.results[0]['faultCode'], 1)
944 self.assertEqual(result.results[0]['faultString'],
945 '<class \'Exception\'>:method "this_is_not_exists" '
946 'is not supported')
947 except (xmlrpclib.ProtocolError, OSError) as e:
948 # ignore failures due to non-blocking socket 'unavailable' errors
949 if not is_unavailable_exception(e):
950 # protocol error; provide additional information in test output
951 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
952
953 def test_dotted_attribute(self):
954 # Raises an AttributeError because private methods are not allowed.
955 self.assertRaises(AttributeError,
956 xmlrpc.server.resolve_dotted_attribute, str, '__add')
957
958 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
959 # Get the test to run faster by sending a request with test_simple1.
960 # This avoids waiting for the socket timeout.
961 self.test_simple1()
962
963 def test_allow_dotted_names_true(self):
964 # XXX also need allow_dotted_names_false test.
965 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
966 data = server.Fixture.getData()
967 self.assertEqual(data, '42')
968
969 def test_unicode_host(self):
970 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
971 self.assertEqual(server.add("a", "\xe9"), "a\xe9")
972
973 def test_partial_post(self):
974 # Check that a partial POST doesn't make the server loop: issue #14001.
975 with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
976 conn.send('POST /RPC2 HTTP/1.0\r\n'
977 'Content-Length: 100\r\n\r\n'
978 'bye HTTP/1.1\r\n'
979 f'Host: {ADDR}:{PORT}\r\n'
980 'Accept-Encoding: identity\r\n'
981 'Content-Length: 0\r\n\r\n'.encode('ascii'))
982
983 def test_context_manager(self):
984 with xmlrpclib.ServerProxy(URL) as server:
985 server.add(2, 3)
986 self.assertNotEqual(server('transport')._connection,
987 (None, None))
988 self.assertEqual(server('transport')._connection,
989 (None, None))
990
991 def test_context_manager_method_error(self):
992 try:
993 with xmlrpclib.ServerProxy(URL) as server:
994 server.add(2, "a")
995 except xmlrpclib.Fault:
996 pass
997 self.assertEqual(server('transport')._connection,
998 (None, None))
999
1000
1001 class ESC[4;38;5;81mSimpleServerEncodingTestCase(ESC[4;38;5;149mBaseServerTestCase):
1002 @staticmethod
1003 def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
1004 http_server(evt, numrequests, requestHandler, 'iso-8859-15')
1005
1006 def test_server_encoding(self):
1007 start_string = '\u20ac'
1008 end_string = '\xa4'
1009
1010 try:
1011 p = xmlrpclib.ServerProxy(URL)
1012 self.assertEqual(p.add(start_string, end_string),
1013 start_string + end_string)
1014 except (xmlrpclib.ProtocolError, socket.error) as e:
1015 # ignore failures due to non-blocking socket unavailable errors.
1016 if not is_unavailable_exception(e):
1017 # protocol error; provide additional information in test output
1018 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1019
1020
1021 class ESC[4;38;5;81mMultiPathServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
1022 threadFunc = staticmethod(http_multi_server)
1023 request_count = 2
1024 def test_path1(self):
1025 p = xmlrpclib.ServerProxy(URL+"/foo")
1026 self.assertEqual(p.pow(6,8), 6**8)
1027 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1028
1029 def test_path2(self):
1030 p = xmlrpclib.ServerProxy(URL+"/foo/bar")
1031 self.assertEqual(p.add(6,8), 6+8)
1032 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1033
1034 @support.requires_resource('walltime')
1035 def test_path3(self):
1036 p = xmlrpclib.ServerProxy(URL+"/is/broken")
1037 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1038
1039 @support.requires_resource('walltime')
1040 def test_invalid_path(self):
1041 p = xmlrpclib.ServerProxy(URL+"/invalid")
1042 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1043
1044 @support.requires_resource('walltime')
1045 def test_path_query_fragment(self):
1046 p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
1047 self.assertEqual(p.test(), "/foo?k=v#frag")
1048
1049 @support.requires_resource('walltime')
1050 def test_path_fragment(self):
1051 p = xmlrpclib.ServerProxy(URL+"/foo#frag")
1052 self.assertEqual(p.test(), "/foo#frag")
1053
1054 @support.requires_resource('walltime')
1055 def test_path_query(self):
1056 p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
1057 self.assertEqual(p.test(), "/foo?k=v")
1058
1059 @support.requires_resource('walltime')
1060 def test_empty_path(self):
1061 p = xmlrpclib.ServerProxy(URL)
1062 self.assertEqual(p.test(), "/RPC2")
1063
1064 @support.requires_resource('walltime')
1065 def test_root_path(self):
1066 p = xmlrpclib.ServerProxy(URL + "/")
1067 self.assertEqual(p.test(), "/")
1068
1069 @support.requires_resource('walltime')
1070 def test_empty_path_query(self):
1071 p = xmlrpclib.ServerProxy(URL + "?k=v")
1072 self.assertEqual(p.test(), "?k=v")
1073
1074 @support.requires_resource('walltime')
1075 def test_empty_path_fragment(self):
1076 p = xmlrpclib.ServerProxy(URL + "#frag")
1077 self.assertEqual(p.test(), "#frag")
1078
1079
1080 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1081 #does indeed serve subsequent requests on the same connection
1082 class ESC[4;38;5;81mBaseKeepaliveServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
1083 #a request handler that supports keep-alive and logs requests into a
1084 #class variable
1085 class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCRequestHandler):
1086 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1087 protocol_version = 'HTTP/1.1'
1088 myRequests = []
1089 def handle(self):
1090 self.myRequests.append([])
1091 self.reqidx = len(self.myRequests)-1
1092 return self.parentClass.handle(self)
1093 def handle_one_request(self):
1094 result = self.parentClass.handle_one_request(self)
1095 self.myRequests[self.reqidx].append(self.raw_requestline)
1096 return result
1097
1098 requestHandler = RequestHandler
1099 def setUp(self):
1100 #clear request log
1101 self.RequestHandler.myRequests = []
1102 return BaseServerTestCase.setUp(self)
1103
1104 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1105 #does indeed serve subsequent requests on the same connection
1106 class ESC[4;38;5;81mKeepaliveServerTestCase1(ESC[4;38;5;149mBaseKeepaliveServerTestCase):
1107 def test_two(self):
1108 p = xmlrpclib.ServerProxy(URL)
1109 #do three requests.
1110 self.assertEqual(p.pow(6,8), 6**8)
1111 self.assertEqual(p.pow(6,8), 6**8)
1112 self.assertEqual(p.pow(6,8), 6**8)
1113 p("close")()
1114
1115 #they should have all been handled by a single request handler
1116 self.assertEqual(len(self.RequestHandler.myRequests), 1)
1117
1118 #check that we did at least two (the third may be pending append
1119 #due to thread scheduling)
1120 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1121
1122
1123 #test special attribute access on the serverproxy, through the __call__
1124 #function.
1125 class ESC[4;38;5;81mKeepaliveServerTestCase2(ESC[4;38;5;149mBaseKeepaliveServerTestCase):
1126 #ask for two keepalive requests to be handled.
1127 request_count=2
1128
1129 def test_close(self):
1130 p = xmlrpclib.ServerProxy(URL)
1131 #do some requests with close.
1132 self.assertEqual(p.pow(6,8), 6**8)
1133 self.assertEqual(p.pow(6,8), 6**8)
1134 self.assertEqual(p.pow(6,8), 6**8)
1135 p("close")() #this should trigger a new keep-alive request
1136 self.assertEqual(p.pow(6,8), 6**8)
1137 self.assertEqual(p.pow(6,8), 6**8)
1138 self.assertEqual(p.pow(6,8), 6**8)
1139 p("close")()
1140
1141 #they should have all been two request handlers, each having logged at least
1142 #two complete requests
1143 self.assertEqual(len(self.RequestHandler.myRequests), 2)
1144 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1145 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1146
1147
1148 def test_transport(self):
1149 p = xmlrpclib.ServerProxy(URL)
1150 #do some requests with close.
1151 self.assertEqual(p.pow(6,8), 6**8)
1152 p("transport").close() #same as above, really.
1153 self.assertEqual(p.pow(6,8), 6**8)
1154 p("close")()
1155 self.assertEqual(len(self.RequestHandler.myRequests), 2)
1156
1157 #A test case that verifies that gzip encoding works in both directions
1158 #(for a request and the response)
1159 @unittest.skipIf(gzip is None, 'requires gzip')
1160 class ESC[4;38;5;81mGzipServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
1161 #a request handler that supports keep-alive and logs requests into a
1162 #class variable
1163 class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCRequestHandler):
1164 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1165 protocol_version = 'HTTP/1.1'
1166
1167 def do_POST(self):
1168 #store content of last request in class
1169 self.__class__.content_length = int(self.headers["content-length"])
1170 return self.parentClass.do_POST(self)
1171 requestHandler = RequestHandler
1172
1173 class ESC[4;38;5;81mTransport(ESC[4;38;5;149mxmlrpclibESC[4;38;5;149m.ESC[4;38;5;149mTransport):
1174 #custom transport, stores the response length for our perusal
1175 fake_gzip = False
1176 def parse_response(self, response):
1177 self.response_length=int(response.getheader("content-length", 0))
1178 return xmlrpclib.Transport.parse_response(self, response)
1179
1180 def send_content(self, connection, body):
1181 if self.fake_gzip:
1182 #add a lone gzip header to induce decode error remotely
1183 connection.putheader("Content-Encoding", "gzip")
1184 return xmlrpclib.Transport.send_content(self, connection, body)
1185
1186 def setUp(self):
1187 BaseServerTestCase.setUp(self)
1188
1189 def test_gzip_request(self):
1190 t = self.Transport()
1191 t.encode_threshold = None
1192 p = xmlrpclib.ServerProxy(URL, transport=t)
1193 self.assertEqual(p.pow(6,8), 6**8)
1194 a = self.RequestHandler.content_length
1195 t.encode_threshold = 0 #turn on request encoding
1196 self.assertEqual(p.pow(6,8), 6**8)
1197 b = self.RequestHandler.content_length
1198 self.assertTrue(a>b)
1199 p("close")()
1200
1201 def test_bad_gzip_request(self):
1202 t = self.Transport()
1203 t.encode_threshold = None
1204 t.fake_gzip = True
1205 p = xmlrpclib.ServerProxy(URL, transport=t)
1206 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1207 re.compile(r"\b400\b"))
1208 with cm:
1209 p.pow(6, 8)
1210 p("close")()
1211
1212 def test_gzip_response(self):
1213 t = self.Transport()
1214 p = xmlrpclib.ServerProxy(URL, transport=t)
1215 old = self.requestHandler.encode_threshold
1216 self.requestHandler.encode_threshold = None #no encoding
1217 self.assertEqual(p.pow(6,8), 6**8)
1218 a = t.response_length
1219 self.requestHandler.encode_threshold = 0 #always encode
1220 self.assertEqual(p.pow(6,8), 6**8)
1221 p("close")()
1222 b = t.response_length
1223 self.requestHandler.encode_threshold = old
1224 self.assertTrue(a>b)
1225
1226
1227 @unittest.skipIf(gzip is None, 'requires gzip')
1228 class ESC[4;38;5;81mGzipUtilTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1229
1230 def test_gzip_decode_limit(self):
1231 max_gzip_decode = 20 * 1024 * 1024
1232 data = b'\0' * max_gzip_decode
1233 encoded = xmlrpclib.gzip_encode(data)
1234 decoded = xmlrpclib.gzip_decode(encoded)
1235 self.assertEqual(len(decoded), max_gzip_decode)
1236
1237 data = b'\0' * (max_gzip_decode + 1)
1238 encoded = xmlrpclib.gzip_encode(data)
1239
1240 with self.assertRaisesRegex(ValueError,
1241 "max gzipped payload length exceeded"):
1242 xmlrpclib.gzip_decode(encoded)
1243
1244 xmlrpclib.gzip_decode(encoded, max_decode=-1)
1245
1246
1247 class ESC[4;38;5;81mHeadersServerTestCase(ESC[4;38;5;149mBaseServerTestCase):
1248 class ESC[4;38;5;81mRequestHandler(ESC[4;38;5;149mxmlrpcESC[4;38;5;149m.ESC[4;38;5;149mserverESC[4;38;5;149m.ESC[4;38;5;149mSimpleXMLRPCRequestHandler):
1249 test_headers = None
1250
1251 def do_POST(self):
1252 self.__class__.test_headers = self.headers
1253 return super().do_POST()
1254 requestHandler = RequestHandler
1255 standard_headers = [
1256 'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
1257 'Content-Length']
1258
1259 def setUp(self):
1260 self.RequestHandler.test_headers = None
1261 return super().setUp()
1262
1263 def assertContainsAdditionalHeaders(self, headers, additional):
1264 expected_keys = sorted(self.standard_headers + list(additional.keys()))
1265 self.assertListEqual(sorted(headers.keys()), expected_keys)
1266
1267 for key, value in additional.items():
1268 self.assertEqual(headers.get(key), value)
1269
1270 def test_header(self):
1271 p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
1272 self.assertEqual(p.pow(6, 8), 6**8)
1273
1274 headers = self.RequestHandler.test_headers
1275 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1276
1277 def test_header_many(self):
1278 p = xmlrpclib.ServerProxy(
1279 URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
1280 self.assertEqual(p.pow(6, 8), 6**8)
1281
1282 headers = self.RequestHandler.test_headers
1283 self.assertContainsAdditionalHeaders(
1284 headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
1285
1286 def test_header_empty(self):
1287 p = xmlrpclib.ServerProxy(URL, headers=[])
1288 self.assertEqual(p.pow(6, 8), 6**8)
1289
1290 headers = self.RequestHandler.test_headers
1291 self.assertContainsAdditionalHeaders(headers, {})
1292
1293 def test_header_tuple(self):
1294 p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
1295 self.assertEqual(p.pow(6, 8), 6**8)
1296
1297 headers = self.RequestHandler.test_headers
1298 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1299
1300 def test_header_items(self):
1301 p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
1302 self.assertEqual(p.pow(6, 8), 6**8)
1303
1304 headers = self.RequestHandler.test_headers
1305 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1306
1307
1308 #Test special attributes of the ServerProxy object
1309 class ESC[4;38;5;81mServerProxyTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1310 def setUp(self):
1311 unittest.TestCase.setUp(self)
1312 # Actual value of the URL doesn't matter if it is a string in
1313 # the correct format.
1314 self.url = 'http://fake.localhost'
1315
1316 def test_close(self):
1317 p = xmlrpclib.ServerProxy(self.url)
1318 self.assertEqual(p('close')(), None)
1319
1320 def test_transport(self):
1321 t = xmlrpclib.Transport()
1322 p = xmlrpclib.ServerProxy(self.url, transport=t)
1323 self.assertEqual(p('transport'), t)
1324
1325
1326 # This is a contrived way to make a failure occur on the server side
1327 # in order to test the _send_traceback_header flag on the server
1328 class ESC[4;38;5;81mFailingMessageClass(ESC[4;38;5;149mhttpESC[4;38;5;149m.ESC[4;38;5;149mclientESC[4;38;5;149m.ESC[4;38;5;149mHTTPMessage):
1329 def get(self, key, failobj=None):
1330 key = key.lower()
1331 if key == 'content-length':
1332 return 'I am broken'
1333 return super().get(key, failobj)
1334
1335
1336 class ESC[4;38;5;81mFailingServerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1337 def setUp(self):
1338 self.evt = threading.Event()
1339 # start server thread to handle requests
1340 serv_args = (self.evt, 1)
1341 thread = threading.Thread(target=http_server, args=serv_args)
1342 thread.start()
1343 self.addCleanup(thread.join)
1344
1345 # wait for the server to be ready
1346 self.evt.wait()
1347 self.evt.clear()
1348
1349 def tearDown(self):
1350 # wait on the server thread to terminate
1351 self.evt.wait()
1352 # reset flag
1353 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1354 # reset message class
1355 default_class = http.client.HTTPMessage
1356 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1357
1358 def test_basic(self):
1359 # check that flag is false by default
1360 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1361 self.assertEqual(flagval, False)
1362
1363 # enable traceback reporting
1364 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1365
1366 # test a call that shouldn't fail just as a smoke test
1367 try:
1368 p = xmlrpclib.ServerProxy(URL)
1369 self.assertEqual(p.pow(6,8), 6**8)
1370 except (xmlrpclib.ProtocolError, OSError) as e:
1371 # ignore failures due to non-blocking socket 'unavailable' errors
1372 if not is_unavailable_exception(e):
1373 # protocol error; provide additional information in test output
1374 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1375
1376 def test_fail_no_info(self):
1377 # use the broken message class
1378 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1379
1380 try:
1381 p = xmlrpclib.ServerProxy(URL)
1382 p.pow(6,8)
1383 except (xmlrpclib.ProtocolError, OSError) as e:
1384 # ignore failures due to non-blocking socket 'unavailable' errors
1385 if not is_unavailable_exception(e) and hasattr(e, "headers"):
1386 # The two server-side error headers shouldn't be sent back in this case
1387 self.assertTrue(e.headers.get("X-exception") is None)
1388 self.assertTrue(e.headers.get("X-traceback") is None)
1389 else:
1390 self.fail('ProtocolError not raised')
1391
1392 def test_fail_with_info(self):
1393 # use the broken message class
1394 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1395
1396 # Check that errors in the server send back exception/traceback
1397 # info when flag is set
1398 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1399
1400 try:
1401 p = xmlrpclib.ServerProxy(URL)
1402 p.pow(6,8)
1403 except (xmlrpclib.ProtocolError, OSError) as e:
1404 # ignore failures due to non-blocking socket 'unavailable' errors
1405 if not is_unavailable_exception(e) and hasattr(e, "headers"):
1406 # We should get error info in the response
1407 expected_err = "invalid literal for int() with base 10: 'I am broken'"
1408 self.assertEqual(e.headers.get("X-exception"), expected_err)
1409 self.assertTrue(e.headers.get("X-traceback") is not None)
1410 else:
1411 self.fail('ProtocolError not raised')
1412
1413
1414 @contextlib.contextmanager
1415 def captured_stdout(encoding='utf-8'):
1416 """A variation on support.captured_stdout() which gives a text stream
1417 having a `buffer` attribute.
1418 """
1419 orig_stdout = sys.stdout
1420 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1421 try:
1422 yield sys.stdout
1423 finally:
1424 sys.stdout = orig_stdout
1425
1426
1427 class ESC[4;38;5;81mCGIHandlerTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1428 def setUp(self):
1429 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1430
1431 def tearDown(self):
1432 self.cgi = None
1433
1434 def test_cgi_get(self):
1435 with os_helper.EnvironmentVarGuard() as env:
1436 env['REQUEST_METHOD'] = 'GET'
1437 # if the method is GET and no request_text is given, it runs handle_get
1438 # get sysout output
1439 with captured_stdout(encoding=self.cgi.encoding) as data_out:
1440 self.cgi.handle_request()
1441
1442 # parse Status header
1443 data_out.seek(0)
1444 handle = data_out.read()
1445 status = handle.split()[1]
1446 message = ' '.join(handle.split()[2:4])
1447
1448 self.assertEqual(status, '400')
1449 self.assertEqual(message, 'Bad Request')
1450
1451
1452 def test_cgi_xmlrpc_response(self):
1453 data = """<?xml version='1.0'?>
1454 <methodCall>
1455 <methodName>test_method</methodName>
1456 <params>
1457 <param>
1458 <value><string>foo</string></value>
1459 </param>
1460 <param>
1461 <value><string>bar</string></value>
1462 </param>
1463 </params>
1464 </methodCall>
1465 """
1466
1467 with os_helper.EnvironmentVarGuard() as env, \
1468 captured_stdout(encoding=self.cgi.encoding) as data_out, \
1469 support.captured_stdin() as data_in:
1470 data_in.write(data)
1471 data_in.seek(0)
1472 env['CONTENT_LENGTH'] = str(len(data))
1473 self.cgi.handle_request()
1474 data_out.seek(0)
1475
1476 # will respond exception, if so, our goal is achieved ;)
1477 handle = data_out.read()
1478
1479 # start with 44th char so as not to get http header, we just
1480 # need only xml
1481 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1482
1483 # Also test the content-length returned by handle_request
1484 # Using the same test method inorder to avoid all the datapassing
1485 # boilerplate code.
1486 # Test for bug: http://bugs.python.org/issue5040
1487
1488 content = handle[handle.find("<?xml"):]
1489
1490 self.assertEqual(
1491 int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1492 len(content))
1493
1494
1495 class ESC[4;38;5;81mUseBuiltinTypesTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1496
1497 def test_use_builtin_types(self):
1498 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1499 # makes all dispatch of binary data as bytes instances, and all
1500 # dispatch of datetime argument as datetime.datetime instances.
1501 self.log = []
1502 expected_bytes = b"my dog has fleas"
1503 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1504 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1505 def foobar(*args):
1506 self.log.extend(args)
1507 handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1508 allow_none=True, encoding=None, use_builtin_types=True)
1509 handler.register_function(foobar)
1510 handler._marshaled_dispatch(marshaled)
1511 self.assertEqual(len(self.log), 2)
1512 mybytes, mydate = self.log
1513 self.assertEqual(self.log, [expected_bytes, expected_date])
1514 self.assertIs(type(mydate), datetime.datetime)
1515 self.assertIs(type(mybytes), bytes)
1516
1517 def test_cgihandler_has_use_builtin_types_flag(self):
1518 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1519 self.assertTrue(handler.use_builtin_types)
1520
1521 def test_xmlrpcserver_has_use_builtin_types_flag(self):
1522 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1523 use_builtin_types=True)
1524 server.server_close()
1525 self.assertTrue(server.use_builtin_types)
1526
1527
1528 def setUpModule():
1529 thread_info = threading_helper.threading_setup()
1530 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1531
1532
1533 if __name__ == "__main__":
1534 unittest.main()