(root)/
Python-3.12.0/
Lib/
test/
test_ftplib.py
       1  """Test script for ftplib module."""
       2  
       3  # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
       4  # environment
       5  
       6  import ftplib
       7  import socket
       8  import io
       9  import errno
      10  import os
      11  import threading
      12  import time
      13  import unittest
      14  try:
      15      import ssl
      16  except ImportError:
      17      ssl = None
      18  
      19  from unittest import TestCase, skipUnless
      20  from test import support
      21  from test.support import threading_helper
      22  from test.support import socket_helper
      23  from test.support import warnings_helper
      24  from test.support import asynchat
      25  from test.support import asyncore
      26  from test.support.socket_helper import HOST, HOSTv6
      27  
      28  
      29  support.requires_working_socket(module=True)
      30  
      31  TIMEOUT = support.LOOPBACK_TIMEOUT
      32  DEFAULT_ENCODING = 'utf-8'
      33  # the dummy data returned by server over the data channel when
      34  # RETR, LIST, NLST, MLSD commands are issued
      35  RETR_DATA = 'abcde12345\r\n' * 1000 + 'non-ascii char \xAE\r\n'
      36  LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
      37  NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
      38  MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
      39               "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
      40               "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
      41               "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
      42               "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
      43               "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
      44               "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
      45               "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
      46               "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
      47               "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
      48               "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
      49               "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
      50               "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
      51               "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
      52               "type=file;perm=r;unique==keVO1+1G4; file4\r\n"
      53               "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n"
      54               "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n")
      55  
      56  
      57  def default_error_handler():
      58      # bpo-44359: Silently ignore socket errors. Such errors occur when a client
      59      # socket is closed, in TestFTPClass.tearDown() and makepasv() tests, and
      60      # the server gets an error on its side.
      61      pass
      62  
      63  
      64  class ESC[4;38;5;81mDummyDTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
      65      dtp_conn_closed = False
      66  
      67      def __init__(self, conn, baseclass):
      68          asynchat.async_chat.__init__(self, conn)
      69          self.baseclass = baseclass
      70          self.baseclass.last_received_data = ''
      71          self.encoding = baseclass.encoding
      72  
      73      def handle_read(self):
      74          new_data = self.recv(1024).decode(self.encoding, 'replace')
      75          self.baseclass.last_received_data += new_data
      76  
      77      def handle_close(self):
      78          # XXX: this method can be called many times in a row for a single
      79          # connection, including in clear-text (non-TLS) mode.
      80          # (behaviour witnessed with test_data_connection)
      81          if not self.dtp_conn_closed:
      82              self.baseclass.push('226 transfer complete')
      83              self.close()
      84              self.dtp_conn_closed = True
      85  
      86      def push(self, what):
      87          if self.baseclass.next_data is not None:
      88              what = self.baseclass.next_data
      89              self.baseclass.next_data = None
      90          if not what:
      91              return self.close_when_done()
      92          super(DummyDTPHandler, self).push(what.encode(self.encoding))
      93  
      94      def handle_error(self):
      95          default_error_handler()
      96  
      97  
      98  class ESC[4;38;5;81mDummyFTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
      99  
     100      dtp_handler = DummyDTPHandler
     101  
     102      def __init__(self, conn, encoding=DEFAULT_ENCODING):
     103          asynchat.async_chat.__init__(self, conn)
     104          # tells the socket to handle urgent data inline (ABOR command)
     105          self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
     106          self.set_terminator(b"\r\n")
     107          self.in_buffer = []
     108          self.dtp = None
     109          self.last_received_cmd = None
     110          self.last_received_data = ''
     111          self.next_response = ''
     112          self.next_data = None
     113          self.rest = None
     114          self.next_retr_data = RETR_DATA
     115          self.push('220 welcome')
     116          self.encoding = encoding
     117          # We use this as the string IPv4 address to direct the client
     118          # to in response to a PASV command.  To test security behavior.
     119          # https://bugs.python.org/issue43285/.
     120          self.fake_pasv_server_ip = '252.253.254.255'
     121  
     122      def collect_incoming_data(self, data):
     123          self.in_buffer.append(data)
     124  
     125      def found_terminator(self):
     126          line = b''.join(self.in_buffer).decode(self.encoding)
     127          self.in_buffer = []
     128          if self.next_response:
     129              self.push(self.next_response)
     130              self.next_response = ''
     131          cmd = line.split(' ')[0].lower()
     132          self.last_received_cmd = cmd
     133          space = line.find(' ')
     134          if space != -1:
     135              arg = line[space + 1:]
     136          else:
     137              arg = ""
     138          if hasattr(self, 'cmd_' + cmd):
     139              method = getattr(self, 'cmd_' + cmd)
     140              method(arg)
     141          else:
     142              self.push('550 command "%s" not understood.' %cmd)
     143  
     144      def handle_error(self):
     145          default_error_handler()
     146  
     147      def push(self, data):
     148          asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n')
     149  
     150      def cmd_port(self, arg):
     151          addr = list(map(int, arg.split(',')))
     152          ip = '%d.%d.%d.%d' %tuple(addr[:4])
     153          port = (addr[4] * 256) + addr[5]
     154          s = socket.create_connection((ip, port), timeout=TIMEOUT)
     155          self.dtp = self.dtp_handler(s, baseclass=self)
     156          self.push('200 active data connection established')
     157  
     158      def cmd_pasv(self, arg):
     159          with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
     160              sock.settimeout(TIMEOUT)
     161              port = sock.getsockname()[1]
     162              ip = self.fake_pasv_server_ip
     163              ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
     164              self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
     165              conn, addr = sock.accept()
     166              self.dtp = self.dtp_handler(conn, baseclass=self)
     167  
     168      def cmd_eprt(self, arg):
     169          af, ip, port = arg.split(arg[0])[1:-1]
     170          port = int(port)
     171          s = socket.create_connection((ip, port), timeout=TIMEOUT)
     172          self.dtp = self.dtp_handler(s, baseclass=self)
     173          self.push('200 active data connection established')
     174  
     175      def cmd_epsv(self, arg):
     176          with socket.create_server((self.socket.getsockname()[0], 0),
     177                                    family=socket.AF_INET6) as sock:
     178              sock.settimeout(TIMEOUT)
     179              port = sock.getsockname()[1]
     180              self.push('229 entering extended passive mode (|||%d|)' %port)
     181              conn, addr = sock.accept()
     182              self.dtp = self.dtp_handler(conn, baseclass=self)
     183  
     184      def cmd_echo(self, arg):
     185          # sends back the received string (used by the test suite)
     186          self.push(arg)
     187  
     188      def cmd_noop(self, arg):
     189          self.push('200 noop ok')
     190  
     191      def cmd_user(self, arg):
     192          self.push('331 username ok')
     193  
     194      def cmd_pass(self, arg):
     195          self.push('230 password ok')
     196  
     197      def cmd_acct(self, arg):
     198          self.push('230 acct ok')
     199  
     200      def cmd_rnfr(self, arg):
     201          self.push('350 rnfr ok')
     202  
     203      def cmd_rnto(self, arg):
     204          self.push('250 rnto ok')
     205  
     206      def cmd_dele(self, arg):
     207          self.push('250 dele ok')
     208  
     209      def cmd_cwd(self, arg):
     210          self.push('250 cwd ok')
     211  
     212      def cmd_size(self, arg):
     213          self.push('250 1000')
     214  
     215      def cmd_mkd(self, arg):
     216          self.push('257 "%s"' %arg)
     217  
     218      def cmd_rmd(self, arg):
     219          self.push('250 rmd ok')
     220  
     221      def cmd_pwd(self, arg):
     222          self.push('257 "pwd ok"')
     223  
     224      def cmd_type(self, arg):
     225          self.push('200 type ok')
     226  
     227      def cmd_quit(self, arg):
     228          self.push('221 quit ok')
     229          self.close()
     230  
     231      def cmd_abor(self, arg):
     232          self.push('226 abor ok')
     233  
     234      def cmd_stor(self, arg):
     235          self.push('125 stor ok')
     236  
     237      def cmd_rest(self, arg):
     238          self.rest = arg
     239          self.push('350 rest ok')
     240  
     241      def cmd_retr(self, arg):
     242          self.push('125 retr ok')
     243          if self.rest is not None:
     244              offset = int(self.rest)
     245          else:
     246              offset = 0
     247          self.dtp.push(self.next_retr_data[offset:])
     248          self.dtp.close_when_done()
     249          self.rest = None
     250  
     251      def cmd_list(self, arg):
     252          self.push('125 list ok')
     253          self.dtp.push(LIST_DATA)
     254          self.dtp.close_when_done()
     255  
     256      def cmd_nlst(self, arg):
     257          self.push('125 nlst ok')
     258          self.dtp.push(NLST_DATA)
     259          self.dtp.close_when_done()
     260  
     261      def cmd_opts(self, arg):
     262          self.push('200 opts ok')
     263  
     264      def cmd_mlsd(self, arg):
     265          self.push('125 mlsd ok')
     266          self.dtp.push(MLSD_DATA)
     267          self.dtp.close_when_done()
     268  
     269      def cmd_setlongretr(self, arg):
     270          # For testing. Next RETR will return long line.
     271          self.next_retr_data = 'x' * int(arg)
     272          self.push('125 setlongretr ok')
     273  
     274  
     275  class ESC[4;38;5;81mDummyFTPServer(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     276  
     277      handler = DummyFTPHandler
     278  
     279      def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING):
     280          threading.Thread.__init__(self)
     281          asyncore.dispatcher.__init__(self)
     282          self.daemon = True
     283          self.create_socket(af, socket.SOCK_STREAM)
     284          self.bind(address)
     285          self.listen(5)
     286          self.active = False
     287          self.active_lock = threading.Lock()
     288          self.host, self.port = self.socket.getsockname()[:2]
     289          self.handler_instance = None
     290          self.encoding = encoding
     291  
     292      def start(self):
     293          assert not self.active
     294          self.__flag = threading.Event()
     295          threading.Thread.start(self)
     296          self.__flag.wait()
     297  
     298      def run(self):
     299          self.active = True
     300          self.__flag.set()
     301          while self.active and asyncore.socket_map:
     302              self.active_lock.acquire()
     303              asyncore.loop(timeout=0.1, count=1)
     304              self.active_lock.release()
     305          asyncore.close_all(ignore_all=True)
     306  
     307      def stop(self):
     308          assert self.active
     309          self.active = False
     310          self.join()
     311  
     312      def handle_accepted(self, conn, addr):
     313          self.handler_instance = self.handler(conn, encoding=self.encoding)
     314  
     315      def handle_connect(self):
     316          self.close()
     317      handle_read = handle_connect
     318  
     319      def writable(self):
     320          return 0
     321  
     322      def handle_error(self):
     323          default_error_handler()
     324  
     325  
     326  if ssl is not None:
     327  
     328      CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
     329      CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
     330  
     331      class ESC[4;38;5;81mSSLConnection(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
     332          """An asyncore.dispatcher subclass supporting TLS/SSL."""
     333  
     334          _ssl_accepting = False
     335          _ssl_closing = False
     336  
     337          def secure_connection(self):
     338              context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     339              context.load_cert_chain(CERTFILE)
     340              socket = context.wrap_socket(self.socket,
     341                                           suppress_ragged_eofs=False,
     342                                           server_side=True,
     343                                           do_handshake_on_connect=False)
     344              self.del_channel()
     345              self.set_socket(socket)
     346              self._ssl_accepting = True
     347  
     348          def _do_ssl_handshake(self):
     349              try:
     350                  self.socket.do_handshake()
     351              except ssl.SSLError as err:
     352                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     353                                     ssl.SSL_ERROR_WANT_WRITE):
     354                      return
     355                  elif err.args[0] == ssl.SSL_ERROR_EOF:
     356                      return self.handle_close()
     357                  # TODO: SSLError does not expose alert information
     358                  elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
     359                      return self.handle_close()
     360                  raise
     361              except OSError as err:
     362                  if err.args[0] == errno.ECONNABORTED:
     363                      return self.handle_close()
     364              else:
     365                  self._ssl_accepting = False
     366  
     367          def _do_ssl_shutdown(self):
     368              self._ssl_closing = True
     369              try:
     370                  self.socket = self.socket.unwrap()
     371              except ssl.SSLError as err:
     372                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     373                                     ssl.SSL_ERROR_WANT_WRITE):
     374                      return
     375              except OSError:
     376                  # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
     377                  # from OpenSSL's SSL_shutdown(), corresponding to a
     378                  # closed socket condition. See also:
     379                  # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
     380                  pass
     381              self._ssl_closing = False
     382              if getattr(self, '_ccc', False) is False:
     383                  super(SSLConnection, self).close()
     384              else:
     385                  pass
     386  
     387          def handle_read_event(self):
     388              if self._ssl_accepting:
     389                  self._do_ssl_handshake()
     390              elif self._ssl_closing:
     391                  self._do_ssl_shutdown()
     392              else:
     393                  super(SSLConnection, self).handle_read_event()
     394  
     395          def handle_write_event(self):
     396              if self._ssl_accepting:
     397                  self._do_ssl_handshake()
     398              elif self._ssl_closing:
     399                  self._do_ssl_shutdown()
     400              else:
     401                  super(SSLConnection, self).handle_write_event()
     402  
     403          def send(self, data):
     404              try:
     405                  return super(SSLConnection, self).send(data)
     406              except ssl.SSLError as err:
     407                  if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
     408                                     ssl.SSL_ERROR_WANT_READ,
     409                                     ssl.SSL_ERROR_WANT_WRITE):
     410                      return 0
     411                  raise
     412  
     413          def recv(self, buffer_size):
     414              try:
     415                  return super(SSLConnection, self).recv(buffer_size)
     416              except ssl.SSLError as err:
     417                  if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     418                                     ssl.SSL_ERROR_WANT_WRITE):
     419                      return b''
     420                  if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
     421                      self.handle_close()
     422                      return b''
     423                  raise
     424  
     425          def handle_error(self):
     426              default_error_handler()
     427  
     428          def close(self):
     429              if (isinstance(self.socket, ssl.SSLSocket) and
     430                      self.socket._sslobj is not None):
     431                  self._do_ssl_shutdown()
     432              else:
     433                  super(SSLConnection, self).close()
     434  
     435  
     436      class ESC[4;38;5;81mDummyTLS_DTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyDTPHandler):
     437          """A DummyDTPHandler subclass supporting TLS/SSL."""
     438  
     439          def __init__(self, conn, baseclass):
     440              DummyDTPHandler.__init__(self, conn, baseclass)
     441              if self.baseclass.secure_data_channel:
     442                  self.secure_connection()
     443  
     444  
     445      class ESC[4;38;5;81mDummyTLS_FTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyFTPHandler):
     446          """A DummyFTPHandler subclass supporting TLS/SSL."""
     447  
     448          dtp_handler = DummyTLS_DTPHandler
     449  
     450          def __init__(self, conn, encoding=DEFAULT_ENCODING):
     451              DummyFTPHandler.__init__(self, conn, encoding=encoding)
     452              self.secure_data_channel = False
     453              self._ccc = False
     454  
     455          def cmd_auth(self, line):
     456              """Set up secure control channel."""
     457              self.push('234 AUTH TLS successful')
     458              self.secure_connection()
     459  
     460          def cmd_ccc(self, line):
     461              self.push('220 Reverting back to clear-text')
     462              self._ccc = True
     463              self._do_ssl_shutdown()
     464  
     465          def cmd_pbsz(self, line):
     466              """Negotiate size of buffer for secure data transfer.
     467              For TLS/SSL the only valid value for the parameter is '0'.
     468              Any other value is accepted but ignored.
     469              """
     470              self.push('200 PBSZ=0 successful.')
     471  
     472          def cmd_prot(self, line):
     473              """Setup un/secure data channel."""
     474              arg = line.upper()
     475              if arg == 'C':
     476                  self.push('200 Protection set to Clear')
     477                  self.secure_data_channel = False
     478              elif arg == 'P':
     479                  self.push('200 Protection set to Private')
     480                  self.secure_data_channel = True
     481              else:
     482                  self.push("502 Unrecognized PROT type (use C or P).")
     483  
     484  
     485      class ESC[4;38;5;81mDummyTLS_FTPServer(ESC[4;38;5;149mDummyFTPServer):
     486          handler = DummyTLS_FTPHandler
     487  
     488  
     489  class ESC[4;38;5;81mTestFTPClass(ESC[4;38;5;149mTestCase):
     490  
     491      def setUp(self, encoding=DEFAULT_ENCODING):
     492          self.server = DummyFTPServer((HOST, 0), encoding=encoding)
     493          self.server.start()
     494          self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding)
     495          self.client.connect(self.server.host, self.server.port)
     496  
     497      def tearDown(self):
     498          self.client.close()
     499          self.server.stop()
     500          # Explicitly clear the attribute to prevent dangling thread
     501          self.server = None
     502          asyncore.close_all(ignore_all=True)
     503  
     504      def check_data(self, received, expected):
     505          self.assertEqual(len(received), len(expected))
     506          self.assertEqual(received, expected)
     507  
     508      def test_getwelcome(self):
     509          self.assertEqual(self.client.getwelcome(), '220 welcome')
     510  
     511      def test_sanitize(self):
     512          self.assertEqual(self.client.sanitize('foo'), repr('foo'))
     513          self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
     514          self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
     515  
     516      def test_exceptions(self):
     517          self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
     518          self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
     519          self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
     520          self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
     521          self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
     522          self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
     523          self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
     524          self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
     525  
     526      def test_all_errors(self):
     527          exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
     528                        ftplib.error_proto, ftplib.Error, OSError,
     529                        EOFError)
     530          for x in exceptions:
     531              try:
     532                  raise x('exception not included in all_errors set')
     533              except ftplib.all_errors:
     534                  pass
     535  
     536      def test_set_pasv(self):
     537          # passive mode is supposed to be enabled by default
     538          self.assertTrue(self.client.passiveserver)
     539          self.client.set_pasv(True)
     540          self.assertTrue(self.client.passiveserver)
     541          self.client.set_pasv(False)
     542          self.assertFalse(self.client.passiveserver)
     543  
     544      def test_voidcmd(self):
     545          self.client.voidcmd('echo 200')
     546          self.client.voidcmd('echo 299')
     547          self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
     548          self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
     549  
     550      def test_login(self):
     551          self.client.login()
     552  
     553      def test_acct(self):
     554          self.client.acct('passwd')
     555  
     556      def test_rename(self):
     557          self.client.rename('a', 'b')
     558          self.server.handler_instance.next_response = '200'
     559          self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
     560  
     561      def test_delete(self):
     562          self.client.delete('foo')
     563          self.server.handler_instance.next_response = '199'
     564          self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
     565  
     566      def test_size(self):
     567          self.client.size('foo')
     568  
     569      def test_mkd(self):
     570          dir = self.client.mkd('/foo')
     571          self.assertEqual(dir, '/foo')
     572  
     573      def test_rmd(self):
     574          self.client.rmd('foo')
     575  
     576      def test_cwd(self):
     577          dir = self.client.cwd('/foo')
     578          self.assertEqual(dir, '250 cwd ok')
     579  
     580      def test_pwd(self):
     581          dir = self.client.pwd()
     582          self.assertEqual(dir, 'pwd ok')
     583  
     584      def test_quit(self):
     585          self.assertEqual(self.client.quit(), '221 quit ok')
     586          # Ensure the connection gets closed; sock attribute should be None
     587          self.assertEqual(self.client.sock, None)
     588  
     589      def test_abort(self):
     590          self.client.abort()
     591  
     592      def test_retrbinary(self):
     593          def callback(data):
     594              received.append(data.decode(self.client.encoding))
     595          received = []
     596          self.client.retrbinary('retr', callback)
     597          self.check_data(''.join(received), RETR_DATA)
     598  
     599      def test_retrbinary_rest(self):
     600          def callback(data):
     601              received.append(data.decode(self.client.encoding))
     602          for rest in (0, 10, 20):
     603              received = []
     604              self.client.retrbinary('retr', callback, rest=rest)
     605              self.check_data(''.join(received), RETR_DATA[rest:])
     606  
     607      def test_retrlines(self):
     608          received = []
     609          self.client.retrlines('retr', received.append)
     610          self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
     611  
     612      def test_storbinary(self):
     613          f = io.BytesIO(RETR_DATA.encode(self.client.encoding))
     614          self.client.storbinary('stor', f)
     615          self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
     616          # test new callback arg
     617          flag = []
     618          f.seek(0)
     619          self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
     620          self.assertTrue(flag)
     621  
     622      def test_storbinary_rest(self):
     623          data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
     624          f = io.BytesIO(data)
     625          for r in (30, '30'):
     626              f.seek(0)
     627              self.client.storbinary('stor', f, rest=r)
     628              self.assertEqual(self.server.handler_instance.rest, str(r))
     629  
     630      def test_storlines(self):
     631          data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
     632          f = io.BytesIO(data)
     633          self.client.storlines('stor', f)
     634          self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
     635          # test new callback arg
     636          flag = []
     637          f.seek(0)
     638          self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
     639          self.assertTrue(flag)
     640  
     641          f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
     642          # storlines() expects a binary file, not a text file
     643          with warnings_helper.check_warnings(('', BytesWarning), quiet=True):
     644              self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
     645  
     646      def test_nlst(self):
     647          self.client.nlst()
     648          self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
     649  
     650      def test_dir(self):
     651          l = []
     652          self.client.dir(lambda x: l.append(x))
     653          self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
     654  
     655      def test_mlsd(self):
     656          list(self.client.mlsd())
     657          list(self.client.mlsd(path='/'))
     658          list(self.client.mlsd(path='/', facts=['size', 'type']))
     659  
     660          ls = list(self.client.mlsd())
     661          for name, facts in ls:
     662              self.assertIsInstance(name, str)
     663              self.assertIsInstance(facts, dict)
     664              self.assertTrue(name)
     665              self.assertIn('type', facts)
     666              self.assertIn('perm', facts)
     667              self.assertIn('unique', facts)
     668  
     669          def set_data(data):
     670              self.server.handler_instance.next_data = data
     671  
     672          def test_entry(line, type=None, perm=None, unique=None, name=None):
     673              type = 'type' if type is None else type
     674              perm = 'perm' if perm is None else perm
     675              unique = 'unique' if unique is None else unique
     676              name = 'name' if name is None else name
     677              set_data(line)
     678              _name, facts = next(self.client.mlsd())
     679              self.assertEqual(_name, name)
     680              self.assertEqual(facts['type'], type)
     681              self.assertEqual(facts['perm'], perm)
     682              self.assertEqual(facts['unique'], unique)
     683  
     684          # plain
     685          test_entry('type=type;perm=perm;unique=unique; name\r\n')
     686          # "=" in fact value
     687          test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
     688          test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
     689          test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
     690          test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
     691          # spaces in name
     692          test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
     693          test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
     694          test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
     695          test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
     696          # ";" in name
     697          test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
     698          test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
     699          test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
     700          test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
     701          # case sensitiveness
     702          set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
     703          _name, facts = next(self.client.mlsd())
     704          for x in facts:
     705              self.assertTrue(x.islower())
     706          # no data (directory empty)
     707          set_data('')
     708          self.assertRaises(StopIteration, next, self.client.mlsd())
     709          set_data('')
     710          for x in self.client.mlsd():
     711              self.fail("unexpected data %s" % x)
     712  
     713      def test_makeport(self):
     714          with self.client.makeport():
     715              # IPv4 is in use, just make sure send_eprt has not been used
     716              self.assertEqual(self.server.handler_instance.last_received_cmd,
     717                                  'port')
     718  
     719      def test_makepasv(self):
     720          host, port = self.client.makepasv()
     721          conn = socket.create_connection((host, port), timeout=TIMEOUT)
     722          conn.close()
     723          # IPv4 is in use, just make sure send_epsv has not been used
     724          self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
     725  
     726      def test_makepasv_issue43285_security_disabled(self):
     727          """Test the opt-in to the old vulnerable behavior."""
     728          self.client.trust_server_pasv_ipv4_address = True
     729          bad_host, port = self.client.makepasv()
     730          self.assertEqual(
     731                  bad_host, self.server.handler_instance.fake_pasv_server_ip)
     732          # Opening and closing a connection keeps the dummy server happy
     733          # instead of timing out on accept.
     734          socket.create_connection((self.client.sock.getpeername()[0], port),
     735                                   timeout=TIMEOUT).close()
     736  
     737      def test_makepasv_issue43285_security_enabled_default(self):
     738          self.assertFalse(self.client.trust_server_pasv_ipv4_address)
     739          trusted_host, port = self.client.makepasv()
     740          self.assertNotEqual(
     741                  trusted_host, self.server.handler_instance.fake_pasv_server_ip)
     742          # Opening and closing a connection keeps the dummy server happy
     743          # instead of timing out on accept.
     744          socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
     745  
     746      def test_with_statement(self):
     747          self.client.quit()
     748  
     749          def is_client_connected():
     750              if self.client.sock is None:
     751                  return False
     752              try:
     753                  self.client.sendcmd('noop')
     754              except (OSError, EOFError):
     755                  return False
     756              return True
     757  
     758          # base test
     759          with ftplib.FTP(timeout=TIMEOUT) as self.client:
     760              self.client.connect(self.server.host, self.server.port)
     761              self.client.sendcmd('noop')
     762              self.assertTrue(is_client_connected())
     763          self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
     764          self.assertFalse(is_client_connected())
     765  
     766          # QUIT sent inside the with block
     767          with ftplib.FTP(timeout=TIMEOUT) as self.client:
     768              self.client.connect(self.server.host, self.server.port)
     769              self.client.sendcmd('noop')
     770              self.client.quit()
     771          self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
     772          self.assertFalse(is_client_connected())
     773  
     774          # force a wrong response code to be sent on QUIT: error_perm
     775          # is expected and the connection is supposed to be closed
     776          try:
     777              with ftplib.FTP(timeout=TIMEOUT) as self.client:
     778                  self.client.connect(self.server.host, self.server.port)
     779                  self.client.sendcmd('noop')
     780                  self.server.handler_instance.next_response = '550 error on quit'
     781          except ftplib.error_perm as err:
     782              self.assertEqual(str(err), '550 error on quit')
     783          else:
     784              self.fail('Exception not raised')
     785          # needed to give the threaded server some time to set the attribute
     786          # which otherwise would still be == 'noop'
     787          time.sleep(0.1)
     788          self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
     789          self.assertFalse(is_client_connected())
     790  
     791      def test_source_address(self):
     792          self.client.quit()
     793          port = socket_helper.find_unused_port()
     794          try:
     795              self.client.connect(self.server.host, self.server.port,
     796                                  source_address=(HOST, port))
     797              self.assertEqual(self.client.sock.getsockname()[1], port)
     798              self.client.quit()
     799          except OSError as e:
     800              if e.errno == errno.EADDRINUSE:
     801                  self.skipTest("couldn't bind to port %d" % port)
     802              raise
     803  
     804      def test_source_address_passive_connection(self):
     805          port = socket_helper.find_unused_port()
     806          self.client.source_address = (HOST, port)
     807          try:
     808              with self.client.transfercmd('list') as sock:
     809                  self.assertEqual(sock.getsockname()[1], port)
     810          except OSError as e:
     811              if e.errno == errno.EADDRINUSE:
     812                  self.skipTest("couldn't bind to port %d" % port)
     813              raise
     814  
     815      def test_parse257(self):
     816          self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
     817          self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
     818          self.assertEqual(ftplib.parse257('257 ""'), '')
     819          self.assertEqual(ftplib.parse257('257 "" created'), '')
     820          self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
     821          # The 257 response is supposed to include the directory
     822          # name and in case it contains embedded double-quotes
     823          # they must be doubled (see RFC-959, chapter 7, appendix 2).
     824          self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
     825          self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
     826  
     827      def test_line_too_long(self):
     828          self.assertRaises(ftplib.Error, self.client.sendcmd,
     829                            'x' * self.client.maxline * 2)
     830  
     831      def test_retrlines_too_long(self):
     832          self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
     833          received = []
     834          self.assertRaises(ftplib.Error,
     835                            self.client.retrlines, 'retr', received.append)
     836  
     837      def test_storlines_too_long(self):
     838          f = io.BytesIO(b'x' * self.client.maxline * 2)
     839          self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
     840  
     841      def test_encoding_param(self):
     842          encodings = ['latin-1', 'utf-8']
     843          for encoding in encodings:
     844              with self.subTest(encoding=encoding):
     845                  self.tearDown()
     846                  self.setUp(encoding=encoding)
     847                  self.assertEqual(encoding, self.client.encoding)
     848                  self.test_retrbinary()
     849                  self.test_storbinary()
     850                  self.test_retrlines()
     851                  new_dir = self.client.mkd('/non-ascii dir \xAE')
     852                  self.check_data(new_dir, '/non-ascii dir \xAE')
     853          # Check default encoding
     854          client = ftplib.FTP(timeout=TIMEOUT)
     855          self.assertEqual(DEFAULT_ENCODING, client.encoding)
     856  
     857  
     858  @skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
     859  class ESC[4;38;5;81mTestIPv6Environment(ESC[4;38;5;149mTestCase):
     860  
     861      def setUp(self):
     862          self.server = DummyFTPServer((HOSTv6, 0),
     863                                       af=socket.AF_INET6,
     864                                       encoding=DEFAULT_ENCODING)
     865          self.server.start()
     866          self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING)
     867          self.client.connect(self.server.host, self.server.port)
     868  
     869      def tearDown(self):
     870          self.client.close()
     871          self.server.stop()
     872          # Explicitly clear the attribute to prevent dangling thread
     873          self.server = None
     874          asyncore.close_all(ignore_all=True)
     875  
     876      def test_af(self):
     877          self.assertEqual(self.client.af, socket.AF_INET6)
     878  
     879      def test_makeport(self):
     880          with self.client.makeport():
     881              self.assertEqual(self.server.handler_instance.last_received_cmd,
     882                                  'eprt')
     883  
     884      def test_makepasv(self):
     885          host, port = self.client.makepasv()
     886          conn = socket.create_connection((host, port), timeout=TIMEOUT)
     887          conn.close()
     888          self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
     889  
     890      def test_transfer(self):
     891          def retr():
     892              def callback(data):
     893                  received.append(data.decode(self.client.encoding))
     894              received = []
     895              self.client.retrbinary('retr', callback)
     896              self.assertEqual(len(''.join(received)), len(RETR_DATA))
     897              self.assertEqual(''.join(received), RETR_DATA)
     898          self.client.set_pasv(True)
     899          retr()
     900          self.client.set_pasv(False)
     901          retr()
     902  
     903  
     904  @skipUnless(ssl, "SSL not available")
     905  class ESC[4;38;5;81mTestTLS_FTPClassMixin(ESC[4;38;5;149mTestFTPClass):
     906      """Repeat TestFTPClass tests starting the TLS layer for both control
     907      and data connections first.
     908      """
     909  
     910      def setUp(self, encoding=DEFAULT_ENCODING):
     911          self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
     912          self.server.start()
     913          self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding)
     914          self.client.connect(self.server.host, self.server.port)
     915          # enable TLS
     916          self.client.auth()
     917          self.client.prot_p()
     918  
     919  
     920  @skipUnless(ssl, "SSL not available")
     921  class ESC[4;38;5;81mTestTLS_FTPClass(ESC[4;38;5;149mTestCase):
     922      """Specific TLS_FTP class tests."""
     923  
     924      def setUp(self, encoding=DEFAULT_ENCODING):
     925          self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
     926          self.server.start()
     927          self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
     928          self.client.connect(self.server.host, self.server.port)
     929  
     930      def tearDown(self):
     931          self.client.close()
     932          self.server.stop()
     933          # Explicitly clear the attribute to prevent dangling thread
     934          self.server = None
     935          asyncore.close_all(ignore_all=True)
     936  
     937      def test_control_connection(self):
     938          self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
     939          self.client.auth()
     940          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     941  
     942      def test_data_connection(self):
     943          # clear text
     944          with self.client.transfercmd('list') as sock:
     945              self.assertNotIsInstance(sock, ssl.SSLSocket)
     946              self.assertEqual(sock.recv(1024),
     947                               LIST_DATA.encode(self.client.encoding))
     948          self.assertEqual(self.client.voidresp(), "226 transfer complete")
     949  
     950          # secured, after PROT P
     951          self.client.prot_p()
     952          with self.client.transfercmd('list') as sock:
     953              self.assertIsInstance(sock, ssl.SSLSocket)
     954              # consume from SSL socket to finalize handshake and avoid
     955              # "SSLError [SSL] shutdown while in init"
     956              self.assertEqual(sock.recv(1024),
     957                               LIST_DATA.encode(self.client.encoding))
     958          self.assertEqual(self.client.voidresp(), "226 transfer complete")
     959  
     960          # PROT C is issued, the connection must be in cleartext again
     961          self.client.prot_c()
     962          with self.client.transfercmd('list') as sock:
     963              self.assertNotIsInstance(sock, ssl.SSLSocket)
     964              self.assertEqual(sock.recv(1024),
     965                               LIST_DATA.encode(self.client.encoding))
     966          self.assertEqual(self.client.voidresp(), "226 transfer complete")
     967  
     968      def test_login(self):
     969          # login() is supposed to implicitly secure the control connection
     970          self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
     971          self.client.login()
     972          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     973          # make sure that AUTH TLS doesn't get issued again
     974          self.client.login()
     975  
     976      def test_auth_issued_twice(self):
     977          self.client.auth()
     978          self.assertRaises(ValueError, self.client.auth)
     979  
     980      def test_context(self):
     981          self.client.quit()
     982          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     983          ctx.check_hostname = False
     984          ctx.verify_mode = ssl.CERT_NONE
     985          self.assertRaises(TypeError, ftplib.FTP_TLS, keyfile=CERTFILE,
     986                            context=ctx)
     987          self.assertRaises(TypeError, ftplib.FTP_TLS, certfile=CERTFILE,
     988                            context=ctx)
     989          self.assertRaises(TypeError, ftplib.FTP_TLS, certfile=CERTFILE,
     990                            keyfile=CERTFILE, context=ctx)
     991  
     992          self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
     993          self.client.connect(self.server.host, self.server.port)
     994          self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
     995          self.client.auth()
     996          self.assertIs(self.client.sock.context, ctx)
     997          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
     998  
     999          self.client.prot_p()
    1000          with self.client.transfercmd('list') as sock:
    1001              self.assertIs(sock.context, ctx)
    1002              self.assertIsInstance(sock, ssl.SSLSocket)
    1003  
    1004      def test_ccc(self):
    1005          self.assertRaises(ValueError, self.client.ccc)
    1006          self.client.login(secure=True)
    1007          self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    1008          self.client.ccc()
    1009          self.assertRaises(ValueError, self.client.sock.unwrap)
    1010  
    1011      @skipUnless(False, "FIXME: bpo-32706")
    1012      def test_check_hostname(self):
    1013          self.client.quit()
    1014          ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    1015          self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
    1016          self.assertEqual(ctx.check_hostname, True)
    1017          ctx.load_verify_locations(CAFILE)
    1018          self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
    1019  
    1020          # 127.0.0.1 doesn't match SAN
    1021          self.client.connect(self.server.host, self.server.port)
    1022          with self.assertRaises(ssl.CertificateError):
    1023              self.client.auth()
    1024          # exception quits connection
    1025  
    1026          self.client.connect(self.server.host, self.server.port)
    1027          self.client.prot_p()
    1028          with self.assertRaises(ssl.CertificateError):
    1029              with self.client.transfercmd("list") as sock:
    1030                  pass
    1031          self.client.quit()
    1032  
    1033          self.client.connect("localhost", self.server.port)
    1034          self.client.auth()
    1035          self.client.quit()
    1036  
    1037          self.client.connect("localhost", self.server.port)
    1038          self.client.prot_p()
    1039          with self.client.transfercmd("list") as sock:
    1040              pass
    1041  
    1042  
    1043  class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
    1044  
    1045      def setUp(self):
    1046          self.evt = threading.Event()
    1047          self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    1048          self.sock.settimeout(20)
    1049          self.port = socket_helper.bind_port(self.sock)
    1050          self.server_thread = threading.Thread(target=self.server)
    1051          self.server_thread.daemon = True
    1052          self.server_thread.start()
    1053          # Wait for the server to be ready.
    1054          self.evt.wait()
    1055          self.evt.clear()
    1056          self.old_port = ftplib.FTP.port
    1057          ftplib.FTP.port = self.port
    1058  
    1059      def tearDown(self):
    1060          ftplib.FTP.port = self.old_port
    1061          self.server_thread.join()
    1062          # Explicitly clear the attribute to prevent dangling thread
    1063          self.server_thread = None
    1064  
    1065      def server(self):
    1066          # This method sets the evt 3 times:
    1067          #  1) when the connection is ready to be accepted.
    1068          #  2) when it is safe for the caller to close the connection
    1069          #  3) when we have closed the socket
    1070          self.sock.listen()
    1071          # (1) Signal the caller that we are ready to accept the connection.
    1072          self.evt.set()
    1073          try:
    1074              conn, addr = self.sock.accept()
    1075          except TimeoutError:
    1076              pass
    1077          else:
    1078              conn.sendall(b"1 Hola mundo\n")
    1079              conn.shutdown(socket.SHUT_WR)
    1080              # (2) Signal the caller that it is safe to close the socket.
    1081              self.evt.set()
    1082              conn.close()
    1083          finally:
    1084              self.sock.close()
    1085  
    1086      def testTimeoutDefault(self):
    1087          # default -- use global socket timeout
    1088          self.assertIsNone(socket.getdefaulttimeout())
    1089          socket.setdefaulttimeout(30)
    1090          try:
    1091              ftp = ftplib.FTP(HOST)
    1092          finally:
    1093              socket.setdefaulttimeout(None)
    1094          self.assertEqual(ftp.sock.gettimeout(), 30)
    1095          self.evt.wait()
    1096          ftp.close()
    1097  
    1098      def testTimeoutNone(self):
    1099          # no timeout -- do not use global socket timeout
    1100          self.assertIsNone(socket.getdefaulttimeout())
    1101          socket.setdefaulttimeout(30)
    1102          try:
    1103              ftp = ftplib.FTP(HOST, timeout=None)
    1104          finally:
    1105              socket.setdefaulttimeout(None)
    1106          self.assertIsNone(ftp.sock.gettimeout())
    1107          self.evt.wait()
    1108          ftp.close()
    1109  
    1110      def testTimeoutValue(self):
    1111          # a value
    1112          ftp = ftplib.FTP(HOST, timeout=30)
    1113          self.assertEqual(ftp.sock.gettimeout(), 30)
    1114          self.evt.wait()
    1115          ftp.close()
    1116  
    1117          # bpo-39259
    1118          with self.assertRaises(ValueError):
    1119              ftplib.FTP(HOST, timeout=0)
    1120  
    1121      def testTimeoutConnect(self):
    1122          ftp = ftplib.FTP()
    1123          ftp.connect(HOST, timeout=30)
    1124          self.assertEqual(ftp.sock.gettimeout(), 30)
    1125          self.evt.wait()
    1126          ftp.close()
    1127  
    1128      def testTimeoutDifferentOrder(self):
    1129          ftp = ftplib.FTP(timeout=30)
    1130          ftp.connect(HOST)
    1131          self.assertEqual(ftp.sock.gettimeout(), 30)
    1132          self.evt.wait()
    1133          ftp.close()
    1134  
    1135      def testTimeoutDirectAccess(self):
    1136          ftp = ftplib.FTP()
    1137          ftp.timeout = 30
    1138          ftp.connect(HOST)
    1139          self.assertEqual(ftp.sock.gettimeout(), 30)
    1140          self.evt.wait()
    1141          ftp.close()
    1142  
    1143  
    1144  class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149mTestCase):
    1145      def test__all__(self):
    1146          not_exported = {
    1147              'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 'Error',
    1148              'parse150', 'parse227', 'parse229', 'parse257', 'print_line',
    1149              'ftpcp', 'test'}
    1150          support.check__all__(self, ftplib, not_exported=not_exported)
    1151  
    1152  
    1153  def setUpModule():
    1154      thread_info = threading_helper.threading_setup()
    1155      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
    1156  
    1157  
    1158  if __name__ == '__main__':
    1159      unittest.main()