1 #
2 # A higher level module for using sockets (or Windows named pipes)
3 #
4 # multiprocessing/connection.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
11
12 import errno
13 import io
14 import os
15 import sys
16 import socket
17 import struct
18 import time
19 import tempfile
20 import itertools
21
22 import _multiprocessing
23
24 from . import util
25
26 from . import AuthenticationError, BufferTooShort
27 from .context import reduction
28 _ForkingPickler = reduction.ForkingPickler
29
30 try:
31 import _winapi
32 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
33 except ImportError:
34 if sys.platform == 'win32':
35 raise
36 _winapi = None
37
38 #
39 #
40 #
41
42 BUFSIZE = 8192
43 # A very generous timeout when it comes to local connections...
44 CONNECTION_TIMEOUT = 20.
45
46 _mmap_counter = itertools.count()
47
48 default_family = 'AF_INET'
49 families = ['AF_INET']
50
51 if hasattr(socket, 'AF_UNIX'):
52 default_family = 'AF_UNIX'
53 families += ['AF_UNIX']
54
55 if sys.platform == 'win32':
56 default_family = 'AF_PIPE'
57 families += ['AF_PIPE']
58
59
60 def _init_timeout(timeout=CONNECTION_TIMEOUT):
61 return time.monotonic() + timeout
62
63 def _check_timeout(t):
64 return time.monotonic() > t
65
66 #
67 #
68 #
69
70 def arbitrary_address(family):
71 '''
72 Return an arbitrary free address for the given family
73 '''
74 if family == 'AF_INET':
75 return ('localhost', 0)
76 elif family == 'AF_UNIX':
77 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
78 elif family == 'AF_PIPE':
79 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
80 (os.getpid(), next(_mmap_counter)), dir="")
81 else:
82 raise ValueError('unrecognized family')
83
84 def _validate_family(family):
85 '''
86 Checks if the family is valid for the current environment.
87 '''
88 if sys.platform != 'win32' and family == 'AF_PIPE':
89 raise ValueError('Family %s is not recognized.' % family)
90
91 if sys.platform == 'win32' and family == 'AF_UNIX':
92 # double check
93 if not hasattr(socket, family):
94 raise ValueError('Family %s is not recognized.' % family)
95
96 def address_type(address):
97 '''
98 Return the types of the address
99
100 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
101 '''
102 if type(address) == tuple:
103 return 'AF_INET'
104 elif type(address) is str and address.startswith('\\\\'):
105 return 'AF_PIPE'
106 elif type(address) is str or util.is_abstract_socket_namespace(address):
107 return 'AF_UNIX'
108 else:
109 raise ValueError('address type of %r unrecognized' % address)
110
111 #
112 # Connection classes
113 #
114
115 class ESC[4;38;5;81m_ConnectionBase:
116 _handle = None
117
118 def __init__(self, handle, readable=True, writable=True):
119 handle = handle.__index__()
120 if handle < 0:
121 raise ValueError("invalid handle")
122 if not readable and not writable:
123 raise ValueError(
124 "at least one of `readable` and `writable` must be True")
125 self._handle = handle
126 self._readable = readable
127 self._writable = writable
128
129 # XXX should we use util.Finalize instead of a __del__?
130
131 def __del__(self):
132 if self._handle is not None:
133 self._close()
134
135 def _check_closed(self):
136 if self._handle is None:
137 raise OSError("handle is closed")
138
139 def _check_readable(self):
140 if not self._readable:
141 raise OSError("connection is write-only")
142
143 def _check_writable(self):
144 if not self._writable:
145 raise OSError("connection is read-only")
146
147 def _bad_message_length(self):
148 if self._writable:
149 self._readable = False
150 else:
151 self.close()
152 raise OSError("bad message length")
153
154 @property
155 def closed(self):
156 """True if the connection is closed"""
157 return self._handle is None
158
159 @property
160 def readable(self):
161 """True if the connection is readable"""
162 return self._readable
163
164 @property
165 def writable(self):
166 """True if the connection is writable"""
167 return self._writable
168
169 def fileno(self):
170 """File descriptor or handle of the connection"""
171 self._check_closed()
172 return self._handle
173
174 def close(self):
175 """Close the connection"""
176 if self._handle is not None:
177 try:
178 self._close()
179 finally:
180 self._handle = None
181
182 def send_bytes(self, buf, offset=0, size=None):
183 """Send the bytes data from a bytes-like object"""
184 self._check_closed()
185 self._check_writable()
186 m = memoryview(buf)
187 if m.itemsize > 1:
188 m = m.cast('B')
189 n = m.nbytes
190 if offset < 0:
191 raise ValueError("offset is negative")
192 if n < offset:
193 raise ValueError("buffer length < offset")
194 if size is None:
195 size = n - offset
196 elif size < 0:
197 raise ValueError("size is negative")
198 elif offset + size > n:
199 raise ValueError("buffer length < offset + size")
200 self._send_bytes(m[offset:offset + size])
201
202 def send(self, obj):
203 """Send a (picklable) object"""
204 self._check_closed()
205 self._check_writable()
206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
209 """
210 Receive bytes data as a bytes object.
211 """
212 self._check_closed()
213 self._check_readable()
214 if maxlength is not None and maxlength < 0:
215 raise ValueError("negative maxlength")
216 buf = self._recv_bytes(maxlength)
217 if buf is None:
218 self._bad_message_length()
219 return buf.getvalue()
220
221 def recv_bytes_into(self, buf, offset=0):
222 """
223 Receive bytes data into a writeable bytes-like object.
224 Return the number of bytes read.
225 """
226 self._check_closed()
227 self._check_readable()
228 with memoryview(buf) as m:
229 # Get bytesize of arbitrary buffer
230 itemsize = m.itemsize
231 bytesize = itemsize * len(m)
232 if offset < 0:
233 raise ValueError("negative offset")
234 elif offset > bytesize:
235 raise ValueError("offset too large")
236 result = self._recv_bytes()
237 size = result.tell()
238 if bytesize < offset + size:
239 raise BufferTooShort(result.getvalue())
240 # Message can fit in dest
241 result.seek(0)
242 result.readinto(m[offset // itemsize :
243 (offset + size) // itemsize])
244 return size
245
246 def recv(self):
247 """Receive a (picklable) object"""
248 self._check_closed()
249 self._check_readable()
250 buf = self._recv_bytes()
251 return _ForkingPickler.loads(buf.getbuffer())
252
253 def poll(self, timeout=0.0):
254 """Whether there is any input available to be read"""
255 self._check_closed()
256 self._check_readable()
257 return self._poll(timeout)
258
259 def __enter__(self):
260 return self
261
262 def __exit__(self, exc_type, exc_value, exc_tb):
263 self.close()
264
265
266 if _winapi:
267
268 class ESC[4;38;5;81mPipeConnection(ESC[4;38;5;149m_ConnectionBase):
269 """
270 Connection class based on a Windows named pipe.
271 Overlapped I/O is used, so the handles must have been created
272 with FILE_FLAG_OVERLAPPED.
273 """
274 _got_empty_message = False
275 _send_ov = None
276
277 def _close(self, _CloseHandle=_winapi.CloseHandle):
278 ov = self._send_ov
279 if ov is not None:
280 # Interrupt WaitForMultipleObjects() in _send_bytes()
281 ov.cancel()
282 _CloseHandle(self._handle)
283
284 def _send_bytes(self, buf):
285 if self._send_ov is not None:
286 # A connection should only be used by a single thread
287 raise ValueError("concurrent send_bytes() calls "
288 "are not supported")
289 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
290 self._send_ov = ov
291 try:
292 if err == _winapi.ERROR_IO_PENDING:
293 waitres = _winapi.WaitForMultipleObjects(
294 [ov.event], False, INFINITE)
295 assert waitres == WAIT_OBJECT_0
296 except:
297 ov.cancel()
298 raise
299 finally:
300 self._send_ov = None
301 nwritten, err = ov.GetOverlappedResult(True)
302 if err == _winapi.ERROR_OPERATION_ABORTED:
303 # close() was called by another thread while
304 # WaitForMultipleObjects() was waiting for the overlapped
305 # operation.
306 raise OSError(errno.EPIPE, "handle is closed")
307 assert err == 0
308 assert nwritten == len(buf)
309
310 def _recv_bytes(self, maxsize=None):
311 if self._got_empty_message:
312 self._got_empty_message = False
313 return io.BytesIO()
314 else:
315 bsize = 128 if maxsize is None else min(maxsize, 128)
316 try:
317 ov, err = _winapi.ReadFile(self._handle, bsize,
318 overlapped=True)
319 try:
320 if err == _winapi.ERROR_IO_PENDING:
321 waitres = _winapi.WaitForMultipleObjects(
322 [ov.event], False, INFINITE)
323 assert waitres == WAIT_OBJECT_0
324 except:
325 ov.cancel()
326 raise
327 finally:
328 nread, err = ov.GetOverlappedResult(True)
329 if err == 0:
330 f = io.BytesIO()
331 f.write(ov.getbuffer())
332 return f
333 elif err == _winapi.ERROR_MORE_DATA:
334 return self._get_more_data(ov, maxsize)
335 except OSError as e:
336 if e.winerror == _winapi.ERROR_BROKEN_PIPE:
337 raise EOFError
338 else:
339 raise
340 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
341
342 def _poll(self, timeout):
343 if (self._got_empty_message or
344 _winapi.PeekNamedPipe(self._handle)[0] != 0):
345 return True
346 return bool(wait([self], timeout))
347
348 def _get_more_data(self, ov, maxsize):
349 buf = ov.getbuffer()
350 f = io.BytesIO()
351 f.write(buf)
352 left = _winapi.PeekNamedPipe(self._handle)[1]
353 assert left > 0
354 if maxsize is not None and len(buf) + left > maxsize:
355 self._bad_message_length()
356 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
357 rbytes, err = ov.GetOverlappedResult(True)
358 assert err == 0
359 assert rbytes == left
360 f.write(ov.getbuffer())
361 return f
362
363
364 class ESC[4;38;5;81mConnection(ESC[4;38;5;149m_ConnectionBase):
365 """
366 Connection class based on an arbitrary file descriptor (Unix only), or
367 a socket handle (Windows).
368 """
369
370 if _winapi:
371 def _close(self, _close=_multiprocessing.closesocket):
372 _close(self._handle)
373 _write = _multiprocessing.send
374 _read = _multiprocessing.recv
375 else:
376 def _close(self, _close=os.close):
377 _close(self._handle)
378 _write = os.write
379 _read = os.read
380
381 def _send(self, buf, write=_write):
382 remaining = len(buf)
383 while True:
384 n = write(self._handle, buf)
385 remaining -= n
386 if remaining == 0:
387 break
388 buf = buf[n:]
389
390 def _recv(self, size, read=_read):
391 buf = io.BytesIO()
392 handle = self._handle
393 remaining = size
394 while remaining > 0:
395 chunk = read(handle, remaining)
396 n = len(chunk)
397 if n == 0:
398 if remaining == size:
399 raise EOFError
400 else:
401 raise OSError("got end of file during message")
402 buf.write(chunk)
403 remaining -= n
404 return buf
405
406 def _send_bytes(self, buf):
407 n = len(buf)
408 if n > 0x7fffffff:
409 pre_header = struct.pack("!i", -1)
410 header = struct.pack("!Q", n)
411 self._send(pre_header)
412 self._send(header)
413 self._send(buf)
414 else:
415 # For wire compatibility with 3.7 and lower
416 header = struct.pack("!i", n)
417 if n > 16384:
418 # The payload is large so Nagle's algorithm won't be triggered
419 # and we'd better avoid the cost of concatenation.
420 self._send(header)
421 self._send(buf)
422 else:
423 # Issue #20540: concatenate before sending, to avoid delays due
424 # to Nagle's algorithm on a TCP socket.
425 # Also note we want to avoid sending a 0-length buffer separately,
426 # to avoid "broken pipe" errors if the other end closed the pipe.
427 self._send(header + buf)
428
429 def _recv_bytes(self, maxsize=None):
430 buf = self._recv(4)
431 size, = struct.unpack("!i", buf.getvalue())
432 if size == -1:
433 buf = self._recv(8)
434 size, = struct.unpack("!Q", buf.getvalue())
435 if maxsize is not None and size > maxsize:
436 return None
437 return self._recv(size)
438
439 def _poll(self, timeout):
440 r = wait([self], timeout)
441 return bool(r)
442
443
444 #
445 # Public functions
446 #
447
448 class ESC[4;38;5;81mListener(ESC[4;38;5;149mobject):
449 '''
450 Returns a listener object.
451
452 This is a wrapper for a bound socket which is 'listening' for
453 connections, or for a Windows named pipe.
454 '''
455 def __init__(self, address=None, family=None, backlog=1, authkey=None):
456 family = family or (address and address_type(address)) \
457 or default_family
458 address = address or arbitrary_address(family)
459
460 _validate_family(family)
461 if family == 'AF_PIPE':
462 self._listener = PipeListener(address, backlog)
463 else:
464 self._listener = SocketListener(address, family, backlog)
465
466 if authkey is not None and not isinstance(authkey, bytes):
467 raise TypeError('authkey should be a byte string')
468
469 self._authkey = authkey
470
471 def accept(self):
472 '''
473 Accept a connection on the bound socket or named pipe of `self`.
474
475 Returns a `Connection` object.
476 '''
477 if self._listener is None:
478 raise OSError('listener is closed')
479 c = self._listener.accept()
480 if self._authkey:
481 deliver_challenge(c, self._authkey)
482 answer_challenge(c, self._authkey)
483 return c
484
485 def close(self):
486 '''
487 Close the bound socket or named pipe of `self`.
488 '''
489 listener = self._listener
490 if listener is not None:
491 self._listener = None
492 listener.close()
493
494 @property
495 def address(self):
496 return self._listener._address
497
498 @property
499 def last_accepted(self):
500 return self._listener._last_accepted
501
502 def __enter__(self):
503 return self
504
505 def __exit__(self, exc_type, exc_value, exc_tb):
506 self.close()
507
508
509 def Client(address, family=None, authkey=None):
510 '''
511 Returns a connection to the address of a `Listener`
512 '''
513 family = family or address_type(address)
514 _validate_family(family)
515 if family == 'AF_PIPE':
516 c = PipeClient(address)
517 else:
518 c = SocketClient(address)
519
520 if authkey is not None and not isinstance(authkey, bytes):
521 raise TypeError('authkey should be a byte string')
522
523 if authkey is not None:
524 answer_challenge(c, authkey)
525 deliver_challenge(c, authkey)
526
527 return c
528
529
530 if sys.platform != 'win32':
531
532 def Pipe(duplex=True):
533 '''
534 Returns pair of connection objects at either end of a pipe
535 '''
536 if duplex:
537 s1, s2 = socket.socketpair()
538 s1.setblocking(True)
539 s2.setblocking(True)
540 c1 = Connection(s1.detach())
541 c2 = Connection(s2.detach())
542 else:
543 fd1, fd2 = os.pipe()
544 c1 = Connection(fd1, writable=False)
545 c2 = Connection(fd2, readable=False)
546
547 return c1, c2
548
549 else:
550
551 def Pipe(duplex=True):
552 '''
553 Returns pair of connection objects at either end of a pipe
554 '''
555 address = arbitrary_address('AF_PIPE')
556 if duplex:
557 openmode = _winapi.PIPE_ACCESS_DUPLEX
558 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
559 obsize, ibsize = BUFSIZE, BUFSIZE
560 else:
561 openmode = _winapi.PIPE_ACCESS_INBOUND
562 access = _winapi.GENERIC_WRITE
563 obsize, ibsize = 0, BUFSIZE
564
565 h1 = _winapi.CreateNamedPipe(
566 address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
567 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
568 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
569 _winapi.PIPE_WAIT,
570 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
571 # default security descriptor: the handle cannot be inherited
572 _winapi.NULL
573 )
574 h2 = _winapi.CreateFile(
575 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
576 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
577 )
578 _winapi.SetNamedPipeHandleState(
579 h2, _winapi.PIPE_READMODE_MESSAGE, None, None
580 )
581
582 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
583 _, err = overlapped.GetOverlappedResult(True)
584 assert err == 0
585
586 c1 = PipeConnection(h1, writable=duplex)
587 c2 = PipeConnection(h2, readable=duplex)
588
589 return c1, c2
590
591 #
592 # Definitions for connections based on sockets
593 #
594
595 class ESC[4;38;5;81mSocketListener(ESC[4;38;5;149mobject):
596 '''
597 Representation of a socket which is bound to an address and listening
598 '''
599 def __init__(self, address, family, backlog=1):
600 self._socket = socket.socket(getattr(socket, family))
601 try:
602 # SO_REUSEADDR has different semantics on Windows (issue #2550).
603 if os.name == 'posix':
604 self._socket.setsockopt(socket.SOL_SOCKET,
605 socket.SO_REUSEADDR, 1)
606 self._socket.setblocking(True)
607 self._socket.bind(address)
608 self._socket.listen(backlog)
609 self._address = self._socket.getsockname()
610 except OSError:
611 self._socket.close()
612 raise
613 self._family = family
614 self._last_accepted = None
615
616 if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
617 # Linux abstract socket namespaces do not need to be explicitly unlinked
618 self._unlink = util.Finalize(
619 self, os.unlink, args=(address,), exitpriority=0
620 )
621 else:
622 self._unlink = None
623
624 def accept(self):
625 s, self._last_accepted = self._socket.accept()
626 s.setblocking(True)
627 return Connection(s.detach())
628
629 def close(self):
630 try:
631 self._socket.close()
632 finally:
633 unlink = self._unlink
634 if unlink is not None:
635 self._unlink = None
636 unlink()
637
638
639 def SocketClient(address):
640 '''
641 Return a connection object connected to the socket given by `address`
642 '''
643 family = address_type(address)
644 with socket.socket( getattr(socket, family) ) as s:
645 s.setblocking(True)
646 s.connect(address)
647 return Connection(s.detach())
648
649 #
650 # Definitions for connections based on named pipes
651 #
652
653 if sys.platform == 'win32':
654
655 class ESC[4;38;5;81mPipeListener(ESC[4;38;5;149mobject):
656 '''
657 Representation of a named pipe
658 '''
659 def __init__(self, address, backlog=None):
660 self._address = address
661 self._handle_queue = [self._new_handle(first=True)]
662
663 self._last_accepted = None
664 util.sub_debug('listener created with address=%r', self._address)
665 self.close = util.Finalize(
666 self, PipeListener._finalize_pipe_listener,
667 args=(self._handle_queue, self._address), exitpriority=0
668 )
669
670 def _new_handle(self, first=False):
671 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
672 if first:
673 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
674 return _winapi.CreateNamedPipe(
675 self._address, flags,
676 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
677 _winapi.PIPE_WAIT,
678 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
679 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
680 )
681
682 def accept(self):
683 self._handle_queue.append(self._new_handle())
684 handle = self._handle_queue.pop(0)
685 try:
686 ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
687 except OSError as e:
688 if e.winerror != _winapi.ERROR_NO_DATA:
689 raise
690 # ERROR_NO_DATA can occur if a client has already connected,
691 # written data and then disconnected -- see Issue 14725.
692 else:
693 try:
694 res = _winapi.WaitForMultipleObjects(
695 [ov.event], False, INFINITE)
696 except:
697 ov.cancel()
698 _winapi.CloseHandle(handle)
699 raise
700 finally:
701 _, err = ov.GetOverlappedResult(True)
702 assert err == 0
703 return PipeConnection(handle)
704
705 @staticmethod
706 def _finalize_pipe_listener(queue, address):
707 util.sub_debug('closing listener with address=%r', address)
708 for handle in queue:
709 _winapi.CloseHandle(handle)
710
711 def PipeClient(address):
712 '''
713 Return a connection object connected to the pipe given by `address`
714 '''
715 t = _init_timeout()
716 while 1:
717 try:
718 _winapi.WaitNamedPipe(address, 1000)
719 h = _winapi.CreateFile(
720 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
721 0, _winapi.NULL, _winapi.OPEN_EXISTING,
722 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
723 )
724 except OSError as e:
725 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
726 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
727 raise
728 else:
729 break
730 else:
731 raise
732
733 _winapi.SetNamedPipeHandleState(
734 h, _winapi.PIPE_READMODE_MESSAGE, None, None
735 )
736 return PipeConnection(h)
737
738 #
739 # Authentication stuff
740 #
741
742 MESSAGE_LENGTH = 20
743
744 CHALLENGE = b'#CHALLENGE#'
745 WELCOME = b'#WELCOME#'
746 FAILURE = b'#FAILURE#'
747
748 def deliver_challenge(connection, authkey):
749 import hmac
750 if not isinstance(authkey, bytes):
751 raise ValueError(
752 "Authkey must be bytes, not {0!s}".format(type(authkey)))
753 message = os.urandom(MESSAGE_LENGTH)
754 connection.send_bytes(CHALLENGE + message)
755 digest = hmac.new(authkey, message, 'md5').digest()
756 response = connection.recv_bytes(256) # reject large message
757 if response == digest:
758 connection.send_bytes(WELCOME)
759 else:
760 connection.send_bytes(FAILURE)
761 raise AuthenticationError('digest received was wrong')
762
763 def answer_challenge(connection, authkey):
764 import hmac
765 if not isinstance(authkey, bytes):
766 raise ValueError(
767 "Authkey must be bytes, not {0!s}".format(type(authkey)))
768 message = connection.recv_bytes(256) # reject large message
769 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
770 message = message[len(CHALLENGE):]
771 digest = hmac.new(authkey, message, 'md5').digest()
772 connection.send_bytes(digest)
773 response = connection.recv_bytes(256) # reject large message
774 if response != WELCOME:
775 raise AuthenticationError('digest sent was rejected')
776
777 #
778 # Support for using xmlrpclib for serialization
779 #
780
781 class ESC[4;38;5;81mConnectionWrapper(ESC[4;38;5;149mobject):
782 def __init__(self, conn, dumps, loads):
783 self._conn = conn
784 self._dumps = dumps
785 self._loads = loads
786 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
787 obj = getattr(conn, attr)
788 setattr(self, attr, obj)
789 def send(self, obj):
790 s = self._dumps(obj)
791 self._conn.send_bytes(s)
792 def recv(self):
793 s = self._conn.recv_bytes()
794 return self._loads(s)
795
796 def _xml_dumps(obj):
797 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
798
799 def _xml_loads(s):
800 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
801 return obj
802
803 class ESC[4;38;5;81mXmlListener(ESC[4;38;5;149mListener):
804 def accept(self):
805 global xmlrpclib
806 import xmlrpc.client as xmlrpclib
807 obj = Listener.accept(self)
808 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
809
810 def XmlClient(*args, **kwds):
811 global xmlrpclib
812 import xmlrpc.client as xmlrpclib
813 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
814
815 #
816 # Wait
817 #
818
819 if sys.platform == 'win32':
820
821 def _exhaustive_wait(handles, timeout):
822 # Return ALL handles which are currently signalled. (Only
823 # returning the first signalled might create starvation issues.)
824 L = list(handles)
825 ready = []
826 while L:
827 res = _winapi.WaitForMultipleObjects(L, False, timeout)
828 if res == WAIT_TIMEOUT:
829 break
830 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
831 res -= WAIT_OBJECT_0
832 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
833 res -= WAIT_ABANDONED_0
834 else:
835 raise RuntimeError('Should not get here')
836 ready.append(L[res])
837 L = L[res+1:]
838 timeout = 0
839 return ready
840
841 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
842
843 def wait(object_list, timeout=None):
844 '''
845 Wait till an object in object_list is ready/readable.
846
847 Returns list of those objects in object_list which are ready/readable.
848 '''
849 if timeout is None:
850 timeout = INFINITE
851 elif timeout < 0:
852 timeout = 0
853 else:
854 timeout = int(timeout * 1000 + 0.5)
855
856 object_list = list(object_list)
857 waithandle_to_obj = {}
858 ov_list = []
859 ready_objects = set()
860 ready_handles = set()
861
862 try:
863 for o in object_list:
864 try:
865 fileno = getattr(o, 'fileno')
866 except AttributeError:
867 waithandle_to_obj[o.__index__()] = o
868 else:
869 # start an overlapped read of length zero
870 try:
871 ov, err = _winapi.ReadFile(fileno(), 0, True)
872 except OSError as e:
873 ov, err = None, e.winerror
874 if err not in _ready_errors:
875 raise
876 if err == _winapi.ERROR_IO_PENDING:
877 ov_list.append(ov)
878 waithandle_to_obj[ov.event] = o
879 else:
880 # If o.fileno() is an overlapped pipe handle and
881 # err == 0 then there is a zero length message
882 # in the pipe, but it HAS NOT been consumed...
883 if ov and sys.getwindowsversion()[:2] >= (6, 2):
884 # ... except on Windows 8 and later, where
885 # the message HAS been consumed.
886 try:
887 _, err = ov.GetOverlappedResult(False)
888 except OSError as e:
889 err = e.winerror
890 if not err and hasattr(o, '_got_empty_message'):
891 o._got_empty_message = True
892 ready_objects.add(o)
893 timeout = 0
894
895 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
896 finally:
897 # request that overlapped reads stop
898 for ov in ov_list:
899 ov.cancel()
900
901 # wait for all overlapped reads to stop
902 for ov in ov_list:
903 try:
904 _, err = ov.GetOverlappedResult(True)
905 except OSError as e:
906 err = e.winerror
907 if err not in _ready_errors:
908 raise
909 if err != _winapi.ERROR_OPERATION_ABORTED:
910 o = waithandle_to_obj[ov.event]
911 ready_objects.add(o)
912 if err == 0:
913 # If o.fileno() is an overlapped pipe handle then
914 # a zero length message HAS been consumed.
915 if hasattr(o, '_got_empty_message'):
916 o._got_empty_message = True
917
918 ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
919 return [o for o in object_list if o in ready_objects]
920
921 else:
922
923 import selectors
924
925 # poll/select have the advantage of not requiring any extra file
926 # descriptor, contrarily to epoll/kqueue (also, they require a single
927 # syscall).
928 if hasattr(selectors, 'PollSelector'):
929 _WaitSelector = selectors.PollSelector
930 else:
931 _WaitSelector = selectors.SelectSelector
932
933 def wait(object_list, timeout=None):
934 '''
935 Wait till an object in object_list is ready/readable.
936
937 Returns list of those objects in object_list which are ready/readable.
938 '''
939 with _WaitSelector() as selector:
940 for obj in object_list:
941 selector.register(obj, selectors.EVENT_READ)
942
943 if timeout is not None:
944 deadline = time.monotonic() + timeout
945
946 while True:
947 ready = selector.select(timeout)
948 if ready:
949 return [key.fileobj for (key, events) in ready]
950 else:
951 if timeout is not None:
952 timeout = deadline - time.monotonic()
953 if timeout < 0:
954 return ready
955
956 #
957 # Make connection and socket objects shareable if possible
958 #
959
960 if sys.platform == 'win32':
961 def reduce_connection(conn):
962 handle = conn.fileno()
963 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
964 from . import resource_sharer
965 ds = resource_sharer.DupSocket(s)
966 return rebuild_connection, (ds, conn.readable, conn.writable)
967 def rebuild_connection(ds, readable, writable):
968 sock = ds.detach()
969 return Connection(sock.detach(), readable, writable)
970 reduction.register(Connection, reduce_connection)
971
972 def reduce_pipe_connection(conn):
973 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
974 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
975 dh = reduction.DupHandle(conn.fileno(), access)
976 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
977 def rebuild_pipe_connection(dh, readable, writable):
978 handle = dh.detach()
979 return PipeConnection(handle, readable, writable)
980 reduction.register(PipeConnection, reduce_pipe_connection)
981
982 else:
983 def reduce_connection(conn):
984 df = reduction.DupFd(conn.fileno())
985 return rebuild_connection, (df, conn.readable, conn.writable)
986 def rebuild_connection(df, readable, writable):
987 fd = df.detach()
988 return Connection(fd, readable, writable)
989 reduction.register(Connection, reduce_connection)