1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
4
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
7 #
8 # All Rights Reserved
9 #
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
18 #
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
27
28 """Basic infrastructure for asynchronous socket service clients and servers.
29
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
38
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
48
49 import select
50 import socket
51 import sys
52 import time
53 import warnings
54
55 import os
56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57 ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
58 errorcode
59
60 _DEPRECATION_MSG = ('The {name} module is deprecated and will be removed in '
61 'Python {remove}. The recommended replacement is asyncio')
62 warnings._deprecated(__name__, _DEPRECATION_MSG, remove=(3, 12))
63
64
65 _DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
66 EBADF})
67
68 try:
69 socket_map
70 except NameError:
71 socket_map = {}
72
73 def _strerror(err):
74 try:
75 return os.strerror(err)
76 except (ValueError, OverflowError, NameError):
77 if err in errorcode:
78 return errorcode[err]
79 return "Unknown error %s" %err
80
81 class ESC[4;38;5;81mExitNow(ESC[4;38;5;149mException):
82 pass
83
84 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
85
86 def read(obj):
87 try:
88 obj.handle_read_event()
89 except _reraised_exceptions:
90 raise
91 except:
92 obj.handle_error()
93
94 def write(obj):
95 try:
96 obj.handle_write_event()
97 except _reraised_exceptions:
98 raise
99 except:
100 obj.handle_error()
101
102 def _exception(obj):
103 try:
104 obj.handle_expt_event()
105 except _reraised_exceptions:
106 raise
107 except:
108 obj.handle_error()
109
110 def readwrite(obj, flags):
111 try:
112 if flags & select.POLLIN:
113 obj.handle_read_event()
114 if flags & select.POLLOUT:
115 obj.handle_write_event()
116 if flags & select.POLLPRI:
117 obj.handle_expt_event()
118 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
119 obj.handle_close()
120 except OSError as e:
121 if e.errno not in _DISCONNECTED:
122 obj.handle_error()
123 else:
124 obj.handle_close()
125 except _reraised_exceptions:
126 raise
127 except:
128 obj.handle_error()
129
130 def poll(timeout=0.0, map=None):
131 if map is None:
132 map = socket_map
133 if map:
134 r = []; w = []; e = []
135 for fd, obj in list(map.items()):
136 is_r = obj.readable()
137 is_w = obj.writable()
138 if is_r:
139 r.append(fd)
140 # accepting sockets should not be writable
141 if is_w and not obj.accepting:
142 w.append(fd)
143 if is_r or is_w:
144 e.append(fd)
145 if [] == r == w == e:
146 time.sleep(timeout)
147 return
148
149 r, w, e = select.select(r, w, e, timeout)
150
151 for fd in r:
152 obj = map.get(fd)
153 if obj is None:
154 continue
155 read(obj)
156
157 for fd in w:
158 obj = map.get(fd)
159 if obj is None:
160 continue
161 write(obj)
162
163 for fd in e:
164 obj = map.get(fd)
165 if obj is None:
166 continue
167 _exception(obj)
168
169 def poll2(timeout=0.0, map=None):
170 # Use the poll() support added to the select module in Python 2.0
171 if map is None:
172 map = socket_map
173 if timeout is not None:
174 # timeout is in milliseconds
175 timeout = int(timeout*1000)
176 pollster = select.poll()
177 if map:
178 for fd, obj in list(map.items()):
179 flags = 0
180 if obj.readable():
181 flags |= select.POLLIN | select.POLLPRI
182 # accepting sockets should not be writable
183 if obj.writable() and not obj.accepting:
184 flags |= select.POLLOUT
185 if flags:
186 pollster.register(fd, flags)
187
188 r = pollster.poll(timeout)
189 for fd, flags in r:
190 obj = map.get(fd)
191 if obj is None:
192 continue
193 readwrite(obj, flags)
194
195 poll3 = poll2 # Alias for backward compatibility
196
197 def loop(timeout=30.0, use_poll=False, map=None, count=None):
198 if map is None:
199 map = socket_map
200
201 if use_poll and hasattr(select, 'poll'):
202 poll_fun = poll2
203 else:
204 poll_fun = poll
205
206 if count is None:
207 while map:
208 poll_fun(timeout, map)
209
210 else:
211 while map and count > 0:
212 poll_fun(timeout, map)
213 count = count - 1
214
215 class ESC[4;38;5;81mdispatcher:
216
217 debug = False
218 connected = False
219 accepting = False
220 connecting = False
221 closing = False
222 addr = None
223 ignore_log_types = frozenset({'warning'})
224
225 def __init__(self, sock=None, map=None):
226 if map is None:
227 self._map = socket_map
228 else:
229 self._map = map
230
231 self._fileno = None
232
233 if sock:
234 # Set to nonblocking just to make sure for cases where we
235 # get a socket from a blocking source.
236 sock.setblocking(False)
237 self.set_socket(sock, map)
238 self.connected = True
239 # The constructor no longer requires that the socket
240 # passed be connected.
241 try:
242 self.addr = sock.getpeername()
243 except OSError as err:
244 if err.errno in (ENOTCONN, EINVAL):
245 # To handle the case where we got an unconnected
246 # socket.
247 self.connected = False
248 else:
249 # The socket is broken in some unknown way, alert
250 # the user and remove it from the map (to prevent
251 # polling of broken sockets).
252 self.del_channel(map)
253 raise
254 else:
255 self.socket = None
256
257 def __repr__(self):
258 status = [self.__class__.__module__+"."+self.__class__.__qualname__]
259 if self.accepting and self.addr:
260 status.append('listening')
261 elif self.connected:
262 status.append('connected')
263 if self.addr is not None:
264 try:
265 status.append('%s:%d' % self.addr)
266 except TypeError:
267 status.append(repr(self.addr))
268 return '<%s at %#x>' % (' '.join(status), id(self))
269
270 def add_channel(self, map=None):
271 #self.log_info('adding channel %s' % self)
272 if map is None:
273 map = self._map
274 map[self._fileno] = self
275
276 def del_channel(self, map=None):
277 fd = self._fileno
278 if map is None:
279 map = self._map
280 if fd in map:
281 #self.log_info('closing channel %d:%s' % (fd, self))
282 del map[fd]
283 self._fileno = None
284
285 def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
286 self.family_and_type = family, type
287 sock = socket.socket(family, type)
288 sock.setblocking(False)
289 self.set_socket(sock)
290
291 def set_socket(self, sock, map=None):
292 self.socket = sock
293 self._fileno = sock.fileno()
294 self.add_channel(map)
295
296 def set_reuse_addr(self):
297 # try to re-use a server port if possible
298 try:
299 self.socket.setsockopt(
300 socket.SOL_SOCKET, socket.SO_REUSEADDR,
301 self.socket.getsockopt(socket.SOL_SOCKET,
302 socket.SO_REUSEADDR) | 1
303 )
304 except OSError:
305 pass
306
307 # ==================================================
308 # predicates for select()
309 # these are used as filters for the lists of sockets
310 # to pass to select().
311 # ==================================================
312
313 def readable(self):
314 return True
315
316 def writable(self):
317 return True
318
319 # ==================================================
320 # socket object methods.
321 # ==================================================
322
323 def listen(self, num):
324 self.accepting = True
325 if os.name == 'nt' and num > 5:
326 num = 5
327 return self.socket.listen(num)
328
329 def bind(self, addr):
330 self.addr = addr
331 return self.socket.bind(addr)
332
333 def connect(self, address):
334 self.connected = False
335 self.connecting = True
336 err = self.socket.connect_ex(address)
337 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
338 or err == EINVAL and os.name == 'nt':
339 self.addr = address
340 return
341 if err in (0, EISCONN):
342 self.addr = address
343 self.handle_connect_event()
344 else:
345 raise OSError(err, errorcode[err])
346
347 def accept(self):
348 # XXX can return either an address pair or None
349 try:
350 conn, addr = self.socket.accept()
351 except TypeError:
352 return None
353 except OSError as why:
354 if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
355 return None
356 else:
357 raise
358 else:
359 return conn, addr
360
361 def send(self, data):
362 try:
363 result = self.socket.send(data)
364 return result
365 except OSError as why:
366 if why.errno == EWOULDBLOCK:
367 return 0
368 elif why.errno in _DISCONNECTED:
369 self.handle_close()
370 return 0
371 else:
372 raise
373
374 def recv(self, buffer_size):
375 try:
376 data = self.socket.recv(buffer_size)
377 if not data:
378 # a closed connection is indicated by signaling
379 # a read condition, and having recv() return 0.
380 self.handle_close()
381 return b''
382 else:
383 return data
384 except OSError as why:
385 # winsock sometimes raises ENOTCONN
386 if why.errno in _DISCONNECTED:
387 self.handle_close()
388 return b''
389 else:
390 raise
391
392 def close(self):
393 self.connected = False
394 self.accepting = False
395 self.connecting = False
396 self.del_channel()
397 if self.socket is not None:
398 try:
399 self.socket.close()
400 except OSError as why:
401 if why.errno not in (ENOTCONN, EBADF):
402 raise
403
404 # log and log_info may be overridden to provide more sophisticated
405 # logging and warning methods. In general, log is for 'hit' logging
406 # and 'log_info' is for informational, warning and error logging.
407
408 def log(self, message):
409 sys.stderr.write('log: %s\n' % str(message))
410
411 def log_info(self, message, type='info'):
412 if type not in self.ignore_log_types:
413 print('%s: %s' % (type, message))
414
415 def handle_read_event(self):
416 if self.accepting:
417 # accepting sockets are never connected, they "spawn" new
418 # sockets that are connected
419 self.handle_accept()
420 elif not self.connected:
421 if self.connecting:
422 self.handle_connect_event()
423 self.handle_read()
424 else:
425 self.handle_read()
426
427 def handle_connect_event(self):
428 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
429 if err != 0:
430 raise OSError(err, _strerror(err))
431 self.handle_connect()
432 self.connected = True
433 self.connecting = False
434
435 def handle_write_event(self):
436 if self.accepting:
437 # Accepting sockets shouldn't get a write event.
438 # We will pretend it didn't happen.
439 return
440
441 if not self.connected:
442 if self.connecting:
443 self.handle_connect_event()
444 self.handle_write()
445
446 def handle_expt_event(self):
447 # handle_expt_event() is called if there might be an error on the
448 # socket, or if there is OOB data
449 # check for the error condition first
450 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
451 if err != 0:
452 # we can get here when select.select() says that there is an
453 # exceptional condition on the socket
454 # since there is an error, we'll go ahead and close the socket
455 # like we would in a subclassed handle_read() that received no
456 # data
457 self.handle_close()
458 else:
459 self.handle_expt()
460
461 def handle_error(self):
462 nil, t, v, tbinfo = compact_traceback()
463
464 # sometimes a user repr method will crash.
465 try:
466 self_repr = repr(self)
467 except:
468 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
469
470 self.log_info(
471 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
472 self_repr,
473 t,
474 v,
475 tbinfo
476 ),
477 'error'
478 )
479 self.handle_close()
480
481 def handle_expt(self):
482 self.log_info('unhandled incoming priority event', 'warning')
483
484 def handle_read(self):
485 self.log_info('unhandled read event', 'warning')
486
487 def handle_write(self):
488 self.log_info('unhandled write event', 'warning')
489
490 def handle_connect(self):
491 self.log_info('unhandled connect event', 'warning')
492
493 def handle_accept(self):
494 pair = self.accept()
495 if pair is not None:
496 self.handle_accepted(*pair)
497
498 def handle_accepted(self, sock, addr):
499 sock.close()
500 self.log_info('unhandled accepted event', 'warning')
501
502 def handle_close(self):
503 self.log_info('unhandled close event', 'warning')
504 self.close()
505
506 # ---------------------------------------------------------------------------
507 # adds simple buffered output capability, useful for simple clients.
508 # [for more sophisticated usage use asynchat.async_chat]
509 # ---------------------------------------------------------------------------
510
511 class ESC[4;38;5;81mdispatcher_with_send(ESC[4;38;5;149mdispatcher):
512
513 def __init__(self, sock=None, map=None):
514 dispatcher.__init__(self, sock, map)
515 self.out_buffer = b''
516
517 def initiate_send(self):
518 num_sent = 0
519 num_sent = dispatcher.send(self, self.out_buffer[:65536])
520 self.out_buffer = self.out_buffer[num_sent:]
521
522 def handle_write(self):
523 self.initiate_send()
524
525 def writable(self):
526 return (not self.connected) or len(self.out_buffer)
527
528 def send(self, data):
529 if self.debug:
530 self.log_info('sending %s' % repr(data))
531 self.out_buffer = self.out_buffer + data
532 self.initiate_send()
533
534 # ---------------------------------------------------------------------------
535 # used for debugging.
536 # ---------------------------------------------------------------------------
537
538 def compact_traceback():
539 t, v, tb = sys.exc_info()
540 tbinfo = []
541 if not tb: # Must have a traceback
542 raise AssertionError("traceback does not exist")
543 while tb:
544 tbinfo.append((
545 tb.tb_frame.f_code.co_filename,
546 tb.tb_frame.f_code.co_name,
547 str(tb.tb_lineno)
548 ))
549 tb = tb.tb_next
550
551 # just to be safe
552 del tb
553
554 file, function, line = tbinfo[-1]
555 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
556 return (file, function, line), t, v, info
557
558 def close_all(map=None, ignore_all=False):
559 if map is None:
560 map = socket_map
561 for x in list(map.values()):
562 try:
563 x.close()
564 except OSError as x:
565 if x.errno == EBADF:
566 pass
567 elif not ignore_all:
568 raise
569 except _reraised_exceptions:
570 raise
571 except:
572 if not ignore_all:
573 raise
574 map.clear()
575
576 # Asynchronous File I/O:
577 #
578 # After a little research (reading man pages on various unixen, and
579 # digging through the linux kernel), I've determined that select()
580 # isn't meant for doing asynchronous file i/o.
581 # Heartening, though - reading linux/mm/filemap.c shows that linux
582 # supports asynchronous read-ahead. So _MOST_ of the time, the data
583 # will be sitting in memory for us already when we go to read it.
584 #
585 # What other OS's (besides NT) support async file i/o? [VMS?]
586 #
587 # Regardless, this is useful for pipes, and stdin/stdout...
588
589 if os.name == 'posix':
590 class ESC[4;38;5;81mfile_wrapper:
591 # Here we override just enough to make a file
592 # look like a socket for the purposes of asyncore.
593 # The passed fd is automatically os.dup()'d
594
595 def __init__(self, fd):
596 self.fd = os.dup(fd)
597
598 def __del__(self):
599 if self.fd >= 0:
600 warnings.warn("unclosed file %r" % self, ResourceWarning,
601 source=self)
602 self.close()
603
604 def recv(self, *args):
605 return os.read(self.fd, *args)
606
607 def send(self, *args):
608 return os.write(self.fd, *args)
609
610 def getsockopt(self, level, optname, buflen=None):
611 if (level == socket.SOL_SOCKET and
612 optname == socket.SO_ERROR and
613 not buflen):
614 return 0
615 raise NotImplementedError("Only asyncore specific behaviour "
616 "implemented.")
617
618 read = recv
619 write = send
620
621 def close(self):
622 if self.fd < 0:
623 return
624 fd = self.fd
625 self.fd = -1
626 os.close(fd)
627
628 def fileno(self):
629 return self.fd
630
631 class ESC[4;38;5;81mfile_dispatcher(ESC[4;38;5;149mdispatcher):
632
633 def __init__(self, fd, map=None):
634 dispatcher.__init__(self, None, map)
635 self.connected = True
636 try:
637 fd = fd.fileno()
638 except AttributeError:
639 pass
640 self.set_file(fd)
641 # set it to non-blocking mode
642 os.set_blocking(fd, False)
643
644 def set_file(self, fd):
645 self.socket = file_wrapper(fd)
646 self._fileno = self.socket.fileno()
647 self.add_channel()