python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
setuptools/
_distutils/
command/
upload.py
       1  """
       2  distutils.command.upload
       3  
       4  Implements the Distutils 'upload' subcommand (upload package to a package
       5  index).
       6  """
       7  
       8  import os
       9  import io
      10  import hashlib
      11  from base64 import standard_b64encode
      12  from urllib.request import urlopen, Request, HTTPError
      13  from urllib.parse import urlparse
      14  from distutils.errors import DistutilsError, DistutilsOptionError
      15  from distutils.core import PyPIRCCommand
      16  from distutils.spawn import spawn
      17  from distutils import log
      18  
      19  
      20  # PyPI Warehouse supports MD5, SHA256, and Blake2 (blake2-256)
      21  # https://bugs.python.org/issue40698
      22  _FILE_CONTENT_DIGESTS = {
      23      "md5_digest": getattr(hashlib, "md5", None),
      24      "sha256_digest": getattr(hashlib, "sha256", None),
      25      "blake2_256_digest": getattr(hashlib, "blake2b", None),
      26  }
      27  
      28  
      29  class ESC[4;38;5;81mupload(ESC[4;38;5;149mPyPIRCCommand):
      30  
      31      description = "upload binary package to PyPI"
      32  
      33      user_options = PyPIRCCommand.user_options + [
      34          ('sign', 's', 'sign files to upload using gpg'),
      35          ('identity=', 'i', 'GPG identity used to sign files'),
      36      ]
      37  
      38      boolean_options = PyPIRCCommand.boolean_options + ['sign']
      39  
      40      def initialize_options(self):
      41          PyPIRCCommand.initialize_options(self)
      42          self.username = ''
      43          self.password = ''
      44          self.show_response = 0
      45          self.sign = False
      46          self.identity = None
      47  
      48      def finalize_options(self):
      49          PyPIRCCommand.finalize_options(self)
      50          if self.identity and not self.sign:
      51              raise DistutilsOptionError("Must use --sign for --identity to have meaning")
      52          config = self._read_pypirc()
      53          if config != {}:
      54              self.username = config['username']
      55              self.password = config['password']
      56              self.repository = config['repository']
      57              self.realm = config['realm']
      58  
      59          # getting the password from the distribution
      60          # if previously set by the register command
      61          if not self.password and self.distribution.password:
      62              self.password = self.distribution.password
      63  
      64      def run(self):
      65          if not self.distribution.dist_files:
      66              msg = (
      67                  "Must create and upload files in one command "
      68                  "(e.g. setup.py sdist upload)"
      69              )
      70              raise DistutilsOptionError(msg)
      71          for command, pyversion, filename in self.distribution.dist_files:
      72              self.upload_file(command, pyversion, filename)
      73  
      74      def upload_file(self, command, pyversion, filename):  # noqa: C901
      75          # Makes sure the repository URL is compliant
      76          schema, netloc, url, params, query, fragments = urlparse(self.repository)
      77          if params or query or fragments:
      78              raise AssertionError("Incompatible url %s" % self.repository)
      79  
      80          if schema not in ('http', 'https'):
      81              raise AssertionError("unsupported schema " + schema)
      82  
      83          # Sign if requested
      84          if self.sign:
      85              gpg_args = ["gpg", "--detach-sign", "-a", filename]
      86              if self.identity:
      87                  gpg_args[2:2] = ["--local-user", self.identity]
      88              spawn(gpg_args, dry_run=self.dry_run)
      89  
      90          # Fill in the data - send all the meta-data in case we need to
      91          # register a new release
      92          f = open(filename, 'rb')
      93          try:
      94              content = f.read()
      95          finally:
      96              f.close()
      97  
      98          meta = self.distribution.metadata
      99          data = {
     100              # action
     101              ':action': 'file_upload',
     102              'protocol_version': '1',
     103              # identify release
     104              'name': meta.get_name(),
     105              'version': meta.get_version(),
     106              # file content
     107              'content': (os.path.basename(filename), content),
     108              'filetype': command,
     109              'pyversion': pyversion,
     110              # additional meta-data
     111              'metadata_version': '1.0',
     112              'summary': meta.get_description(),
     113              'home_page': meta.get_url(),
     114              'author': meta.get_contact(),
     115              'author_email': meta.get_contact_email(),
     116              'license': meta.get_licence(),
     117              'description': meta.get_long_description(),
     118              'keywords': meta.get_keywords(),
     119              'platform': meta.get_platforms(),
     120              'classifiers': meta.get_classifiers(),
     121              'download_url': meta.get_download_url(),
     122              # PEP 314
     123              'provides': meta.get_provides(),
     124              'requires': meta.get_requires(),
     125              'obsoletes': meta.get_obsoletes(),
     126          }
     127  
     128          data['comment'] = ''
     129  
     130          # file content digests
     131          for digest_name, digest_cons in _FILE_CONTENT_DIGESTS.items():
     132              if digest_cons is None:
     133                  continue
     134              try:
     135                  data[digest_name] = digest_cons(content).hexdigest()
     136              except ValueError:
     137                  # hash digest not available or blocked by security policy
     138                  pass
     139  
     140          if self.sign:
     141              with open(filename + ".asc", "rb") as f:
     142                  data['gpg_signature'] = (os.path.basename(filename) + ".asc", f.read())
     143  
     144          # set up the authentication
     145          user_pass = (self.username + ":" + self.password).encode('ascii')
     146          # The exact encoding of the authentication string is debated.
     147          # Anyway PyPI only accepts ascii for both username or password.
     148          auth = "Basic " + standard_b64encode(user_pass).decode('ascii')
     149  
     150          # Build up the MIME payload for the POST data
     151          boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
     152          sep_boundary = b'\r\n--' + boundary.encode('ascii')
     153          end_boundary = sep_boundary + b'--\r\n'
     154          body = io.BytesIO()
     155          for key, value in data.items():
     156              title = '\r\nContent-Disposition: form-data; name="%s"' % key
     157              # handle multiple entries for the same name
     158              if not isinstance(value, list):
     159                  value = [value]
     160              for value in value:
     161                  if type(value) is tuple:
     162                      title += '; filename="%s"' % value[0]
     163                      value = value[1]
     164                  else:
     165                      value = str(value).encode('utf-8')
     166                  body.write(sep_boundary)
     167                  body.write(title.encode('utf-8'))
     168                  body.write(b"\r\n\r\n")
     169                  body.write(value)
     170          body.write(end_boundary)
     171          body = body.getvalue()
     172  
     173          msg = "Submitting {} to {}".format(filename, self.repository)
     174          self.announce(msg, log.INFO)
     175  
     176          # build the Request
     177          headers = {
     178              'Content-type': 'multipart/form-data; boundary=%s' % boundary,
     179              'Content-length': str(len(body)),
     180              'Authorization': auth,
     181          }
     182  
     183          request = Request(self.repository, data=body, headers=headers)
     184          # send the data
     185          try:
     186              result = urlopen(request)
     187              status = result.getcode()
     188              reason = result.msg
     189          except HTTPError as e:
     190              status = e.code
     191              reason = e.msg
     192          except OSError as e:
     193              self.announce(str(e), log.ERROR)
     194              raise
     195  
     196          if status == 200:
     197              self.announce('Server response ({}): {}'.format(status, reason), log.INFO)
     198              if self.show_response:
     199                  text = self._read_pypi_response(result)
     200                  msg = '\n'.join(('-' * 75, text, '-' * 75))
     201                  self.announce(msg, log.INFO)
     202          else:
     203              msg = 'Upload failed ({}): {}'.format(status, reason)
     204              self.announce(msg, log.ERROR)
     205              raise DistutilsError(msg)