1 # Wrapper module for _ssl, providing some additional facilities
2 # implemented in Python. Written by Bill Janssen.
3
4 """This module provides some more Pythonic support for SSL.
5
6 Object types:
7
8 SSLSocket -- subtype of socket.socket which does SSL over the socket
9
10 Exceptions:
11
12 SSLError -- exception raised for I/O errors
13
14 Functions:
15
16 cert_time_to_seconds -- convert time string used for certificate
17 notBefore and notAfter functions to integer
18 seconds past the Epoch (the time values
19 returned from time.time())
20
21 get_server_certificate (addr, ssl_version, ca_certs, timeout) -- Retrieve the
22 certificate from the server at the specified
23 address and return it as a PEM-encoded string
24
25
26 Integer constants:
27
28 SSL_ERROR_ZERO_RETURN
29 SSL_ERROR_WANT_READ
30 SSL_ERROR_WANT_WRITE
31 SSL_ERROR_WANT_X509_LOOKUP
32 SSL_ERROR_SYSCALL
33 SSL_ERROR_SSL
34 SSL_ERROR_WANT_CONNECT
35
36 SSL_ERROR_EOF
37 SSL_ERROR_INVALID_ERROR_CODE
38
39 The following group define certificate requirements that one side is
40 allowing/requiring from the other side:
41
42 CERT_NONE - no certificates from the other side are required (or will
43 be looked at if provided)
44 CERT_OPTIONAL - certificates are not required, but if provided will be
45 validated, and if validation fails, the connection will
46 also fail
47 CERT_REQUIRED - certificates are required, and will be validated, and
48 if validation fails, the connection will also fail
49
50 The following constants identify various SSL protocol variants:
51
52 PROTOCOL_SSLv2
53 PROTOCOL_SSLv3
54 PROTOCOL_SSLv23
55 PROTOCOL_TLS
56 PROTOCOL_TLS_CLIENT
57 PROTOCOL_TLS_SERVER
58 PROTOCOL_TLSv1
59 PROTOCOL_TLSv1_1
60 PROTOCOL_TLSv1_2
61
62 The following constants identify various SSL alert message descriptions as per
63 http://www.iana.org/assignments/tls-parameters/tls-parameters.xml#tls-parameters-6
64
65 ALERT_DESCRIPTION_CLOSE_NOTIFY
66 ALERT_DESCRIPTION_UNEXPECTED_MESSAGE
67 ALERT_DESCRIPTION_BAD_RECORD_MAC
68 ALERT_DESCRIPTION_RECORD_OVERFLOW
69 ALERT_DESCRIPTION_DECOMPRESSION_FAILURE
70 ALERT_DESCRIPTION_HANDSHAKE_FAILURE
71 ALERT_DESCRIPTION_BAD_CERTIFICATE
72 ALERT_DESCRIPTION_UNSUPPORTED_CERTIFICATE
73 ALERT_DESCRIPTION_CERTIFICATE_REVOKED
74 ALERT_DESCRIPTION_CERTIFICATE_EXPIRED
75 ALERT_DESCRIPTION_CERTIFICATE_UNKNOWN
76 ALERT_DESCRIPTION_ILLEGAL_PARAMETER
77 ALERT_DESCRIPTION_UNKNOWN_CA
78 ALERT_DESCRIPTION_ACCESS_DENIED
79 ALERT_DESCRIPTION_DECODE_ERROR
80 ALERT_DESCRIPTION_DECRYPT_ERROR
81 ALERT_DESCRIPTION_PROTOCOL_VERSION
82 ALERT_DESCRIPTION_INSUFFICIENT_SECURITY
83 ALERT_DESCRIPTION_INTERNAL_ERROR
84 ALERT_DESCRIPTION_USER_CANCELLED
85 ALERT_DESCRIPTION_NO_RENEGOTIATION
86 ALERT_DESCRIPTION_UNSUPPORTED_EXTENSION
87 ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
88 ALERT_DESCRIPTION_UNRECOGNIZED_NAME
89 ALERT_DESCRIPTION_BAD_CERTIFICATE_STATUS_RESPONSE
90 ALERT_DESCRIPTION_BAD_CERTIFICATE_HASH_VALUE
91 ALERT_DESCRIPTION_UNKNOWN_PSK_IDENTITY
92 """
93
94 import sys
95 import os
96 from collections import namedtuple
97 from enum import Enum as _Enum, IntEnum as _IntEnum, IntFlag as _IntFlag
98 from enum import _simple_enum
99
100 import _ssl # if we can't import it, let the error propagate
101
102 from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION
103 from _ssl import _SSLContext, MemoryBIO, SSLSession
104 from _ssl import (
105 SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError,
106 SSLSyscallError, SSLEOFError, SSLCertVerificationError
107 )
108 from _ssl import txt2obj as _txt2obj, nid2obj as _nid2obj
109 from _ssl import RAND_status, RAND_add, RAND_bytes
110 try:
111 from _ssl import RAND_egd
112 except ImportError:
113 # LibreSSL does not provide RAND_egd
114 pass
115
116
117 from _ssl import (
118 HAS_SNI, HAS_ECDH, HAS_NPN, HAS_ALPN, HAS_SSLv2, HAS_SSLv3, HAS_TLSv1,
119 HAS_TLSv1_1, HAS_TLSv1_2, HAS_TLSv1_3
120 )
121 from _ssl import _DEFAULT_CIPHERS, _OPENSSL_API_VERSION
122
123 _IntEnum._convert_(
124 '_SSLMethod', __name__,
125 lambda name: name.startswith('PROTOCOL_') and name != 'PROTOCOL_SSLv23',
126 source=_ssl)
127
128 _IntFlag._convert_(
129 'Options', __name__,
130 lambda name: name.startswith('OP_'),
131 source=_ssl)
132
133 _IntEnum._convert_(
134 'AlertDescription', __name__,
135 lambda name: name.startswith('ALERT_DESCRIPTION_'),
136 source=_ssl)
137
138 _IntEnum._convert_(
139 'SSLErrorNumber', __name__,
140 lambda name: name.startswith('SSL_ERROR_'),
141 source=_ssl)
142
143 _IntFlag._convert_(
144 'VerifyFlags', __name__,
145 lambda name: name.startswith('VERIFY_'),
146 source=_ssl)
147
148 _IntEnum._convert_(
149 'VerifyMode', __name__,
150 lambda name: name.startswith('CERT_'),
151 source=_ssl)
152
153 PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_TLS
154 _PROTOCOL_NAMES = {value: name for name, value in _SSLMethod.__members__.items()}
155
156 _SSLv2_IF_EXISTS = getattr(_SSLMethod, 'PROTOCOL_SSLv2', None)
157
158
159 @_simple_enum(_IntEnum)
160 class ESC[4;38;5;81mTLSVersion:
161 MINIMUM_SUPPORTED = _ssl.PROTO_MINIMUM_SUPPORTED
162 SSLv3 = _ssl.PROTO_SSLv3
163 TLSv1 = _ssl.PROTO_TLSv1
164 TLSv1_1 = _ssl.PROTO_TLSv1_1
165 TLSv1_2 = _ssl.PROTO_TLSv1_2
166 TLSv1_3 = _ssl.PROTO_TLSv1_3
167 MAXIMUM_SUPPORTED = _ssl.PROTO_MAXIMUM_SUPPORTED
168
169
170 @_simple_enum(_IntEnum)
171 class ESC[4;38;5;81m_TLSContentType:
172 """Content types (record layer)
173
174 See RFC 8446, section B.1
175 """
176 CHANGE_CIPHER_SPEC = 20
177 ALERT = 21
178 HANDSHAKE = 22
179 APPLICATION_DATA = 23
180 # pseudo content types
181 HEADER = 0x100
182 INNER_CONTENT_TYPE = 0x101
183
184
185 @_simple_enum(_IntEnum)
186 class ESC[4;38;5;81m_TLSAlertType:
187 """Alert types for TLSContentType.ALERT messages
188
189 See RFC 8466, section B.2
190 """
191 CLOSE_NOTIFY = 0
192 UNEXPECTED_MESSAGE = 10
193 BAD_RECORD_MAC = 20
194 DECRYPTION_FAILED = 21
195 RECORD_OVERFLOW = 22
196 DECOMPRESSION_FAILURE = 30
197 HANDSHAKE_FAILURE = 40
198 NO_CERTIFICATE = 41
199 BAD_CERTIFICATE = 42
200 UNSUPPORTED_CERTIFICATE = 43
201 CERTIFICATE_REVOKED = 44
202 CERTIFICATE_EXPIRED = 45
203 CERTIFICATE_UNKNOWN = 46
204 ILLEGAL_PARAMETER = 47
205 UNKNOWN_CA = 48
206 ACCESS_DENIED = 49
207 DECODE_ERROR = 50
208 DECRYPT_ERROR = 51
209 EXPORT_RESTRICTION = 60
210 PROTOCOL_VERSION = 70
211 INSUFFICIENT_SECURITY = 71
212 INTERNAL_ERROR = 80
213 INAPPROPRIATE_FALLBACK = 86
214 USER_CANCELED = 90
215 NO_RENEGOTIATION = 100
216 MISSING_EXTENSION = 109
217 UNSUPPORTED_EXTENSION = 110
218 CERTIFICATE_UNOBTAINABLE = 111
219 UNRECOGNIZED_NAME = 112
220 BAD_CERTIFICATE_STATUS_RESPONSE = 113
221 BAD_CERTIFICATE_HASH_VALUE = 114
222 UNKNOWN_PSK_IDENTITY = 115
223 CERTIFICATE_REQUIRED = 116
224 NO_APPLICATION_PROTOCOL = 120
225
226
227 @_simple_enum(_IntEnum)
228 class ESC[4;38;5;81m_TLSMessageType:
229 """Message types (handshake protocol)
230
231 See RFC 8446, section B.3
232 """
233 HELLO_REQUEST = 0
234 CLIENT_HELLO = 1
235 SERVER_HELLO = 2
236 HELLO_VERIFY_REQUEST = 3
237 NEWSESSION_TICKET = 4
238 END_OF_EARLY_DATA = 5
239 HELLO_RETRY_REQUEST = 6
240 ENCRYPTED_EXTENSIONS = 8
241 CERTIFICATE = 11
242 SERVER_KEY_EXCHANGE = 12
243 CERTIFICATE_REQUEST = 13
244 SERVER_DONE = 14
245 CERTIFICATE_VERIFY = 15
246 CLIENT_KEY_EXCHANGE = 16
247 FINISHED = 20
248 CERTIFICATE_URL = 21
249 CERTIFICATE_STATUS = 22
250 SUPPLEMENTAL_DATA = 23
251 KEY_UPDATE = 24
252 NEXT_PROTO = 67
253 MESSAGE_HASH = 254
254 CHANGE_CIPHER_SPEC = 0x0101
255
256
257 if sys.platform == "win32":
258 from _ssl import enum_certificates, enum_crls
259
260 from socket import socket, SOCK_STREAM, create_connection
261 from socket import SOL_SOCKET, SO_TYPE, _GLOBAL_DEFAULT_TIMEOUT
262 import socket as _socket
263 import base64 # for DER-to-PEM translation
264 import errno
265 import warnings
266
267
268 socket_error = OSError # keep that public name in module namespace
269
270 CHANNEL_BINDING_TYPES = ['tls-unique']
271
272 HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT')
273
274
275 _RESTRICTED_SERVER_CIPHERS = _DEFAULT_CIPHERS
276
277 CertificateError = SSLCertVerificationError
278
279
280 def _dnsname_match(dn, hostname):
281 """Matching according to RFC 6125, section 6.4.3
282
283 - Hostnames are compared lower-case.
284 - For IDNA, both dn and hostname must be encoded as IDN A-label (ACE).
285 - Partial wildcards like 'www*.example.org', multiple wildcards, sole
286 wildcard or wildcards in labels other then the left-most label are not
287 supported and a CertificateError is raised.
288 - A wildcard must match at least one character.
289 """
290 if not dn:
291 return False
292
293 wildcards = dn.count('*')
294 # speed up common case w/o wildcards
295 if not wildcards:
296 return dn.lower() == hostname.lower()
297
298 if wildcards > 1:
299 raise CertificateError(
300 "too many wildcards in certificate DNS name: {!r}.".format(dn))
301
302 dn_leftmost, sep, dn_remainder = dn.partition('.')
303
304 if '*' in dn_remainder:
305 # Only match wildcard in leftmost segment.
306 raise CertificateError(
307 "wildcard can only be present in the leftmost label: "
308 "{!r}.".format(dn))
309
310 if not sep:
311 # no right side
312 raise CertificateError(
313 "sole wildcard without additional labels are not support: "
314 "{!r}.".format(dn))
315
316 if dn_leftmost != '*':
317 # no partial wildcard matching
318 raise CertificateError(
319 "partial wildcards in leftmost label are not supported: "
320 "{!r}.".format(dn))
321
322 hostname_leftmost, sep, hostname_remainder = hostname.partition('.')
323 if not hostname_leftmost or not sep:
324 # wildcard must match at least one char
325 return False
326 return dn_remainder.lower() == hostname_remainder.lower()
327
328
329 def _inet_paton(ipname):
330 """Try to convert an IP address to packed binary form
331
332 Supports IPv4 addresses on all platforms and IPv6 on platforms with IPv6
333 support.
334 """
335 # inet_aton() also accepts strings like '1', '127.1', some also trailing
336 # data like '127.0.0.1 whatever'.
337 try:
338 addr = _socket.inet_aton(ipname)
339 except OSError:
340 # not an IPv4 address
341 pass
342 else:
343 if _socket.inet_ntoa(addr) == ipname:
344 # only accept injective ipnames
345 return addr
346 else:
347 # refuse for short IPv4 notation and additional trailing data
348 raise ValueError(
349 "{!r} is not a quad-dotted IPv4 address.".format(ipname)
350 )
351
352 try:
353 return _socket.inet_pton(_socket.AF_INET6, ipname)
354 except OSError:
355 raise ValueError("{!r} is neither an IPv4 nor an IP6 "
356 "address.".format(ipname))
357 except AttributeError:
358 # AF_INET6 not available
359 pass
360
361 raise ValueError("{!r} is not an IPv4 address.".format(ipname))
362
363
364 def _ipaddress_match(cert_ipaddress, host_ip):
365 """Exact matching of IP addresses.
366
367 RFC 6125 explicitly doesn't define an algorithm for this
368 (section 1.7.2 - "Out of Scope").
369 """
370 # OpenSSL may add a trailing newline to a subjectAltName's IP address,
371 # commonly with IPv6 addresses. Strip off trailing \n.
372 ip = _inet_paton(cert_ipaddress.rstrip())
373 return ip == host_ip
374
375
376 DefaultVerifyPaths = namedtuple("DefaultVerifyPaths",
377 "cafile capath openssl_cafile_env openssl_cafile openssl_capath_env "
378 "openssl_capath")
379
380 def get_default_verify_paths():
381 """Return paths to default cafile and capath.
382 """
383 parts = _ssl.get_default_verify_paths()
384
385 # environment vars shadow paths
386 cafile = os.environ.get(parts[0], parts[1])
387 capath = os.environ.get(parts[2], parts[3])
388
389 return DefaultVerifyPaths(cafile if os.path.isfile(cafile) else None,
390 capath if os.path.isdir(capath) else None,
391 *parts)
392
393
394 class ESC[4;38;5;81m_ASN1Object(ESC[4;38;5;149mnamedtuple("_ASN1Object", "nid shortname longname oid")):
395 """ASN.1 object identifier lookup
396 """
397 __slots__ = ()
398
399 def __new__(cls, oid):
400 return super().__new__(cls, *_txt2obj(oid, name=False))
401
402 @classmethod
403 def fromnid(cls, nid):
404 """Create _ASN1Object from OpenSSL numeric ID
405 """
406 return super().__new__(cls, *_nid2obj(nid))
407
408 @classmethod
409 def fromname(cls, name):
410 """Create _ASN1Object from short name, long name or OID
411 """
412 return super().__new__(cls, *_txt2obj(name, name=True))
413
414
415 class ESC[4;38;5;81mPurpose(ESC[4;38;5;149m_ASN1Object, ESC[4;38;5;149m_Enum):
416 """SSLContext purpose flags with X509v3 Extended Key Usage objects
417 """
418 SERVER_AUTH = '1.3.6.1.5.5.7.3.1'
419 CLIENT_AUTH = '1.3.6.1.5.5.7.3.2'
420
421
422 class ESC[4;38;5;81mSSLContext(ESC[4;38;5;149m_SSLContext):
423 """An SSLContext holds various SSL-related configuration options and
424 data, such as certificates and possibly a private key."""
425 _windows_cert_stores = ("CA", "ROOT")
426
427 sslsocket_class = None # SSLSocket is assigned later.
428 sslobject_class = None # SSLObject is assigned later.
429
430 def __new__(cls, protocol=None, *args, **kwargs):
431 if protocol is None:
432 warnings.warn(
433 "ssl.SSLContext() without protocol argument is deprecated.",
434 category=DeprecationWarning,
435 stacklevel=2
436 )
437 protocol = PROTOCOL_TLS
438 self = _SSLContext.__new__(cls, protocol)
439 return self
440
441 def _encode_hostname(self, hostname):
442 if hostname is None:
443 return None
444 elif isinstance(hostname, str):
445 return hostname.encode('idna').decode('ascii')
446 else:
447 return hostname.decode('ascii')
448
449 def wrap_socket(self, sock, server_side=False,
450 do_handshake_on_connect=True,
451 suppress_ragged_eofs=True,
452 server_hostname=None, session=None):
453 # SSLSocket class handles server_hostname encoding before it calls
454 # ctx._wrap_socket()
455 return self.sslsocket_class._create(
456 sock=sock,
457 server_side=server_side,
458 do_handshake_on_connect=do_handshake_on_connect,
459 suppress_ragged_eofs=suppress_ragged_eofs,
460 server_hostname=server_hostname,
461 context=self,
462 session=session
463 )
464
465 def wrap_bio(self, incoming, outgoing, server_side=False,
466 server_hostname=None, session=None):
467 # Need to encode server_hostname here because _wrap_bio() can only
468 # handle ASCII str.
469 return self.sslobject_class._create(
470 incoming, outgoing, server_side=server_side,
471 server_hostname=self._encode_hostname(server_hostname),
472 session=session, context=self,
473 )
474
475 def set_npn_protocols(self, npn_protocols):
476 warnings.warn(
477 "ssl NPN is deprecated, use ALPN instead",
478 DeprecationWarning,
479 stacklevel=2
480 )
481 protos = bytearray()
482 for protocol in npn_protocols:
483 b = bytes(protocol, 'ascii')
484 if len(b) == 0 or len(b) > 255:
485 raise SSLError('NPN protocols must be 1 to 255 in length')
486 protos.append(len(b))
487 protos.extend(b)
488
489 self._set_npn_protocols(protos)
490
491 def set_servername_callback(self, server_name_callback):
492 if server_name_callback is None:
493 self.sni_callback = None
494 else:
495 if not callable(server_name_callback):
496 raise TypeError("not a callable object")
497
498 def shim_cb(sslobj, servername, sslctx):
499 servername = self._encode_hostname(servername)
500 return server_name_callback(sslobj, servername, sslctx)
501
502 self.sni_callback = shim_cb
503
504 def set_alpn_protocols(self, alpn_protocols):
505 protos = bytearray()
506 for protocol in alpn_protocols:
507 b = bytes(protocol, 'ascii')
508 if len(b) == 0 or len(b) > 255:
509 raise SSLError('ALPN protocols must be 1 to 255 in length')
510 protos.append(len(b))
511 protos.extend(b)
512
513 self._set_alpn_protocols(protos)
514
515 def _load_windows_store_certs(self, storename, purpose):
516 certs = bytearray()
517 try:
518 for cert, encoding, trust in enum_certificates(storename):
519 # CA certs are never PKCS#7 encoded
520 if encoding == "x509_asn":
521 if trust is True or purpose.oid in trust:
522 certs.extend(cert)
523 except PermissionError:
524 warnings.warn("unable to enumerate Windows certificate store")
525 if certs:
526 self.load_verify_locations(cadata=certs)
527 return certs
528
529 def load_default_certs(self, purpose=Purpose.SERVER_AUTH):
530 if not isinstance(purpose, _ASN1Object):
531 raise TypeError(purpose)
532 if sys.platform == "win32":
533 for storename in self._windows_cert_stores:
534 self._load_windows_store_certs(storename, purpose)
535 self.set_default_verify_paths()
536
537 if hasattr(_SSLContext, 'minimum_version'):
538 @property
539 def minimum_version(self):
540 return TLSVersion(super().minimum_version)
541
542 @minimum_version.setter
543 def minimum_version(self, value):
544 if value == TLSVersion.SSLv3:
545 self.options &= ~Options.OP_NO_SSLv3
546 super(SSLContext, SSLContext).minimum_version.__set__(self, value)
547
548 @property
549 def maximum_version(self):
550 return TLSVersion(super().maximum_version)
551
552 @maximum_version.setter
553 def maximum_version(self, value):
554 super(SSLContext, SSLContext).maximum_version.__set__(self, value)
555
556 @property
557 def options(self):
558 return Options(super().options)
559
560 @options.setter
561 def options(self, value):
562 super(SSLContext, SSLContext).options.__set__(self, value)
563
564 if hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT'):
565 @property
566 def hostname_checks_common_name(self):
567 ncs = self._host_flags & _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
568 return ncs != _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
569
570 @hostname_checks_common_name.setter
571 def hostname_checks_common_name(self, value):
572 if value:
573 self._host_flags &= ~_ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
574 else:
575 self._host_flags |= _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT
576 else:
577 @property
578 def hostname_checks_common_name(self):
579 return True
580
581 @property
582 def _msg_callback(self):
583 """TLS message callback
584
585 The message callback provides a debugging hook to analyze TLS
586 connections. The callback is called for any TLS protocol message
587 (header, handshake, alert, and more), but not for application data.
588 Due to technical limitations, the callback can't be used to filter
589 traffic or to abort a connection. Any exception raised in the
590 callback is delayed until the handshake, read, or write operation
591 has been performed.
592
593 def msg_cb(conn, direction, version, content_type, msg_type, data):
594 pass
595
596 conn
597 :class:`SSLSocket` or :class:`SSLObject` instance
598 direction
599 ``read`` or ``write``
600 version
601 :class:`TLSVersion` enum member or int for unknown version. For a
602 frame header, it's the header version.
603 content_type
604 :class:`_TLSContentType` enum member or int for unsupported
605 content type.
606 msg_type
607 Either a :class:`_TLSContentType` enum number for a header
608 message, a :class:`_TLSAlertType` enum member for an alert
609 message, a :class:`_TLSMessageType` enum member for other
610 messages, or int for unsupported message types.
611 data
612 Raw, decrypted message content as bytes
613 """
614 inner = super()._msg_callback
615 if inner is not None:
616 return inner.user_function
617 else:
618 return None
619
620 @_msg_callback.setter
621 def _msg_callback(self, callback):
622 if callback is None:
623 super(SSLContext, SSLContext)._msg_callback.__set__(self, None)
624 return
625
626 if not hasattr(callback, '__call__'):
627 raise TypeError(f"{callback} is not callable.")
628
629 def inner(conn, direction, version, content_type, msg_type, data):
630 try:
631 version = TLSVersion(version)
632 except ValueError:
633 pass
634
635 try:
636 content_type = _TLSContentType(content_type)
637 except ValueError:
638 pass
639
640 if content_type == _TLSContentType.HEADER:
641 msg_enum = _TLSContentType
642 elif content_type == _TLSContentType.ALERT:
643 msg_enum = _TLSAlertType
644 else:
645 msg_enum = _TLSMessageType
646 try:
647 msg_type = msg_enum(msg_type)
648 except ValueError:
649 pass
650
651 return callback(conn, direction, version,
652 content_type, msg_type, data)
653
654 inner.user_function = callback
655
656 super(SSLContext, SSLContext)._msg_callback.__set__(self, inner)
657
658 @property
659 def protocol(self):
660 return _SSLMethod(super().protocol)
661
662 @property
663 def verify_flags(self):
664 return VerifyFlags(super().verify_flags)
665
666 @verify_flags.setter
667 def verify_flags(self, value):
668 super(SSLContext, SSLContext).verify_flags.__set__(self, value)
669
670 @property
671 def verify_mode(self):
672 value = super().verify_mode
673 try:
674 return VerifyMode(value)
675 except ValueError:
676 return value
677
678 @verify_mode.setter
679 def verify_mode(self, value):
680 super(SSLContext, SSLContext).verify_mode.__set__(self, value)
681
682
683 def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
684 capath=None, cadata=None):
685 """Create a SSLContext object with default settings.
686
687 NOTE: The protocol and settings may change anytime without prior
688 deprecation. The values represent a fair balance between maximum
689 compatibility and security.
690 """
691 if not isinstance(purpose, _ASN1Object):
692 raise TypeError(purpose)
693
694 # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
695 # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
696 # by default.
697 if purpose == Purpose.SERVER_AUTH:
698 # verify certs and host name in client mode
699 context = SSLContext(PROTOCOL_TLS_CLIENT)
700 context.verify_mode = CERT_REQUIRED
701 context.check_hostname = True
702 elif purpose == Purpose.CLIENT_AUTH:
703 context = SSLContext(PROTOCOL_TLS_SERVER)
704 else:
705 raise ValueError(purpose)
706
707 if cafile or capath or cadata:
708 context.load_verify_locations(cafile, capath, cadata)
709 elif context.verify_mode != CERT_NONE:
710 # no explicit cafile, capath or cadata but the verify mode is
711 # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system
712 # root CA certificates for the given purpose. This may fail silently.
713 context.load_default_certs(purpose)
714 # OpenSSL 1.1.1 keylog file
715 if hasattr(context, 'keylog_filename'):
716 keylogfile = os.environ.get('SSLKEYLOGFILE')
717 if keylogfile and not sys.flags.ignore_environment:
718 context.keylog_filename = keylogfile
719 return context
720
721 def _create_unverified_context(protocol=None, *, cert_reqs=CERT_NONE,
722 check_hostname=False, purpose=Purpose.SERVER_AUTH,
723 certfile=None, keyfile=None,
724 cafile=None, capath=None, cadata=None):
725 """Create a SSLContext object for Python stdlib modules
726
727 All Python stdlib modules shall use this function to create SSLContext
728 objects in order to keep common settings in one place. The configuration
729 is less restrict than create_default_context()'s to increase backward
730 compatibility.
731 """
732 if not isinstance(purpose, _ASN1Object):
733 raise TypeError(purpose)
734
735 # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
736 # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
737 # by default.
738 if purpose == Purpose.SERVER_AUTH:
739 # verify certs and host name in client mode
740 if protocol is None:
741 protocol = PROTOCOL_TLS_CLIENT
742 elif purpose == Purpose.CLIENT_AUTH:
743 if protocol is None:
744 protocol = PROTOCOL_TLS_SERVER
745 else:
746 raise ValueError(purpose)
747
748 context = SSLContext(protocol)
749 context.check_hostname = check_hostname
750 if cert_reqs is not None:
751 context.verify_mode = cert_reqs
752 if check_hostname:
753 context.check_hostname = True
754
755 if keyfile and not certfile:
756 raise ValueError("certfile must be specified")
757 if certfile or keyfile:
758 context.load_cert_chain(certfile, keyfile)
759
760 # load CA root certs
761 if cafile or capath or cadata:
762 context.load_verify_locations(cafile, capath, cadata)
763 elif context.verify_mode != CERT_NONE:
764 # no explicit cafile, capath or cadata but the verify mode is
765 # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system
766 # root CA certificates for the given purpose. This may fail silently.
767 context.load_default_certs(purpose)
768 # OpenSSL 1.1.1 keylog file
769 if hasattr(context, 'keylog_filename'):
770 keylogfile = os.environ.get('SSLKEYLOGFILE')
771 if keylogfile and not sys.flags.ignore_environment:
772 context.keylog_filename = keylogfile
773 return context
774
775 # Used by http.client if no context is explicitly passed.
776 _create_default_https_context = create_default_context
777
778
779 # Backwards compatibility alias, even though it's not a public name.
780 _create_stdlib_context = _create_unverified_context
781
782
783 class ESC[4;38;5;81mSSLObject:
784 """This class implements an interface on top of a low-level SSL object as
785 implemented by OpenSSL. This object captures the state of an SSL connection
786 but does not provide any network IO itself. IO needs to be performed
787 through separate "BIO" objects which are OpenSSL's IO abstraction layer.
788
789 This class does not have a public constructor. Instances are returned by
790 ``SSLContext.wrap_bio``. This class is typically used by framework authors
791 that want to implement asynchronous IO for SSL through memory buffers.
792
793 When compared to ``SSLSocket``, this object lacks the following features:
794
795 * Any form of network IO, including methods such as ``recv`` and ``send``.
796 * The ``do_handshake_on_connect`` and ``suppress_ragged_eofs`` machinery.
797 """
798 def __init__(self, *args, **kwargs):
799 raise TypeError(
800 f"{self.__class__.__name__} does not have a public "
801 f"constructor. Instances are returned by SSLContext.wrap_bio()."
802 )
803
804 @classmethod
805 def _create(cls, incoming, outgoing, server_side=False,
806 server_hostname=None, session=None, context=None):
807 self = cls.__new__(cls)
808 sslobj = context._wrap_bio(
809 incoming, outgoing, server_side=server_side,
810 server_hostname=server_hostname,
811 owner=self, session=session
812 )
813 self._sslobj = sslobj
814 return self
815
816 @property
817 def context(self):
818 """The SSLContext that is currently in use."""
819 return self._sslobj.context
820
821 @context.setter
822 def context(self, ctx):
823 self._sslobj.context = ctx
824
825 @property
826 def session(self):
827 """The SSLSession for client socket."""
828 return self._sslobj.session
829
830 @session.setter
831 def session(self, session):
832 self._sslobj.session = session
833
834 @property
835 def session_reused(self):
836 """Was the client session reused during handshake"""
837 return self._sslobj.session_reused
838
839 @property
840 def server_side(self):
841 """Whether this is a server-side socket."""
842 return self._sslobj.server_side
843
844 @property
845 def server_hostname(self):
846 """The currently set server hostname (for SNI), or ``None`` if no
847 server hostname is set."""
848 return self._sslobj.server_hostname
849
850 def read(self, len=1024, buffer=None):
851 """Read up to 'len' bytes from the SSL object and return them.
852
853 If 'buffer' is provided, read into this buffer and return the number of
854 bytes read.
855 """
856 if buffer is not None:
857 v = self._sslobj.read(len, buffer)
858 else:
859 v = self._sslobj.read(len)
860 return v
861
862 def write(self, data):
863 """Write 'data' to the SSL object and return the number of bytes
864 written.
865
866 The 'data' argument must support the buffer interface.
867 """
868 return self._sslobj.write(data)
869
870 def getpeercert(self, binary_form=False):
871 """Returns a formatted version of the data in the certificate provided
872 by the other end of the SSL channel.
873
874 Return None if no certificate was provided, {} if a certificate was
875 provided, but not validated.
876 """
877 return self._sslobj.getpeercert(binary_form)
878
879 def selected_npn_protocol(self):
880 """Return the currently selected NPN protocol as a string, or ``None``
881 if a next protocol was not negotiated or if NPN is not supported by one
882 of the peers."""
883 warnings.warn(
884 "ssl NPN is deprecated, use ALPN instead",
885 DeprecationWarning,
886 stacklevel=2
887 )
888
889 def selected_alpn_protocol(self):
890 """Return the currently selected ALPN protocol as a string, or ``None``
891 if a next protocol was not negotiated or if ALPN is not supported by one
892 of the peers."""
893 return self._sslobj.selected_alpn_protocol()
894
895 def cipher(self):
896 """Return the currently selected cipher as a 3-tuple ``(name,
897 ssl_version, secret_bits)``."""
898 return self._sslobj.cipher()
899
900 def shared_ciphers(self):
901 """Return a list of ciphers shared by the client during the handshake or
902 None if this is not a valid server connection.
903 """
904 return self._sslobj.shared_ciphers()
905
906 def compression(self):
907 """Return the current compression algorithm in use, or ``None`` if
908 compression was not negotiated or not supported by one of the peers."""
909 return self._sslobj.compression()
910
911 def pending(self):
912 """Return the number of bytes that can be read immediately."""
913 return self._sslobj.pending()
914
915 def do_handshake(self):
916 """Start the SSL/TLS handshake."""
917 self._sslobj.do_handshake()
918
919 def unwrap(self):
920 """Start the SSL shutdown handshake."""
921 return self._sslobj.shutdown()
922
923 def get_channel_binding(self, cb_type="tls-unique"):
924 """Get channel binding data for current connection. Raise ValueError
925 if the requested `cb_type` is not supported. Return bytes of the data
926 or None if the data is not available (e.g. before the handshake)."""
927 return self._sslobj.get_channel_binding(cb_type)
928
929 def version(self):
930 """Return a string identifying the protocol version used by the
931 current SSL channel. """
932 return self._sslobj.version()
933
934 def verify_client_post_handshake(self):
935 return self._sslobj.verify_client_post_handshake()
936
937
938 def _sslcopydoc(func):
939 """Copy docstring from SSLObject to SSLSocket"""
940 func.__doc__ = getattr(SSLObject, func.__name__).__doc__
941 return func
942
943
944 class ESC[4;38;5;81mSSLSocket(ESC[4;38;5;149msocket):
945 """This class implements a subtype of socket.socket that wraps
946 the underlying OS socket in an SSL context when necessary, and
947 provides read and write methods over that channel. """
948
949 def __init__(self, *args, **kwargs):
950 raise TypeError(
951 f"{self.__class__.__name__} does not have a public "
952 f"constructor. Instances are returned by "
953 f"SSLContext.wrap_socket()."
954 )
955
956 @classmethod
957 def _create(cls, sock, server_side=False, do_handshake_on_connect=True,
958 suppress_ragged_eofs=True, server_hostname=None,
959 context=None, session=None):
960 if sock.getsockopt(SOL_SOCKET, SO_TYPE) != SOCK_STREAM:
961 raise NotImplementedError("only stream sockets are supported")
962 if server_side:
963 if server_hostname:
964 raise ValueError("server_hostname can only be specified "
965 "in client mode")
966 if session is not None:
967 raise ValueError("session can only be specified in "
968 "client mode")
969 if context.check_hostname and not server_hostname:
970 raise ValueError("check_hostname requires server_hostname")
971
972 kwargs = dict(
973 family=sock.family, type=sock.type, proto=sock.proto,
974 fileno=sock.fileno()
975 )
976 self = cls.__new__(cls, **kwargs)
977 super(SSLSocket, self).__init__(**kwargs)
978 sock_timeout = sock.gettimeout()
979 sock.detach()
980
981 self._context = context
982 self._session = session
983 self._closed = False
984 self._sslobj = None
985 self.server_side = server_side
986 self.server_hostname = context._encode_hostname(server_hostname)
987 self.do_handshake_on_connect = do_handshake_on_connect
988 self.suppress_ragged_eofs = suppress_ragged_eofs
989
990 # See if we are connected
991 try:
992 self.getpeername()
993 except OSError as e:
994 if e.errno != errno.ENOTCONN:
995 raise
996 connected = False
997 blocking = self.getblocking()
998 self.setblocking(False)
999 try:
1000 # We are not connected so this is not supposed to block, but
1001 # testing revealed otherwise on macOS and Windows so we do
1002 # the non-blocking dance regardless. Our raise when any data
1003 # is found means consuming the data is harmless.
1004 notconn_pre_handshake_data = self.recv(1)
1005 except OSError as e:
1006 # EINVAL occurs for recv(1) on non-connected on unix sockets.
1007 if e.errno not in (errno.ENOTCONN, errno.EINVAL):
1008 raise
1009 notconn_pre_handshake_data = b''
1010 self.setblocking(blocking)
1011 if notconn_pre_handshake_data:
1012 # This prevents pending data sent to the socket before it was
1013 # closed from escaping to the caller who could otherwise
1014 # presume it came through a successful TLS connection.
1015 reason = "Closed before TLS handshake with data in recv buffer."
1016 notconn_pre_handshake_data_error = SSLError(e.errno, reason)
1017 # Add the SSLError attributes that _ssl.c always adds.
1018 notconn_pre_handshake_data_error.reason = reason
1019 notconn_pre_handshake_data_error.library = None
1020 try:
1021 self.close()
1022 except OSError:
1023 pass
1024 try:
1025 raise notconn_pre_handshake_data_error
1026 finally:
1027 # Explicitly break the reference cycle.
1028 notconn_pre_handshake_data_error = None
1029 else:
1030 connected = True
1031
1032 self.settimeout(sock_timeout) # Must come after setblocking() calls.
1033 self._connected = connected
1034 if connected:
1035 # create the SSL object
1036 try:
1037 self._sslobj = self._context._wrap_socket(
1038 self, server_side, self.server_hostname,
1039 owner=self, session=self._session,
1040 )
1041 if do_handshake_on_connect:
1042 timeout = self.gettimeout()
1043 if timeout == 0.0:
1044 # non-blocking
1045 raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets")
1046 self.do_handshake()
1047 except (OSError, ValueError):
1048 self.close()
1049 raise
1050 return self
1051
1052 @property
1053 @_sslcopydoc
1054 def context(self):
1055 return self._context
1056
1057 @context.setter
1058 def context(self, ctx):
1059 self._context = ctx
1060 self._sslobj.context = ctx
1061
1062 @property
1063 @_sslcopydoc
1064 def session(self):
1065 if self._sslobj is not None:
1066 return self._sslobj.session
1067
1068 @session.setter
1069 def session(self, session):
1070 self._session = session
1071 if self._sslobj is not None:
1072 self._sslobj.session = session
1073
1074 @property
1075 @_sslcopydoc
1076 def session_reused(self):
1077 if self._sslobj is not None:
1078 return self._sslobj.session_reused
1079
1080 def dup(self):
1081 raise NotImplementedError("Can't dup() %s instances" %
1082 self.__class__.__name__)
1083
1084 def _checkClosed(self, msg=None):
1085 # raise an exception here if you wish to check for spurious closes
1086 pass
1087
1088 def _check_connected(self):
1089 if not self._connected:
1090 # getpeername() will raise ENOTCONN if the socket is really
1091 # not connected; note that we can be connected even without
1092 # _connected being set, e.g. if connect() first returned
1093 # EAGAIN.
1094 self.getpeername()
1095
1096 def read(self, len=1024, buffer=None):
1097 """Read up to LEN bytes and return them.
1098 Return zero-length string on EOF."""
1099
1100 self._checkClosed()
1101 if self._sslobj is None:
1102 raise ValueError("Read on closed or unwrapped SSL socket.")
1103 try:
1104 if buffer is not None:
1105 return self._sslobj.read(len, buffer)
1106 else:
1107 return self._sslobj.read(len)
1108 except SSLError as x:
1109 if x.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs:
1110 if buffer is not None:
1111 return 0
1112 else:
1113 return b''
1114 else:
1115 raise
1116
1117 def write(self, data):
1118 """Write DATA to the underlying SSL channel. Returns
1119 number of bytes of DATA actually transmitted."""
1120
1121 self._checkClosed()
1122 if self._sslobj is None:
1123 raise ValueError("Write on closed or unwrapped SSL socket.")
1124 return self._sslobj.write(data)
1125
1126 @_sslcopydoc
1127 def getpeercert(self, binary_form=False):
1128 self._checkClosed()
1129 self._check_connected()
1130 return self._sslobj.getpeercert(binary_form)
1131
1132 @_sslcopydoc
1133 def selected_npn_protocol(self):
1134 self._checkClosed()
1135 warnings.warn(
1136 "ssl NPN is deprecated, use ALPN instead",
1137 DeprecationWarning,
1138 stacklevel=2
1139 )
1140 return None
1141
1142 @_sslcopydoc
1143 def selected_alpn_protocol(self):
1144 self._checkClosed()
1145 if self._sslobj is None or not _ssl.HAS_ALPN:
1146 return None
1147 else:
1148 return self._sslobj.selected_alpn_protocol()
1149
1150 @_sslcopydoc
1151 def cipher(self):
1152 self._checkClosed()
1153 if self._sslobj is None:
1154 return None
1155 else:
1156 return self._sslobj.cipher()
1157
1158 @_sslcopydoc
1159 def shared_ciphers(self):
1160 self._checkClosed()
1161 if self._sslobj is None:
1162 return None
1163 else:
1164 return self._sslobj.shared_ciphers()
1165
1166 @_sslcopydoc
1167 def compression(self):
1168 self._checkClosed()
1169 if self._sslobj is None:
1170 return None
1171 else:
1172 return self._sslobj.compression()
1173
1174 def send(self, data, flags=0):
1175 self._checkClosed()
1176 if self._sslobj is not None:
1177 if flags != 0:
1178 raise ValueError(
1179 "non-zero flags not allowed in calls to send() on %s" %
1180 self.__class__)
1181 return self._sslobj.write(data)
1182 else:
1183 return super().send(data, flags)
1184
1185 def sendto(self, data, flags_or_addr, addr=None):
1186 self._checkClosed()
1187 if self._sslobj is not None:
1188 raise ValueError("sendto not allowed on instances of %s" %
1189 self.__class__)
1190 elif addr is None:
1191 return super().sendto(data, flags_or_addr)
1192 else:
1193 return super().sendto(data, flags_or_addr, addr)
1194
1195 def sendmsg(self, *args, **kwargs):
1196 # Ensure programs don't send data unencrypted if they try to
1197 # use this method.
1198 raise NotImplementedError("sendmsg not allowed on instances of %s" %
1199 self.__class__)
1200
1201 def sendall(self, data, flags=0):
1202 self._checkClosed()
1203 if self._sslobj is not None:
1204 if flags != 0:
1205 raise ValueError(
1206 "non-zero flags not allowed in calls to sendall() on %s" %
1207 self.__class__)
1208 count = 0
1209 with memoryview(data) as view, view.cast("B") as byte_view:
1210 amount = len(byte_view)
1211 while count < amount:
1212 v = self.send(byte_view[count:])
1213 count += v
1214 else:
1215 return super().sendall(data, flags)
1216
1217 def sendfile(self, file, offset=0, count=None):
1218 """Send a file, possibly by using os.sendfile() if this is a
1219 clear-text socket. Return the total number of bytes sent.
1220 """
1221 if self._sslobj is not None:
1222 return self._sendfile_use_send(file, offset, count)
1223 else:
1224 # os.sendfile() works with plain sockets only
1225 return super().sendfile(file, offset, count)
1226
1227 def recv(self, buflen=1024, flags=0):
1228 self._checkClosed()
1229 if self._sslobj is not None:
1230 if flags != 0:
1231 raise ValueError(
1232 "non-zero flags not allowed in calls to recv() on %s" %
1233 self.__class__)
1234 return self.read(buflen)
1235 else:
1236 return super().recv(buflen, flags)
1237
1238 def recv_into(self, buffer, nbytes=None, flags=0):
1239 self._checkClosed()
1240 if buffer and (nbytes is None):
1241 nbytes = len(buffer)
1242 elif nbytes is None:
1243 nbytes = 1024
1244 if self._sslobj is not None:
1245 if flags != 0:
1246 raise ValueError(
1247 "non-zero flags not allowed in calls to recv_into() on %s" %
1248 self.__class__)
1249 return self.read(nbytes, buffer)
1250 else:
1251 return super().recv_into(buffer, nbytes, flags)
1252
1253 def recvfrom(self, buflen=1024, flags=0):
1254 self._checkClosed()
1255 if self._sslobj is not None:
1256 raise ValueError("recvfrom not allowed on instances of %s" %
1257 self.__class__)
1258 else:
1259 return super().recvfrom(buflen, flags)
1260
1261 def recvfrom_into(self, buffer, nbytes=None, flags=0):
1262 self._checkClosed()
1263 if self._sslobj is not None:
1264 raise ValueError("recvfrom_into not allowed on instances of %s" %
1265 self.__class__)
1266 else:
1267 return super().recvfrom_into(buffer, nbytes, flags)
1268
1269 def recvmsg(self, *args, **kwargs):
1270 raise NotImplementedError("recvmsg not allowed on instances of %s" %
1271 self.__class__)
1272
1273 def recvmsg_into(self, *args, **kwargs):
1274 raise NotImplementedError("recvmsg_into not allowed on instances of "
1275 "%s" % self.__class__)
1276
1277 @_sslcopydoc
1278 def pending(self):
1279 self._checkClosed()
1280 if self._sslobj is not None:
1281 return self._sslobj.pending()
1282 else:
1283 return 0
1284
1285 def shutdown(self, how):
1286 self._checkClosed()
1287 self._sslobj = None
1288 super().shutdown(how)
1289
1290 @_sslcopydoc
1291 def unwrap(self):
1292 if self._sslobj:
1293 s = self._sslobj.shutdown()
1294 self._sslobj = None
1295 return s
1296 else:
1297 raise ValueError("No SSL wrapper around " + str(self))
1298
1299 @_sslcopydoc
1300 def verify_client_post_handshake(self):
1301 if self._sslobj:
1302 return self._sslobj.verify_client_post_handshake()
1303 else:
1304 raise ValueError("No SSL wrapper around " + str(self))
1305
1306 def _real_close(self):
1307 self._sslobj = None
1308 super()._real_close()
1309
1310 @_sslcopydoc
1311 def do_handshake(self, block=False):
1312 self._check_connected()
1313 timeout = self.gettimeout()
1314 try:
1315 if timeout == 0.0 and block:
1316 self.settimeout(None)
1317 self._sslobj.do_handshake()
1318 finally:
1319 self.settimeout(timeout)
1320
1321 def _real_connect(self, addr, connect_ex):
1322 if self.server_side:
1323 raise ValueError("can't connect in server-side mode")
1324 # Here we assume that the socket is client-side, and not
1325 # connected at the time of the call. We connect it, then wrap it.
1326 if self._connected or self._sslobj is not None:
1327 raise ValueError("attempt to connect already-connected SSLSocket!")
1328 self._sslobj = self.context._wrap_socket(
1329 self, False, self.server_hostname,
1330 owner=self, session=self._session
1331 )
1332 try:
1333 if connect_ex:
1334 rc = super().connect_ex(addr)
1335 else:
1336 rc = None
1337 super().connect(addr)
1338 if not rc:
1339 self._connected = True
1340 if self.do_handshake_on_connect:
1341 self.do_handshake()
1342 return rc
1343 except (OSError, ValueError):
1344 self._sslobj = None
1345 raise
1346
1347 def connect(self, addr):
1348 """Connects to remote ADDR, and then wraps the connection in
1349 an SSL channel."""
1350 self._real_connect(addr, False)
1351
1352 def connect_ex(self, addr):
1353 """Connects to remote ADDR, and then wraps the connection in
1354 an SSL channel."""
1355 return self._real_connect(addr, True)
1356
1357 def accept(self):
1358 """Accepts a new connection from a remote client, and returns
1359 a tuple containing that new connection wrapped with a server-side
1360 SSL channel, and the address of the remote client."""
1361
1362 newsock, addr = super().accept()
1363 newsock = self.context.wrap_socket(newsock,
1364 do_handshake_on_connect=self.do_handshake_on_connect,
1365 suppress_ragged_eofs=self.suppress_ragged_eofs,
1366 server_side=True)
1367 return newsock, addr
1368
1369 @_sslcopydoc
1370 def get_channel_binding(self, cb_type="tls-unique"):
1371 if self._sslobj is not None:
1372 return self._sslobj.get_channel_binding(cb_type)
1373 else:
1374 if cb_type not in CHANNEL_BINDING_TYPES:
1375 raise ValueError(
1376 "{0} channel binding type not implemented".format(cb_type)
1377 )
1378 return None
1379
1380 @_sslcopydoc
1381 def version(self):
1382 if self._sslobj is not None:
1383 return self._sslobj.version()
1384 else:
1385 return None
1386
1387
1388 # Python does not support forward declaration of types.
1389 SSLContext.sslsocket_class = SSLSocket
1390 SSLContext.sslobject_class = SSLObject
1391
1392
1393 # some utility functions
1394
1395 def cert_time_to_seconds(cert_time):
1396 """Return the time in seconds since the Epoch, given the timestring
1397 representing the "notBefore" or "notAfter" date from a certificate
1398 in ``"%b %d %H:%M:%S %Y %Z"`` strptime format (C locale).
1399
1400 "notBefore" or "notAfter" dates must use UTC (RFC 5280).
1401
1402 Month is one of: Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec
1403 UTC should be specified as GMT (see ASN1_TIME_print())
1404 """
1405 from time import strptime
1406 from calendar import timegm
1407
1408 months = (
1409 "Jan","Feb","Mar","Apr","May","Jun",
1410 "Jul","Aug","Sep","Oct","Nov","Dec"
1411 )
1412 time_format = ' %d %H:%M:%S %Y GMT' # NOTE: no month, fixed GMT
1413 try:
1414 month_number = months.index(cert_time[:3].title()) + 1
1415 except ValueError:
1416 raise ValueError('time data %r does not match '
1417 'format "%%b%s"' % (cert_time, time_format))
1418 else:
1419 # found valid month
1420 tt = strptime(cert_time[3:], time_format)
1421 # return an integer, the previous mktime()-based implementation
1422 # returned a float (fractional seconds are always zero here).
1423 return timegm((tt[0], month_number) + tt[2:6])
1424
1425 PEM_HEADER = "-----BEGIN CERTIFICATE-----"
1426 PEM_FOOTER = "-----END CERTIFICATE-----"
1427
1428 def DER_cert_to_PEM_cert(der_cert_bytes):
1429 """Takes a certificate in binary DER format and returns the
1430 PEM version of it as a string."""
1431
1432 f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict')
1433 ss = [PEM_HEADER]
1434 ss += [f[i:i+64] for i in range(0, len(f), 64)]
1435 ss.append(PEM_FOOTER + '\n')
1436 return '\n'.join(ss)
1437
1438 def PEM_cert_to_DER_cert(pem_cert_string):
1439 """Takes a certificate in ASCII PEM format and returns the
1440 DER-encoded version of it as a byte sequence"""
1441
1442 if not pem_cert_string.startswith(PEM_HEADER):
1443 raise ValueError("Invalid PEM encoding; must start with %s"
1444 % PEM_HEADER)
1445 if not pem_cert_string.strip().endswith(PEM_FOOTER):
1446 raise ValueError("Invalid PEM encoding; must end with %s"
1447 % PEM_FOOTER)
1448 d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)]
1449 return base64.decodebytes(d.encode('ASCII', 'strict'))
1450
1451 def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT,
1452 ca_certs=None, timeout=_GLOBAL_DEFAULT_TIMEOUT):
1453 """Retrieve the certificate from the server at the specified address,
1454 and return it as a PEM-encoded string.
1455 If 'ca_certs' is specified, validate the server cert against it.
1456 If 'ssl_version' is specified, use it in the connection attempt.
1457 If 'timeout' is specified, use it in the connection attempt.
1458 """
1459
1460 host, port = addr
1461 if ca_certs is not None:
1462 cert_reqs = CERT_REQUIRED
1463 else:
1464 cert_reqs = CERT_NONE
1465 context = _create_stdlib_context(ssl_version,
1466 cert_reqs=cert_reqs,
1467 cafile=ca_certs)
1468 with create_connection(addr, timeout=timeout) as sock:
1469 with context.wrap_socket(sock, server_hostname=host) as sslsock:
1470 dercert = sslsock.getpeercert(True)
1471 return DER_cert_to_PEM_cert(dercert)
1472
1473 def get_protocol_name(protocol_code):
1474 return _PROTOCOL_NAMES.get(protocol_code, '<unknown>')