1  # -*- coding: utf-8 -*-
       2  """Tests for distutils.archive_util."""
       3  import unittest
       4  import os
       5  import sys
       6  import tarfile
       7  from os.path import splitdrive
       8  import warnings
       9  
      10  from distutils import archive_util
      11  from distutils.archive_util import (check_archive_formats, make_tarball,
      12                                      make_zipfile, make_archive,
      13                                      ARCHIVE_FORMATS)
      14  from distutils.spawn import find_executable, spawn
      15  from distutils.tests import support
      16  from test.support import patch
      17  from test.support.os_helper import change_cwd
      18  from test.support.warnings_helper import check_warnings
      19  
      20  try:
      21      import grp
      22      import pwd
      23      UID_GID_SUPPORT = True
      24  except ImportError:
      25      UID_GID_SUPPORT = False
      26  
      27  try:
      28      import zipfile
      29      ZIP_SUPPORT = True
      30  except ImportError:
      31      ZIP_SUPPORT = find_executable('zip')
      32  
      33  try:
      34      import zlib
      35      ZLIB_SUPPORT = True
      36  except ImportError:
      37      ZLIB_SUPPORT = False
      38  
      39  try:
      40      import bz2
      41  except ImportError:
      42      bz2 = None
      43  
      44  try:
      45      import lzma
      46  except ImportError:
      47      lzma = None
      48  
      49  def can_fs_encode(filename):
      50      """
      51      Return True if the filename can be saved in the file system.
      52      """
      53      if os.path.supports_unicode_filenames:
      54          return True
      55      try:
      56          filename.encode(sys.getfilesystemencoding())
      57      except UnicodeEncodeError:
      58          return False
      59      return True
      60  
      61  
      62  class ESC[4;38;5;81mArchiveUtilTestCase(ESC[4;38;5;149msupportESC[4;38;5;149m.ESC[4;38;5;149mTempdirManager,
      63                            ESC[4;38;5;149msupportESC[4;38;5;149m.ESC[4;38;5;149mLoggingSilencer,
      64                            ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      65  
      66      @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
      67      def test_make_tarball(self, name='archive'):
      68          # creating something to tar
      69          tmpdir = self._create_files()
      70          self._make_tarball(tmpdir, name, '.tar.gz')
      71          # trying an uncompressed one
      72          self._make_tarball(tmpdir, name, '.tar', compress=None)
      73  
      74      @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
      75      def test_make_tarball_gzip(self):
      76          tmpdir = self._create_files()
      77          self._make_tarball(tmpdir, 'archive', '.tar.gz', compress='gzip')
      78  
      79      @unittest.skipUnless(bz2, 'Need bz2 support to run')
      80      def test_make_tarball_bzip2(self):
      81          tmpdir = self._create_files()
      82          self._make_tarball(tmpdir, 'archive', '.tar.bz2', compress='bzip2')
      83  
      84      @unittest.skipUnless(lzma, 'Need lzma support to run')
      85      def test_make_tarball_xz(self):
      86          tmpdir = self._create_files()
      87          self._make_tarball(tmpdir, 'archive', '.tar.xz', compress='xz')
      88  
      89      @unittest.skipUnless(can_fs_encode('årchiv'),
      90          'File system cannot handle this filename')
      91      def test_make_tarball_latin1(self):
      92          """
      93          Mirror test_make_tarball, except filename contains latin characters.
      94          """
      95          self.test_make_tarball('årchiv') # note this isn't a real word
      96  
      97      @unittest.skipUnless(can_fs_encode('のアーカイブ'),
      98          'File system cannot handle this filename')
      99      def test_make_tarball_extended(self):
     100          """
     101          Mirror test_make_tarball, except filename contains extended
     102          characters outside the latin charset.
     103          """
     104          self.test_make_tarball('のアーカイブ') # japanese for archive
     105  
     106      def _make_tarball(self, tmpdir, target_name, suffix, **kwargs):
     107          tmpdir2 = self.mkdtemp()
     108          unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
     109                              "source and target should be on same drive")
     110  
     111          base_name = os.path.join(tmpdir2, target_name)
     112  
     113          # working with relative paths to avoid tar warnings
     114          with change_cwd(tmpdir):
     115              make_tarball(splitdrive(base_name)[1], 'dist', **kwargs)
     116  
     117          # check if the compressed tarball was created
     118          tarball = base_name + suffix
     119          self.assertTrue(os.path.exists(tarball))
     120          self.assertEqual(self._tarinfo(tarball), self._created_files)
     121  
     122      def _tarinfo(self, path):
     123          tar = tarfile.open(path)
     124          try:
     125              names = tar.getnames()
     126              names.sort()
     127              return names
     128          finally:
     129              tar.close()
     130  
     131      _zip_created_files = ['dist/', 'dist/file1', 'dist/file2',
     132                            'dist/sub/', 'dist/sub/file3', 'dist/sub2/']
     133      _created_files = [p.rstrip('/') for p in _zip_created_files]
     134  
     135      def _create_files(self):
     136          # creating something to tar
     137          tmpdir = self.mkdtemp()
     138          dist = os.path.join(tmpdir, 'dist')
     139          os.mkdir(dist)
     140          self.write_file([dist, 'file1'], 'xxx')
     141          self.write_file([dist, 'file2'], 'xxx')
     142          os.mkdir(os.path.join(dist, 'sub'))
     143          self.write_file([dist, 'sub', 'file3'], 'xxx')
     144          os.mkdir(os.path.join(dist, 'sub2'))
     145          return tmpdir
     146  
     147      @unittest.skipUnless(find_executable('tar') and find_executable('gzip')
     148                           and ZLIB_SUPPORT,
     149                           'Need the tar, gzip and zlib command to run')
     150      def test_tarfile_vs_tar(self):
     151          tmpdir =  self._create_files()
     152          tmpdir2 = self.mkdtemp()
     153          base_name = os.path.join(tmpdir2, 'archive')
     154          old_dir = os.getcwd()
     155          os.chdir(tmpdir)
     156          try:
     157              make_tarball(base_name, 'dist')
     158          finally:
     159              os.chdir(old_dir)
     160  
     161          # check if the compressed tarball was created
     162          tarball = base_name + '.tar.gz'
     163          self.assertTrue(os.path.exists(tarball))
     164  
     165          # now create another tarball using `tar`
     166          tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
     167          tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
     168          gzip_cmd = ['gzip', '-f', '-9', 'archive2.tar']
     169          old_dir = os.getcwd()
     170          os.chdir(tmpdir)
     171          try:
     172              spawn(tar_cmd)
     173              spawn(gzip_cmd)
     174          finally:
     175              os.chdir(old_dir)
     176  
     177          self.assertTrue(os.path.exists(tarball2))
     178          # let's compare both tarballs
     179          self.assertEqual(self._tarinfo(tarball), self._created_files)
     180          self.assertEqual(self._tarinfo(tarball2), self._created_files)
     181  
     182          # trying an uncompressed one
     183          base_name = os.path.join(tmpdir2, 'archive')
     184          old_dir = os.getcwd()
     185          os.chdir(tmpdir)
     186          try:
     187              make_tarball(base_name, 'dist', compress=None)
     188          finally:
     189              os.chdir(old_dir)
     190          tarball = base_name + '.tar'
     191          self.assertTrue(os.path.exists(tarball))
     192  
     193          # now for a dry_run
     194          base_name = os.path.join(tmpdir2, 'archive')
     195          old_dir = os.getcwd()
     196          os.chdir(tmpdir)
     197          try:
     198              make_tarball(base_name, 'dist', compress=None, dry_run=True)
     199          finally:
     200              os.chdir(old_dir)
     201          tarball = base_name + '.tar'
     202          self.assertTrue(os.path.exists(tarball))
     203  
     204      @unittest.skipUnless(find_executable('compress'),
     205                           'The compress program is required')
     206      def test_compress_deprecated(self):
     207          tmpdir =  self._create_files()
     208          base_name = os.path.join(self.mkdtemp(), 'archive')
     209  
     210          # using compress and testing the PendingDeprecationWarning
     211          old_dir = os.getcwd()
     212          os.chdir(tmpdir)
     213          try:
     214              with check_warnings() as w:
     215                  warnings.simplefilter("always")
     216                  make_tarball(base_name, 'dist', compress='compress')
     217          finally:
     218              os.chdir(old_dir)
     219          tarball = base_name + '.tar.Z'
     220          self.assertTrue(os.path.exists(tarball))
     221          self.assertEqual(len(w.warnings), 1)
     222  
     223          # same test with dry_run
     224          os.remove(tarball)
     225          old_dir = os.getcwd()
     226          os.chdir(tmpdir)
     227          try:
     228              with check_warnings() as w:
     229                  warnings.simplefilter("always")
     230                  make_tarball(base_name, 'dist', compress='compress',
     231                               dry_run=True)
     232          finally:
     233              os.chdir(old_dir)
     234          self.assertFalse(os.path.exists(tarball))
     235          self.assertEqual(len(w.warnings), 1)
     236  
     237      @unittest.skipUnless(ZIP_SUPPORT and ZLIB_SUPPORT,
     238                           'Need zip and zlib support to run')
     239      def test_make_zipfile(self):
     240          # creating something to tar
     241          tmpdir = self._create_files()
     242          base_name = os.path.join(self.mkdtemp(), 'archive')
     243          with change_cwd(tmpdir):
     244              make_zipfile(base_name, 'dist')
     245  
     246          # check if the compressed tarball was created
     247          tarball = base_name + '.zip'
     248          self.assertTrue(os.path.exists(tarball))
     249          with zipfile.ZipFile(tarball) as zf:
     250              self.assertEqual(sorted(zf.namelist()), self._zip_created_files)
     251  
     252      @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
     253      def test_make_zipfile_no_zlib(self):
     254          patch(self, archive_util.zipfile, 'zlib', None)  # force zlib ImportError
     255  
     256          called = []
     257          zipfile_class = zipfile.ZipFile
     258          def fake_zipfile(*a, **kw):
     259              if kw.get('compression', None) == zipfile.ZIP_STORED:
     260                  called.append((a, kw))
     261              return zipfile_class(*a, **kw)
     262  
     263          patch(self, archive_util.zipfile, 'ZipFile', fake_zipfile)
     264  
     265          # create something to tar and compress
     266          tmpdir = self._create_files()
     267          base_name = os.path.join(self.mkdtemp(), 'archive')
     268          with change_cwd(tmpdir):
     269              make_zipfile(base_name, 'dist')
     270  
     271          tarball = base_name + '.zip'
     272          self.assertEqual(called,
     273                           [((tarball, "w"), {'compression': zipfile.ZIP_STORED})])
     274          self.assertTrue(os.path.exists(tarball))
     275          with zipfile.ZipFile(tarball) as zf:
     276              self.assertEqual(sorted(zf.namelist()), self._zip_created_files)
     277  
     278      def test_check_archive_formats(self):
     279          self.assertEqual(check_archive_formats(['gztar', 'xxx', 'zip']),
     280                           'xxx')
     281          self.assertIsNone(check_archive_formats(['gztar', 'bztar', 'xztar',
     282                                                   'ztar', 'tar', 'zip']))
     283  
     284      def test_make_archive(self):
     285          tmpdir = self.mkdtemp()
     286          base_name = os.path.join(tmpdir, 'archive')
     287          self.assertRaises(ValueError, make_archive, base_name, 'xxx')
     288  
     289      def test_make_archive_cwd(self):
     290          current_dir = os.getcwd()
     291          def _breaks(*args, **kw):
     292              raise RuntimeError()
     293          ARCHIVE_FORMATS['xxx'] = (_breaks, [], 'xxx file')
     294          try:
     295              try:
     296                  make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
     297              except:
     298                  pass
     299              self.assertEqual(os.getcwd(), current_dir)
     300          finally:
     301              del ARCHIVE_FORMATS['xxx']
     302  
     303      def test_make_archive_tar(self):
     304          base_dir =  self._create_files()
     305          base_name = os.path.join(self.mkdtemp() , 'archive')
     306          res = make_archive(base_name, 'tar', base_dir, 'dist')
     307          self.assertTrue(os.path.exists(res))
     308          self.assertEqual(os.path.basename(res), 'archive.tar')
     309          self.assertEqual(self._tarinfo(res), self._created_files)
     310  
     311      @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
     312      def test_make_archive_gztar(self):
     313          base_dir =  self._create_files()
     314          base_name = os.path.join(self.mkdtemp() , 'archive')
     315          res = make_archive(base_name, 'gztar', base_dir, 'dist')
     316          self.assertTrue(os.path.exists(res))
     317          self.assertEqual(os.path.basename(res), 'archive.tar.gz')
     318          self.assertEqual(self._tarinfo(res), self._created_files)
     319  
     320      @unittest.skipUnless(bz2, 'Need bz2 support to run')
     321      def test_make_archive_bztar(self):
     322          base_dir =  self._create_files()
     323          base_name = os.path.join(self.mkdtemp() , 'archive')
     324          res = make_archive(base_name, 'bztar', base_dir, 'dist')
     325          self.assertTrue(os.path.exists(res))
     326          self.assertEqual(os.path.basename(res), 'archive.tar.bz2')
     327          self.assertEqual(self._tarinfo(res), self._created_files)
     328  
     329      @unittest.skipUnless(lzma, 'Need xz support to run')
     330      def test_make_archive_xztar(self):
     331          base_dir =  self._create_files()
     332          base_name = os.path.join(self.mkdtemp() , 'archive')
     333          res = make_archive(base_name, 'xztar', base_dir, 'dist')
     334          self.assertTrue(os.path.exists(res))
     335          self.assertEqual(os.path.basename(res), 'archive.tar.xz')
     336          self.assertEqual(self._tarinfo(res), self._created_files)
     337  
     338      def test_make_archive_owner_group(self):
     339          # testing make_archive with owner and group, with various combinations
     340          # this works even if there's not gid/uid support
     341          if UID_GID_SUPPORT:
     342              group = grp.getgrgid(0)[0]
     343              owner = pwd.getpwuid(0)[0]
     344          else:
     345              group = owner = 'root'
     346  
     347          base_dir =  self._create_files()
     348          root_dir = self.mkdtemp()
     349          base_name = os.path.join(self.mkdtemp() , 'archive')
     350          res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
     351                             group=group)
     352          self.assertTrue(os.path.exists(res))
     353  
     354          res = make_archive(base_name, 'zip', root_dir, base_dir)
     355          self.assertTrue(os.path.exists(res))
     356  
     357          res = make_archive(base_name, 'tar', root_dir, base_dir,
     358                             owner=owner, group=group)
     359          self.assertTrue(os.path.exists(res))
     360  
     361          res = make_archive(base_name, 'tar', root_dir, base_dir,
     362                             owner='kjhkjhkjg', group='oihohoh')
     363          self.assertTrue(os.path.exists(res))
     364  
     365      @unittest.skipUnless(ZLIB_SUPPORT, "Requires zlib")
     366      @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
     367      def test_tarfile_root_owner(self):
     368          tmpdir =  self._create_files()
     369          base_name = os.path.join(self.mkdtemp(), 'archive')
     370          old_dir = os.getcwd()
     371          os.chdir(tmpdir)
     372          group = grp.getgrgid(0)[0]
     373          owner = pwd.getpwuid(0)[0]
     374          try:
     375              archive_name = make_tarball(base_name, 'dist', compress=None,
     376                                          owner=owner, group=group)
     377          finally:
     378              os.chdir(old_dir)
     379  
     380          # check if the compressed tarball was created
     381          self.assertTrue(os.path.exists(archive_name))
     382  
     383          # now checks the rights
     384          archive = tarfile.open(archive_name)
     385          try:
     386              for member in archive.getmembers():
     387                  self.assertEqual(member.uid, 0)
     388                  self.assertEqual(member.gid, 0)
     389          finally:
     390              archive.close()
     391  
     392  if __name__ == "__main__":
     393      unittest.main()