python (3.12.0)
1 # TODO: This module was deprecated and removed from CPython 3.12
2 # Now it is a test-only helper. Any attempts to rewrite exising tests that
3 # are using this module and remove it completely are appreciated!
4 # See: https://github.com/python/cpython/issues/72719
5
6 # -*- Mode: Python -*-
7 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
8 # Author: Sam Rushing <rushing@nightmare.com>
9
10 # ======================================================================
11 # Copyright 1996 by Sam Rushing
12 #
13 # All Rights Reserved
14 #
15 # Permission to use, copy, modify, and distribute this software and
16 # its documentation for any purpose and without fee is hereby
17 # granted, provided that the above copyright notice appear in all
18 # copies and that both that copyright notice and this permission
19 # notice appear in supporting documentation, and that the name of Sam
20 # Rushing not be used in advertising or publicity pertaining to
21 # distribution of the software without specific, written prior
22 # permission.
23 #
24 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
25 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
26 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
27 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
28 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
29 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
30 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
31 # ======================================================================
32
33 """Basic infrastructure for asynchronous socket service clients and servers.
34
35 There are only two ways to have a program on a single processor do "more
36 than one thing at a time". Multi-threaded programming is the simplest and
37 most popular way to do it, but there is another very different technique,
38 that lets you have nearly all the advantages of multi-threading, without
39 actually using multiple threads. it's really only practical if your program
40 is largely I/O bound. If your program is CPU bound, then pre-emptive
41 scheduled threads are probably what you really need. Network servers are
42 rarely CPU-bound, however.
43
44 If your operating system supports the select() system call in its I/O
45 library (and nearly all do), then you can use it to juggle multiple
46 communication channels at once; doing other work while your I/O is taking
47 place in the "background." Although this strategy can seem strange and
48 complex, especially at first, it is in many ways easier to understand and
49 control than multi-threaded programming. The module documented here solves
50 many of the difficult problems for you, making the task of building
51 sophisticated high-performance network servers and clients a snap.
52 """
53
54 import select
55 import socket
56 import sys
57 import time
58 import warnings
59
60 import os
61 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
62 ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
63 errorcode
64
65
66 _DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
67 EBADF})
68
69 try:
70 socket_map
71 except NameError:
72 socket_map = {}
73
74 def _strerror(err):
75 try:
76 return os.strerror(err)
77 except (ValueError, OverflowError, NameError):
78 if err in errorcode:
79 return errorcode[err]
80 return "Unknown error %s" %err
81
82 class ESC[4;38;5;81mExitNow(ESC[4;38;5;149mException):
83 pass
84
85 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
86
87 def read(obj):
88 try:
89 obj.handle_read_event()
90 except _reraised_exceptions:
91 raise
92 except:
93 obj.handle_error()
94
95 def write(obj):
96 try:
97 obj.handle_write_event()
98 except _reraised_exceptions:
99 raise
100 except:
101 obj.handle_error()
102
103 def _exception(obj):
104 try:
105 obj.handle_expt_event()
106 except _reraised_exceptions:
107 raise
108 except:
109 obj.handle_error()
110
111 def readwrite(obj, flags):
112 try:
113 if flags & select.POLLIN:
114 obj.handle_read_event()
115 if flags & select.POLLOUT:
116 obj.handle_write_event()
117 if flags & select.POLLPRI:
118 obj.handle_expt_event()
119 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
120 obj.handle_close()
121 except OSError as e:
122 if e.errno not in _DISCONNECTED:
123 obj.handle_error()
124 else:
125 obj.handle_close()
126 except _reraised_exceptions:
127 raise
128 except:
129 obj.handle_error()
130
131 def poll(timeout=0.0, map=None):
132 if map is None:
133 map = socket_map
134 if map:
135 r = []; w = []; e = []
136 for fd, obj in list(map.items()):
137 is_r = obj.readable()
138 is_w = obj.writable()
139 if is_r:
140 r.append(fd)
141 # accepting sockets should not be writable
142 if is_w and not obj.accepting:
143 w.append(fd)
144 if is_r or is_w:
145 e.append(fd)
146 if [] == r == w == e:
147 time.sleep(timeout)
148 return
149
150 r, w, e = select.select(r, w, e, timeout)
151
152 for fd in r:
153 obj = map.get(fd)
154 if obj is None:
155 continue
156 read(obj)
157
158 for fd in w:
159 obj = map.get(fd)
160 if obj is None:
161 continue
162 write(obj)
163
164 for fd in e:
165 obj = map.get(fd)
166 if obj is None:
167 continue
168 _exception(obj)
169
170 def poll2(timeout=0.0, map=None):
171 # Use the poll() support added to the select module in Python 2.0
172 if map is None:
173 map = socket_map
174 if timeout is not None:
175 # timeout is in milliseconds
176 timeout = int(timeout*1000)
177 pollster = select.poll()
178 if map:
179 for fd, obj in list(map.items()):
180 flags = 0
181 if obj.readable():
182 flags |= select.POLLIN | select.POLLPRI
183 # accepting sockets should not be writable
184 if obj.writable() and not obj.accepting:
185 flags |= select.POLLOUT
186 if flags:
187 pollster.register(fd, flags)
188
189 r = pollster.poll(timeout)
190 for fd, flags in r:
191 obj = map.get(fd)
192 if obj is None:
193 continue
194 readwrite(obj, flags)
195
196 poll3 = poll2 # Alias for backward compatibility
197
198 def loop(timeout=30.0, use_poll=False, map=None, count=None):
199 if map is None:
200 map = socket_map
201
202 if use_poll and hasattr(select, 'poll'):
203 poll_fun = poll2
204 else:
205 poll_fun = poll
206
207 if count is None:
208 while map:
209 poll_fun(timeout, map)
210
211 else:
212 while map and count > 0:
213 poll_fun(timeout, map)
214 count = count - 1
215
216 class ESC[4;38;5;81mdispatcher:
217
218 debug = False
219 connected = False
220 accepting = False
221 connecting = False
222 closing = False
223 addr = None
224 ignore_log_types = frozenset({'warning'})
225
226 def __init__(self, sock=None, map=None):
227 if map is None:
228 self._map = socket_map
229 else:
230 self._map = map
231
232 self._fileno = None
233
234 if sock:
235 # Set to nonblocking just to make sure for cases where we
236 # get a socket from a blocking source.
237 sock.setblocking(False)
238 self.set_socket(sock, map)
239 self.connected = True
240 # The constructor no longer requires that the socket
241 # passed be connected.
242 try:
243 self.addr = sock.getpeername()
244 except OSError as err:
245 if err.errno in (ENOTCONN, EINVAL):
246 # To handle the case where we got an unconnected
247 # socket.
248 self.connected = False
249 else:
250 # The socket is broken in some unknown way, alert
251 # the user and remove it from the map (to prevent
252 # polling of broken sockets).
253 self.del_channel(map)
254 raise
255 else:
256 self.socket = None
257
258 def __repr__(self):
259 status = [self.__class__.__module__+"."+self.__class__.__qualname__]
260 if self.accepting and self.addr:
261 status.append('listening')
262 elif self.connected:
263 status.append('connected')
264 if self.addr is not None:
265 try:
266 status.append('%s:%d' % self.addr)
267 except TypeError:
268 status.append(repr(self.addr))
269 return '<%s at %#x>' % (' '.join(status), id(self))
270
271 def add_channel(self, map=None):
272 #self.log_info('adding channel %s' % self)
273 if map is None:
274 map = self._map
275 map[self._fileno] = self
276
277 def del_channel(self, map=None):
278 fd = self._fileno
279 if map is None:
280 map = self._map
281 if fd in map:
282 #self.log_info('closing channel %d:%s' % (fd, self))
283 del map[fd]
284 self._fileno = None
285
286 def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
287 self.family_and_type = family, type
288 sock = socket.socket(family, type)
289 sock.setblocking(False)
290 self.set_socket(sock)
291
292 def set_socket(self, sock, map=None):
293 self.socket = sock
294 self._fileno = sock.fileno()
295 self.add_channel(map)
296
297 def set_reuse_addr(self):
298 # try to re-use a server port if possible
299 try:
300 self.socket.setsockopt(
301 socket.SOL_SOCKET, socket.SO_REUSEADDR,
302 self.socket.getsockopt(socket.SOL_SOCKET,
303 socket.SO_REUSEADDR) | 1
304 )
305 except OSError:
306 pass
307
308 # ==================================================
309 # predicates for select()
310 # these are used as filters for the lists of sockets
311 # to pass to select().
312 # ==================================================
313
314 def readable(self):
315 return True
316
317 def writable(self):
318 return True
319
320 # ==================================================
321 # socket object methods.
322 # ==================================================
323
324 def listen(self, num):
325 self.accepting = True
326 if os.name == 'nt' and num > 5:
327 num = 5
328 return self.socket.listen(num)
329
330 def bind(self, addr):
331 self.addr = addr
332 return self.socket.bind(addr)
333
334 def connect(self, address):
335 self.connected = False
336 self.connecting = True
337 err = self.socket.connect_ex(address)
338 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
339 or err == EINVAL and os.name == 'nt':
340 self.addr = address
341 return
342 if err in (0, EISCONN):
343 self.addr = address
344 self.handle_connect_event()
345 else:
346 raise OSError(err, errorcode[err])
347
348 def accept(self):
349 # XXX can return either an address pair or None
350 try:
351 conn, addr = self.socket.accept()
352 except TypeError:
353 return None
354 except OSError as why:
355 if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
356 return None
357 else:
358 raise
359 else:
360 return conn, addr
361
362 def send(self, data):
363 try:
364 result = self.socket.send(data)
365 return result
366 except OSError as why:
367 if why.errno == EWOULDBLOCK:
368 return 0
369 elif why.errno in _DISCONNECTED:
370 self.handle_close()
371 return 0
372 else:
373 raise
374
375 def recv(self, buffer_size):
376 try:
377 data = self.socket.recv(buffer_size)
378 if not data:
379 # a closed connection is indicated by signaling
380 # a read condition, and having recv() return 0.
381 self.handle_close()
382 return b''
383 else:
384 return data
385 except OSError as why:
386 # winsock sometimes raises ENOTCONN
387 if why.errno in _DISCONNECTED:
388 self.handle_close()
389 return b''
390 else:
391 raise
392
393 def close(self):
394 self.connected = False
395 self.accepting = False
396 self.connecting = False
397 self.del_channel()
398 if self.socket is not None:
399 try:
400 self.socket.close()
401 except OSError as why:
402 if why.errno not in (ENOTCONN, EBADF):
403 raise
404
405 # log and log_info may be overridden to provide more sophisticated
406 # logging and warning methods. In general, log is for 'hit' logging
407 # and 'log_info' is for informational, warning and error logging.
408
409 def log(self, message):
410 sys.stderr.write('log: %s\n' % str(message))
411
412 def log_info(self, message, type='info'):
413 if type not in self.ignore_log_types:
414 print('%s: %s' % (type, message))
415
416 def handle_read_event(self):
417 if self.accepting:
418 # accepting sockets are never connected, they "spawn" new
419 # sockets that are connected
420 self.handle_accept()
421 elif not self.connected:
422 if self.connecting:
423 self.handle_connect_event()
424 self.handle_read()
425 else:
426 self.handle_read()
427
428 def handle_connect_event(self):
429 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
430 if err != 0:
431 raise OSError(err, _strerror(err))
432 self.handle_connect()
433 self.connected = True
434 self.connecting = False
435
436 def handle_write_event(self):
437 if self.accepting:
438 # Accepting sockets shouldn't get a write event.
439 # We will pretend it didn't happen.
440 return
441
442 if not self.connected:
443 if self.connecting:
444 self.handle_connect_event()
445 self.handle_write()
446
447 def handle_expt_event(self):
448 # handle_expt_event() is called if there might be an error on the
449 # socket, or if there is OOB data
450 # check for the error condition first
451 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
452 if err != 0:
453 # we can get here when select.select() says that there is an
454 # exceptional condition on the socket
455 # since there is an error, we'll go ahead and close the socket
456 # like we would in a subclassed handle_read() that received no
457 # data
458 self.handle_close()
459 else:
460 self.handle_expt()
461
462 def handle_error(self):
463 nil, t, v, tbinfo = compact_traceback()
464
465 # sometimes a user repr method will crash.
466 try:
467 self_repr = repr(self)
468 except:
469 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
470
471 self.log_info(
472 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
473 self_repr,
474 t,
475 v,
476 tbinfo
477 ),
478 'error'
479 )
480 self.handle_close()
481
482 def handle_expt(self):
483 self.log_info('unhandled incoming priority event', 'warning')
484
485 def handle_read(self):
486 self.log_info('unhandled read event', 'warning')
487
488 def handle_write(self):
489 self.log_info('unhandled write event', 'warning')
490
491 def handle_connect(self):
492 self.log_info('unhandled connect event', 'warning')
493
494 def handle_accept(self):
495 pair = self.accept()
496 if pair is not None:
497 self.handle_accepted(*pair)
498
499 def handle_accepted(self, sock, addr):
500 sock.close()
501 self.log_info('unhandled accepted event', 'warning')
502
503 def handle_close(self):
504 self.log_info('unhandled close event', 'warning')
505 self.close()
506
507 # ---------------------------------------------------------------------------
508 # adds simple buffered output capability, useful for simple clients.
509 # [for more sophisticated usage use asynchat.async_chat]
510 # ---------------------------------------------------------------------------
511
512 class ESC[4;38;5;81mdispatcher_with_send(ESC[4;38;5;149mdispatcher):
513
514 def __init__(self, sock=None, map=None):
515 dispatcher.__init__(self, sock, map)
516 self.out_buffer = b''
517
518 def initiate_send(self):
519 num_sent = 0
520 num_sent = dispatcher.send(self, self.out_buffer[:65536])
521 self.out_buffer = self.out_buffer[num_sent:]
522
523 def handle_write(self):
524 self.initiate_send()
525
526 def writable(self):
527 return (not self.connected) or len(self.out_buffer)
528
529 def send(self, data):
530 if self.debug:
531 self.log_info('sending %s' % repr(data))
532 self.out_buffer = self.out_buffer + data
533 self.initiate_send()
534
535 # ---------------------------------------------------------------------------
536 # used for debugging.
537 # ---------------------------------------------------------------------------
538
539 def compact_traceback():
540 exc = sys.exception()
541 tb = exc.__traceback__
542 if not tb: # Must have a traceback
543 raise AssertionError("traceback does not exist")
544 tbinfo = []
545 while tb:
546 tbinfo.append((
547 tb.tb_frame.f_code.co_filename,
548 tb.tb_frame.f_code.co_name,
549 str(tb.tb_lineno)
550 ))
551 tb = tb.tb_next
552
553 # just to be safe
554 del tb
555
556 file, function, line = tbinfo[-1]
557 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
558 return (file, function, line), type(exc), exc, info
559
560 def close_all(map=None, ignore_all=False):
561 if map is None:
562 map = socket_map
563 for x in list(map.values()):
564 try:
565 x.close()
566 except OSError as x:
567 if x.errno == EBADF:
568 pass
569 elif not ignore_all:
570 raise
571 except _reraised_exceptions:
572 raise
573 except:
574 if not ignore_all:
575 raise
576 map.clear()
577
578 # Asynchronous File I/O:
579 #
580 # After a little research (reading man pages on various unixen, and
581 # digging through the linux kernel), I've determined that select()
582 # isn't meant for doing asynchronous file i/o.
583 # Heartening, though - reading linux/mm/filemap.c shows that linux
584 # supports asynchronous read-ahead. So _MOST_ of the time, the data
585 # will be sitting in memory for us already when we go to read it.
586 #
587 # What other OS's (besides NT) support async file i/o? [VMS?]
588 #
589 # Regardless, this is useful for pipes, and stdin/stdout...
590
591 if os.name == 'posix':
592 class ESC[4;38;5;81mfile_wrapper:
593 # Here we override just enough to make a file
594 # look like a socket for the purposes of asyncore.
595 # The passed fd is automatically os.dup()'d
596
597 def __init__(self, fd):
598 self.fd = os.dup(fd)
599
600 def __del__(self):
601 if self.fd >= 0:
602 warnings.warn("unclosed file %r" % self, ResourceWarning,
603 source=self)
604 self.close()
605
606 def recv(self, *args):
607 return os.read(self.fd, *args)
608
609 def send(self, *args):
610 return os.write(self.fd, *args)
611
612 def getsockopt(self, level, optname, buflen=None):
613 if (level == socket.SOL_SOCKET and
614 optname == socket.SO_ERROR and
615 not buflen):
616 return 0
617 raise NotImplementedError("Only asyncore specific behaviour "
618 "implemented.")
619
620 read = recv
621 write = send
622
623 def close(self):
624 if self.fd < 0:
625 return
626 fd = self.fd
627 self.fd = -1
628 os.close(fd)
629
630 def fileno(self):
631 return self.fd
632
633 class ESC[4;38;5;81mfile_dispatcher(ESC[4;38;5;149mdispatcher):
634
635 def __init__(self, fd, map=None):
636 dispatcher.__init__(self, None, map)
637 self.connected = True
638 try:
639 fd = fd.fileno()
640 except AttributeError:
641 pass
642 self.set_file(fd)
643 # set it to non-blocking mode
644 os.set_blocking(fd, False)
645
646 def set_file(self, fd):
647 self.socket = file_wrapper(fd)
648 self._fileno = self.socket.fileno()
649 self.add_channel()