(root)/
Python-3.11.7/
Lib/
test/
test_asyncore.py
       1  import unittest
       2  import select
       3  import os
       4  import socket
       5  import sys
       6  import time
       7  import errno
       8  import struct
       9  import threading
      10  
      11  from test import support
      12  from test.support import os_helper
      13  from test.support import socket_helper
      14  from test.support import threading_helper
      15  from test.support import warnings_helper
      16  from io import BytesIO
      17  
      18  if support.PGO:
      19      raise unittest.SkipTest("test is not helpful for PGO")
      20  
      21  support.requires_working_socket(module=True)
      22  
      23  asyncore = warnings_helper.import_deprecated('asyncore')
      24  
      25  
      26  HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX')
      27  
      28  class ESC[4;38;5;81mdummysocket:
      29      def __init__(self):
      30          self.closed = False
      31  
      32      def close(self):
      33          self.closed = True
      34  
      35      def fileno(self):
      36          return 42
      37  
      38  class ESC[4;38;5;81mdummychannel:
      39      def __init__(self):
      40          self.socket = dummysocket()
      41  
      42      def close(self):
      43          self.socket.close()
      44  
      45  class ESC[4;38;5;81mexitingdummy:
      46      def __init__(self):
      47          pass
      48  
      49      def handle_read_event(self):
      50          raise asyncore.ExitNow()
      51  
      52      handle_write_event = handle_read_event
      53      handle_close = handle_read_event
      54      handle_expt_event = handle_read_event
      55  
      56  class ESC[4;38;5;81mcrashingdummy:
      57      def __init__(self):
      58          self.error_handled = False
      59  
      60      def handle_read_event(self):
      61          raise Exception()
      62  
      63      handle_write_event = handle_read_event
      64      handle_close = handle_read_event
      65      handle_expt_event = handle_read_event
      66  
      67      def handle_error(self):
      68          self.error_handled = True
      69  
      70  # used when testing senders; just collects what it gets until newline is sent
      71  def capture_server(evt, buf, serv):
      72      try:
      73          serv.listen()
      74          conn, addr = serv.accept()
      75      except TimeoutError:
      76          pass
      77      else:
      78          n = 200
      79          start = time.monotonic()
      80          while n > 0 and time.monotonic() - start < 3.0:
      81              r, w, e = select.select([conn], [], [], 0.1)
      82              if r:
      83                  n -= 1
      84                  data = conn.recv(10)
      85                  # keep everything except for the newline terminator
      86                  buf.write(data.replace(b'\n', b''))
      87                  if b'\n' in data:
      88                      break
      89              time.sleep(0.01)
      90  
      91          conn.close()
      92      finally:
      93          serv.close()
      94          evt.set()
      95  
      96  def bind_af_aware(sock, addr):
      97      """Helper function to bind a socket according to its family."""
      98      if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
      99          # Make sure the path doesn't exist.
     100          os_helper.unlink(addr)
     101          socket_helper.bind_unix_socket(sock, addr)
     102      else:
     103          sock.bind(addr)
     104  
     105  
     106  class ESC[4;38;5;81mHelperFunctionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     107      def test_readwriteexc(self):
     108          # Check exception handling behavior of read, write and _exception
     109  
     110          # check that ExitNow exceptions in the object handler method
     111          # bubbles all the way up through asyncore read/write/_exception calls
     112          tr1 = exitingdummy()
     113          self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
     114          self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
     115          self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
     116  
     117          # check that an exception other than ExitNow in the object handler
     118          # method causes the handle_error method to get called
     119          tr2 = crashingdummy()
     120          asyncore.read(tr2)
     121          self.assertEqual(tr2.error_handled, True)
     122  
     123          tr2 = crashingdummy()
     124          asyncore.write(tr2)
     125          self.assertEqual(tr2.error_handled, True)
     126  
     127          tr2 = crashingdummy()
     128          asyncore._exception(tr2)
     129          self.assertEqual(tr2.error_handled, True)
     130  
     131      # asyncore.readwrite uses constants in the select module that
     132      # are not present in Windows systems (see this thread:
     133      # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
     134      # These constants should be present as long as poll is available
     135  
     136      @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
     137      def test_readwrite(self):
     138          # Check that correct methods are called by readwrite()
     139  
     140          attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
     141  
     142          expected = (
     143              (select.POLLIN, 'read'),
     144              (select.POLLPRI, 'expt'),
     145              (select.POLLOUT, 'write'),
     146              (select.POLLERR, 'closed'),
     147              (select.POLLHUP, 'closed'),
     148              (select.POLLNVAL, 'closed'),
     149              )
     150  
     151          class ESC[4;38;5;81mtestobj:
     152              def __init__(self):
     153                  self.read = False
     154                  self.write = False
     155                  self.closed = False
     156                  self.expt = False
     157                  self.error_handled = False
     158  
     159              def handle_read_event(self):
     160                  self.read = True
     161  
     162              def handle_write_event(self):
     163                  self.write = True
     164  
     165              def handle_close(self):
     166                  self.closed = True
     167  
     168              def handle_expt_event(self):
     169                  self.expt = True
     170  
     171              def handle_error(self):
     172                  self.error_handled = True
     173  
     174          for flag, expectedattr in expected:
     175              tobj = testobj()
     176              self.assertEqual(getattr(tobj, expectedattr), False)
     177              asyncore.readwrite(tobj, flag)
     178  
     179              # Only the attribute modified by the routine we expect to be
     180              # called should be True.
     181              for attr in attributes:
     182                  self.assertEqual(getattr(tobj, attr), attr==expectedattr)
     183  
     184              # check that ExitNow exceptions in the object handler method
     185              # bubbles all the way up through asyncore readwrite call
     186              tr1 = exitingdummy()
     187              self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
     188  
     189              # check that an exception other than ExitNow in the object handler
     190              # method causes the handle_error method to get called
     191              tr2 = crashingdummy()
     192              self.assertEqual(tr2.error_handled, False)
     193              asyncore.readwrite(tr2, flag)
     194              self.assertEqual(tr2.error_handled, True)
     195  
     196      def test_closeall(self):
     197          self.closeall_check(False)
     198  
     199      def test_closeall_default(self):
     200          self.closeall_check(True)
     201  
     202      def closeall_check(self, usedefault):
     203          # Check that close_all() closes everything in a given map
     204  
     205          l = []
     206          testmap = {}
     207          for i in range(10):
     208              c = dummychannel()
     209              l.append(c)
     210              self.assertEqual(c.socket.closed, False)
     211              testmap[i] = c
     212  
     213          if usedefault:
     214              socketmap = asyncore.socket_map
     215              try:
     216                  asyncore.socket_map = testmap
     217                  asyncore.close_all()
     218              finally:
     219                  testmap, asyncore.socket_map = asyncore.socket_map, socketmap
     220          else:
     221              asyncore.close_all(testmap)
     222  
     223          self.assertEqual(len(testmap), 0)
     224  
     225          for c in l:
     226              self.assertEqual(c.socket.closed, True)
     227  
     228      def test_compact_traceback(self):
     229          try:
     230              raise Exception("I don't like spam!")
     231          except:
     232              real_t, real_v, real_tb = sys.exc_info()
     233              r = asyncore.compact_traceback()
     234          else:
     235              self.fail("Expected exception")
     236  
     237          (f, function, line), t, v, info = r
     238          self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
     239          self.assertEqual(function, 'test_compact_traceback')
     240          self.assertEqual(t, real_t)
     241          self.assertEqual(v, real_v)
     242          self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
     243  
     244  
     245  class ESC[4;38;5;81mDispatcherTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     246      def setUp(self):
     247          pass
     248  
     249      def tearDown(self):
     250          asyncore.close_all()
     251  
     252      def test_basic(self):
     253          d = asyncore.dispatcher()
     254          self.assertEqual(d.readable(), True)
     255          self.assertEqual(d.writable(), True)
     256  
     257      def test_repr(self):
     258          d = asyncore.dispatcher()
     259          self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
     260  
     261      def test_log(self):
     262          d = asyncore.dispatcher()
     263  
     264          # capture output of dispatcher.log() (to stderr)
     265          l1 = "Lovely spam! Wonderful spam!"
     266          l2 = "I don't like spam!"
     267          with support.captured_stderr() as stderr:
     268              d.log(l1)
     269              d.log(l2)
     270  
     271          lines = stderr.getvalue().splitlines()
     272          self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
     273  
     274      def test_log_info(self):
     275          d = asyncore.dispatcher()
     276  
     277          # capture output of dispatcher.log_info() (to stdout via print)
     278          l1 = "Have you got anything without spam?"
     279          l2 = "Why can't she have egg bacon spam and sausage?"
     280          l3 = "THAT'S got spam in it!"
     281          with support.captured_stdout() as stdout:
     282              d.log_info(l1, 'EGGS')
     283              d.log_info(l2)
     284              d.log_info(l3, 'SPAM')
     285  
     286          lines = stdout.getvalue().splitlines()
     287          expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
     288          self.assertEqual(lines, expected)
     289  
     290      def test_unhandled(self):
     291          d = asyncore.dispatcher()
     292          d.ignore_log_types = ()
     293  
     294          # capture output of dispatcher.log_info() (to stdout via print)
     295          with support.captured_stdout() as stdout:
     296              d.handle_expt()
     297              d.handle_read()
     298              d.handle_write()
     299              d.handle_connect()
     300  
     301          lines = stdout.getvalue().splitlines()
     302          expected = ['warning: unhandled incoming priority event',
     303                      'warning: unhandled read event',
     304                      'warning: unhandled write event',
     305                      'warning: unhandled connect event']
     306          self.assertEqual(lines, expected)
     307  
     308      def test_strerror(self):
     309          # refers to bug #8573
     310          err = asyncore._strerror(errno.EPERM)
     311          if hasattr(os, 'strerror'):
     312              self.assertEqual(err, os.strerror(errno.EPERM))
     313          err = asyncore._strerror(-1)
     314          self.assertTrue(err != "")
     315  
     316  
     317  class ESC[4;38;5;81mdispatcherwithsend_noread(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher_with_send):
     318      def readable(self):
     319          return False
     320  
     321      def handle_connect(self):
     322          pass
     323  
     324  
     325  class ESC[4;38;5;81mDispatcherWithSendTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     326      def setUp(self):
     327          pass
     328  
     329      def tearDown(self):
     330          asyncore.close_all()
     331  
     332      @threading_helper.reap_threads
     333      def test_send(self):
     334          evt = threading.Event()
     335          sock = socket.socket()
     336          sock.settimeout(3)
     337          port = socket_helper.bind_port(sock)
     338  
     339          cap = BytesIO()
     340          args = (evt, cap, sock)
     341          t = threading.Thread(target=capture_server, args=args)
     342          t.start()
     343          try:
     344              # wait a little longer for the server to initialize (it sometimes
     345              # refuses connections on slow machines without this wait)
     346              time.sleep(0.2)
     347  
     348              data = b"Suppose there isn't a 16-ton weight?"
     349              d = dispatcherwithsend_noread()
     350              d.create_socket()
     351              d.connect((socket_helper.HOST, port))
     352  
     353              # give time for socket to connect
     354              time.sleep(0.1)
     355  
     356              d.send(data)
     357              d.send(data)
     358              d.send(b'\n')
     359  
     360              n = 1000
     361              while d.out_buffer and n > 0:
     362                  asyncore.poll()
     363                  n -= 1
     364  
     365              evt.wait()
     366  
     367              self.assertEqual(cap.getvalue(), data*2)
     368          finally:
     369              threading_helper.join_thread(t)
     370  
     371  
     372  @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
     373                       'asyncore.file_wrapper required')
     374  class ESC[4;38;5;81mFileWrapperTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     375      def setUp(self):
     376          self.d = b"It's not dead, it's sleeping!"
     377          with open(os_helper.TESTFN, 'wb') as file:
     378              file.write(self.d)
     379  
     380      def tearDown(self):
     381          os_helper.unlink(os_helper.TESTFN)
     382  
     383      def test_recv(self):
     384          fd = os.open(os_helper.TESTFN, os.O_RDONLY)
     385          w = asyncore.file_wrapper(fd)
     386          os.close(fd)
     387  
     388          self.assertNotEqual(w.fd, fd)
     389          self.assertNotEqual(w.fileno(), fd)
     390          self.assertEqual(w.recv(13), b"It's not dead")
     391          self.assertEqual(w.read(6), b", it's")
     392          w.close()
     393          self.assertRaises(OSError, w.read, 1)
     394  
     395      def test_send(self):
     396          d1 = b"Come again?"
     397          d2 = b"I want to buy some cheese."
     398          fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND)
     399          w = asyncore.file_wrapper(fd)
     400          os.close(fd)
     401  
     402          w.write(d1)
     403          w.send(d2)
     404          w.close()
     405          with open(os_helper.TESTFN, 'rb') as file:
     406              self.assertEqual(file.read(), self.d + d1 + d2)
     407  
     408      @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
     409                           'asyncore.file_dispatcher required')
     410      def test_dispatcher(self):
     411          fd = os.open(os_helper.TESTFN, os.O_RDONLY)
     412          data = []
     413          class ESC[4;38;5;81mFileDispatcher(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mfile_dispatcher):
     414              def handle_read(self):
     415                  data.append(self.recv(29))
     416          s = FileDispatcher(fd)
     417          os.close(fd)
     418          asyncore.loop(timeout=0.01, use_poll=True, count=2)
     419          self.assertEqual(b"".join(data), self.d)
     420  
     421      def test_resource_warning(self):
     422          # Issue #11453
     423          fd = os.open(os_helper.TESTFN, os.O_RDONLY)
     424          f = asyncore.file_wrapper(fd)
     425  
     426          os.close(fd)
     427          with warnings_helper.check_warnings(('', ResourceWarning)):
     428              f = None
     429              support.gc_collect()
     430  
     431      def test_close_twice(self):
     432          fd = os.open(os_helper.TESTFN, os.O_RDONLY)
     433          f = asyncore.file_wrapper(fd)
     434          os.close(fd)
     435  
     436          os.close(f.fd)  # file_wrapper dupped fd
     437          with self.assertRaises(OSError):
     438              f.close()
     439  
     440          self.assertEqual(f.fd, -1)
     441          # calling close twice should not fail
     442          f.close()
     443  
     444  
     445  class ESC[4;38;5;81mBaseTestHandler(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
     446  
     447      def __init__(self, sock=None):
     448          asyncore.dispatcher.__init__(self, sock)
     449          self.flag = False
     450  
     451      def handle_accept(self):
     452          raise Exception("handle_accept not supposed to be called")
     453  
     454      def handle_accepted(self):
     455          raise Exception("handle_accepted not supposed to be called")
     456  
     457      def handle_connect(self):
     458          raise Exception("handle_connect not supposed to be called")
     459  
     460      def handle_expt(self):
     461          raise Exception("handle_expt not supposed to be called")
     462  
     463      def handle_close(self):
     464          raise Exception("handle_close not supposed to be called")
     465  
     466      def handle_error(self):
     467          raise
     468  
     469  
     470  class ESC[4;38;5;81mBaseServer(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
     471      """A server which listens on an address and dispatches the
     472      connection to a handler.
     473      """
     474  
     475      def __init__(self, family, addr, handler=BaseTestHandler):
     476          asyncore.dispatcher.__init__(self)
     477          self.create_socket(family)
     478          self.set_reuse_addr()
     479          bind_af_aware(self.socket, addr)
     480          self.listen(5)
     481          self.handler = handler
     482  
     483      @property
     484      def address(self):
     485          return self.socket.getsockname()
     486  
     487      def handle_accepted(self, sock, addr):
     488          self.handler(sock)
     489  
     490      def handle_error(self):
     491          raise
     492  
     493  
     494  class ESC[4;38;5;81mBaseClient(ESC[4;38;5;149mBaseTestHandler):
     495  
     496      def __init__(self, family, address):
     497          BaseTestHandler.__init__(self)
     498          self.create_socket(family)
     499          self.connect(address)
     500  
     501      def handle_connect(self):
     502          pass
     503  
     504  
     505  class ESC[4;38;5;81mBaseTestAPI:
     506  
     507      def tearDown(self):
     508          asyncore.close_all(ignore_all=True)
     509  
     510      def loop_waiting_for_flag(self, instance, timeout=5):
     511          timeout = float(timeout) / 100
     512          count = 100
     513          while asyncore.socket_map and count > 0:
     514              asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
     515              if instance.flag:
     516                  return
     517              count -= 1
     518              time.sleep(timeout)
     519          self.fail("flag not set")
     520  
     521      def test_handle_connect(self):
     522          # make sure handle_connect is called on connect()
     523  
     524          class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
     525              def handle_connect(self):
     526                  self.flag = True
     527  
     528          server = BaseServer(self.family, self.addr)
     529          client = TestClient(self.family, server.address)
     530          self.loop_waiting_for_flag(client)
     531  
     532      def test_handle_accept(self):
     533          # make sure handle_accept() is called when a client connects
     534  
     535          class ESC[4;38;5;81mTestListener(ESC[4;38;5;149mBaseTestHandler):
     536  
     537              def __init__(self, family, addr):
     538                  BaseTestHandler.__init__(self)
     539                  self.create_socket(family)
     540                  bind_af_aware(self.socket, addr)
     541                  self.listen(5)
     542                  self.address = self.socket.getsockname()
     543  
     544              def handle_accept(self):
     545                  self.flag = True
     546  
     547          server = TestListener(self.family, self.addr)
     548          client = BaseClient(self.family, server.address)
     549          self.loop_waiting_for_flag(server)
     550  
     551      def test_handle_accepted(self):
     552          # make sure handle_accepted() is called when a client connects
     553  
     554          class ESC[4;38;5;81mTestListener(ESC[4;38;5;149mBaseTestHandler):
     555  
     556              def __init__(self, family, addr):
     557                  BaseTestHandler.__init__(self)
     558                  self.create_socket(family)
     559                  bind_af_aware(self.socket, addr)
     560                  self.listen(5)
     561                  self.address = self.socket.getsockname()
     562  
     563              def handle_accept(self):
     564                  asyncore.dispatcher.handle_accept(self)
     565  
     566              def handle_accepted(self, sock, addr):
     567                  sock.close()
     568                  self.flag = True
     569  
     570          server = TestListener(self.family, self.addr)
     571          client = BaseClient(self.family, server.address)
     572          self.loop_waiting_for_flag(server)
     573  
     574  
     575      def test_handle_read(self):
     576          # make sure handle_read is called on data received
     577  
     578          class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
     579              def handle_read(self):
     580                  self.flag = True
     581  
     582          class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
     583              def __init__(self, conn):
     584                  BaseTestHandler.__init__(self, conn)
     585                  self.send(b'x' * 1024)
     586  
     587          server = BaseServer(self.family, self.addr, TestHandler)
     588          client = TestClient(self.family, server.address)
     589          self.loop_waiting_for_flag(client)
     590  
     591      def test_handle_write(self):
     592          # make sure handle_write is called
     593  
     594          class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
     595              def handle_write(self):
     596                  self.flag = True
     597  
     598          server = BaseServer(self.family, self.addr)
     599          client = TestClient(self.family, server.address)
     600          self.loop_waiting_for_flag(client)
     601  
     602      def test_handle_close(self):
     603          # make sure handle_close is called when the other end closes
     604          # the connection
     605  
     606          class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
     607  
     608              def handle_read(self):
     609                  # in order to make handle_close be called we are supposed
     610                  # to make at least one recv() call
     611                  self.recv(1024)
     612  
     613              def handle_close(self):
     614                  self.flag = True
     615                  self.close()
     616  
     617          class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
     618              def __init__(self, conn):
     619                  BaseTestHandler.__init__(self, conn)
     620                  self.close()
     621  
     622          server = BaseServer(self.family, self.addr, TestHandler)
     623          client = TestClient(self.family, server.address)
     624          self.loop_waiting_for_flag(client)
     625  
     626      def test_handle_close_after_conn_broken(self):
     627          # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
     628          # #11265).
     629  
     630          data = b'\0' * 128
     631  
     632          class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
     633  
     634              def handle_write(self):
     635                  self.send(data)
     636  
     637              def handle_close(self):
     638                  self.flag = True
     639                  self.close()
     640  
     641              def handle_expt(self):
     642                  self.flag = True
     643                  self.close()
     644  
     645          class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
     646  
     647              def handle_read(self):
     648                  self.recv(len(data))
     649                  self.close()
     650  
     651              def writable(self):
     652                  return False
     653  
     654          server = BaseServer(self.family, self.addr, TestHandler)
     655          client = TestClient(self.family, server.address)
     656          self.loop_waiting_for_flag(client)
     657  
     658      @unittest.skipIf(sys.platform.startswith("sunos"),
     659                       "OOB support is broken on Solaris")
     660      def test_handle_expt(self):
     661          # Make sure handle_expt is called on OOB data received.
     662          # Note: this might fail on some platforms as OOB data is
     663          # tenuously supported and rarely used.
     664          if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
     665              self.skipTest("Not applicable to AF_UNIX sockets.")
     666  
     667          if sys.platform == "darwin" and self.use_poll:
     668              self.skipTest("poll may fail on macOS; see issue #28087")
     669  
     670          class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
     671              def handle_expt(self):
     672                  self.socket.recv(1024, socket.MSG_OOB)
     673                  self.flag = True
     674  
     675          class ESC[4;38;5;81mTestHandler(ESC[4;38;5;149mBaseTestHandler):
     676              def __init__(self, conn):
     677                  BaseTestHandler.__init__(self, conn)
     678                  self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
     679  
     680          server = BaseServer(self.family, self.addr, TestHandler)
     681          client = TestClient(self.family, server.address)
     682          self.loop_waiting_for_flag(client)
     683  
     684      def test_handle_error(self):
     685  
     686          class ESC[4;38;5;81mTestClient(ESC[4;38;5;149mBaseClient):
     687              def handle_write(self):
     688                  1.0 / 0
     689              def handle_error(self):
     690                  self.flag = True
     691                  try:
     692                      raise
     693                  except ZeroDivisionError:
     694                      pass
     695                  else:
     696                      raise Exception("exception not raised")
     697  
     698          server = BaseServer(self.family, self.addr)
     699          client = TestClient(self.family, server.address)
     700          self.loop_waiting_for_flag(client)
     701  
     702      def test_connection_attributes(self):
     703          server = BaseServer(self.family, self.addr)
     704          client = BaseClient(self.family, server.address)
     705  
     706          # we start disconnected
     707          self.assertFalse(server.connected)
     708          self.assertTrue(server.accepting)
     709          # this can't be taken for granted across all platforms
     710          #self.assertFalse(client.connected)
     711          self.assertFalse(client.accepting)
     712  
     713          # execute some loops so that client connects to server
     714          asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
     715          self.assertFalse(server.connected)
     716          self.assertTrue(server.accepting)
     717          self.assertTrue(client.connected)
     718          self.assertFalse(client.accepting)
     719  
     720          # disconnect the client
     721          client.close()
     722          self.assertFalse(server.connected)
     723          self.assertTrue(server.accepting)
     724          self.assertFalse(client.connected)
     725          self.assertFalse(client.accepting)
     726  
     727          # stop serving
     728          server.close()
     729          self.assertFalse(server.connected)
     730          self.assertFalse(server.accepting)
     731  
     732      def test_create_socket(self):
     733          s = asyncore.dispatcher()
     734          s.create_socket(self.family)
     735          self.assertEqual(s.socket.type, socket.SOCK_STREAM)
     736          self.assertEqual(s.socket.family, self.family)
     737          self.assertEqual(s.socket.gettimeout(), 0)
     738          self.assertFalse(s.socket.get_inheritable())
     739  
     740      def test_bind(self):
     741          if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
     742              self.skipTest("Not applicable to AF_UNIX sockets.")
     743          s1 = asyncore.dispatcher()
     744          s1.create_socket(self.family)
     745          s1.bind(self.addr)
     746          s1.listen(5)
     747          port = s1.socket.getsockname()[1]
     748  
     749          s2 = asyncore.dispatcher()
     750          s2.create_socket(self.family)
     751          # EADDRINUSE indicates the socket was correctly bound
     752          self.assertRaises(OSError, s2.bind, (self.addr[0], port))
     753  
     754      def test_set_reuse_addr(self):
     755          if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
     756              self.skipTest("Not applicable to AF_UNIX sockets.")
     757  
     758          with socket.socket(self.family) as sock:
     759              try:
     760                  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
     761              except OSError:
     762                  unittest.skip("SO_REUSEADDR not supported on this platform")
     763              else:
     764                  # if SO_REUSEADDR succeeded for sock we expect asyncore
     765                  # to do the same
     766                  s = asyncore.dispatcher(socket.socket(self.family))
     767                  self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
     768                                                       socket.SO_REUSEADDR))
     769                  s.socket.close()
     770                  s.create_socket(self.family)
     771                  s.set_reuse_addr()
     772                  self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
     773                                                       socket.SO_REUSEADDR))
     774  
     775      @threading_helper.reap_threads
     776      def test_quick_connect(self):
     777          # see: http://bugs.python.org/issue10340
     778          if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
     779              self.skipTest("test specific to AF_INET and AF_INET6")
     780  
     781          server = BaseServer(self.family, self.addr)
     782          # run the thread 500 ms: the socket should be connected in 200 ms
     783          t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
     784                                                            count=5))
     785          t.start()
     786          try:
     787              with socket.socket(self.family, socket.SOCK_STREAM) as s:
     788                  s.settimeout(.2)
     789                  s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
     790                               struct.pack('ii', 1, 0))
     791  
     792                  try:
     793                      s.connect(server.address)
     794                  except OSError:
     795                      pass
     796          finally:
     797              threading_helper.join_thread(t)
     798  
     799  class ESC[4;38;5;81mTestAPI_UseIPv4Sockets(ESC[4;38;5;149mBaseTestAPI):
     800      family = socket.AF_INET
     801      addr = (socket_helper.HOST, 0)
     802  
     803  @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required')
     804  class ESC[4;38;5;81mTestAPI_UseIPv6Sockets(ESC[4;38;5;149mBaseTestAPI):
     805      family = socket.AF_INET6
     806      addr = (socket_helper.HOSTv6, 0)
     807  
     808  @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
     809  class ESC[4;38;5;81mTestAPI_UseUnixSockets(ESC[4;38;5;149mBaseTestAPI):
     810      if HAS_UNIX_SOCKETS:
     811          family = socket.AF_UNIX
     812      addr = os_helper.TESTFN
     813  
     814      def tearDown(self):
     815          os_helper.unlink(self.addr)
     816          BaseTestAPI.tearDown(self)
     817  
     818  class ESC[4;38;5;81mTestAPI_UseIPv4Select(ESC[4;38;5;149mTestAPI_UseIPv4Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     819      use_poll = False
     820  
     821  @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
     822  class ESC[4;38;5;81mTestAPI_UseIPv4Poll(ESC[4;38;5;149mTestAPI_UseIPv4Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     823      use_poll = True
     824  
     825  class ESC[4;38;5;81mTestAPI_UseIPv6Select(ESC[4;38;5;149mTestAPI_UseIPv6Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     826      use_poll = False
     827  
     828  @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
     829  class ESC[4;38;5;81mTestAPI_UseIPv6Poll(ESC[4;38;5;149mTestAPI_UseIPv6Sockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     830      use_poll = True
     831  
     832  class ESC[4;38;5;81mTestAPI_UseUnixSocketsSelect(ESC[4;38;5;149mTestAPI_UseUnixSockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     833      use_poll = False
     834  
     835  @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
     836  class ESC[4;38;5;81mTestAPI_UseUnixSocketsPoll(ESC[4;38;5;149mTestAPI_UseUnixSockets, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     837      use_poll = True
     838  
     839  if __name__ == "__main__":
     840      unittest.main()