1 """Test script for ftplib module."""
2
3 # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4 # environment
5
6 import ftplib
7 import socket
8 import io
9 import errno
10 import os
11 import threading
12 import time
13 import unittest
14 try:
15 import ssl
16 except ImportError:
17 ssl = None
18
19 from unittest import TestCase, skipUnless
20 from test import support
21 from test.support import threading_helper
22 from test.support import socket_helper
23 from test.support import warnings_helper
24 from test.support.socket_helper import HOST, HOSTv6
25
26
27 asynchat = warnings_helper.import_deprecated('asynchat')
28 asyncore = warnings_helper.import_deprecated('asyncore')
29
30
31 support.requires_working_socket(module=True)
32
33 TIMEOUT = support.LOOPBACK_TIMEOUT
34 DEFAULT_ENCODING = 'utf-8'
35 # the dummy data returned by server over the data channel when
36 # RETR, LIST, NLST, MLSD commands are issued
37 RETR_DATA = 'abcde\xB9\xB2\xB3\xA4\xA6\r\n' * 1000
38 LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
39 NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
40 MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
41 "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
42 "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
43 "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
44 "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
45 "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
46 "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
47 "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
48 "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
49 "type=file;perm=r;unique==keVO1+IH4; leading space\r\n"
50 "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
51 "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
52 "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
53 "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
54 "type=file;perm=r;unique==keVO1+1G4; file4\r\n"
55 "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n"
56 "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n")
57
58
59 def default_error_handler():
60 # bpo-44359: Silently ignore socket errors. Such errors occur when a client
61 # socket is closed, in TestFTPClass.tearDown() and makepasv() tests, and
62 # the server gets an error on its side.
63 pass
64
65
66 class ESC[4;38;5;81mDummyDTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
67 dtp_conn_closed = False
68
69 def __init__(self, conn, baseclass):
70 asynchat.async_chat.__init__(self, conn)
71 self.baseclass = baseclass
72 self.baseclass.last_received_data = bytearray()
73 self.encoding = baseclass.encoding
74
75 def handle_read(self):
76 new_data = self.recv(1024)
77 self.baseclass.last_received_data += new_data
78
79 def handle_close(self):
80 # XXX: this method can be called many times in a row for a single
81 # connection, including in clear-text (non-TLS) mode.
82 # (behaviour witnessed with test_data_connection)
83 if not self.dtp_conn_closed:
84 self.baseclass.push('226 transfer complete')
85 self.close()
86 self.dtp_conn_closed = True
87
88 def push(self, what):
89 if self.baseclass.next_data is not None:
90 what = self.baseclass.next_data
91 self.baseclass.next_data = None
92 if not what:
93 return self.close_when_done()
94 super(DummyDTPHandler, self).push(what.encode(self.encoding))
95
96 def handle_error(self):
97 default_error_handler()
98
99
100 class ESC[4;38;5;81mDummyFTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
101
102 dtp_handler = DummyDTPHandler
103
104 def __init__(self, conn, encoding=DEFAULT_ENCODING):
105 asynchat.async_chat.__init__(self, conn)
106 # tells the socket to handle urgent data inline (ABOR command)
107 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
108 self.set_terminator(b"\r\n")
109 self.in_buffer = []
110 self.dtp = None
111 self.last_received_cmd = None
112 self.last_received_data = bytearray()
113 self.next_response = ''
114 self.next_data = None
115 self.rest = None
116 self.next_retr_data = RETR_DATA
117 self.push('220 welcome')
118 self.encoding = encoding
119 # We use this as the string IPv4 address to direct the client
120 # to in response to a PASV command. To test security behavior.
121 # https://bugs.python.org/issue43285/.
122 self.fake_pasv_server_ip = '252.253.254.255'
123
124 def collect_incoming_data(self, data):
125 self.in_buffer.append(data)
126
127 def found_terminator(self):
128 line = b''.join(self.in_buffer).decode(self.encoding)
129 self.in_buffer = []
130 if self.next_response:
131 self.push(self.next_response)
132 self.next_response = ''
133 cmd = line.split(' ')[0].lower()
134 self.last_received_cmd = cmd
135 space = line.find(' ')
136 if space != -1:
137 arg = line[space + 1:]
138 else:
139 arg = ""
140 if hasattr(self, 'cmd_' + cmd):
141 method = getattr(self, 'cmd_' + cmd)
142 method(arg)
143 else:
144 self.push('550 command "%s" not understood.' %cmd)
145
146 def handle_error(self):
147 default_error_handler()
148
149 def push(self, data):
150 asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n')
151
152 def cmd_port(self, arg):
153 addr = list(map(int, arg.split(',')))
154 ip = '%d.%d.%d.%d' %tuple(addr[:4])
155 port = (addr[4] * 256) + addr[5]
156 s = socket.create_connection((ip, port), timeout=TIMEOUT)
157 self.dtp = self.dtp_handler(s, baseclass=self)
158 self.push('200 active data connection established')
159
160 def cmd_pasv(self, arg):
161 with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
162 sock.settimeout(TIMEOUT)
163 port = sock.getsockname()[1]
164 ip = self.fake_pasv_server_ip
165 ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
166 self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
167 conn, addr = sock.accept()
168 self.dtp = self.dtp_handler(conn, baseclass=self)
169
170 def cmd_eprt(self, arg):
171 af, ip, port = arg.split(arg[0])[1:-1]
172 port = int(port)
173 s = socket.create_connection((ip, port), timeout=TIMEOUT)
174 self.dtp = self.dtp_handler(s, baseclass=self)
175 self.push('200 active data connection established')
176
177 def cmd_epsv(self, arg):
178 with socket.create_server((self.socket.getsockname()[0], 0),
179 family=socket.AF_INET6) as sock:
180 sock.settimeout(TIMEOUT)
181 port = sock.getsockname()[1]
182 self.push('229 entering extended passive mode (|||%d|)' %port)
183 conn, addr = sock.accept()
184 self.dtp = self.dtp_handler(conn, baseclass=self)
185
186 def cmd_echo(self, arg):
187 # sends back the received string (used by the test suite)
188 self.push(arg)
189
190 def cmd_noop(self, arg):
191 self.push('200 noop ok')
192
193 def cmd_user(self, arg):
194 self.push('331 username ok')
195
196 def cmd_pass(self, arg):
197 self.push('230 password ok')
198
199 def cmd_acct(self, arg):
200 self.push('230 acct ok')
201
202 def cmd_rnfr(self, arg):
203 self.push('350 rnfr ok')
204
205 def cmd_rnto(self, arg):
206 self.push('250 rnto ok')
207
208 def cmd_dele(self, arg):
209 self.push('250 dele ok')
210
211 def cmd_cwd(self, arg):
212 self.push('250 cwd ok')
213
214 def cmd_size(self, arg):
215 self.push('250 1000')
216
217 def cmd_mkd(self, arg):
218 self.push('257 "%s"' %arg)
219
220 def cmd_rmd(self, arg):
221 self.push('250 rmd ok')
222
223 def cmd_pwd(self, arg):
224 self.push('257 "pwd ok"')
225
226 def cmd_type(self, arg):
227 self.push('200 type ok')
228
229 def cmd_quit(self, arg):
230 self.push('221 quit ok')
231 self.close()
232
233 def cmd_abor(self, arg):
234 self.push('226 abor ok')
235
236 def cmd_stor(self, arg):
237 self.push('125 stor ok')
238
239 def cmd_rest(self, arg):
240 self.rest = arg
241 self.push('350 rest ok')
242
243 def cmd_retr(self, arg):
244 self.push('125 retr ok')
245 if self.rest is not None:
246 offset = int(self.rest)
247 else:
248 offset = 0
249 self.dtp.push(self.next_retr_data[offset:])
250 self.dtp.close_when_done()
251 self.rest = None
252
253 def cmd_list(self, arg):
254 self.push('125 list ok')
255 self.dtp.push(LIST_DATA)
256 self.dtp.close_when_done()
257
258 def cmd_nlst(self, arg):
259 self.push('125 nlst ok')
260 self.dtp.push(NLST_DATA)
261 self.dtp.close_when_done()
262
263 def cmd_opts(self, arg):
264 self.push('200 opts ok')
265
266 def cmd_mlsd(self, arg):
267 self.push('125 mlsd ok')
268 self.dtp.push(MLSD_DATA)
269 self.dtp.close_when_done()
270
271 def cmd_setlongretr(self, arg):
272 # For testing. Next RETR will return long line.
273 self.next_retr_data = 'x' * int(arg)
274 self.push('125 setlongretr ok')
275
276
277 class ESC[4;38;5;81mDummyFTPServer(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
278
279 handler = DummyFTPHandler
280
281 def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING):
282 threading.Thread.__init__(self)
283 asyncore.dispatcher.__init__(self)
284 self.daemon = True
285 self.create_socket(af, socket.SOCK_STREAM)
286 self.bind(address)
287 self.listen(5)
288 self.active = False
289 self.active_lock = threading.Lock()
290 self.host, self.port = self.socket.getsockname()[:2]
291 self.handler_instance = None
292 self.encoding = encoding
293
294 def start(self):
295 assert not self.active
296 self.__flag = threading.Event()
297 threading.Thread.start(self)
298 self.__flag.wait()
299
300 def run(self):
301 self.active = True
302 self.__flag.set()
303 while self.active and asyncore.socket_map:
304 self.active_lock.acquire()
305 asyncore.loop(timeout=0.1, count=1)
306 self.active_lock.release()
307 asyncore.close_all(ignore_all=True)
308
309 def stop(self):
310 assert self.active
311 self.active = False
312 self.join()
313
314 def handle_accepted(self, conn, addr):
315 self.handler_instance = self.handler(conn, encoding=self.encoding)
316
317 def handle_connect(self):
318 self.close()
319 handle_read = handle_connect
320
321 def writable(self):
322 return 0
323
324 def handle_error(self):
325 default_error_handler()
326
327
328 if ssl is not None:
329
330 CERTFILE = os.path.join(os.path.dirname(__file__), "certdata", "keycert3.pem")
331 CAFILE = os.path.join(os.path.dirname(__file__), "certdata", "pycacert.pem")
332
333 class ESC[4;38;5;81mSSLConnection(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
334 """An asyncore.dispatcher subclass supporting TLS/SSL."""
335
336 _ssl_accepting = False
337 _ssl_closing = False
338
339 def secure_connection(self):
340 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
341 context.load_cert_chain(CERTFILE)
342 socket = context.wrap_socket(self.socket,
343 suppress_ragged_eofs=False,
344 server_side=True,
345 do_handshake_on_connect=False)
346 self.del_channel()
347 self.set_socket(socket)
348 self._ssl_accepting = True
349
350 def _do_ssl_handshake(self):
351 try:
352 self.socket.do_handshake()
353 except ssl.SSLError as err:
354 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
355 ssl.SSL_ERROR_WANT_WRITE):
356 return
357 elif err.args[0] == ssl.SSL_ERROR_EOF:
358 return self.handle_close()
359 # TODO: SSLError does not expose alert information
360 elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
361 return self.handle_close()
362 raise
363 except OSError as err:
364 if err.args[0] == errno.ECONNABORTED:
365 return self.handle_close()
366 else:
367 self._ssl_accepting = False
368
369 def _do_ssl_shutdown(self):
370 self._ssl_closing = True
371 try:
372 self.socket = self.socket.unwrap()
373 except ssl.SSLError as err:
374 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
375 ssl.SSL_ERROR_WANT_WRITE):
376 return
377 except OSError:
378 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
379 # from OpenSSL's SSL_shutdown(), corresponding to a
380 # closed socket condition. See also:
381 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
382 pass
383 self._ssl_closing = False
384 if getattr(self, '_ccc', False) is False:
385 super(SSLConnection, self).close()
386 else:
387 pass
388
389 def handle_read_event(self):
390 if self._ssl_accepting:
391 self._do_ssl_handshake()
392 elif self._ssl_closing:
393 self._do_ssl_shutdown()
394 else:
395 super(SSLConnection, self).handle_read_event()
396
397 def handle_write_event(self):
398 if self._ssl_accepting:
399 self._do_ssl_handshake()
400 elif self._ssl_closing:
401 self._do_ssl_shutdown()
402 else:
403 super(SSLConnection, self).handle_write_event()
404
405 def send(self, data):
406 try:
407 return super(SSLConnection, self).send(data)
408 except ssl.SSLError as err:
409 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
410 ssl.SSL_ERROR_WANT_READ,
411 ssl.SSL_ERROR_WANT_WRITE):
412 return 0
413 raise
414
415 def recv(self, buffer_size):
416 try:
417 return super(SSLConnection, self).recv(buffer_size)
418 except ssl.SSLError as err:
419 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
420 ssl.SSL_ERROR_WANT_WRITE):
421 return b''
422 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
423 self.handle_close()
424 return b''
425 raise
426
427 def handle_error(self):
428 default_error_handler()
429
430 def close(self):
431 if (isinstance(self.socket, ssl.SSLSocket) and
432 self.socket._sslobj is not None):
433 self._do_ssl_shutdown()
434 else:
435 super(SSLConnection, self).close()
436
437
438 class ESC[4;38;5;81mDummyTLS_DTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyDTPHandler):
439 """A DummyDTPHandler subclass supporting TLS/SSL."""
440
441 def __init__(self, conn, baseclass):
442 DummyDTPHandler.__init__(self, conn, baseclass)
443 if self.baseclass.secure_data_channel:
444 self.secure_connection()
445
446
447 class ESC[4;38;5;81mDummyTLS_FTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyFTPHandler):
448 """A DummyFTPHandler subclass supporting TLS/SSL."""
449
450 dtp_handler = DummyTLS_DTPHandler
451
452 def __init__(self, conn, encoding=DEFAULT_ENCODING):
453 DummyFTPHandler.__init__(self, conn, encoding=encoding)
454 self.secure_data_channel = False
455 self._ccc = False
456
457 def cmd_auth(self, line):
458 """Set up secure control channel."""
459 self.push('234 AUTH TLS successful')
460 self.secure_connection()
461
462 def cmd_ccc(self, line):
463 self.push('220 Reverting back to clear-text')
464 self._ccc = True
465 self._do_ssl_shutdown()
466
467 def cmd_pbsz(self, line):
468 """Negotiate size of buffer for secure data transfer.
469 For TLS/SSL the only valid value for the parameter is '0'.
470 Any other value is accepted but ignored.
471 """
472 self.push('200 PBSZ=0 successful.')
473
474 def cmd_prot(self, line):
475 """Setup un/secure data channel."""
476 arg = line.upper()
477 if arg == 'C':
478 self.push('200 Protection set to Clear')
479 self.secure_data_channel = False
480 elif arg == 'P':
481 self.push('200 Protection set to Private')
482 self.secure_data_channel = True
483 else:
484 self.push("502 Unrecognized PROT type (use C or P).")
485
486
487 class ESC[4;38;5;81mDummyTLS_FTPServer(ESC[4;38;5;149mDummyFTPServer):
488 handler = DummyTLS_FTPHandler
489
490
491 class ESC[4;38;5;81mTestFTPClass(ESC[4;38;5;149mTestCase):
492
493 def setUp(self, encoding=DEFAULT_ENCODING):
494 self.server = DummyFTPServer((HOST, 0), encoding=encoding)
495 self.server.start()
496 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding)
497 self.client.connect(self.server.host, self.server.port)
498
499 def tearDown(self):
500 self.client.close()
501 self.server.stop()
502 # Explicitly clear the attribute to prevent dangling thread
503 self.server = None
504 asyncore.close_all(ignore_all=True)
505
506 def check_data(self, received, expected):
507 self.assertEqual(len(received), len(expected))
508 self.assertEqual(received, expected)
509
510 def test_getwelcome(self):
511 self.assertEqual(self.client.getwelcome(), '220 welcome')
512
513 def test_sanitize(self):
514 self.assertEqual(self.client.sanitize('foo'), repr('foo'))
515 self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
516 self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
517
518 def test_exceptions(self):
519 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
520 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
521 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
522 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
523 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
524 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
525 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
526 self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
527
528 def test_all_errors(self):
529 exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
530 ftplib.error_proto, ftplib.Error, OSError,
531 EOFError)
532 for x in exceptions:
533 try:
534 raise x('exception not included in all_errors set')
535 except ftplib.all_errors:
536 pass
537
538 def test_set_pasv(self):
539 # passive mode is supposed to be enabled by default
540 self.assertTrue(self.client.passiveserver)
541 self.client.set_pasv(True)
542 self.assertTrue(self.client.passiveserver)
543 self.client.set_pasv(False)
544 self.assertFalse(self.client.passiveserver)
545
546 def test_voidcmd(self):
547 self.client.voidcmd('echo 200')
548 self.client.voidcmd('echo 299')
549 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
550 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
551
552 def test_login(self):
553 self.client.login()
554
555 def test_acct(self):
556 self.client.acct('passwd')
557
558 def test_rename(self):
559 self.client.rename('a', 'b')
560 self.server.handler_instance.next_response = '200'
561 self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
562
563 def test_delete(self):
564 self.client.delete('foo')
565 self.server.handler_instance.next_response = '199'
566 self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
567
568 def test_size(self):
569 self.client.size('foo')
570
571 def test_mkd(self):
572 dir = self.client.mkd('/foo')
573 self.assertEqual(dir, '/foo')
574
575 def test_rmd(self):
576 self.client.rmd('foo')
577
578 def test_cwd(self):
579 dir = self.client.cwd('/foo')
580 self.assertEqual(dir, '250 cwd ok')
581
582 def test_pwd(self):
583 dir = self.client.pwd()
584 self.assertEqual(dir, 'pwd ok')
585
586 def test_quit(self):
587 self.assertEqual(self.client.quit(), '221 quit ok')
588 # Ensure the connection gets closed; sock attribute should be None
589 self.assertEqual(self.client.sock, None)
590
591 def test_abort(self):
592 self.client.abort()
593
594 def test_retrbinary(self):
595 received = []
596 self.client.retrbinary('retr', received.append)
597 self.check_data(b''.join(received),
598 RETR_DATA.encode(self.client.encoding))
599
600 def test_retrbinary_rest(self):
601 for rest in (0, 10, 20):
602 received = []
603 self.client.retrbinary('retr', received.append, rest=rest)
604 self.check_data(b''.join(received),
605 RETR_DATA[rest:].encode(self.client.encoding))
606
607 def test_retrlines(self):
608 received = []
609 self.client.retrlines('retr', received.append)
610 self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
611
612 def test_storbinary(self):
613 f = io.BytesIO(RETR_DATA.encode(self.client.encoding))
614 self.client.storbinary('stor', f)
615 self.check_data(self.server.handler_instance.last_received_data,
616 RETR_DATA.encode(self.server.encoding))
617 # test new callback arg
618 flag = []
619 f.seek(0)
620 self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
621 self.assertTrue(flag)
622
623 def test_storbinary_rest(self):
624 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
625 f = io.BytesIO(data)
626 for r in (30, '30'):
627 f.seek(0)
628 self.client.storbinary('stor', f, rest=r)
629 self.assertEqual(self.server.handler_instance.rest, str(r))
630
631 def test_storlines(self):
632 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
633 f = io.BytesIO(data)
634 self.client.storlines('stor', f)
635 self.check_data(self.server.handler_instance.last_received_data,
636 RETR_DATA.encode(self.server.encoding))
637 # test new callback arg
638 flag = []
639 f.seek(0)
640 self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
641 self.assertTrue(flag)
642
643 f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
644 # storlines() expects a binary file, not a text file
645 with warnings_helper.check_warnings(('', BytesWarning), quiet=True):
646 self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
647
648 def test_nlst(self):
649 self.client.nlst()
650 self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
651
652 def test_dir(self):
653 l = []
654 self.client.dir(l.append)
655 self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
656
657 def test_mlsd(self):
658 list(self.client.mlsd())
659 list(self.client.mlsd(path='/'))
660 list(self.client.mlsd(path='/', facts=['size', 'type']))
661
662 ls = list(self.client.mlsd())
663 for name, facts in ls:
664 self.assertIsInstance(name, str)
665 self.assertIsInstance(facts, dict)
666 self.assertTrue(name)
667 self.assertIn('type', facts)
668 self.assertIn('perm', facts)
669 self.assertIn('unique', facts)
670
671 def set_data(data):
672 self.server.handler_instance.next_data = data
673
674 def test_entry(line, type=None, perm=None, unique=None, name=None):
675 type = 'type' if type is None else type
676 perm = 'perm' if perm is None else perm
677 unique = 'unique' if unique is None else unique
678 name = 'name' if name is None else name
679 set_data(line)
680 _name, facts = next(self.client.mlsd())
681 self.assertEqual(_name, name)
682 self.assertEqual(facts['type'], type)
683 self.assertEqual(facts['perm'], perm)
684 self.assertEqual(facts['unique'], unique)
685
686 # plain
687 test_entry('type=type;perm=perm;unique=unique; name\r\n')
688 # "=" in fact value
689 test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
690 test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
691 test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
692 test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
693 # spaces in name
694 test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
695 test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
696 test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name")
697 test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e")
698 # ";" in name
699 test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
700 test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
701 test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
702 test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
703 # case sensitiveness
704 set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
705 _name, facts = next(self.client.mlsd())
706 for x in facts:
707 self.assertTrue(x.islower())
708 # no data (directory empty)
709 set_data('')
710 self.assertRaises(StopIteration, next, self.client.mlsd())
711 set_data('')
712 for x in self.client.mlsd():
713 self.fail("unexpected data %s" % x)
714
715 def test_makeport(self):
716 with self.client.makeport():
717 # IPv4 is in use, just make sure send_eprt has not been used
718 self.assertEqual(self.server.handler_instance.last_received_cmd,
719 'port')
720
721 def test_makepasv(self):
722 host, port = self.client.makepasv()
723 conn = socket.create_connection((host, port), timeout=TIMEOUT)
724 conn.close()
725 # IPv4 is in use, just make sure send_epsv has not been used
726 self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
727
728 def test_makepasv_issue43285_security_disabled(self):
729 """Test the opt-in to the old vulnerable behavior."""
730 self.client.trust_server_pasv_ipv4_address = True
731 bad_host, port = self.client.makepasv()
732 self.assertEqual(
733 bad_host, self.server.handler_instance.fake_pasv_server_ip)
734 # Opening and closing a connection keeps the dummy server happy
735 # instead of timing out on accept.
736 socket.create_connection((self.client.sock.getpeername()[0], port),
737 timeout=TIMEOUT).close()
738
739 def test_makepasv_issue43285_security_enabled_default(self):
740 self.assertFalse(self.client.trust_server_pasv_ipv4_address)
741 trusted_host, port = self.client.makepasv()
742 self.assertNotEqual(
743 trusted_host, self.server.handler_instance.fake_pasv_server_ip)
744 # Opening and closing a connection keeps the dummy server happy
745 # instead of timing out on accept.
746 socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
747
748 def test_with_statement(self):
749 self.client.quit()
750
751 def is_client_connected():
752 if self.client.sock is None:
753 return False
754 try:
755 self.client.sendcmd('noop')
756 except (OSError, EOFError):
757 return False
758 return True
759
760 # base test
761 with ftplib.FTP(timeout=TIMEOUT) as self.client:
762 self.client.connect(self.server.host, self.server.port)
763 self.client.sendcmd('noop')
764 self.assertTrue(is_client_connected())
765 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
766 self.assertFalse(is_client_connected())
767
768 # QUIT sent inside the with block
769 with ftplib.FTP(timeout=TIMEOUT) as self.client:
770 self.client.connect(self.server.host, self.server.port)
771 self.client.sendcmd('noop')
772 self.client.quit()
773 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
774 self.assertFalse(is_client_connected())
775
776 # force a wrong response code to be sent on QUIT: error_perm
777 # is expected and the connection is supposed to be closed
778 try:
779 with ftplib.FTP(timeout=TIMEOUT) as self.client:
780 self.client.connect(self.server.host, self.server.port)
781 self.client.sendcmd('noop')
782 self.server.handler_instance.next_response = '550 error on quit'
783 except ftplib.error_perm as err:
784 self.assertEqual(str(err), '550 error on quit')
785 else:
786 self.fail('Exception not raised')
787 # needed to give the threaded server some time to set the attribute
788 # which otherwise would still be == 'noop'
789 time.sleep(0.1)
790 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
791 self.assertFalse(is_client_connected())
792
793 def test_source_address(self):
794 self.client.quit()
795 port = socket_helper.find_unused_port()
796 try:
797 self.client.connect(self.server.host, self.server.port,
798 source_address=(HOST, port))
799 self.assertEqual(self.client.sock.getsockname()[1], port)
800 self.client.quit()
801 except OSError as e:
802 if e.errno == errno.EADDRINUSE:
803 self.skipTest("couldn't bind to port %d" % port)
804 raise
805
806 def test_source_address_passive_connection(self):
807 port = socket_helper.find_unused_port()
808 self.client.source_address = (HOST, port)
809 try:
810 with self.client.transfercmd('list') as sock:
811 self.assertEqual(sock.getsockname()[1], port)
812 except OSError as e:
813 if e.errno == errno.EADDRINUSE:
814 self.skipTest("couldn't bind to port %d" % port)
815 raise
816
817 def test_parse257(self):
818 self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
819 self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
820 self.assertEqual(ftplib.parse257('257 ""'), '')
821 self.assertEqual(ftplib.parse257('257 "" created'), '')
822 self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
823 # The 257 response is supposed to include the directory
824 # name and in case it contains embedded double-quotes
825 # they must be doubled (see RFC-959, chapter 7, appendix 2).
826 self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
827 self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
828
829 def test_line_too_long(self):
830 self.assertRaises(ftplib.Error, self.client.sendcmd,
831 'x' * self.client.maxline * 2)
832
833 def test_retrlines_too_long(self):
834 self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
835 received = []
836 self.assertRaises(ftplib.Error,
837 self.client.retrlines, 'retr', received.append)
838
839 def test_storlines_too_long(self):
840 f = io.BytesIO(b'x' * self.client.maxline * 2)
841 self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
842
843 def test_encoding_param(self):
844 encodings = ['latin-1', 'utf-8']
845 for encoding in encodings:
846 with self.subTest(encoding=encoding):
847 self.tearDown()
848 self.setUp(encoding=encoding)
849 self.assertEqual(encoding, self.client.encoding)
850 self.test_retrbinary()
851 self.test_storbinary()
852 self.test_retrlines()
853 new_dir = self.client.mkd('/non-ascii dir \xAE')
854 self.check_data(new_dir, '/non-ascii dir \xAE')
855 # Check default encoding
856 client = ftplib.FTP(timeout=TIMEOUT)
857 self.assertEqual(DEFAULT_ENCODING, client.encoding)
858
859
860 @skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
861 class ESC[4;38;5;81mTestIPv6Environment(ESC[4;38;5;149mTestCase):
862
863 def setUp(self):
864 self.server = DummyFTPServer((HOSTv6, 0),
865 af=socket.AF_INET6,
866 encoding=DEFAULT_ENCODING)
867 self.server.start()
868 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING)
869 self.client.connect(self.server.host, self.server.port)
870
871 def tearDown(self):
872 self.client.close()
873 self.server.stop()
874 # Explicitly clear the attribute to prevent dangling thread
875 self.server = None
876 asyncore.close_all(ignore_all=True)
877
878 def test_af(self):
879 self.assertEqual(self.client.af, socket.AF_INET6)
880
881 def test_makeport(self):
882 with self.client.makeport():
883 self.assertEqual(self.server.handler_instance.last_received_cmd,
884 'eprt')
885
886 def test_makepasv(self):
887 host, port = self.client.makepasv()
888 conn = socket.create_connection((host, port), timeout=TIMEOUT)
889 conn.close()
890 self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
891
892 def test_transfer(self):
893 def retr():
894 received = []
895 self.client.retrbinary('retr', received.append)
896 self.assertEqual(b''.join(received),
897 RETR_DATA.encode(self.client.encoding))
898 self.client.set_pasv(True)
899 retr()
900 self.client.set_pasv(False)
901 retr()
902
903
904 @skipUnless(ssl, "SSL not available")
905 class ESC[4;38;5;81mTestTLS_FTPClassMixin(ESC[4;38;5;149mTestFTPClass):
906 """Repeat TestFTPClass tests starting the TLS layer for both control
907 and data connections first.
908 """
909
910 def setUp(self, encoding=DEFAULT_ENCODING):
911 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
912 self.server.start()
913 self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding)
914 self.client.connect(self.server.host, self.server.port)
915 # enable TLS
916 self.client.auth()
917 self.client.prot_p()
918
919
920 @skipUnless(ssl, "SSL not available")
921 class ESC[4;38;5;81mTestTLS_FTPClass(ESC[4;38;5;149mTestCase):
922 """Specific TLS_FTP class tests."""
923
924 def setUp(self, encoding=DEFAULT_ENCODING):
925 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
926 self.server.start()
927 self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
928 self.client.connect(self.server.host, self.server.port)
929
930 def tearDown(self):
931 self.client.close()
932 self.server.stop()
933 # Explicitly clear the attribute to prevent dangling thread
934 self.server = None
935 asyncore.close_all(ignore_all=True)
936
937 def test_control_connection(self):
938 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
939 self.client.auth()
940 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
941
942 def test_data_connection(self):
943 # clear text
944 with self.client.transfercmd('list') as sock:
945 self.assertNotIsInstance(sock, ssl.SSLSocket)
946 self.assertEqual(sock.recv(1024),
947 LIST_DATA.encode(self.client.encoding))
948 self.assertEqual(self.client.voidresp(), "226 transfer complete")
949
950 # secured, after PROT P
951 self.client.prot_p()
952 with self.client.transfercmd('list') as sock:
953 self.assertIsInstance(sock, ssl.SSLSocket)
954 # consume from SSL socket to finalize handshake and avoid
955 # "SSLError [SSL] shutdown while in init"
956 self.assertEqual(sock.recv(1024),
957 LIST_DATA.encode(self.client.encoding))
958 self.assertEqual(self.client.voidresp(), "226 transfer complete")
959
960 # PROT C is issued, the connection must be in cleartext again
961 self.client.prot_c()
962 with self.client.transfercmd('list') as sock:
963 self.assertNotIsInstance(sock, ssl.SSLSocket)
964 self.assertEqual(sock.recv(1024),
965 LIST_DATA.encode(self.client.encoding))
966 self.assertEqual(self.client.voidresp(), "226 transfer complete")
967
968 def test_login(self):
969 # login() is supposed to implicitly secure the control connection
970 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
971 self.client.login()
972 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
973 # make sure that AUTH TLS doesn't get issued again
974 self.client.login()
975
976 def test_auth_issued_twice(self):
977 self.client.auth()
978 self.assertRaises(ValueError, self.client.auth)
979
980 def test_context(self):
981 self.client.quit()
982 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
983 ctx.check_hostname = False
984 ctx.verify_mode = ssl.CERT_NONE
985 self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
986 context=ctx)
987 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
988 context=ctx)
989 self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
990 keyfile=CERTFILE, context=ctx)
991
992 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
993 self.client.connect(self.server.host, self.server.port)
994 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
995 self.client.auth()
996 self.assertIs(self.client.sock.context, ctx)
997 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
998
999 self.client.prot_p()
1000 with self.client.transfercmd('list') as sock:
1001 self.assertIs(sock.context, ctx)
1002 self.assertIsInstance(sock, ssl.SSLSocket)
1003
1004 def test_ccc(self):
1005 self.assertRaises(ValueError, self.client.ccc)
1006 self.client.login(secure=True)
1007 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
1008 self.client.ccc()
1009 self.assertRaises(ValueError, self.client.sock.unwrap)
1010
1011 @skipUnless(False, "FIXME: bpo-32706")
1012 def test_check_hostname(self):
1013 self.client.quit()
1014 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1015 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1016 self.assertEqual(ctx.check_hostname, True)
1017 ctx.load_verify_locations(CAFILE)
1018 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
1019
1020 # 127.0.0.1 doesn't match SAN
1021 self.client.connect(self.server.host, self.server.port)
1022 with self.assertRaises(ssl.CertificateError):
1023 self.client.auth()
1024 # exception quits connection
1025
1026 self.client.connect(self.server.host, self.server.port)
1027 self.client.prot_p()
1028 with self.assertRaises(ssl.CertificateError):
1029 with self.client.transfercmd("list") as sock:
1030 pass
1031 self.client.quit()
1032
1033 self.client.connect("localhost", self.server.port)
1034 self.client.auth()
1035 self.client.quit()
1036
1037 self.client.connect("localhost", self.server.port)
1038 self.client.prot_p()
1039 with self.client.transfercmd("list") as sock:
1040 pass
1041
1042
1043 class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
1044
1045 def setUp(self):
1046 self.evt = threading.Event()
1047 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1048 self.sock.settimeout(20)
1049 self.port = socket_helper.bind_port(self.sock)
1050 self.server_thread = threading.Thread(target=self.server)
1051 self.server_thread.daemon = True
1052 self.server_thread.start()
1053 # Wait for the server to be ready.
1054 self.evt.wait()
1055 self.evt.clear()
1056 self.old_port = ftplib.FTP.port
1057 ftplib.FTP.port = self.port
1058
1059 def tearDown(self):
1060 ftplib.FTP.port = self.old_port
1061 self.server_thread.join()
1062 # Explicitly clear the attribute to prevent dangling thread
1063 self.server_thread = None
1064
1065 def server(self):
1066 # This method sets the evt 3 times:
1067 # 1) when the connection is ready to be accepted.
1068 # 2) when it is safe for the caller to close the connection
1069 # 3) when we have closed the socket
1070 self.sock.listen()
1071 # (1) Signal the caller that we are ready to accept the connection.
1072 self.evt.set()
1073 try:
1074 conn, addr = self.sock.accept()
1075 except TimeoutError:
1076 pass
1077 else:
1078 conn.sendall(b"1 Hola mundo\n")
1079 conn.shutdown(socket.SHUT_WR)
1080 # (2) Signal the caller that it is safe to close the socket.
1081 self.evt.set()
1082 conn.close()
1083 finally:
1084 self.sock.close()
1085
1086 def testTimeoutDefault(self):
1087 # default -- use global socket timeout
1088 self.assertIsNone(socket.getdefaulttimeout())
1089 socket.setdefaulttimeout(30)
1090 try:
1091 ftp = ftplib.FTP(HOST)
1092 finally:
1093 socket.setdefaulttimeout(None)
1094 self.assertEqual(ftp.sock.gettimeout(), 30)
1095 self.evt.wait()
1096 ftp.close()
1097
1098 def testTimeoutNone(self):
1099 # no timeout -- do not use global socket timeout
1100 self.assertIsNone(socket.getdefaulttimeout())
1101 socket.setdefaulttimeout(30)
1102 try:
1103 ftp = ftplib.FTP(HOST, timeout=None)
1104 finally:
1105 socket.setdefaulttimeout(None)
1106 self.assertIsNone(ftp.sock.gettimeout())
1107 self.evt.wait()
1108 ftp.close()
1109
1110 def testTimeoutValue(self):
1111 # a value
1112 ftp = ftplib.FTP(HOST, timeout=30)
1113 self.assertEqual(ftp.sock.gettimeout(), 30)
1114 self.evt.wait()
1115 ftp.close()
1116
1117 # bpo-39259
1118 with self.assertRaises(ValueError):
1119 ftplib.FTP(HOST, timeout=0)
1120
1121 def testTimeoutConnect(self):
1122 ftp = ftplib.FTP()
1123 ftp.connect(HOST, timeout=30)
1124 self.assertEqual(ftp.sock.gettimeout(), 30)
1125 self.evt.wait()
1126 ftp.close()
1127
1128 def testTimeoutDifferentOrder(self):
1129 ftp = ftplib.FTP(timeout=30)
1130 ftp.connect(HOST)
1131 self.assertEqual(ftp.sock.gettimeout(), 30)
1132 self.evt.wait()
1133 ftp.close()
1134
1135 def testTimeoutDirectAccess(self):
1136 ftp = ftplib.FTP()
1137 ftp.timeout = 30
1138 ftp.connect(HOST)
1139 self.assertEqual(ftp.sock.gettimeout(), 30)
1140 self.evt.wait()
1141 ftp.close()
1142
1143
1144 class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149mTestCase):
1145 def test__all__(self):
1146 not_exported = {
1147 'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 'Error',
1148 'parse150', 'parse227', 'parse229', 'parse257', 'print_line',
1149 'ftpcp', 'test'}
1150 support.check__all__(self, ftplib, not_exported=not_exported)
1151
1152
1153 def setUpModule():
1154 thread_info = threading_helper.threading_setup()
1155 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1156
1157
1158 if __name__ == '__main__':
1159 unittest.main()