python (3.11.7)

(root)/
lib/
python3.11/
test/
test_ftplib.py
       1  """Test script for ftplib module."""
       2  
       3  # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
       4  # environment
       5  
       6  import ftplib
       7  import socket
       8  import io
       9  import errno
      10  import os
      11  import threading
      12  import time
      13  import unittest
      14  try:
      15      import ssl
      16  except ImportError:
      17      ssl = None
      18  
      19  from unittest import TestCase, skipUnless
      20  from test import support
      21  from test.support import threading_helper
      22  from test.support import socket_helper
      23  from test.support import warnings_helper
      24  from test.support.socket_helper import HOST, HOSTv6
      25  
      26  
      27  asynchat = warnings_helper.import_deprecated('asynchat')
      28  asyncore = warnings_helper.import_deprecated('asyncore')
      29  
      30  
      31  support.requires_working_socket(module=True)
      32  
      33  TIMEOUT = support.LOOPBACK_TIMEOUT
      34  DEFAULT_ENCODING = 'utf-8'
      35  # the dummy data returned by server over the data channel when
      36  # RETR, LIST, NLST, MLSD commands are issued
      37  RETR_DATA = 'abcde\xB9\xB2\xB3\xA4\xA6\r\n' * 1000
      38  LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
      39  NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
      40  MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
      41               "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
      42               "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
      43               "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
      44               "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
      45               "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
      46               "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
      47               "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
      48               "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
      49               "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
      50               "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
      51               "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
      52               "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
      53               "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
      54               "type=file;perm=r;unique==keVO1+1G4; file4\r\n"
      55               "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n"
      56               "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n")
      57  
      58  
      59  def default_error_handler():
      60      # bpo-44359: Silently ignore socket errors. Such errors occur when a client
      61      # socket is closed, in TestFTPClass.tearDown() and makepasv() tests, and
      62      # the server gets an error on its side.
      63      pass
      64  
      65  
      66  class ESC[4;38;5;81mDummyDTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
      67      dtp_conn_closed = False
      68  
      69      def __init__(self, conn, baseclass):
      70          asynchat.async_chat.__init__(self, conn)
      71          self.baseclass = baseclass
      72          self.baseclass.last_received_data = bytearray()
      73          self.encoding = baseclass.encoding
      74  
      75      def handle_read(self):
      76          new_data = self.recv(1024)
      77          self.baseclass.last_received_data += new_data
      78  
      79      def handle_close(self):
      80          # XXX: this method can be called many times in a row for a single
      81          # connection, including in clear-text (non-TLS) mode.
      82          # (behaviour witnessed with test_data_connection)
      83          if not self.dtp_conn_closed:
      84              self.baseclass.push('226 transfer complete')
      85              self.close()
      86              self.dtp_conn_closed = True
      87  
      88      def push(self, what):
      89          if self.baseclass.next_data is not None:
      90              what = self.baseclass.next_data
      91              self.baseclass.next_data = None
      92          if not what:
      93              return self.close_when_done()
      94          super(DummyDTPHandler, self).push(what.encode(self.encoding))
      95  
      96      def handle_error(self):
      97          default_error_handler()
      98  
      99  
     100  class ESC[4;38;5;81mDummyFTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
     101  
     102      dtp_handler = DummyDTPHandler
     103  
     104      def __init__(self, conn, encoding=DEFAULT_ENCODING):
     105          asynchat.async_chat.__init__(self, conn)
     106          # tells the socket to handle urgent data inline (ABOR command)
     107          self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
     108          self.set_terminator(b"\r\n")
     109          self.in_buffer = []
     110          self.dtp = None
     111          self.last_received_cmd = None
     112          self.last_received_data = bytearray()
     113          self.next_response = ''
     114          self.next_data = None
     115          self.rest = None
     116          self.next_retr_data = RETR_DATA
     117          self.push('220 welcome')
     118          self.encoding = encoding
     119          # We use this as the string IPv4 address to direct the client
     120          # to in response to a PASV command.  To test security behavior.
     121          # https://bugs.python.org/issue43285/.
     122          self.fake_pasv_server_ip = '252.253.254.255'
     123  
     124      def collect_incoming_data(self, data):
     125          self.in_buffer.append(data)
     126  
     127      def found_terminator(self):
     128          line = b''.join(self.in_buffer).decode(self.encoding)
     129          self.in_buffer = []
     130          if self.next_response:
     131              self.push(self.next_response)
     132              self.next_response = ''
     133          cmd = line.split(' ')[0].lower()
     134          self.last_received_cmd = cmd
     135          space = line.find(' ')
     136          if space != -1:
     137              arg = line[space + 1:]
     138          else:
     139              arg = ""
     140          if hasattr(self, 'cmd_' + cmd):
     141              method = getattr(self, 'cmd_' + cmd)
     142              method(arg)
     143          else:
     144              self.push('550 command "%s" not understood.' %cmd)
     145  
     146      def handle_error(self):
     147          default_error_handler()
     148  
     149      def push(self, data):
     150          asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n')
     151  
     152      def cmd_port(self, arg):
     153          addr = list(map(int, arg.split(',')))
     154          ip = '%d.%d.%d.%d' %tuple(addr[:4])
     155          port = (addr[4] * 256) + addr[5]
     156          s = socket.create_connection((ip, port), timeout=TIMEOUT)
     157          self.dtp = self.dtp_handler(s, baseclass=self)
     158          self.push('200 active data connection established')
     159  
     160      def cmd_pasv(self, arg):
     161          with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
     162              sock.settimeout(TIMEOUT)
     163              port = sock.getsockname()[1]
     164              ip = self.fake_pasv_server_ip
     165              ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
     166              self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
     167              conn, addr = sock.accept()
     168              self.dtp = self.dtp_handler(conn, baseclass=self)
     169  
     170      def cmd_eprt(self, arg):
     171          af, ip, port = arg.split(arg[0])[1:-1]
     172          port = int(port)
     173          s = socket.create_connection((ip, port), timeout=TIMEOUT)
     174          self.dtp = self.dtp_handler(s, baseclass=self)
     175          self.push('200 active data connection established')
     176  
     177      def cmd_epsv(self, arg):
     178          with socket.create_server((self.socket.getsockname()[0], 0),
     179                                    family=socket.AF_INET6) as sock:
     180              sock.settimeout(TIMEOUT)
     181              port = sock.getsockname()[1]
     182              self.push('229 entering extended passive mode (|||%d|)' %port)
     183              conn, addr = sock.accept()
     184              self.dtp = self.dtp_handler(conn, baseclass=self)
     185  
     186      def cmd_echo(self, arg):
     187          # sends back the received string (used by the test suite)
     188          self.push(arg)
     189  
     190      def cmd_noop(self, arg):
     191          self.push('200 noop ok')
     192  
     193      def cmd_user(self, arg):
     194          self.push('331 username ok')
     195  
     196      def cmd_pass(self, arg):
     197          self.push('230 password ok')
     198  
     199      def cmd_acct(self, arg):
     200          self.push('230 acct ok')
     201  
     202      def cmd_rnfr(self, arg):
     203          self.push('350 rnfr ok')
     204  
     205      def cmd_rnto(self, arg):
     206          self.push('250 rnto ok')
     207  
     208      def cmd_dele(self, arg):
     209          self.push('250 dele ok')
     210  
     211      def cmd_cwd(self, arg):
     212          self.push('250 cwd ok')
     213  
     214      def cmd_size(self, arg):
     215          self.push('250 1000')
     216  
     217      def cmd_mkd(self, arg):
     218          self.push('257 "%s"' %arg)
     219  
     220      def cmd_rmd(self, arg):
     221          self.push('250 rmd ok')
     222  
     223      def cmd_pwd(self, arg):
     224          self.push('257 "pwd ok"')
     225  
     226      def cmd_type(self, arg):
     227          self.push('200 type ok')
     228  
     229      def cmd_quit(self, arg):
     230          self.push('221 quit ok')
     231          self.close()
     232  
     233      def cmd_abor(self, arg):
     234          self.push('226 abor ok')
     235  
     236      def cmd_stor(self, arg):
     237          self.push('125 stor ok')
     238  
     239      def cmd_rest(self, arg):
     240          self.rest = arg
     241          self.push('350 rest ok')
     242  
     243      def cmd_retr(self, arg):
     244          self.push('125 retr ok')
     245          if self.rest is not None:
     246              offset = int(self.rest)
     247          else:
     248              offset = 0
     249          self.dtp.push(self.next_retr_data[offset:])
     250          self.dtp.close_when_done()
     251          self.rest = None
     252  
     253      def cmd_list(self, arg):
     254          self.push('125 list ok')
     255          self.dtp.push(LIST_DATA)
     256          self.dtp.close_when_done()
     257  
     258      def cmd_nlst(self, arg):
     259          self.push('125 nlst ok')
     260          self.dtp.push(NLST_DATA)
     261          self.dtp.close_when_done()
     262  
     263      def cmd_opts(self, arg):
     264          self.push('200 opts ok')
     265  
     266      def cmd_mlsd(self, arg):
     267          self.push('125 mlsd ok')
     268          self.dtp.push(MLSD_DATA)
     269          self.dtp.close_when_done()
     270  
     271      def cmd_setlongretr(self, arg):
     272          # For testing. Next RETR will return long line.
     273          self.next_retr_data = 'x' * int(arg)
     274          self.push('125 setlongretr ok')
     275  
     276  
     277  class ESC[4;38;5;81mDummyFTPServer(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     278  
     279      handler = DummyFTPHandler
     280  
     281      def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING):
     282          threading.Thread.__init__(self)
     283          asyncore.dispatcher.__init__(self)
     284          self.daemon = True
     285          self.create_socket(af, socket.SOCK_STREAM)
     286          self.bind(address)
     287          self.listen(5)
     288          self.active = False
     289          self.active_lock = threading.Lock()
     290          self.host, self.port = self.socket.getsockname()[:2]
     291          self.handler_instance = None
     292          self.encoding = encoding
     293  
     294      def start(self):
     295          assert not self.active
     296          self.__flag = threading.Event()
     297          threading.Thread.start(self)
     298          self.__flag.wait()
     299  
     300      def run(self):
     301          self.active = True
     302          self.__flag.set()
     303          while self.active and asyncore.socket_map:
     304              self.active_lock.acquire()
     305              asyncore.loop(timeout=0.1, count=1)
     306              self.active_lock.release()
     307          asyncore.close_all(ignore_all=True)
     308  
     309      def stop(self):
     310          assert self.active
     311          self.active = False
     312          self.join()
     313  
     314      def handle_accepted(self, conn, addr):
     315          self.handler_instance = self.handler(conn, encoding=self.encoding)
     316  
     317      def handle_connect(self):
     318          self.close()
     319      handle_read = handle_connect
     320  
     321      def writable(self):
     322          return 0
     323  
     324      def handle_error(self):
     325          default_error_handler()
     326  
     327  
     328  if ssl is not None:
     329  
     330      CERTFILE = os.path.join(os.path.dirname(__file__), "certdata", "keycert3.pem")
     331      CAFILE = os.path.join(os.path.dirname(__file__), "certdata", "pycacert.pem")
     332  
     333      class ESC[4;38;5;81mSSLConnection(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
     334          """An asyncore.dispatcher subclass supporting TLS/SSL."""
     335  
     336          _ssl_accepting = False
     337          _ssl_closing = False
     338  
     339          def secure_connection(self):
     340              context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     341              context.load_cert_chain(CERTFILE)
     342              socket = context.wrap_socket(self.socket,
     343                                           suppress_ragged_eofs=False,
     344                                           server_side=True,
     345                                           do_handshake_on_connect=False)
     346              self.del_channel()
     347              self.set_socket(socket)
     348              self._ssl_accepting = True
     349  
     350          def _do_ssl_handshake(self):
     351              try:
     352                  self.socket.do_handshake()
     353              except ssl.SSLError as err:
     354                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     355                                     ssl.SSL_ERROR_WANT_WRITE):
     356                      return
     357                  elif err.args[0] == ssl.SSL_ERROR_EOF:
     358                      return self.handle_close()
     359                  # TODO: SSLError does not expose alert information
     360                  elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
     361                      return self.handle_close()
     362                  raise
     363              except OSError as err:
     364                  if err.args[0] == errno.ECONNABORTED:
     365                      return self.handle_close()
     366              else:
     367                  self._ssl_accepting = False
     368  
     369          def _do_ssl_shutdown(self):
     370              self._ssl_closing = True
     371              try:
     372                  self.socket = self.socket.unwrap()
     373              except ssl.SSLError as err:
     374                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     375                                     ssl.SSL_ERROR_WANT_WRITE):
     376                      return
     377              except OSError:
     378                  # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
     379                  # from OpenSSL's SSL_shutdown(), corresponding to a
     380                  # closed socket condition. See also:
     381                  # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
     382                  pass
     383              self._ssl_closing = False
     384              if getattr(self, '_ccc', False) is False:
     385                  super(SSLConnection, self).close()
     386              else:
     387                  pass
     388  
     389          def handle_read_event(self):
     390              if self._ssl_accepting:
     391                  self._do_ssl_handshake()
     392              elif self._ssl_closing:
     393                  self._do_ssl_shutdown()
     394              else:
     395                  super(SSLConnection, self).handle_read_event()
     396  
     397          def handle_write_event(self):
     398              if self._ssl_accepting:
     399                  self._do_ssl_handshake()
     400              elif self._ssl_closing:
     401                  self._do_ssl_shutdown()
     402              else:
     403                  super(SSLConnection, self).handle_write_event()
     404  
     405          def send(self, data):
     406              try:
     407                  return super(SSLConnection, self).send(data)
     408              except ssl.SSLError as err:
     409                  if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
     410                                     ssl.SSL_ERROR_WANT_READ,
     411                                     ssl.SSL_ERROR_WANT_WRITE):
     412                      return 0
     413                  raise
     414  
     415          def recv(self, buffer_size):
     416              try:
     417                  return super(SSLConnection, self).recv(buffer_size)
     418              except ssl.SSLError as err:
     419                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     420                                     ssl.SSL_ERROR_WANT_WRITE):
     421                      return b''
     422                  if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
     423                      self.handle_close()
     424                      return b''
     425                  raise
     426  
     427          def handle_error(self):
     428              default_error_handler()
     429  
     430          def close(self):
     431              if (isinstance(self.socket, ssl.SSLSocket) and
     432                      self.socket._sslobj is not None):
     433                  self._do_ssl_shutdown()
     434              else:
     435                  super(SSLConnection, self).close()
     436  
     437  
     438      class ESC[4;38;5;81mDummyTLS_DTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyDTPHandler):
     439          """A DummyDTPHandler subclass supporting TLS/SSL."""
     440  
     441          def __init__(self, conn, baseclass):
     442              DummyDTPHandler.__init__(self, conn, baseclass)
     443              if self.baseclass.secure_data_channel:
     444                  self.secure_connection()
     445  
     446  
     447      class ESC[4;38;5;81mDummyTLS_FTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyFTPHandler):
     448          """A DummyFTPHandler subclass supporting TLS/SSL."""
     449  
     450          dtp_handler = DummyTLS_DTPHandler
     451  
     452          def __init__(self, conn, encoding=DEFAULT_ENCODING):
     453              DummyFTPHandler.__init__(self, conn, encoding=encoding)
     454              self.secure_data_channel = False
     455              self._ccc = False
     456  
     457          def cmd_auth(self, line):
     458              """Set up secure control channel."""
     459              self.push('234 AUTH TLS successful')
     460              self.secure_connection()
     461  
     462          def cmd_ccc(self, line):
     463              self.push('220 Reverting back to clear-text')
     464              self._ccc = True
     465              self._do_ssl_shutdown()
     466  
     467          def cmd_pbsz(self, line):
     468              """Negotiate size of buffer for secure data transfer.
     469              For TLS/SSL the only valid value for the parameter is '0'.
     470              Any other value is accepted but ignored.
     471              """
     472              self.push('200 PBSZ=0 successful.')
     473  
     474          def cmd_prot(self, line):
     475              """Setup un/secure data channel."""
     476              arg = line.upper()
     477              if arg == 'C':
     478                  self.push('200 Protection set to Clear')
     479                  self.secure_data_channel = False
     480              elif arg == 'P':
     481                  self.push('200 Protection set to Private')
     482                  self.secure_data_channel = True
     483              else:
     484                  self.push("502 Unrecognized PROT type (use C or P).")
     485  
     486  
     487      class ESC[4;38;5;81mDummyTLS_FTPServer(ESC[4;38;5;149mDummyFTPServer):
     488          handler = DummyTLS_FTPHandler
     489  
     490  
     491  class ESC[4;38;5;81mTestFTPClass(ESC[4;38;5;149mTestCase):
     492  
     493      def setUp(self, encoding=DEFAULT_ENCODING):
     494          self.server = DummyFTPServer((HOST, 0), encoding=encoding)
     495          self.server.start()
     496          self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding)
     497          self.client.connect(self.server.host, self.server.port)
     498  
     499      def tearDown(self):
     500          self.client.close()
     501          self.server.stop()
     502          # Explicitly clear the attribute to prevent dangling thread
     503          self.server = None
     504          asyncore.close_all(ignore_all=True)
     505  
     506      def check_data(self, received, expected):
     507          self.assertEqual(len(received), len(expected))
     508          self.assertEqual(received, expected)
     509  
     510      def test_getwelcome(self):
     511          self.assertEqual(self.client.getwelcome(), '220 welcome')
     512  
     513      def test_sanitize(self):
     514          self.assertEqual(self.client.sanitize('foo'), repr('foo'))
     515          self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
     516          self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
     517  
     518      def test_exceptions(self):
     519          self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
     520          self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
     521          self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
     522          self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
     523          self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
     524          self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
     525          self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
     526          self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
     527  
     528      def test_all_errors(self):
     529          exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
     530                        ftplib.error_proto, ftplib.Error, OSError,
     531                        EOFError)
     532          for x in exceptions:
     533              try:
     534                  raise x('exception not included in all_errors set')
     535              except ftplib.all_errors:
     536                  pass
     537  
     538      def test_set_pasv(self):
     539          # passive mode is supposed to be enabled by default
     540          self.assertTrue(self.client.passiveserver)
     541          self.client.set_pasv(True)
     542          self.assertTrue(self.client.passiveserver)
     543          self.client.set_pasv(False)
     544          self.assertFalse(self.client.passiveserver)
     545  
     546      def test_voidcmd(self):
     547          self.client.voidcmd('echo 200')
     548          self.client.voidcmd('echo 299')
     549          self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
     550          self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
     551  
     552      def test_login(self):
     553          self.client.login()
     554  
     555      def test_acct(self):
     556          self.client.acct('passwd')
     557  
     558      def test_rename(self):
     559          self.client.rename('a', 'b')
     560          self.server.handler_instance.next_response = '200'
     561          self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
     562  
     563      def test_delete(self):
     564          self.client.delete('foo')
     565          self.server.handler_instance.next_response = '199'
     566          self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
     567  
     568      def test_size(self):
     569          self.client.size('foo')
     570  
     571      def test_mkd(self):
     572          dir = self.client.mkd('/foo')
     573          self.assertEqual(dir, '/foo')
     574  
     575      def test_rmd(self):
     576          self.client.rmd('foo')
     577  
     578      def test_cwd(self):
     579          dir = self.client.cwd('/foo')
     580          self.assertEqual(dir, '250 cwd ok')
     581  
     582      def test_pwd(self):
     583          dir = self.client.pwd()
     584          self.assertEqual(dir, 'pwd ok')
     585  
     586      def test_quit(self):
     587          self.assertEqual(self.client.quit(), '221 quit ok')
     588          # Ensure the connection gets closed; sock attribute should be None
     589          self.assertEqual(self.client.sock, None)
     590  
     591      def test_abort(self):
     592          self.client.abort()
     593  
     594      def test_retrbinary(self):
     595          received = []
     596          self.client.retrbinary('retr', received.append)
     597          self.check_data(b''.join(received),
     598                          RETR_DATA.encode(self.client.encoding))
     599  
     600      def test_retrbinary_rest(self):
     601          for rest in (0, 10, 20):
     602              received = []
     603              self.client.retrbinary('retr', received.append, rest=rest)
     604              self.check_data(b''.join(received),
     605                              RETR_DATA[rest:].encode(self.client.encoding))
     606  
     607      def test_retrlines(self):
     608          received = []
     609          self.client.retrlines('retr', received.append)
     610          self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
     611  
     612      def test_storbinary(self):
     613          f = io.BytesIO(RETR_DATA.encode(self.client.encoding))
     614          self.client.storbinary('stor', f)
     615          self.check_data(self.server.handler_instance.last_received_data,
     616                          RETR_DATA.encode(self.server.encoding))
     617          # test new callback arg
     618          flag = []
     619          f.seek(0)
     620          self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
     621          self.assertTrue(flag)
     622  
     623      def test_storbinary_rest(self):
     624          data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
     625          f = io.BytesIO(data)
     626          for r in (30, '30'):
     627              f.seek(0)
     628              self.client.storbinary('stor', f, rest=r)
     629              self.assertEqual(self.server.handler_instance.rest, str(r))
     630  
     631      def test_storlines(self):
     632          data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
     633          f = io.BytesIO(data)
     634          self.client.storlines('stor', f)
     635          self.check_data(self.server.handler_instance.last_received_data,
     636                          RETR_DATA.encode(self.server.encoding))
     637          # test new callback arg
     638          flag = []
     639          f.seek(0)
     640          self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
     641          self.assertTrue(flag)
     642  
     643          f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
     644          # storlines() expects a binary file, not a text file
     645          with warnings_helper.check_warnings(('', BytesWarning), quiet=True):
     646              self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
     647  
     648      def test_nlst(self):
     649          self.client.nlst()
     650          self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
     651  
     652      def test_dir(self):
     653          l = []
     654          self.client.dir(l.append)
     655          self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
     656  
     657      def test_mlsd(self):
     658          list(self.client.mlsd())
     659          list(self.client.mlsd(path='/'))
     660          list(self.client.mlsd(path='/', facts=['size', 'type']))
     661  
     662          ls = list(self.client.mlsd())
     663          for name, facts in ls:
     664              self.assertIsInstance(name, str)
     665              self.assertIsInstance(facts, dict)
     666              self.assertTrue(name)
     667              self.assertIn('type', facts)
     668              self.assertIn('perm', facts)
     669              self.assertIn('unique', facts)
     670  
     671          def set_data(data):
     672              self.server.handler_instance.next_data = data
     673  
     674          def test_entry(line, type=None, perm=None, unique=None, name=None):
     675              type = 'type' if type is None else type
     676              perm = 'perm' if perm is None else perm
     677              unique = 'unique' if unique is None else unique
     678              name = 'name' if name is None else name
     679              set_data(line)
     680              _name, facts = next(self.client.mlsd())
     681              self.assertEqual(_name, name)
     682              self.assertEqual(facts['type'], type)
     683              self.assertEqual(facts['perm'], perm)
     684              self.assertEqual(facts['unique'], unique)
     685  
     686          # plain
     687          test_entry('type=type;perm=perm;unique=unique; name\r\n')
     688          # "=" in fact value
     689          test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
     690          test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
     691          test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
     692          test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
     693          # spaces in name
     694          test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
     695          test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
     696          test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
     697          test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
     698          # ";" in name
     699          test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
     700          test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
     701          test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
     702          test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
     703          # case sensitiveness
     704          set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
     705          _name, facts = next(self.client.mlsd())
     706          for x in facts:
     707              self.assertTrue(x.islower())
     708          # no data (directory empty)
     709          set_data('')
     710          self.assertRaises(StopIteration, next, self.client.mlsd())
     711          set_data('')
     712          for x in self.client.mlsd():
     713              self.fail("unexpected data %s" % x)
     714  
     715      def test_makeport(self):
     716          with self.client.makeport():
     717              # IPv4 is in use, just make sure send_eprt has not been used
     718              self.assertEqual(self.server.handler_instance.last_received_cmd,
     719                                  'port')
     720  
     721      def test_makepasv(self):
     722          host, port = self.client.makepasv()
     723          conn = socket.create_connection((host, port), timeout=TIMEOUT)
     724          conn.close()
     725          # IPv4 is in use, just make sure send_epsv has not been used
     726          self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
     727  
     728      def test_makepasv_issue43285_security_disabled(self):
     729          """Test the opt-in to the old vulnerable behavior."""
     730          self.client.trust_server_pasv_ipv4_address = True
     731          bad_host, port = self.client.makepasv()
     732          self.assertEqual(
     733                  bad_host, self.server.handler_instance.fake_pasv_server_ip)
     734          # Opening and closing a connection keeps the dummy server happy
     735          # instead of timing out on accept.
     736          socket.create_connection((self.client.sock.getpeername()[0], port),
     737                                   timeout=TIMEOUT).close()
     738  
     739      def test_makepasv_issue43285_security_enabled_default(self):
     740          self.assertFalse(self.client.trust_server_pasv_ipv4_address)
     741          trusted_host, port = self.client.makepasv()
     742          self.assertNotEqual(
     743                  trusted_host, self.server.handler_instance.fake_pasv_server_ip)
     744          # Opening and closing a connection keeps the dummy server happy
     745          # instead of timing out on accept.
     746          socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
     747  
     748      def test_with_statement(self):
     749          self.client.quit()
     750  
     751          def is_client_connected():
     752              if self.client.sock is None:
     753                  return False
     754              try:
     755                  self.client.sendcmd('noop')
     756              except (OSError, EOFError):
     757                  return False
     758              return True
     759  
     760          # base test
     761          with ftplib.FTP(timeout=TIMEOUT) as self.client:
     762              self.client.connect(self.server.host, self.server.port)
     763              self.client.sendcmd('noop')
     764              self.assertTrue(is_client_connected())
     765          self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
     766          self.assertFalse(is_client_connected())
     767  
     768          # QUIT sent inside the with block
     769          with ftplib.FTP(timeout=TIMEOUT) as self.client:
     770              self.client.connect(self.server.host, self.server.port)
     771              self.client.sendcmd('noop')
     772              self.client.quit()
     773          self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
     774          self.assertFalse(is_client_connected())
     775  
     776          # force a wrong response code to be sent on QUIT: error_perm
     777          # is expected and the connection is supposed to be closed
     778          try:
     779              with ftplib.FTP(timeout=TIMEOUT) as self.client:
     780                  self.client.connect(self.server.host, self.server.port)
     781                  self.client.sendcmd('noop')
     782                  self.server.handler_instance.next_response = '550 error on quit'
     783          except ftplib.error_perm as err:
     784              self.assertEqual(str(err), '550 error on quit')
     785          else:
     786              self.fail('Exception not raised')
     787          # needed to give the threaded server some time to set the attribute
     788          # which otherwise would still be == 'noop'
     789          time.sleep(0.1)
     790          self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
     791          self.assertFalse(is_client_connected())
     792  
     793      def test_source_address(self):
     794          self.client.quit()
     795          port = socket_helper.find_unused_port()
     796          try:
     797              self.client.connect(self.server.host, self.server.port,
     798                                  source_address=(HOST, port))
     799              self.assertEqual(self.client.sock.getsockname()[1], port)
     800              self.client.quit()
     801          except OSError as e:
     802              if e.errno == errno.EADDRINUSE:
     803                  self.skipTest("couldn't bind to port %d" % port)
     804              raise
     805  
     806      def test_source_address_passive_connection(self):
     807          port = socket_helper.find_unused_port()
     808          self.client.source_address = (HOST, port)
     809          try:
     810              with self.client.transfercmd('list') as sock:
     811                  self.assertEqual(sock.getsockname()[1], port)
     812          except OSError as e:
     813              if e.errno == errno.EADDRINUSE:
     814                  self.skipTest("couldn't bind to port %d" % port)
     815              raise
     816  
     817      def test_parse257(self):
     818          self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
     819          self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
     820          self.assertEqual(ftplib.parse257('257 ""'), '')
     821          self.assertEqual(ftplib.parse257('257 "" created'), '')
     822          self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
     823          # The 257 response is supposed to include the directory
     824          # name and in case it contains embedded double-quotes
     825          # they must be doubled (see RFC-959, chapter 7, appendix 2).
     826          self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
     827          self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
     828  
     829      def test_line_too_long(self):
     830          self.assertRaises(ftplib.Error, self.client.sendcmd,
     831                            'x' * self.client.maxline * 2)
     832  
     833      def test_retrlines_too_long(self):
     834          self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
     835          received = []
     836          self.assertRaises(ftplib.Error,
     837                            self.client.retrlines, 'retr', received.append)
     838  
     839      def test_storlines_too_long(self):
     840          f = io.BytesIO(b'x' * self.client.maxline * 2)
     841          self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
     842  
     843      def test_encoding_param(self):
     844          encodings = ['latin-1', 'utf-8']
     845          for encoding in encodings:
     846              with self.subTest(encoding=encoding):
     847                  self.tearDown()
     848                  self.setUp(encoding=encoding)
     849                  self.assertEqual(encoding, self.client.encoding)
     850                  self.test_retrbinary()
     851                  self.test_storbinary()
     852                  self.test_retrlines()
     853                  new_dir = self.client.mkd('/non-ascii dir \xAE')
     854                  self.check_data(new_dir, '/non-ascii dir \xAE')
     855          # Check default encoding
     856          client = ftplib.FTP(timeout=TIMEOUT)
     857          self.assertEqual(DEFAULT_ENCODING, client.encoding)
     858  
     859  
     860  @skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
     861  class ESC[4;38;5;81mTestIPv6Environment(ESC[4;38;5;149mTestCase):
     862  
     863      def setUp(self):
     864          self.server = DummyFTPServer((HOSTv6, 0),
     865                                       af=socket.AF_INET6,
     866                                       encoding=DEFAULT_ENCODING)
     867          self.server.start()
     868          self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING)
     869          self.client.connect(self.server.host, self.server.port)
     870  
     871      def tearDown(self):
     872          self.client.close()
     873          self.server.stop()
     874          # Explicitly clear the attribute to prevent dangling thread
     875          self.server = None
     876          asyncore.close_all(ignore_all=True)
     877  
     878      def test_af(self):
     879          self.assertEqual(self.client.af, socket.AF_INET6)
     880  
     881      def test_makeport(self):
     882          with self.client.makeport():
     883              self.assertEqual(self.server.handler_instance.last_received_cmd,
     884                                  'eprt')
     885  
     886      def test_makepasv(self):
     887          host, port = self.client.makepasv()
     888          conn = socket.create_connection((host, port), timeout=TIMEOUT)
     889          conn.close()
     890          self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
     891  
     892      def test_transfer(self):
     893          def retr():
     894              received = []
     895              self.client.retrbinary('retr', received.append)
     896              self.assertEqual(b''.join(received),
     897                               RETR_DATA.encode(self.client.encoding))
     898          self.client.set_pasv(True)
     899          retr()
     900          self.client.set_pasv(False)
     901          retr()
     902  
     903  
     904  @skipUnless(ssl, "SSL not available")
     905  class ESC[4;38;5;81mTestTLS_FTPClassMixin(ESC[4;38;5;149mTestFTPClass):
     906      """Repeat TestFTPClass tests starting the TLS layer for both control
     907      and data connections first.
     908      """
     909  
     910      def setUp(self, encoding=DEFAULT_ENCODING):
     911          self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
     912          self.server.start()
     913          self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding)
     914          self.client.connect(self.server.host, self.server.port)
     915          # enable TLS
     916          self.client.auth()
     917          self.client.prot_p()
     918  
     919  
     920  @skipUnless(ssl, "SSL not available")
     921  class ESC[4;38;5;81mTestTLS_FTPClass(ESC[4;38;5;149mTestCase):
     922      """Specific TLS_FTP class tests."""
     923  
     924      def setUp(self, encoding=DEFAULT_ENCODING):
     925          self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
     926          self.server.start()
     927          self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
     928          self.client.connect(self.server.host, self.server.port)
     929  
     930      def tearDown(self):
     931          self.client.close()
     932          self.server.stop()
     933          # Explicitly clear the attribute to prevent dangling thread
     934          self.server = None
     935          asyncore.close_all(ignore_all=True)
     936  
     937      def test_control_connection(self):
     938          self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
     939          self.client.auth()
     940          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     941  
     942      def test_data_connection(self):
     943          # clear text
     944          with self.client.transfercmd('list') as sock:
     945              self.assertNotIsInstance(sock, ssl.SSLSocket)
     946              self.assertEqual(sock.recv(1024),
     947                               LIST_DATA.encode(self.client.encoding))
     948          self.assertEqual(self.client.voidresp(), "226 transfer complete")
     949  
     950          # secured, after PROT P
     951          self.client.prot_p()
     952          with self.client.transfercmd('list') as sock:
     953              self.assertIsInstance(sock, ssl.SSLSocket)
     954              # consume from SSL socket to finalize handshake and avoid
     955              # "SSLError [SSL] shutdown while in init"
     956              self.assertEqual(sock.recv(1024),
     957                               LIST_DATA.encode(self.client.encoding))
     958          self.assertEqual(self.client.voidresp(), "226 transfer complete")
     959  
     960          # PROT C is issued, the connection must be in cleartext again
     961          self.client.prot_c()
     962          with self.client.transfercmd('list') as sock:
     963              self.assertNotIsInstance(sock, ssl.SSLSocket)
     964              self.assertEqual(sock.recv(1024),
     965                               LIST_DATA.encode(self.client.encoding))
     966          self.assertEqual(self.client.voidresp(), "226 transfer complete")
     967  
     968      def test_login(self):
     969          # login() is supposed to implicitly secure the control connection
     970          self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
     971          self.client.login()
     972          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     973          # make sure that AUTH TLS doesn't get issued again
     974          self.client.login()
     975  
     976      def test_auth_issued_twice(self):
     977          self.client.auth()
     978          self.assertRaises(ValueError, self.client.auth)
     979  
     980      def test_context(self):
     981          self.client.quit()
     982          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     983          ctx.check_hostname = False
     984          ctx.verify_mode = ssl.CERT_NONE
     985          self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
     986                            context=ctx)
     987          self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
     988                            context=ctx)
     989          self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
     990                            keyfile=CERTFILE, context=ctx)
     991  
     992          self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
     993          self.client.connect(self.server.host, self.server.port)
     994          self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
     995          self.client.auth()
     996          self.assertIs(self.client.sock.context, ctx)
     997          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     998  
     999          self.client.prot_p()
    1000          with self.client.transfercmd('list') as sock:
    1001              self.assertIs(sock.context, ctx)
    1002              self.assertIsInstance(sock, ssl.SSLSocket)
    1003  
    1004      def test_ccc(self):
    1005          self.assertRaises(ValueError, self.client.ccc)
    1006          self.client.login(secure=True)
    1007          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    1008          self.client.ccc()
    1009          self.assertRaises(ValueError, self.client.sock.unwrap)
    1010  
    1011      @skipUnless(False, "FIXME: bpo-32706")
    1012      def test_check_hostname(self):
    1013          self.client.quit()
    1014          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1015          self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
    1016          self.assertEqual(ctx.check_hostname, True)
    1017          ctx.load_verify_locations(CAFILE)
    1018          self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
    1019  
    1020          # 127.0.0.1 doesn't match SAN
    1021          self.client.connect(self.server.host, self.server.port)
    1022          with self.assertRaises(ssl.CertificateError):
    1023              self.client.auth()
    1024          # exception quits connection
    1025  
    1026          self.client.connect(self.server.host, self.server.port)
    1027          self.client.prot_p()
    1028          with self.assertRaises(ssl.CertificateError):
    1029              with self.client.transfercmd("list") as sock:
    1030                  pass
    1031          self.client.quit()
    1032  
    1033          self.client.connect("localhost", self.server.port)
    1034          self.client.auth()
    1035          self.client.quit()
    1036  
    1037          self.client.connect("localhost", self.server.port)
    1038          self.client.prot_p()
    1039          with self.client.transfercmd("list") as sock:
    1040              pass
    1041  
    1042  
    1043  class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
    1044  
    1045      def setUp(self):
    1046          self.evt = threading.Event()
    1047          self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    1048          self.sock.settimeout(20)
    1049          self.port = socket_helper.bind_port(self.sock)
    1050          self.server_thread = threading.Thread(target=self.server)
    1051          self.server_thread.daemon = True
    1052          self.server_thread.start()
    1053          # Wait for the server to be ready.
    1054          self.evt.wait()
    1055          self.evt.clear()
    1056          self.old_port = ftplib.FTP.port
    1057          ftplib.FTP.port = self.port
    1058  
    1059      def tearDown(self):
    1060          ftplib.FTP.port = self.old_port
    1061          self.server_thread.join()
    1062          # Explicitly clear the attribute to prevent dangling thread
    1063          self.server_thread = None
    1064  
    1065      def server(self):
    1066          # This method sets the evt 3 times:
    1067          #  1) when the connection is ready to be accepted.
    1068          #  2) when it is safe for the caller to close the connection
    1069          #  3) when we have closed the socket
    1070          self.sock.listen()
    1071          # (1) Signal the caller that we are ready to accept the connection.
    1072          self.evt.set()
    1073          try:
    1074              conn, addr = self.sock.accept()
    1075          except TimeoutError:
    1076              pass
    1077          else:
    1078              conn.sendall(b"1 Hola mundo\n")
    1079              conn.shutdown(socket.SHUT_WR)
    1080              # (2) Signal the caller that it is safe to close the socket.
    1081              self.evt.set()
    1082              conn.close()
    1083          finally:
    1084              self.sock.close()
    1085  
    1086      def testTimeoutDefault(self):
    1087          # default -- use global socket timeout
    1088          self.assertIsNone(socket.getdefaulttimeout())
    1089          socket.setdefaulttimeout(30)
    1090          try:
    1091              ftp = ftplib.FTP(HOST)
    1092          finally:
    1093              socket.setdefaulttimeout(None)
    1094          self.assertEqual(ftp.sock.gettimeout(), 30)
    1095          self.evt.wait()
    1096          ftp.close()
    1097  
    1098      def testTimeoutNone(self):
    1099          # no timeout -- do not use global socket timeout
    1100          self.assertIsNone(socket.getdefaulttimeout())
    1101          socket.setdefaulttimeout(30)
    1102          try:
    1103              ftp = ftplib.FTP(HOST, timeout=None)
    1104          finally:
    1105              socket.setdefaulttimeout(None)
    1106          self.assertIsNone(ftp.sock.gettimeout())
    1107          self.evt.wait()
    1108          ftp.close()
    1109  
    1110      def testTimeoutValue(self):
    1111          # a value
    1112          ftp = ftplib.FTP(HOST, timeout=30)
    1113          self.assertEqual(ftp.sock.gettimeout(), 30)
    1114          self.evt.wait()
    1115          ftp.close()
    1116  
    1117          # bpo-39259
    1118          with self.assertRaises(ValueError):
    1119              ftplib.FTP(HOST, timeout=0)
    1120  
    1121      def testTimeoutConnect(self):
    1122          ftp = ftplib.FTP()
    1123          ftp.connect(HOST, timeout=30)
    1124          self.assertEqual(ftp.sock.gettimeout(), 30)
    1125          self.evt.wait()
    1126          ftp.close()
    1127  
    1128      def testTimeoutDifferentOrder(self):
    1129          ftp = ftplib.FTP(timeout=30)
    1130          ftp.connect(HOST)
    1131          self.assertEqual(ftp.sock.gettimeout(), 30)
    1132          self.evt.wait()
    1133          ftp.close()
    1134  
    1135      def testTimeoutDirectAccess(self):
    1136          ftp = ftplib.FTP()
    1137          ftp.timeout = 30
    1138          ftp.connect(HOST)
    1139          self.assertEqual(ftp.sock.gettimeout(), 30)
    1140          self.evt.wait()
    1141          ftp.close()
    1142  
    1143  
    1144  class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149mTestCase):
    1145      def test__all__(self):
    1146          not_exported = {
    1147              'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 'Error',
    1148              'parse150', 'parse227', 'parse229', 'parse257', 'print_line',
    1149              'ftpcp', 'test'}
    1150          support.check__all__(self, ftplib, not_exported=not_exported)
    1151  
    1152  
    1153  def setUpModule():
    1154      thread_info = threading_helper.threading_setup()
    1155      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
    1156  
    1157  
    1158  if __name__ == '__main__':
    1159      unittest.main()