1 import unittest
2 import select
3 import os
4 import socket
5 import sys
6 import time
7 import errno
8 import struct
9 import threading
10
11 from test import support
12 from test.support import os_helper
13 from test.support import socket_helper
14 from test.support import threading_helper
15 from test.support import warnings_helper
16 from io import BytesIO
17
18 if support.PGO:
19 raise unittest.SkipTest("test is not helpful for PGO")
20
21 support.requires_working_socket(module=True)
22
23 asyncore = warnings_helper.import_deprecated('asyncore')
24
25
26 HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX')
27
28 class ESC[4;38;5;81mdummysocket:
29 def __init__(self):
30 self.closed = False
31
32 def close(self):
33 self.closed = True
34
35 def fileno(self):
36 return 42
37
38 class ESC[4;38;5;81mdummychannel:
39 def __init__(self):
40 self.socket = dummysocket()
41
42 def close(self):
43 self.socket.close()
44
45 class ESC[4;38;5;81mexitingdummy:
46 def __init__(self):
47 pass
48
49 def handle_read_event(self):
50 raise asyncore.ExitNow()
51
52 handle_write_event = handle_read_event
53 handle_close = handle_read_event
54 handle_expt_event = handle_read_event
55
56 class ESC[4;38;5;81mcrashingdummy:
57 def __init__(self):
58 self.error_handled = False
59
60 def handle_read_event(self):
61 raise Exception()
62
63 handle_write_event = handle_read_event
64 handle_close = handle_read_event
65 handle_expt_event = handle_read_event
66
67 def handle_error(self):
68 self.error_handled = True
69
70 # used when testing senders; just collects what it gets until newline is sent
71 def capture_server(evt, buf, serv):
72 try:
73 serv.listen()
74 conn, addr = serv.accept()
75 except TimeoutError:
76 pass
77 else:
78 n = 200
79 start = time.monotonic()
80 while n > 0 and time.monotonic() - start < 3.0:
81 r, w, e = select.select([conn], [], [], 0.1)
82 if r:
83 n -= 1
84 data = conn.recv(10)
85 # keep everything except for the newline terminator
86 buf.write(data.replace(b'\n', b''))
87 if b'\n' in data:
88 break
89 time.sleep(0.01)
90
91 conn.close()
92 finally:
93 serv.close()
94 evt.set()
95
96 def bind_af_aware(sock, addr):
97 """Helper function to bind a socket according to its family."""
98 if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
99 # Make sure the path doesn't exist.
100 os_helper.unlink(addr)
101 socket_helper.bind_unix_socket(sock, addr)
102 else:
103 sock.bind(addr)
104
105
106 class ESC[4;38;5;81mHelperFunctionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
107 def test_readwriteexc(self):
108 # Check exception handling behavior of read, write and _exception
109
110 # check that ExitNow exceptions in the object handler method
111 # bubbles all the way up through asyncore read/write/_exception calls
112 tr1 = exitingdummy()
113 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
114 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
115 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
116
117 # check that an exception other than ExitNow in the object handler
118 # method causes the handle_error method to get called
119 tr2 = crashingdummy()
120 asyncore.read(tr2)
121 self.assertEqual(tr2.error_handled, True)
122
123 tr2 = crashingdummy()
124 asyncore.write(tr2)
125 self.assertEqual(tr2.error_handled, True)
126
127 tr2 = crashingdummy()
128 asyncore._exception(tr2)
129 self.assertEqual(tr2.error_handled, True)
130
131 # asyncore.readwrite uses constants in the select module that
132 # are not present in Windows systems (see this thread:
133 # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
134 # These constants should be present as long as poll is available
135
136 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
137 def test_readwrite(self):
138 # Check that correct methods are called by readwrite()
139
140 attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
141
142 expected = (
143 (select.POLLIN, 'read'),
144 (select.POLLPRI, 'expt'),
145 (select.POLLOUT, 'write'),
146 (select.POLLERR, 'closed'),
147 (select.POLLHUP, 'closed'),
148 (select.POLLNVAL, 'closed'),
149 )
150
151 class ESC[4;38;5;81mtestobj:
152 def __init__(self):
153 self.read = False
154 self.write = False
155 self.closed = False
156 self.expt = False
157 self.error_handled = False
158
159 def handle_read_event(self):
160 self.read = True
161
162 def handle_write_event(self):
163 self.write = True
164
165 def handle_close(self):
166 self.closed = True
167
168 def handle_expt_event(self):
169 self.expt = True
170
171 def handle_error(self):
172 self.error_handled = True
173
174 for flag, expectedattr in expected:
175 tobj = testobj()
176 self.assertEqual(getattr(tobj, expectedattr), False)
177 asyncore.readwrite(tobj, flag)
178
179 # Only the attribute modified by the routine we expect to be
180 # called should be True.
181 for attr in attributes:
182 self.assertEqual(getattr(tobj, attr), attr==expectedattr)
183
184 # check that ExitNow exceptions in the object handler method
185 # bubbles all the way up through asyncore readwrite call
186 tr1 = exitingdummy()
187 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
188
189 # check that an exception other than ExitNow in the object handler
190 # method causes the handle_error method to get called
191 tr2 = crashingdummy()
192 self.assertEqual(tr2.error_handled, False)
193 asyncore.readwrite(tr2, flag)
194 self.assertEqual(tr2.error_handled, True)
195
196 def test_closeall(self):
197 self.closeall_check(False)
198
199 def test_closeall_default(self):
200 self.closeall_check(True)
201
202 def closeall_check(self, usedefault):
203 # Check that close_all() closes everything in a given map
204
205 l = []
206 testmap = {}
207 for i in range(10):
208 c = dummychannel()
209 l.append(c)
210 self.assertEqual(c.socket.closed, False)
211 testmap[i] = c
212
213 if usedefault:
214 socketmap = asyncore.socket_map
215 try:
216 asyncore.socket_map = testmap
217 asyncore.close_all()
218 finally:
219 testmap, asyncore.socket_map = asyncore.socket_map, socketmap
220 else:
221 asyncore.close_all(testmap)
222
223 self.assertEqual(len(testmap), 0)
224
225 for c in l:
226 self.assertEqual(c.socket.closed, True)
227
228 def test_compact_traceback(self):
229 try:
230 raise Exception("I don't like spam!")
231 except:
232 real_t, real_v, real_tb = sys.exc_info()
233 r = asyncore.compact_traceback()
234 else:
235 self.fail("Expected exception")
236
237 (f, function, line), t, v, info = r
238 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
239 self.assertEqual(function, 'test_compact_traceback')
240 self.assertEqual(t, real_t)
241 self.assertEqual(v, real_v)
242 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
243
244
245 class ESC[4;38;5;81mDispatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
246 def setUp(self):
247 pass
248
249 def tearDown(self):
250 asyncore.close_all()
251
252 def test_basic(self):
253 d = asyncore.dispatcher()
254 self.assertEqual(d.readable(), True)
255 self.assertEqual(d.writable(), True)
256
257 def test_repr(self):
258 d = asyncore.dispatcher()
259 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
260
261 def test_log(self):
262 d = asyncore.dispatcher()
263
264 # capture output of dispatcher.log() (to stderr)
265 l1 = "Lovely spam! Wonderful spam!"
266 l2 = "I don't like spam!"
267 with support.captured_stderr() as stderr:
268 d.log(l1)
269 d.log(l2)
270
271 lines = stderr.getvalue().splitlines()
272 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
273
274 def test_log_info(self):
275 d = asyncore.dispatcher()
276
277 # capture output of dispatcher.log_info() (to stdout via print)
278 l1 = "Have you got anything without spam?"
279 l2 = "Why can't she have egg bacon spam and sausage?"
280 l3 = "THAT'S got spam in it!"
281 with support.captured_stdout() as stdout:
282 d.log_info(l1, 'EGGS')
283 d.log_info(l2)
284 d.log_info(l3, 'SPAM')
285
286 lines = stdout.getvalue().splitlines()
287 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
288 self.assertEqual(lines, expected)
289
290 def test_unhandled(self):
291 d = asyncore.dispatcher()
292 d.ignore_log_types = ()
293
294 # capture output of dispatcher.log_info() (to stdout via print)
295 with support.captured_stdout() as stdout:
296 d.handle_expt()
297 d.handle_read()
298 d.handle_write()
299 d.handle_connect()
300
301 lines = stdout.getvalue().splitlines()
302 expected = ['warning: unhandled incoming priority event',
303 'warning: unhandled read event',
304 'warning: unhandled write event',
305 'warning: unhandled connect event']
306 self.assertEqual(lines, expected)
307
308 def test_strerror(self):
309 # refers to bug #8573
310 err = asyncore._strerror(errno.EPERM)
311 if hasattr(os, 'strerror'):
312 self.assertEqual(err, os.strerror(errno.EPERM))
313 err = asyncore._strerror(-1)
314 self.assertTrue(err != "")
315
316
317 class ESC[4;38;5;81mdispatcherwithsend_noread(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher_with_send):
318 def readable(self):
319 return False
320
321 def handle_connect(self):
322 pass
323
324
325 class ESC[4;38;5;81mDispatcherWithSendTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
326 def setUp(self):
327 pass
328
329 def tearDown(self):
330 asyncore.close_all()
331
332 @threading_helper.reap_threads
333 def test_send(self):
334 evt = threading.Event()
335 sock = socket.socket()
336 sock.settimeout(3)
337 port = socket_helper.bind_port(sock)
338
339 cap = BytesIO()
340 args = (evt, cap, sock)
341 t = threading.Thread(target=capture_server, args=args)
342 t.start()
343 try:
344 # wait a little longer for the server to initialize (it sometimes
345 # refuses connections on slow machines without this wait)
346 time.sleep(0.2)
347
348 data = b"Suppose there isn't a 16-ton weight?"
349 d = dispatcherwithsend_noread()
350 d.create_socket()
351 d.connect((socket_helper.HOST, port))
352
353 # give time for socket to connect
354 time.sleep(0.1)
355
356 d.send(data)
357 d.send(data)
358 d.send(b'\n')
359
360 n = 1000
361 while d.out_buffer and n > 0:
362 asyncore.poll()
363 n -= 1
364
365 evt.wait()
366
367 self.assertEqual(cap.getvalue(), data*2)
368 finally:
369 threading_helper.join_thread(t)
370
371
372 @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
373 'asyncore.file_wrapper required')
374 class ESC[4;38;5;81mFileWrapperTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
375 def setUp(self):
376 self.d = b"It's not dead, it's sleeping!"
377 with open(os_helper.TESTFN, 'wb') as file:
378 file.write(self.d)
379
380 def tearDown(self):
381 os_helper.unlink(os_helper.TESTFN)
382
383 def test_recv(self):
384 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
385 w = asyncore.file_wrapper(fd)
386 os.close(fd)
387
388 self.assertNotEqual(w.fd, fd)
389 self.assertNotEqual(w.fileno(), fd)
390 self.assertEqual(w.recv(13), b"It's not dead")
391 self.assertEqual(w.read(6), b", it's")
392 w.close()
393 self.assertRaises(OSError, w.read, 1)
394
395 def test_send(self):
396 d1 = b"Come again?"
397 d2 = b"I want to buy some cheese."
398 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND)
399 w = asyncore.file_wrapper(fd)
400 os.close(fd)
401
402 w.write(d1)
403 w.send(d2)
404 w.close()
405 with open(os_helper.TESTFN, 'rb') as file:
406 self.assertEqual(file.read(), self.d + d1 + d2)
407
408 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
409 'asyncore.file_dispatcher required')
410 def test_dispatcher(self):
411 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
412 data = []
413 class ESC[4;38;5;81mFileDispatcher(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mfile_dispatcher):
414 def handle_read(self):
415 data.append(self.recv(29))
416 s = FileDispatcher(fd)
417 os.close(fd)
418 asyncore.loop(timeout=0.01, use_poll=True, count=2)
419 self.assertEqual(b"".join(data), self.d)
420
421 def test_resource_warning(self):
422 # Issue #11453
423 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
424 f = asyncore.file_wrapper(fd)
425
426 os.close(fd)
427 with warnings_helper.check_warnings(('', ResourceWarning)):
428 f = None
429 support.gc_collect()
430
431 def test_close_twice(self):
432 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
433 f = asyncore.file_wrapper(fd)
434 os.close(fd)
435
436 os.close(f.fd) # file_wrapper dupped fd
437 with self.assertRaises(OSError):
438 f.close()
439
440 self.assertEqual(f.fd, -1)
441 # calling close twice should not fail
442 f.close()
443
444
445 class ESC[4;38;5;81mBaseTestHandler(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
446
447 def __init__(self, sock=None):
448 asyncore.dispatcher.__init__(self, sock)
449 self.flag = False
450
451 def handle_accept(self):
452 raise Exception("handle_accept not supposed to be called")
453
454 def handle_accepted(self):
455 raise Exception("handle_accepted not supposed to be called")
456
457 def handle_connect(self):
458 raise Exception("handle_connect not supposed to be called")
459
460 def handle_expt(self):
461 raise Exception("handle_expt not supposed to be called")
462
463 def handle_close(self):
464 raise Exception("handle_close not supposed to be called")
465
466 def handle_error(self):
467 raise
468
469
470 class ESC[4;38;5;81mBaseServer(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
471 """A server which listens on an address and dispatches the
472 connection to a handler.
473 """
474
475 def __init__(self, family, addr, handler=BaseTestHandler):
476 asyncore.dispatcher.__init__(self)
477 self.create_socket(family)
478 self.set_reuse_addr()
479 bind_af_aware(self.socket, addr)
480 self.listen(5)
481 self.handler = handler
482
483 @property
484 def address(self):
485 return self.socket.getsockname()
486
487 def handle_accepted(self, sock, addr):
488 self.handler(sock)
489
490 def handle_error(self):
491 raise
492
493
494 class ESC[4;38;5;81mBaseClient(ESC[4;38;5;149mBaseTestHandler):
495
496 def __init__(self, family, address):
497 BaseTestHandler.__init__(self)
498 self.create_socket(family)
499 self.connect(address)
500
501 def handle_connect(self):
502 pass
503
504
505 class ESC[4;38;5;81mBaseTestAPI:
506
507 def tearDown(self):
508 asyncore.close_all(ignore_all=True)
509
510 def loop_waiting_for_flag(self, instance, timeout=5):
511 timeout = float(timeout) / 100
512 count = 100
513 while asyncore.socket_map and count > 0:
514 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
515 if instance.flag:
516 return
517 count -= 1
518 time.sleep(timeout)
519 self.fail("flag not set")
520
521 def test_handle_connect(self):
522 # make sure handle_connect is called on connect()
523
524 class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
525 def handle_connect(self):
526 self.flag = True
527
528 server = BaseServer(self.family, self.addr)
529 client = TestClient(self.family, server.address)
530 self.loop_waiting_for_flag(client)
531
532 def test_handle_accept(self):
533 # make sure handle_accept() is called when a client connects
534
535 class ESC[4;38;5;81mTestListener(ESC[4;38;5;149mBaseTestHandler):
536
537 def __init__(self, family, addr):
538 BaseTestHandler.__init__(self)
539 self.create_socket(family)
540 bind_af_aware(self.socket, addr)
541 self.listen(5)
542 self.address = self.socket.getsockname()
543
544 def handle_accept(self):
545 self.flag = True
546
547 server = TestListener(self.family, self.addr)
548 client = BaseClient(self.family, server.address)
549 self.loop_waiting_for_flag(server)
550
551 def test_handle_accepted(self):
552 # make sure handle_accepted() is called when a client connects
553
554 class ESC[4;38;5;81mTestListener(ESC[4;38;5;149mBaseTestHandler):
555
556 def __init__(self, family, addr):
557 BaseTestHandler.__init__(self)
558 self.create_socket(family)
559 bind_af_aware(self.socket, addr)
560 self.listen(5)
561 self.address = self.socket.getsockname()
562
563 def handle_accept(self):
564 asyncore.dispatcher.handle_accept(self)
565
566 def handle_accepted(self, sock, addr):
567 sock.close()
568 self.flag = True
569
570 server = TestListener(self.family, self.addr)
571 client = BaseClient(self.family, server.address)
572 self.loop_waiting_for_flag(server)
573
574
575 def test_handle_read(self):
576 # make sure handle_read is called on data received
577
578 class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
579 def handle_read(self):
580 self.flag = True
581
582 class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
583 def __init__(self, conn):
584 BaseTestHandler.__init__(self, conn)
585 self.send(b'x' * 1024)
586
587 server = BaseServer(self.family, self.addr, TestHandler)
588 client = TestClient(self.family, server.address)
589 self.loop_waiting_for_flag(client)
590
591 def test_handle_write(self):
592 # make sure handle_write is called
593
594 class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
595 def handle_write(self):
596 self.flag = True
597
598 server = BaseServer(self.family, self.addr)
599 client = TestClient(self.family, server.address)
600 self.loop_waiting_for_flag(client)
601
602 def test_handle_close(self):
603 # make sure handle_close is called when the other end closes
604 # the connection
605
606 class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
607
608 def handle_read(self):
609 # in order to make handle_close be called we are supposed
610 # to make at least one recv() call
611 self.recv(1024)
612
613 def handle_close(self):
614 self.flag = True
615 self.close()
616
617 class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
618 def __init__(self, conn):
619 BaseTestHandler.__init__(self, conn)
620 self.close()
621
622 server = BaseServer(self.family, self.addr, TestHandler)
623 client = TestClient(self.family, server.address)
624 self.loop_waiting_for_flag(client)
625
626 def test_handle_close_after_conn_broken(self):
627 # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
628 # #11265).
629
630 data = b'\0' * 128
631
632 class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
633
634 def handle_write(self):
635 self.send(data)
636
637 def handle_close(self):
638 self.flag = True
639 self.close()
640
641 def handle_expt(self):
642 self.flag = True
643 self.close()
644
645 class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
646
647 def handle_read(self):
648 self.recv(len(data))
649 self.close()
650
651 def writable(self):
652 return False
653
654 server = BaseServer(self.family, self.addr, TestHandler)
655 client = TestClient(self.family, server.address)
656 self.loop_waiting_for_flag(client)
657
658 @unittest.skipIf(sys.platform.startswith("sunos"),
659 "OOB support is broken on Solaris")
660 def test_handle_expt(self):
661 # Make sure handle_expt is called on OOB data received.
662 # Note: this might fail on some platforms as OOB data is
663 # tenuously supported and rarely used.
664 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
665 self.skipTest("Not applicable to AF_UNIX sockets.")
666
667 if sys.platform == "darwin" and self.use_poll:
668 self.skipTest("poll may fail on macOS; see issue #28087")
669
670 class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
671 def handle_expt(self):
672 self.socket.recv(1024, socket.MSG_OOB)
673 self.flag = True
674
675 class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
676 def __init__(self, conn):
677 BaseTestHandler.__init__(self, conn)
678 self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
679
680 server = BaseServer(self.family, self.addr, TestHandler)
681 client = TestClient(self.family, server.address)
682 self.loop_waiting_for_flag(client)
683
684 def test_handle_error(self):
685
686 class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
687 def handle_write(self):
688 1.0 / 0
689 def handle_error(self):
690 self.flag = True
691 try:
692 raise
693 except ZeroDivisionError:
694 pass
695 else:
696 raise Exception("exception not raised")
697
698 server = BaseServer(self.family, self.addr)
699 client = TestClient(self.family, server.address)
700 self.loop_waiting_for_flag(client)
701
702 def test_connection_attributes(self):
703 server = BaseServer(self.family, self.addr)
704 client = BaseClient(self.family, server.address)
705
706 # we start disconnected
707 self.assertFalse(server.connected)
708 self.assertTrue(server.accepting)
709 # this can't be taken for granted across all platforms
710 #self.assertFalse(client.connected)
711 self.assertFalse(client.accepting)
712
713 # execute some loops so that client connects to server
714 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
715 self.assertFalse(server.connected)
716 self.assertTrue(server.accepting)
717 self.assertTrue(client.connected)
718 self.assertFalse(client.accepting)
719
720 # disconnect the client
721 client.close()
722 self.assertFalse(server.connected)
723 self.assertTrue(server.accepting)
724 self.assertFalse(client.connected)
725 self.assertFalse(client.accepting)
726
727 # stop serving
728 server.close()
729 self.assertFalse(server.connected)
730 self.assertFalse(server.accepting)
731
732 def test_create_socket(self):
733 s = asyncore.dispatcher()
734 s.create_socket(self.family)
735 self.assertEqual(s.socket.type, socket.SOCK_STREAM)
736 self.assertEqual(s.socket.family, self.family)
737 self.assertEqual(s.socket.gettimeout(), 0)
738 self.assertFalse(s.socket.get_inheritable())
739
740 def test_bind(self):
741 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
742 self.skipTest("Not applicable to AF_UNIX sockets.")
743 s1 = asyncore.dispatcher()
744 s1.create_socket(self.family)
745 s1.bind(self.addr)
746 s1.listen(5)
747 port = s1.socket.getsockname()[1]
748
749 s2 = asyncore.dispatcher()
750 s2.create_socket(self.family)
751 # EADDRINUSE indicates the socket was correctly bound
752 self.assertRaises(OSError, s2.bind, (self.addr[0], port))
753
754 def test_set_reuse_addr(self):
755 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
756 self.skipTest("Not applicable to AF_UNIX sockets.")
757
758 with socket.socket(self.family) as sock:
759 try:
760 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
761 except OSError:
762 unittest.skip("SO_REUSEADDR not supported on this platform")
763 else:
764 # if SO_REUSEADDR succeeded for sock we expect asyncore
765 # to do the same
766 s = asyncore.dispatcher(socket.socket(self.family))
767 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
768 socket.SO_REUSEADDR))
769 s.socket.close()
770 s.create_socket(self.family)
771 s.set_reuse_addr()
772 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
773 socket.SO_REUSEADDR))
774
775 @threading_helper.reap_threads
776 def test_quick_connect(self):
777 # see: http://bugs.python.org/issue10340
778 if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
779 self.skipTest("test specific to AF_INET and AF_INET6")
780
781 server = BaseServer(self.family, self.addr)
782 # run the thread 500 ms: the socket should be connected in 200 ms
783 t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
784 count=5))
785 t.start()
786 try:
787 with socket.socket(self.family, socket.SOCK_STREAM) as s:
788 s.settimeout(.2)
789 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
790 struct.pack('ii', 1, 0))
791
792 try:
793 s.connect(server.address)
794 except OSError:
795 pass
796 finally:
797 threading_helper.join_thread(t)
798
799 class ESC[4;38;5;81mTestAPI_UseIPv4Sockets(ESC[4;38;5;149mBaseTestAPI):
800 family = socket.AF_INET
801 addr = (socket_helper.HOST, 0)
802
803 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required')
804 class ESC[4;38;5;81mTestAPI_UseIPv6Sockets(ESC[4;38;5;149mBaseTestAPI):
805 family = socket.AF_INET6
806 addr = (socket_helper.HOSTv6, 0)
807
808 @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
809 class ESC[4;38;5;81mTestAPI_UseUnixSockets(ESC[4;38;5;149mBaseTestAPI):
810 if HAS_UNIX_SOCKETS:
811 family = socket.AF_UNIX
812 addr = os_helper.TESTFN
813
814 def tearDown(self):
815 os_helper.unlink(self.addr)
816 BaseTestAPI.tearDown(self)
817
818 class ESC[4;38;5;81mTestAPI_UseIPv4Select(ESC[4;38;5;149mTestAPI_UseIPv4Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
819 use_poll = False
820
821 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
822 class ESC[4;38;5;81mTestAPI_UseIPv4Poll(ESC[4;38;5;149mTestAPI_UseIPv4Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
823 use_poll = True
824
825 class ESC[4;38;5;81mTestAPI_UseIPv6Select(ESC[4;38;5;149mTestAPI_UseIPv6Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
826 use_poll = False
827
828 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
829 class ESC[4;38;5;81mTestAPI_UseIPv6Poll(ESC[4;38;5;149mTestAPI_UseIPv6Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
830 use_poll = True
831
832 class ESC[4;38;5;81mTestAPI_UseUnixSocketsSelect(ESC[4;38;5;149mTestAPI_UseUnixSockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
833 use_poll = False
834
835 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
836 class ESC[4;38;5;81mTestAPI_UseUnixSocketsPoll(ESC[4;38;5;149mTestAPI_UseUnixSockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
837 use_poll = True
838
839 if __name__ == "__main__":
840 unittest.main()