python (3.12.0)
1 #
2 # A higher level module for using sockets (or Windows named pipes)
3 #
4 # multiprocessing/connection.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
11
12 import io
13 import os
14 import sys
15 import socket
16 import struct
17 import time
18 import tempfile
19 import itertools
20
21 import _multiprocessing
22
23 from . import util
24
25 from . import AuthenticationError, BufferTooShort
26 from .context import reduction
27 _ForkingPickler = reduction.ForkingPickler
28
29 try:
30 import _winapi
31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
32 except ImportError:
33 if sys.platform == 'win32':
34 raise
35 _winapi = None
36
37 #
38 #
39 #
40
41 BUFSIZE = 8192
42 # A very generous timeout when it comes to local connections...
43 CONNECTION_TIMEOUT = 20.
44
45 _mmap_counter = itertools.count()
46
47 default_family = 'AF_INET'
48 families = ['AF_INET']
49
50 if hasattr(socket, 'AF_UNIX'):
51 default_family = 'AF_UNIX'
52 families += ['AF_UNIX']
53
54 if sys.platform == 'win32':
55 default_family = 'AF_PIPE'
56 families += ['AF_PIPE']
57
58
59 def _init_timeout(timeout=CONNECTION_TIMEOUT):
60 return time.monotonic() + timeout
61
62 def _check_timeout(t):
63 return time.monotonic() > t
64
65 #
66 #
67 #
68
69 def arbitrary_address(family):
70 '''
71 Return an arbitrary free address for the given family
72 '''
73 if family == 'AF_INET':
74 return ('localhost', 0)
75 elif family == 'AF_UNIX':
76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
77 elif family == 'AF_PIPE':
78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
79 (os.getpid(), next(_mmap_counter)), dir="")
80 else:
81 raise ValueError('unrecognized family')
82
83 def _validate_family(family):
84 '''
85 Checks if the family is valid for the current environment.
86 '''
87 if sys.platform != 'win32' and family == 'AF_PIPE':
88 raise ValueError('Family %s is not recognized.' % family)
89
90 if sys.platform == 'win32' and family == 'AF_UNIX':
91 # double check
92 if not hasattr(socket, family):
93 raise ValueError('Family %s is not recognized.' % family)
94
95 def address_type(address):
96 '''
97 Return the types of the address
98
99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
100 '''
101 if type(address) == tuple:
102 return 'AF_INET'
103 elif type(address) is str and address.startswith('\\\\'):
104 return 'AF_PIPE'
105 elif type(address) is str or util.is_abstract_socket_namespace(address):
106 return 'AF_UNIX'
107 else:
108 raise ValueError('address type of %r unrecognized' % address)
109
110 #
111 # Connection classes
112 #
113
114 class ESC[4;38;5;81m_ConnectionBase:
115 _handle = None
116
117 def __init__(self, handle, readable=True, writable=True):
118 handle = handle.__index__()
119 if handle < 0:
120 raise ValueError("invalid handle")
121 if not readable and not writable:
122 raise ValueError(
123 "at least one of `readable` and `writable` must be True")
124 self._handle = handle
125 self._readable = readable
126 self._writable = writable
127
128 # XXX should we use util.Finalize instead of a __del__?
129
130 def __del__(self):
131 if self._handle is not None:
132 self._close()
133
134 def _check_closed(self):
135 if self._handle is None:
136 raise OSError("handle is closed")
137
138 def _check_readable(self):
139 if not self._readable:
140 raise OSError("connection is write-only")
141
142 def _check_writable(self):
143 if not self._writable:
144 raise OSError("connection is read-only")
145
146 def _bad_message_length(self):
147 if self._writable:
148 self._readable = False
149 else:
150 self.close()
151 raise OSError("bad message length")
152
153 @property
154 def closed(self):
155 """True if the connection is closed"""
156 return self._handle is None
157
158 @property
159 def readable(self):
160 """True if the connection is readable"""
161 return self._readable
162
163 @property
164 def writable(self):
165 """True if the connection is writable"""
166 return self._writable
167
168 def fileno(self):
169 """File descriptor or handle of the connection"""
170 self._check_closed()
171 return self._handle
172
173 def close(self):
174 """Close the connection"""
175 if self._handle is not None:
176 try:
177 self._close()
178 finally:
179 self._handle = None
180
181 def send_bytes(self, buf, offset=0, size=None):
182 """Send the bytes data from a bytes-like object"""
183 self._check_closed()
184 self._check_writable()
185 m = memoryview(buf)
186 if m.itemsize > 1:
187 m = m.cast('B')
188 n = m.nbytes
189 if offset < 0:
190 raise ValueError("offset is negative")
191 if n < offset:
192 raise ValueError("buffer length < offset")
193 if size is None:
194 size = n - offset
195 elif size < 0:
196 raise ValueError("size is negative")
197 elif offset + size > n:
198 raise ValueError("buffer length < offset + size")
199 self._send_bytes(m[offset:offset + size])
200
201 def send(self, obj):
202 """Send a (picklable) object"""
203 self._check_closed()
204 self._check_writable()
205 self._send_bytes(_ForkingPickler.dumps(obj))
206
207 def recv_bytes(self, maxlength=None):
208 """
209 Receive bytes data as a bytes object.
210 """
211 self._check_closed()
212 self._check_readable()
213 if maxlength is not None and maxlength < 0:
214 raise ValueError("negative maxlength")
215 buf = self._recv_bytes(maxlength)
216 if buf is None:
217 self._bad_message_length()
218 return buf.getvalue()
219
220 def recv_bytes_into(self, buf, offset=0):
221 """
222 Receive bytes data into a writeable bytes-like object.
223 Return the number of bytes read.
224 """
225 self._check_closed()
226 self._check_readable()
227 with memoryview(buf) as m:
228 # Get bytesize of arbitrary buffer
229 itemsize = m.itemsize
230 bytesize = itemsize * len(m)
231 if offset < 0:
232 raise ValueError("negative offset")
233 elif offset > bytesize:
234 raise ValueError("offset too large")
235 result = self._recv_bytes()
236 size = result.tell()
237 if bytesize < offset + size:
238 raise BufferTooShort(result.getvalue())
239 # Message can fit in dest
240 result.seek(0)
241 result.readinto(m[offset // itemsize :
242 (offset + size) // itemsize])
243 return size
244
245 def recv(self):
246 """Receive a (picklable) object"""
247 self._check_closed()
248 self._check_readable()
249 buf = self._recv_bytes()
250 return _ForkingPickler.loads(buf.getbuffer())
251
252 def poll(self, timeout=0.0):
253 """Whether there is any input available to be read"""
254 self._check_closed()
255 self._check_readable()
256 return self._poll(timeout)
257
258 def __enter__(self):
259 return self
260
261 def __exit__(self, exc_type, exc_value, exc_tb):
262 self.close()
263
264
265 if _winapi:
266
267 class ESC[4;38;5;81mPipeConnection(ESC[4;38;5;149m_ConnectionBase):
268 """
269 Connection class based on a Windows named pipe.
270 Overlapped I/O is used, so the handles must have been created
271 with FILE_FLAG_OVERLAPPED.
272 """
273 _got_empty_message = False
274
275 def _close(self, _CloseHandle=_winapi.CloseHandle):
276 _CloseHandle(self._handle)
277
278 def _send_bytes(self, buf):
279 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
280 try:
281 if err == _winapi.ERROR_IO_PENDING:
282 waitres = _winapi.WaitForMultipleObjects(
283 [ov.event], False, INFINITE)
284 assert waitres == WAIT_OBJECT_0
285 except:
286 ov.cancel()
287 raise
288 finally:
289 nwritten, err = ov.GetOverlappedResult(True)
290 assert err == 0
291 assert nwritten == len(buf)
292
293 def _recv_bytes(self, maxsize=None):
294 if self._got_empty_message:
295 self._got_empty_message = False
296 return io.BytesIO()
297 else:
298 bsize = 128 if maxsize is None else min(maxsize, 128)
299 try:
300 ov, err = _winapi.ReadFile(self._handle, bsize,
301 overlapped=True)
302 try:
303 if err == _winapi.ERROR_IO_PENDING:
304 waitres = _winapi.WaitForMultipleObjects(
305 [ov.event], False, INFINITE)
306 assert waitres == WAIT_OBJECT_0
307 except:
308 ov.cancel()
309 raise
310 finally:
311 nread, err = ov.GetOverlappedResult(True)
312 if err == 0:
313 f = io.BytesIO()
314 f.write(ov.getbuffer())
315 return f
316 elif err == _winapi.ERROR_MORE_DATA:
317 return self._get_more_data(ov, maxsize)
318 except OSError as e:
319 if e.winerror == _winapi.ERROR_BROKEN_PIPE:
320 raise EOFError
321 else:
322 raise
323 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
324
325 def _poll(self, timeout):
326 if (self._got_empty_message or
327 _winapi.PeekNamedPipe(self._handle)[0] != 0):
328 return True
329 return bool(wait([self], timeout))
330
331 def _get_more_data(self, ov, maxsize):
332 buf = ov.getbuffer()
333 f = io.BytesIO()
334 f.write(buf)
335 left = _winapi.PeekNamedPipe(self._handle)[1]
336 assert left > 0
337 if maxsize is not None and len(buf) + left > maxsize:
338 self._bad_message_length()
339 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
340 rbytes, err = ov.GetOverlappedResult(True)
341 assert err == 0
342 assert rbytes == left
343 f.write(ov.getbuffer())
344 return f
345
346
347 class ESC[4;38;5;81mConnection(ESC[4;38;5;149m_ConnectionBase):
348 """
349 Connection class based on an arbitrary file descriptor (Unix only), or
350 a socket handle (Windows).
351 """
352
353 if _winapi:
354 def _close(self, _close=_multiprocessing.closesocket):
355 _close(self._handle)
356 _write = _multiprocessing.send
357 _read = _multiprocessing.recv
358 else:
359 def _close(self, _close=os.close):
360 _close(self._handle)
361 _write = os.write
362 _read = os.read
363
364 def _send(self, buf, write=_write):
365 remaining = len(buf)
366 while True:
367 n = write(self._handle, buf)
368 remaining -= n
369 if remaining == 0:
370 break
371 buf = buf[n:]
372
373 def _recv(self, size, read=_read):
374 buf = io.BytesIO()
375 handle = self._handle
376 remaining = size
377 while remaining > 0:
378 chunk = read(handle, remaining)
379 n = len(chunk)
380 if n == 0:
381 if remaining == size:
382 raise EOFError
383 else:
384 raise OSError("got end of file during message")
385 buf.write(chunk)
386 remaining -= n
387 return buf
388
389 def _send_bytes(self, buf):
390 n = len(buf)
391 if n > 0x7fffffff:
392 pre_header = struct.pack("!i", -1)
393 header = struct.pack("!Q", n)
394 self._send(pre_header)
395 self._send(header)
396 self._send(buf)
397 else:
398 # For wire compatibility with 3.7 and lower
399 header = struct.pack("!i", n)
400 if n > 16384:
401 # The payload is large so Nagle's algorithm won't be triggered
402 # and we'd better avoid the cost of concatenation.
403 self._send(header)
404 self._send(buf)
405 else:
406 # Issue #20540: concatenate before sending, to avoid delays due
407 # to Nagle's algorithm on a TCP socket.
408 # Also note we want to avoid sending a 0-length buffer separately,
409 # to avoid "broken pipe" errors if the other end closed the pipe.
410 self._send(header + buf)
411
412 def _recv_bytes(self, maxsize=None):
413 buf = self._recv(4)
414 size, = struct.unpack("!i", buf.getvalue())
415 if size == -1:
416 buf = self._recv(8)
417 size, = struct.unpack("!Q", buf.getvalue())
418 if maxsize is not None and size > maxsize:
419 return None
420 return self._recv(size)
421
422 def _poll(self, timeout):
423 r = wait([self], timeout)
424 return bool(r)
425
426
427 #
428 # Public functions
429 #
430
431 class ESC[4;38;5;81mListener(ESC[4;38;5;149mobject):
432 '''
433 Returns a listener object.
434
435 This is a wrapper for a bound socket which is 'listening' for
436 connections, or for a Windows named pipe.
437 '''
438 def __init__(self, address=None, family=None, backlog=1, authkey=None):
439 family = family or (address and address_type(address)) \
440 or default_family
441 address = address or arbitrary_address(family)
442
443 _validate_family(family)
444 if family == 'AF_PIPE':
445 self._listener = PipeListener(address, backlog)
446 else:
447 self._listener = SocketListener(address, family, backlog)
448
449 if authkey is not None and not isinstance(authkey, bytes):
450 raise TypeError('authkey should be a byte string')
451
452 self._authkey = authkey
453
454 def accept(self):
455 '''
456 Accept a connection on the bound socket or named pipe of `self`.
457
458 Returns a `Connection` object.
459 '''
460 if self._listener is None:
461 raise OSError('listener is closed')
462 c = self._listener.accept()
463 if self._authkey:
464 deliver_challenge(c, self._authkey)
465 answer_challenge(c, self._authkey)
466 return c
467
468 def close(self):
469 '''
470 Close the bound socket or named pipe of `self`.
471 '''
472 listener = self._listener
473 if listener is not None:
474 self._listener = None
475 listener.close()
476
477 @property
478 def address(self):
479 return self._listener._address
480
481 @property
482 def last_accepted(self):
483 return self._listener._last_accepted
484
485 def __enter__(self):
486 return self
487
488 def __exit__(self, exc_type, exc_value, exc_tb):
489 self.close()
490
491
492 def Client(address, family=None, authkey=None):
493 '''
494 Returns a connection to the address of a `Listener`
495 '''
496 family = family or address_type(address)
497 _validate_family(family)
498 if family == 'AF_PIPE':
499 c = PipeClient(address)
500 else:
501 c = SocketClient(address)
502
503 if authkey is not None and not isinstance(authkey, bytes):
504 raise TypeError('authkey should be a byte string')
505
506 if authkey is not None:
507 answer_challenge(c, authkey)
508 deliver_challenge(c, authkey)
509
510 return c
511
512
513 if sys.platform != 'win32':
514
515 def Pipe(duplex=True):
516 '''
517 Returns pair of connection objects at either end of a pipe
518 '''
519 if duplex:
520 s1, s2 = socket.socketpair()
521 s1.setblocking(True)
522 s2.setblocking(True)
523 c1 = Connection(s1.detach())
524 c2 = Connection(s2.detach())
525 else:
526 fd1, fd2 = os.pipe()
527 c1 = Connection(fd1, writable=False)
528 c2 = Connection(fd2, readable=False)
529
530 return c1, c2
531
532 else:
533
534 def Pipe(duplex=True):
535 '''
536 Returns pair of connection objects at either end of a pipe
537 '''
538 address = arbitrary_address('AF_PIPE')
539 if duplex:
540 openmode = _winapi.PIPE_ACCESS_DUPLEX
541 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
542 obsize, ibsize = BUFSIZE, BUFSIZE
543 else:
544 openmode = _winapi.PIPE_ACCESS_INBOUND
545 access = _winapi.GENERIC_WRITE
546 obsize, ibsize = 0, BUFSIZE
547
548 h1 = _winapi.CreateNamedPipe(
549 address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
550 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
551 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
552 _winapi.PIPE_WAIT,
553 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
554 # default security descriptor: the handle cannot be inherited
555 _winapi.NULL
556 )
557 h2 = _winapi.CreateFile(
558 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
559 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
560 )
561 _winapi.SetNamedPipeHandleState(
562 h2, _winapi.PIPE_READMODE_MESSAGE, None, None
563 )
564
565 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
566 _, err = overlapped.GetOverlappedResult(True)
567 assert err == 0
568
569 c1 = PipeConnection(h1, writable=duplex)
570 c2 = PipeConnection(h2, readable=duplex)
571
572 return c1, c2
573
574 #
575 # Definitions for connections based on sockets
576 #
577
578 class ESC[4;38;5;81mSocketListener(ESC[4;38;5;149mobject):
579 '''
580 Representation of a socket which is bound to an address and listening
581 '''
582 def __init__(self, address, family, backlog=1):
583 self._socket = socket.socket(getattr(socket, family))
584 try:
585 # SO_REUSEADDR has different semantics on Windows (issue #2550).
586 if os.name == 'posix':
587 self._socket.setsockopt(socket.SOL_SOCKET,
588 socket.SO_REUSEADDR, 1)
589 self._socket.setblocking(True)
590 self._socket.bind(address)
591 self._socket.listen(backlog)
592 self._address = self._socket.getsockname()
593 except OSError:
594 self._socket.close()
595 raise
596 self._family = family
597 self._last_accepted = None
598
599 if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
600 # Linux abstract socket namespaces do not need to be explicitly unlinked
601 self._unlink = util.Finalize(
602 self, os.unlink, args=(address,), exitpriority=0
603 )
604 else:
605 self._unlink = None
606
607 def accept(self):
608 s, self._last_accepted = self._socket.accept()
609 s.setblocking(True)
610 return Connection(s.detach())
611
612 def close(self):
613 try:
614 self._socket.close()
615 finally:
616 unlink = self._unlink
617 if unlink is not None:
618 self._unlink = None
619 unlink()
620
621
622 def SocketClient(address):
623 '''
624 Return a connection object connected to the socket given by `address`
625 '''
626 family = address_type(address)
627 with socket.socket( getattr(socket, family) ) as s:
628 s.setblocking(True)
629 s.connect(address)
630 return Connection(s.detach())
631
632 #
633 # Definitions for connections based on named pipes
634 #
635
636 if sys.platform == 'win32':
637
638 class ESC[4;38;5;81mPipeListener(ESC[4;38;5;149mobject):
639 '''
640 Representation of a named pipe
641 '''
642 def __init__(self, address, backlog=None):
643 self._address = address
644 self._handle_queue = [self._new_handle(first=True)]
645
646 self._last_accepted = None
647 util.sub_debug('listener created with address=%r', self._address)
648 self.close = util.Finalize(
649 self, PipeListener._finalize_pipe_listener,
650 args=(self._handle_queue, self._address), exitpriority=0
651 )
652
653 def _new_handle(self, first=False):
654 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
655 if first:
656 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
657 return _winapi.CreateNamedPipe(
658 self._address, flags,
659 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
660 _winapi.PIPE_WAIT,
661 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
662 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
663 )
664
665 def accept(self):
666 self._handle_queue.append(self._new_handle())
667 handle = self._handle_queue.pop(0)
668 try:
669 ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
670 except OSError as e:
671 if e.winerror != _winapi.ERROR_NO_DATA:
672 raise
673 # ERROR_NO_DATA can occur if a client has already connected,
674 # written data and then disconnected -- see Issue 14725.
675 else:
676 try:
677 res = _winapi.WaitForMultipleObjects(
678 [ov.event], False, INFINITE)
679 except:
680 ov.cancel()
681 _winapi.CloseHandle(handle)
682 raise
683 finally:
684 _, err = ov.GetOverlappedResult(True)
685 assert err == 0
686 return PipeConnection(handle)
687
688 @staticmethod
689 def _finalize_pipe_listener(queue, address):
690 util.sub_debug('closing listener with address=%r', address)
691 for handle in queue:
692 _winapi.CloseHandle(handle)
693
694 def PipeClient(address):
695 '''
696 Return a connection object connected to the pipe given by `address`
697 '''
698 t = _init_timeout()
699 while 1:
700 try:
701 _winapi.WaitNamedPipe(address, 1000)
702 h = _winapi.CreateFile(
703 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
704 0, _winapi.NULL, _winapi.OPEN_EXISTING,
705 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
706 )
707 except OSError as e:
708 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
709 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
710 raise
711 else:
712 break
713 else:
714 raise
715
716 _winapi.SetNamedPipeHandleState(
717 h, _winapi.PIPE_READMODE_MESSAGE, None, None
718 )
719 return PipeConnection(h)
720
721 #
722 # Authentication stuff
723 #
724
725 MESSAGE_LENGTH = 40 # MUST be > 20
726
727 _CHALLENGE = b'#CHALLENGE#'
728 _WELCOME = b'#WELCOME#'
729 _FAILURE = b'#FAILURE#'
730
731 # multiprocessing.connection Authentication Handshake Protocol Description
732 # (as documented for reference after reading the existing code)
733 # =============================================================================
734 #
735 # On Windows: native pipes with "overlapped IO" are used to send the bytes,
736 # instead of the length prefix SIZE scheme described below. (ie: the OS deals
737 # with message sizes for us)
738 #
739 # Protocol error behaviors:
740 #
741 # On POSIX, any failure to receive the length prefix into SIZE, for SIZE greater
742 # than the requested maxsize to receive, or receiving fewer than SIZE bytes
743 # results in the connection being closed and auth to fail.
744 #
745 # On Windows, receiving too few bytes is never a low level _recv_bytes read
746 # error, receiving too many will trigger an error only if receive maxsize
747 # value was larger than 128 OR the if the data arrived in smaller pieces.
748 #
749 # Serving side Client side
750 # ------------------------------ ---------------------------------------
751 # 0. Open a connection on the pipe.
752 # 1. Accept connection.
753 # 2. Random 20+ bytes -> MESSAGE
754 # Modern servers always send
755 # more than 20 bytes and include
756 # a {digest} prefix on it with
757 # their preferred HMAC digest.
758 # Legacy ones send ==20 bytes.
759 # 3. send 4 byte length (net order)
760 # prefix followed by:
761 # b'#CHALLENGE#' + MESSAGE
762 # 4. Receive 4 bytes, parse as network byte
763 # order integer. If it is -1, receive an
764 # additional 8 bytes, parse that as network
765 # byte order. The result is the length of
766 # the data that follows -> SIZE.
767 # 5. Receive min(SIZE, 256) bytes -> M1
768 # 6. Assert that M1 starts with:
769 # b'#CHALLENGE#'
770 # 7. Strip that prefix from M1 into -> M2
771 # 7.1. Parse M2: if it is exactly 20 bytes in
772 # length this indicates a legacy server
773 # supporting only HMAC-MD5. Otherwise the
774 # 7.2. preferred digest is looked up from an
775 # expected "{digest}" prefix on M2. No prefix
776 # or unsupported digest? <- AuthenticationError
777 # 7.3. Put divined algorithm name in -> D_NAME
778 # 8. Compute HMAC-D_NAME of AUTHKEY, M2 -> C_DIGEST
779 # 9. Send 4 byte length prefix (net order)
780 # followed by C_DIGEST bytes.
781 # 10. Receive 4 or 4+8 byte length
782 # prefix (#4 dance) -> SIZE.
783 # 11. Receive min(SIZE, 256) -> C_D.
784 # 11.1. Parse C_D: legacy servers
785 # accept it as is, "md5" -> D_NAME
786 # 11.2. modern servers check the length
787 # of C_D, IF it is 16 bytes?
788 # 11.2.1. "md5" -> D_NAME
789 # and skip to step 12.
790 # 11.3. longer? expect and parse a "{digest}"
791 # prefix into -> D_NAME.
792 # Strip the prefix and store remaining
793 # bytes in -> C_D.
794 # 11.4. Don't like D_NAME? <- AuthenticationError
795 # 12. Compute HMAC-D_NAME of AUTHKEY,
796 # MESSAGE into -> M_DIGEST.
797 # 13. Compare M_DIGEST == C_D:
798 # 14a: Match? Send length prefix &
799 # b'#WELCOME#'
800 # <- RETURN
801 # 14b: Mismatch? Send len prefix &
802 # b'#FAILURE#'
803 # <- CLOSE & AuthenticationError
804 # 15. Receive 4 or 4+8 byte length prefix (net
805 # order) again as in #4 into -> SIZE.
806 # 16. Receive min(SIZE, 256) bytes -> M3.
807 # 17. Compare M3 == b'#WELCOME#':
808 # 17a. Match? <- RETURN
809 # 17b. Mismatch? <- CLOSE & AuthenticationError
810 #
811 # If this RETURNed, the connection remains open: it has been authenticated.
812 #
813 # Length prefixes are used consistently. Even on the legacy protocol, this
814 # was good fortune and allowed us to evolve the protocol by using the length
815 # of the opening challenge or length of the returned digest as a signal as
816 # to which protocol the other end supports.
817
818 _ALLOWED_DIGESTS = frozenset(
819 {b'md5', b'sha256', b'sha384', b'sha3_256', b'sha3_384'})
820 _MAX_DIGEST_LEN = max(len(_) for _ in _ALLOWED_DIGESTS)
821
822 # Old hmac-md5 only server versions from Python <=3.11 sent a message of this
823 # length. It happens to not match the length of any supported digest so we can
824 # use a message of this length to indicate that we should work in backwards
825 # compatible md5-only mode without a {digest_name} prefix on our response.
826 _MD5ONLY_MESSAGE_LENGTH = 20
827 _MD5_DIGEST_LEN = 16
828 _LEGACY_LENGTHS = (_MD5ONLY_MESSAGE_LENGTH, _MD5_DIGEST_LEN)
829
830
831 def _get_digest_name_and_payload(message: bytes) -> (str, bytes):
832 """Returns a digest name and the payload for a response hash.
833
834 If a legacy protocol is detected based on the message length
835 or contents the digest name returned will be empty to indicate
836 legacy mode where MD5 and no digest prefix should be sent.
837 """
838 # modern message format: b"{digest}payload" longer than 20 bytes
839 # legacy message format: 16 or 20 byte b"payload"
840 if len(message) in _LEGACY_LENGTHS:
841 # Either this was a legacy server challenge, or we're processing
842 # a reply from a legacy client that sent an unprefixed 16-byte
843 # HMAC-MD5 response. All messages using the modern protocol will
844 # be longer than either of these lengths.
845 return '', message
846 if (message.startswith(b'{') and
847 (curly := message.find(b'}', 1, _MAX_DIGEST_LEN+2)) > 0):
848 digest = message[1:curly]
849 if digest in _ALLOWED_DIGESTS:
850 payload = message[curly+1:]
851 return digest.decode('ascii'), payload
852 raise AuthenticationError(
853 'unsupported message length, missing digest prefix, '
854 f'or unsupported digest: {message=}')
855
856
857 def _create_response(authkey, message):
858 """Create a MAC based on authkey and message
859
860 The MAC algorithm defaults to HMAC-MD5, unless MD5 is not available or
861 the message has a '{digest_name}' prefix. For legacy HMAC-MD5, the response
862 is the raw MAC, otherwise the response is prefixed with '{digest_name}',
863 e.g. b'{sha256}abcdefg...'
864
865 Note: The MAC protects the entire message including the digest_name prefix.
866 """
867 import hmac
868 digest_name = _get_digest_name_and_payload(message)[0]
869 # The MAC protects the entire message: digest header and payload.
870 if not digest_name:
871 # Legacy server without a {digest} prefix on message.
872 # Generate a legacy non-prefixed HMAC-MD5 reply.
873 try:
874 return hmac.new(authkey, message, 'md5').digest()
875 except ValueError:
876 # HMAC-MD5 is not available (FIPS mode?), fall back to
877 # HMAC-SHA2-256 modern protocol. The legacy server probably
878 # doesn't support it and will reject us anyways. :shrug:
879 digest_name = 'sha256'
880 # Modern protocol, indicate the digest used in the reply.
881 response = hmac.new(authkey, message, digest_name).digest()
882 return b'{%s}%s' % (digest_name.encode('ascii'), response)
883
884
885 def _verify_challenge(authkey, message, response):
886 """Verify MAC challenge
887
888 If our message did not include a digest_name prefix, the client is allowed
889 to select a stronger digest_name from _ALLOWED_DIGESTS.
890
891 In case our message is prefixed, a client cannot downgrade to a weaker
892 algorithm, because the MAC is calculated over the entire message
893 including the '{digest_name}' prefix.
894 """
895 import hmac
896 response_digest, response_mac = _get_digest_name_and_payload(response)
897 response_digest = response_digest or 'md5'
898 try:
899 expected = hmac.new(authkey, message, response_digest).digest()
900 except ValueError:
901 raise AuthenticationError(f'{response_digest=} unsupported')
902 if len(expected) != len(response_mac):
903 raise AuthenticationError(
904 f'expected {response_digest!r} of length {len(expected)} '
905 f'got {len(response_mac)}')
906 if not hmac.compare_digest(expected, response_mac):
907 raise AuthenticationError('digest received was wrong')
908
909
910 def deliver_challenge(connection, authkey: bytes, digest_name='sha256'):
911 if not isinstance(authkey, bytes):
912 raise ValueError(
913 "Authkey must be bytes, not {0!s}".format(type(authkey)))
914 assert MESSAGE_LENGTH > _MD5ONLY_MESSAGE_LENGTH, "protocol constraint"
915 message = os.urandom(MESSAGE_LENGTH)
916 message = b'{%s}%s' % (digest_name.encode('ascii'), message)
917 # Even when sending a challenge to a legacy client that does not support
918 # digest prefixes, they'll take the entire thing as a challenge and
919 # respond to it with a raw HMAC-MD5.
920 connection.send_bytes(_CHALLENGE + message)
921 response = connection.recv_bytes(256) # reject large message
922 try:
923 _verify_challenge(authkey, message, response)
924 except AuthenticationError:
925 connection.send_bytes(_FAILURE)
926 raise
927 else:
928 connection.send_bytes(_WELCOME)
929
930
931 def answer_challenge(connection, authkey: bytes):
932 if not isinstance(authkey, bytes):
933 raise ValueError(
934 "Authkey must be bytes, not {0!s}".format(type(authkey)))
935 message = connection.recv_bytes(256) # reject large message
936 if not message.startswith(_CHALLENGE):
937 raise AuthenticationError(
938 f'Protocol error, expected challenge: {message=}')
939 message = message[len(_CHALLENGE):]
940 if len(message) < _MD5ONLY_MESSAGE_LENGTH:
941 raise AuthenticationError('challenge too short: {len(message)} bytes')
942 digest = _create_response(authkey, message)
943 connection.send_bytes(digest)
944 response = connection.recv_bytes(256) # reject large message
945 if response != _WELCOME:
946 raise AuthenticationError('digest sent was rejected')
947
948 #
949 # Support for using xmlrpclib for serialization
950 #
951
952 class ESC[4;38;5;81mConnectionWrapper(ESC[4;38;5;149mobject):
953 def __init__(self, conn, dumps, loads):
954 self._conn = conn
955 self._dumps = dumps
956 self._loads = loads
957 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
958 obj = getattr(conn, attr)
959 setattr(self, attr, obj)
960 def send(self, obj):
961 s = self._dumps(obj)
962 self._conn.send_bytes(s)
963 def recv(self):
964 s = self._conn.recv_bytes()
965 return self._loads(s)
966
967 def _xml_dumps(obj):
968 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
969
970 def _xml_loads(s):
971 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
972 return obj
973
974 class ESC[4;38;5;81mXmlListener(ESC[4;38;5;149mListener):
975 def accept(self):
976 global xmlrpclib
977 import xmlrpc.client as xmlrpclib
978 obj = Listener.accept(self)
979 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
980
981 def XmlClient(*args, **kwds):
982 global xmlrpclib
983 import xmlrpc.client as xmlrpclib
984 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
985
986 #
987 # Wait
988 #
989
990 if sys.platform == 'win32':
991
992 def _exhaustive_wait(handles, timeout):
993 # Return ALL handles which are currently signalled. (Only
994 # returning the first signalled might create starvation issues.)
995 L = list(handles)
996 ready = []
997 while L:
998 res = _winapi.WaitForMultipleObjects(L, False, timeout)
999 if res == WAIT_TIMEOUT:
1000 break
1001 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
1002 res -= WAIT_OBJECT_0
1003 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
1004 res -= WAIT_ABANDONED_0
1005 else:
1006 raise RuntimeError('Should not get here')
1007 ready.append(L[res])
1008 L = L[res+1:]
1009 timeout = 0
1010 return ready
1011
1012 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
1013
1014 def wait(object_list, timeout=None):
1015 '''
1016 Wait till an object in object_list is ready/readable.
1017
1018 Returns list of those objects in object_list which are ready/readable.
1019 '''
1020 if timeout is None:
1021 timeout = INFINITE
1022 elif timeout < 0:
1023 timeout = 0
1024 else:
1025 timeout = int(timeout * 1000 + 0.5)
1026
1027 object_list = list(object_list)
1028 waithandle_to_obj = {}
1029 ov_list = []
1030 ready_objects = set()
1031 ready_handles = set()
1032
1033 try:
1034 for o in object_list:
1035 try:
1036 fileno = getattr(o, 'fileno')
1037 except AttributeError:
1038 waithandle_to_obj[o.__index__()] = o
1039 else:
1040 # start an overlapped read of length zero
1041 try:
1042 ov, err = _winapi.ReadFile(fileno(), 0, True)
1043 except OSError as e:
1044 ov, err = None, e.winerror
1045 if err not in _ready_errors:
1046 raise
1047 if err == _winapi.ERROR_IO_PENDING:
1048 ov_list.append(ov)
1049 waithandle_to_obj[ov.event] = o
1050 else:
1051 # If o.fileno() is an overlapped pipe handle and
1052 # err == 0 then there is a zero length message
1053 # in the pipe, but it HAS NOT been consumed...
1054 if ov and sys.getwindowsversion()[:2] >= (6, 2):
1055 # ... except on Windows 8 and later, where
1056 # the message HAS been consumed.
1057 try:
1058 _, err = ov.GetOverlappedResult(False)
1059 except OSError as e:
1060 err = e.winerror
1061 if not err and hasattr(o, '_got_empty_message'):
1062 o._got_empty_message = True
1063 ready_objects.add(o)
1064 timeout = 0
1065
1066 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
1067 finally:
1068 # request that overlapped reads stop
1069 for ov in ov_list:
1070 ov.cancel()
1071
1072 # wait for all overlapped reads to stop
1073 for ov in ov_list:
1074 try:
1075 _, err = ov.GetOverlappedResult(True)
1076 except OSError as e:
1077 err = e.winerror
1078 if err not in _ready_errors:
1079 raise
1080 if err != _winapi.ERROR_OPERATION_ABORTED:
1081 o = waithandle_to_obj[ov.event]
1082 ready_objects.add(o)
1083 if err == 0:
1084 # If o.fileno() is an overlapped pipe handle then
1085 # a zero length message HAS been consumed.
1086 if hasattr(o, '_got_empty_message'):
1087 o._got_empty_message = True
1088
1089 ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
1090 return [o for o in object_list if o in ready_objects]
1091
1092 else:
1093
1094 import selectors
1095
1096 # poll/select have the advantage of not requiring any extra file
1097 # descriptor, contrarily to epoll/kqueue (also, they require a single
1098 # syscall).
1099 if hasattr(selectors, 'PollSelector'):
1100 _WaitSelector = selectors.PollSelector
1101 else:
1102 _WaitSelector = selectors.SelectSelector
1103
1104 def wait(object_list, timeout=None):
1105 '''
1106 Wait till an object in object_list is ready/readable.
1107
1108 Returns list of those objects in object_list which are ready/readable.
1109 '''
1110 with _WaitSelector() as selector:
1111 for obj in object_list:
1112 selector.register(obj, selectors.EVENT_READ)
1113
1114 if timeout is not None:
1115 deadline = time.monotonic() + timeout
1116
1117 while True:
1118 ready = selector.select(timeout)
1119 if ready:
1120 return [key.fileobj for (key, events) in ready]
1121 else:
1122 if timeout is not None:
1123 timeout = deadline - time.monotonic()
1124 if timeout < 0:
1125 return ready
1126
1127 #
1128 # Make connection and socket objects shareable if possible
1129 #
1130
1131 if sys.platform == 'win32':
1132 def reduce_connection(conn):
1133 handle = conn.fileno()
1134 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
1135 from . import resource_sharer
1136 ds = resource_sharer.DupSocket(s)
1137 return rebuild_connection, (ds, conn.readable, conn.writable)
1138 def rebuild_connection(ds, readable, writable):
1139 sock = ds.detach()
1140 return Connection(sock.detach(), readable, writable)
1141 reduction.register(Connection, reduce_connection)
1142
1143 def reduce_pipe_connection(conn):
1144 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
1145 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
1146 dh = reduction.DupHandle(conn.fileno(), access)
1147 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
1148 def rebuild_pipe_connection(dh, readable, writable):
1149 handle = dh.detach()
1150 return PipeConnection(handle, readable, writable)
1151 reduction.register(PipeConnection, reduce_pipe_connection)
1152
1153 else:
1154 def reduce_connection(conn):
1155 df = reduction.DupFd(conn.fileno())
1156 return rebuild_connection, (df, conn.readable, conn.writable)
1157 def rebuild_connection(df, readable, writable):
1158 fd = df.detach()
1159 return Connection(fd, readable, writable)
1160 reduction.register(Connection, reduce_connection)