1 """Test script for ftplib module."""
2
3 # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4 # environment
5
6 import ftplib
7 import socket
8 import io
9 import errno
10 import os
11 import threading
12 import time
13 import unittest
14 try:
15 import ssl
16 except ImportError:
17 ssl = None
18
19 from unittest import TestCase, skipUnless
20 from test import support
21 from test.support import threading_helper
22 from test.support import socket_helper
23 from test.support import warnings_helper
24 from test.support import asynchat
25 from test.support import asyncore
26 from test.support.socket_helper import HOST, HOSTv6
27
28
29 support.requires_working_socket(module=True)
30
31 TIMEOUT = support.LOOPBACK_TIMEOUT
32 DEFAULT_ENCODING = 'utf-8'
33 # the dummy data returned by server over the data channel when
34 # RETR, LIST, NLST, MLSD commands are issued
35 RETR_DATA = 'abcde12345\r\n' * 1000 + 'non-ascii char \xAE\r\n'
36 LIST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
37 NLST_DATA = 'foo\r\nbar\r\n non-ascii char \xAE\r\n'
38 MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
39 "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
40 "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
41 "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
42 "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
43 "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
44 "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
45 "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
46 "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
47 "type=file;perm=r;unique==keVO1+IH4; leading space\r\n"
48 "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
49 "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
50 "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
51 "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
52 "type=file;perm=r;unique==keVO1+1G4; file4\r\n"
53 "type=dir;perm=cpmel;unique==SGP1; dir \xAE non-ascii char\r\n"
54 "type=file;perm=r;unique==SGP2; file \xAE non-ascii char\r\n")
55
56
57 def default_error_handler():
58 # bpo-44359: Silently ignore socket errors. Such errors occur when a client
59 # socket is closed, in TestFTPClass.tearDown() and makepasv() tests, and
60 # the server gets an error on its side.
61 pass
62
63
64 class ESC[4;38;5;81mDummyDTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
65 dtp_conn_closed = False
66
67 def __init__(self, conn, baseclass):
68 asynchat.async_chat.__init__(self, conn)
69 self.baseclass = baseclass
70 self.baseclass.last_received_data = ''
71 self.encoding = baseclass.encoding
72
73 def handle_read(self):
74 new_data = self.recv(1024).decode(self.encoding, 'replace')
75 self.baseclass.last_received_data += new_data
76
77 def handle_close(self):
78 # XXX: this method can be called many times in a row for a single
79 # connection, including in clear-text (non-TLS) mode.
80 # (behaviour witnessed with test_data_connection)
81 if not self.dtp_conn_closed:
82 self.baseclass.push('226 transfer complete')
83 self.close()
84 self.dtp_conn_closed = True
85
86 def push(self, what):
87 if self.baseclass.next_data is not None:
88 what = self.baseclass.next_data
89 self.baseclass.next_data = None
90 if not what:
91 return self.close_when_done()
92 super(DummyDTPHandler, self).push(what.encode(self.encoding))
93
94 def handle_error(self):
95 default_error_handler()
96
97
98 class ESC[4;38;5;81mDummyFTPHandler(ESC[4;38;5;149masynchatESC[4;38;5;149m.ESC[4;38;5;149masync_chat):
99
100 dtp_handler = DummyDTPHandler
101
102 def __init__(self, conn, encoding=DEFAULT_ENCODING):
103 asynchat.async_chat.__init__(self, conn)
104 # tells the socket to handle urgent data inline (ABOR command)
105 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
106 self.set_terminator(b"\r\n")
107 self.in_buffer = []
108 self.dtp = None
109 self.last_received_cmd = None
110 self.last_received_data = ''
111 self.next_response = ''
112 self.next_data = None
113 self.rest = None
114 self.next_retr_data = RETR_DATA
115 self.push('220 welcome')
116 self.encoding = encoding
117 # We use this as the string IPv4 address to direct the client
118 # to in response to a PASV command. To test security behavior.
119 # https://bugs.python.org/issue43285/.
120 self.fake_pasv_server_ip = '252.253.254.255'
121
122 def collect_incoming_data(self, data):
123 self.in_buffer.append(data)
124
125 def found_terminator(self):
126 line = b''.join(self.in_buffer).decode(self.encoding)
127 self.in_buffer = []
128 if self.next_response:
129 self.push(self.next_response)
130 self.next_response = ''
131 cmd = line.split(' ')[0].lower()
132 self.last_received_cmd = cmd
133 space = line.find(' ')
134 if space != -1:
135 arg = line[space + 1:]
136 else:
137 arg = ""
138 if hasattr(self, 'cmd_' + cmd):
139 method = getattr(self, 'cmd_' + cmd)
140 method(arg)
141 else:
142 self.push('550 command "%s" not understood.' %cmd)
143
144 def handle_error(self):
145 default_error_handler()
146
147 def push(self, data):
148 asynchat.async_chat.push(self, data.encode(self.encoding) + b'\r\n')
149
150 def cmd_port(self, arg):
151 addr = list(map(int, arg.split(',')))
152 ip = '%d.%d.%d.%d' %tuple(addr[:4])
153 port = (addr[4] * 256) + addr[5]
154 s = socket.create_connection((ip, port), timeout=TIMEOUT)
155 self.dtp = self.dtp_handler(s, baseclass=self)
156 self.push('200 active data connection established')
157
158 def cmd_pasv(self, arg):
159 with socket.create_server((self.socket.getsockname()[0], 0)) as sock:
160 sock.settimeout(TIMEOUT)
161 port = sock.getsockname()[1]
162 ip = self.fake_pasv_server_ip
163 ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
164 self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
165 conn, addr = sock.accept()
166 self.dtp = self.dtp_handler(conn, baseclass=self)
167
168 def cmd_eprt(self, arg):
169 af, ip, port = arg.split(arg[0])[1:-1]
170 port = int(port)
171 s = socket.create_connection((ip, port), timeout=TIMEOUT)
172 self.dtp = self.dtp_handler(s, baseclass=self)
173 self.push('200 active data connection established')
174
175 def cmd_epsv(self, arg):
176 with socket.create_server((self.socket.getsockname()[0], 0),
177 family=socket.AF_INET6) as sock:
178 sock.settimeout(TIMEOUT)
179 port = sock.getsockname()[1]
180 self.push('229 entering extended passive mode (|||%d|)' %port)
181 conn, addr = sock.accept()
182 self.dtp = self.dtp_handler(conn, baseclass=self)
183
184 def cmd_echo(self, arg):
185 # sends back the received string (used by the test suite)
186 self.push(arg)
187
188 def cmd_noop(self, arg):
189 self.push('200 noop ok')
190
191 def cmd_user(self, arg):
192 self.push('331 username ok')
193
194 def cmd_pass(self, arg):
195 self.push('230 password ok')
196
197 def cmd_acct(self, arg):
198 self.push('230 acct ok')
199
200 def cmd_rnfr(self, arg):
201 self.push('350 rnfr ok')
202
203 def cmd_rnto(self, arg):
204 self.push('250 rnto ok')
205
206 def cmd_dele(self, arg):
207 self.push('250 dele ok')
208
209 def cmd_cwd(self, arg):
210 self.push('250 cwd ok')
211
212 def cmd_size(self, arg):
213 self.push('250 1000')
214
215 def cmd_mkd(self, arg):
216 self.push('257 "%s"' %arg)
217
218 def cmd_rmd(self, arg):
219 self.push('250 rmd ok')
220
221 def cmd_pwd(self, arg):
222 self.push('257 "pwd ok"')
223
224 def cmd_type(self, arg):
225 self.push('200 type ok')
226
227 def cmd_quit(self, arg):
228 self.push('221 quit ok')
229 self.close()
230
231 def cmd_abor(self, arg):
232 self.push('226 abor ok')
233
234 def cmd_stor(self, arg):
235 self.push('125 stor ok')
236
237 def cmd_rest(self, arg):
238 self.rest = arg
239 self.push('350 rest ok')
240
241 def cmd_retr(self, arg):
242 self.push('125 retr ok')
243 if self.rest is not None:
244 offset = int(self.rest)
245 else:
246 offset = 0
247 self.dtp.push(self.next_retr_data[offset:])
248 self.dtp.close_when_done()
249 self.rest = None
250
251 def cmd_list(self, arg):
252 self.push('125 list ok')
253 self.dtp.push(LIST_DATA)
254 self.dtp.close_when_done()
255
256 def cmd_nlst(self, arg):
257 self.push('125 nlst ok')
258 self.dtp.push(NLST_DATA)
259 self.dtp.close_when_done()
260
261 def cmd_opts(self, arg):
262 self.push('200 opts ok')
263
264 def cmd_mlsd(self, arg):
265 self.push('125 mlsd ok')
266 self.dtp.push(MLSD_DATA)
267 self.dtp.close_when_done()
268
269 def cmd_setlongretr(self, arg):
270 # For testing. Next RETR will return long line.
271 self.next_retr_data = 'x' * int(arg)
272 self.push('125 setlongretr ok')
273
274
275 class ESC[4;38;5;81mDummyFTPServer(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher, ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
276
277 handler = DummyFTPHandler
278
279 def __init__(self, address, af=socket.AF_INET, encoding=DEFAULT_ENCODING):
280 threading.Thread.__init__(self)
281 asyncore.dispatcher.__init__(self)
282 self.daemon = True
283 self.create_socket(af, socket.SOCK_STREAM)
284 self.bind(address)
285 self.listen(5)
286 self.active = False
287 self.active_lock = threading.Lock()
288 self.host, self.port = self.socket.getsockname()[:2]
289 self.handler_instance = None
290 self.encoding = encoding
291
292 def start(self):
293 assert not self.active
294 self.__flag = threading.Event()
295 threading.Thread.start(self)
296 self.__flag.wait()
297
298 def run(self):
299 self.active = True
300 self.__flag.set()
301 while self.active and asyncore.socket_map:
302 self.active_lock.acquire()
303 asyncore.loop(timeout=0.1, count=1)
304 self.active_lock.release()
305 asyncore.close_all(ignore_all=True)
306
307 def stop(self):
308 assert self.active
309 self.active = False
310 self.join()
311
312 def handle_accepted(self, conn, addr):
313 self.handler_instance = self.handler(conn, encoding=self.encoding)
314
315 def handle_connect(self):
316 self.close()
317 handle_read = handle_connect
318
319 def writable(self):
320 return 0
321
322 def handle_error(self):
323 default_error_handler()
324
325
326 if ssl is not None:
327
328 CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
329 CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
330
331 class ESC[4;38;5;81mSSLConnection(ESC[4;38;5;149masyncoreESC[4;38;5;149m.ESC[4;38;5;149mdispatcher):
332 """An asyncore.dispatcher subclass supporting TLS/SSL."""
333
334 _ssl_accepting = False
335 _ssl_closing = False
336
337 def secure_connection(self):
338 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
339 context.load_cert_chain(CERTFILE)
340 socket = context.wrap_socket(self.socket,
341 suppress_ragged_eofs=False,
342 server_side=True,
343 do_handshake_on_connect=False)
344 self.del_channel()
345 self.set_socket(socket)
346 self._ssl_accepting = True
347
348 def _do_ssl_handshake(self):
349 try:
350 self.socket.do_handshake()
351 except ssl.SSLError as err:
352 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
353 ssl.SSL_ERROR_WANT_WRITE):
354 return
355 elif err.args[0] == ssl.SSL_ERROR_EOF:
356 return self.handle_close()
357 # TODO: SSLError does not expose alert information
358 elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
359 return self.handle_close()
360 raise
361 except OSError as err:
362 if err.args[0] == errno.ECONNABORTED:
363 return self.handle_close()
364 else:
365 self._ssl_accepting = False
366
367 def _do_ssl_shutdown(self):
368 self._ssl_closing = True
369 try:
370 self.socket = self.socket.unwrap()
371 except ssl.SSLError as err:
372 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
373 ssl.SSL_ERROR_WANT_WRITE):
374 return
375 except OSError:
376 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
377 # from OpenSSL's SSL_shutdown(), corresponding to a
378 # closed socket condition. See also:
379 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
380 pass
381 self._ssl_closing = False
382 if getattr(self, '_ccc', False) is False:
383 super(SSLConnection, self).close()
384 else:
385 pass
386
387 def handle_read_event(self):
388 if self._ssl_accepting:
389 self._do_ssl_handshake()
390 elif self._ssl_closing:
391 self._do_ssl_shutdown()
392 else:
393 super(SSLConnection, self).handle_read_event()
394
395 def handle_write_event(self):
396 if self._ssl_accepting:
397 self._do_ssl_handshake()
398 elif self._ssl_closing:
399 self._do_ssl_shutdown()
400 else:
401 super(SSLConnection, self).handle_write_event()
402
403 def send(self, data):
404 try:
405 return super(SSLConnection, self).send(data)
406 except ssl.SSLError as err:
407 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
408 ssl.SSL_ERROR_WANT_READ,
409 ssl.SSL_ERROR_WANT_WRITE):
410 return 0
411 raise
412
413 def recv(self, buffer_size):
414 try:
415 return super(SSLConnection, self).recv(buffer_size)
416 except ssl.SSLError as err:
417 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
418 ssl.SSL_ERROR_WANT_WRITE):
419 return b''
420 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
421 self.handle_close()
422 return b''
423 raise
424
425 def handle_error(self):
426 default_error_handler()
427
428 def close(self):
429 if (isinstance(self.socket, ssl.SSLSocket) and
430 self.socket._sslobj is not None):
431 self._do_ssl_shutdown()
432 else:
433 super(SSLConnection, self).close()
434
435
436 class ESC[4;38;5;81mDummyTLS_DTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyDTPHandler):
437 """A DummyDTPHandler subclass supporting TLS/SSL."""
438
439 def __init__(self, conn, baseclass):
440 DummyDTPHandler.__init__(self, conn, baseclass)
441 if self.baseclass.secure_data_channel:
442 self.secure_connection()
443
444
445 class ESC[4;38;5;81mDummyTLS_FTPHandler(ESC[4;38;5;149mSSLConnection, ESC[4;38;5;149mDummyFTPHandler):
446 """A DummyFTPHandler subclass supporting TLS/SSL."""
447
448 dtp_handler = DummyTLS_DTPHandler
449
450 def __init__(self, conn, encoding=DEFAULT_ENCODING):
451 DummyFTPHandler.__init__(self, conn, encoding=encoding)
452 self.secure_data_channel = False
453 self._ccc = False
454
455 def cmd_auth(self, line):
456 """Set up secure control channel."""
457 self.push('234 AUTH TLS successful')
458 self.secure_connection()
459
460 def cmd_ccc(self, line):
461 self.push('220 Reverting back to clear-text')
462 self._ccc = True
463 self._do_ssl_shutdown()
464
465 def cmd_pbsz(self, line):
466 """Negotiate size of buffer for secure data transfer.
467 For TLS/SSL the only valid value for the parameter is '0'.
468 Any other value is accepted but ignored.
469 """
470 self.push('200 PBSZ=0 successful.')
471
472 def cmd_prot(self, line):
473 """Setup un/secure data channel."""
474 arg = line.upper()
475 if arg == 'C':
476 self.push('200 Protection set to Clear')
477 self.secure_data_channel = False
478 elif arg == 'P':
479 self.push('200 Protection set to Private')
480 self.secure_data_channel = True
481 else:
482 self.push("502 Unrecognized PROT type (use C or P).")
483
484
485 class ESC[4;38;5;81mDummyTLS_FTPServer(ESC[4;38;5;149mDummyFTPServer):
486 handler = DummyTLS_FTPHandler
487
488
489 class ESC[4;38;5;81mTestFTPClass(ESC[4;38;5;149mTestCase):
490
491 def setUp(self, encoding=DEFAULT_ENCODING):
492 self.server = DummyFTPServer((HOST, 0), encoding=encoding)
493 self.server.start()
494 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=encoding)
495 self.client.connect(self.server.host, self.server.port)
496
497 def tearDown(self):
498 self.client.close()
499 self.server.stop()
500 # Explicitly clear the attribute to prevent dangling thread
501 self.server = None
502 asyncore.close_all(ignore_all=True)
503
504 def check_data(self, received, expected):
505 self.assertEqual(len(received), len(expected))
506 self.assertEqual(received, expected)
507
508 def test_getwelcome(self):
509 self.assertEqual(self.client.getwelcome(), '220 welcome')
510
511 def test_sanitize(self):
512 self.assertEqual(self.client.sanitize('foo'), repr('foo'))
513 self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
514 self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
515
516 def test_exceptions(self):
517 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
518 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
519 self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
520 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
521 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
522 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
523 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
524 self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
525
526 def test_all_errors(self):
527 exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
528 ftplib.error_proto, ftplib.Error, OSError,
529 EOFError)
530 for x in exceptions:
531 try:
532 raise x('exception not included in all_errors set')
533 except ftplib.all_errors:
534 pass
535
536 def test_set_pasv(self):
537 # passive mode is supposed to be enabled by default
538 self.assertTrue(self.client.passiveserver)
539 self.client.set_pasv(True)
540 self.assertTrue(self.client.passiveserver)
541 self.client.set_pasv(False)
542 self.assertFalse(self.client.passiveserver)
543
544 def test_voidcmd(self):
545 self.client.voidcmd('echo 200')
546 self.client.voidcmd('echo 299')
547 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
548 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
549
550 def test_login(self):
551 self.client.login()
552
553 def test_acct(self):
554 self.client.acct('passwd')
555
556 def test_rename(self):
557 self.client.rename('a', 'b')
558 self.server.handler_instance.next_response = '200'
559 self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
560
561 def test_delete(self):
562 self.client.delete('foo')
563 self.server.handler_instance.next_response = '199'
564 self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
565
566 def test_size(self):
567 self.client.size('foo')
568
569 def test_mkd(self):
570 dir = self.client.mkd('/foo')
571 self.assertEqual(dir, '/foo')
572
573 def test_rmd(self):
574 self.client.rmd('foo')
575
576 def test_cwd(self):
577 dir = self.client.cwd('/foo')
578 self.assertEqual(dir, '250 cwd ok')
579
580 def test_pwd(self):
581 dir = self.client.pwd()
582 self.assertEqual(dir, 'pwd ok')
583
584 def test_quit(self):
585 self.assertEqual(self.client.quit(), '221 quit ok')
586 # Ensure the connection gets closed; sock attribute should be None
587 self.assertEqual(self.client.sock, None)
588
589 def test_abort(self):
590 self.client.abort()
591
592 def test_retrbinary(self):
593 def callback(data):
594 received.append(data.decode(self.client.encoding))
595 received = []
596 self.client.retrbinary('retr', callback)
597 self.check_data(''.join(received), RETR_DATA)
598
599 def test_retrbinary_rest(self):
600 def callback(data):
601 received.append(data.decode(self.client.encoding))
602 for rest in (0, 10, 20):
603 received = []
604 self.client.retrbinary('retr', callback, rest=rest)
605 self.check_data(''.join(received), RETR_DATA[rest:])
606
607 def test_retrlines(self):
608 received = []
609 self.client.retrlines('retr', received.append)
610 self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
611
612 def test_storbinary(self):
613 f = io.BytesIO(RETR_DATA.encode(self.client.encoding))
614 self.client.storbinary('stor', f)
615 self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
616 # test new callback arg
617 flag = []
618 f.seek(0)
619 self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
620 self.assertTrue(flag)
621
622 def test_storbinary_rest(self):
623 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
624 f = io.BytesIO(data)
625 for r in (30, '30'):
626 f.seek(0)
627 self.client.storbinary('stor', f, rest=r)
628 self.assertEqual(self.server.handler_instance.rest, str(r))
629
630 def test_storlines(self):
631 data = RETR_DATA.replace('\r\n', '\n').encode(self.client.encoding)
632 f = io.BytesIO(data)
633 self.client.storlines('stor', f)
634 self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
635 # test new callback arg
636 flag = []
637 f.seek(0)
638 self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
639 self.assertTrue(flag)
640
641 f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
642 # storlines() expects a binary file, not a text file
643 with warnings_helper.check_warnings(('', BytesWarning), quiet=True):
644 self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
645
646 def test_nlst(self):
647 self.client.nlst()
648 self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
649
650 def test_dir(self):
651 l = []
652 self.client.dir(lambda x: l.append(x))
653 self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
654
655 def test_mlsd(self):
656 list(self.client.mlsd())
657 list(self.client.mlsd(path='/'))
658 list(self.client.mlsd(path='/', facts=['size', 'type']))
659
660 ls = list(self.client.mlsd())
661 for name, facts in ls:
662 self.assertIsInstance(name, str)
663 self.assertIsInstance(facts, dict)
664 self.assertTrue(name)
665 self.assertIn('type', facts)
666 self.assertIn('perm', facts)
667 self.assertIn('unique', facts)
668
669 def set_data(data):
670 self.server.handler_instance.next_data = data
671
672 def test_entry(line, type=None, perm=None, unique=None, name=None):
673 type = 'type' if type is None else type
674 perm = 'perm' if perm is None else perm
675 unique = 'unique' if unique is None else unique
676 name = 'name' if name is None else name
677 set_data(line)
678 _name, facts = next(self.client.mlsd())
679 self.assertEqual(_name, name)
680 self.assertEqual(facts['type'], type)
681 self.assertEqual(facts['perm'], perm)
682 self.assertEqual(facts['unique'], unique)
683
684 # plain
685 test_entry('type=type;perm=perm;unique=unique; name\r\n')
686 # "=" in fact value
687 test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
688 test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
689 test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
690 test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
691 # spaces in name
692 test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
693 test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
694 test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name")
695 test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e")
696 # ";" in name
697 test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
698 test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
699 test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
700 test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
701 # case sensitiveness
702 set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
703 _name, facts = next(self.client.mlsd())
704 for x in facts:
705 self.assertTrue(x.islower())
706 # no data (directory empty)
707 set_data('')
708 self.assertRaises(StopIteration, next, self.client.mlsd())
709 set_data('')
710 for x in self.client.mlsd():
711 self.fail("unexpected data %s" % x)
712
713 def test_makeport(self):
714 with self.client.makeport():
715 # IPv4 is in use, just make sure send_eprt has not been used
716 self.assertEqual(self.server.handler_instance.last_received_cmd,
717 'port')
718
719 def test_makepasv(self):
720 host, port = self.client.makepasv()
721 conn = socket.create_connection((host, port), timeout=TIMEOUT)
722 conn.close()
723 # IPv4 is in use, just make sure send_epsv has not been used
724 self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
725
726 def test_makepasv_issue43285_security_disabled(self):
727 """Test the opt-in to the old vulnerable behavior."""
728 self.client.trust_server_pasv_ipv4_address = True
729 bad_host, port = self.client.makepasv()
730 self.assertEqual(
731 bad_host, self.server.handler_instance.fake_pasv_server_ip)
732 # Opening and closing a connection keeps the dummy server happy
733 # instead of timing out on accept.
734 socket.create_connection((self.client.sock.getpeername()[0], port),
735 timeout=TIMEOUT).close()
736
737 def test_makepasv_issue43285_security_enabled_default(self):
738 self.assertFalse(self.client.trust_server_pasv_ipv4_address)
739 trusted_host, port = self.client.makepasv()
740 self.assertNotEqual(
741 trusted_host, self.server.handler_instance.fake_pasv_server_ip)
742 # Opening and closing a connection keeps the dummy server happy
743 # instead of timing out on accept.
744 socket.create_connection((trusted_host, port), timeout=TIMEOUT).close()
745
746 def test_with_statement(self):
747 self.client.quit()
748
749 def is_client_connected():
750 if self.client.sock is None:
751 return False
752 try:
753 self.client.sendcmd('noop')
754 except (OSError, EOFError):
755 return False
756 return True
757
758 # base test
759 with ftplib.FTP(timeout=TIMEOUT) as self.client:
760 self.client.connect(self.server.host, self.server.port)
761 self.client.sendcmd('noop')
762 self.assertTrue(is_client_connected())
763 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
764 self.assertFalse(is_client_connected())
765
766 # QUIT sent inside the with block
767 with ftplib.FTP(timeout=TIMEOUT) as self.client:
768 self.client.connect(self.server.host, self.server.port)
769 self.client.sendcmd('noop')
770 self.client.quit()
771 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
772 self.assertFalse(is_client_connected())
773
774 # force a wrong response code to be sent on QUIT: error_perm
775 # is expected and the connection is supposed to be closed
776 try:
777 with ftplib.FTP(timeout=TIMEOUT) as self.client:
778 self.client.connect(self.server.host, self.server.port)
779 self.client.sendcmd('noop')
780 self.server.handler_instance.next_response = '550 error on quit'
781 except ftplib.error_perm as err:
782 self.assertEqual(str(err), '550 error on quit')
783 else:
784 self.fail('Exception not raised')
785 # needed to give the threaded server some time to set the attribute
786 # which otherwise would still be == 'noop'
787 time.sleep(0.1)
788 self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
789 self.assertFalse(is_client_connected())
790
791 def test_source_address(self):
792 self.client.quit()
793 port = socket_helper.find_unused_port()
794 try:
795 self.client.connect(self.server.host, self.server.port,
796 source_address=(HOST, port))
797 self.assertEqual(self.client.sock.getsockname()[1], port)
798 self.client.quit()
799 except OSError as e:
800 if e.errno == errno.EADDRINUSE:
801 self.skipTest("couldn't bind to port %d" % port)
802 raise
803
804 def test_source_address_passive_connection(self):
805 port = socket_helper.find_unused_port()
806 self.client.source_address = (HOST, port)
807 try:
808 with self.client.transfercmd('list') as sock:
809 self.assertEqual(sock.getsockname()[1], port)
810 except OSError as e:
811 if e.errno == errno.EADDRINUSE:
812 self.skipTest("couldn't bind to port %d" % port)
813 raise
814
815 def test_parse257(self):
816 self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
817 self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
818 self.assertEqual(ftplib.parse257('257 ""'), '')
819 self.assertEqual(ftplib.parse257('257 "" created'), '')
820 self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
821 # The 257 response is supposed to include the directory
822 # name and in case it contains embedded double-quotes
823 # they must be doubled (see RFC-959, chapter 7, appendix 2).
824 self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
825 self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
826
827 def test_line_too_long(self):
828 self.assertRaises(ftplib.Error, self.client.sendcmd,
829 'x' * self.client.maxline * 2)
830
831 def test_retrlines_too_long(self):
832 self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
833 received = []
834 self.assertRaises(ftplib.Error,
835 self.client.retrlines, 'retr', received.append)
836
837 def test_storlines_too_long(self):
838 f = io.BytesIO(b'x' * self.client.maxline * 2)
839 self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
840
841 def test_encoding_param(self):
842 encodings = ['latin-1', 'utf-8']
843 for encoding in encodings:
844 with self.subTest(encoding=encoding):
845 self.tearDown()
846 self.setUp(encoding=encoding)
847 self.assertEqual(encoding, self.client.encoding)
848 self.test_retrbinary()
849 self.test_storbinary()
850 self.test_retrlines()
851 new_dir = self.client.mkd('/non-ascii dir \xAE')
852 self.check_data(new_dir, '/non-ascii dir \xAE')
853 # Check default encoding
854 client = ftplib.FTP(timeout=TIMEOUT)
855 self.assertEqual(DEFAULT_ENCODING, client.encoding)
856
857
858 @skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
859 class ESC[4;38;5;81mTestIPv6Environment(ESC[4;38;5;149mTestCase):
860
861 def setUp(self):
862 self.server = DummyFTPServer((HOSTv6, 0),
863 af=socket.AF_INET6,
864 encoding=DEFAULT_ENCODING)
865 self.server.start()
866 self.client = ftplib.FTP(timeout=TIMEOUT, encoding=DEFAULT_ENCODING)
867 self.client.connect(self.server.host, self.server.port)
868
869 def tearDown(self):
870 self.client.close()
871 self.server.stop()
872 # Explicitly clear the attribute to prevent dangling thread
873 self.server = None
874 asyncore.close_all(ignore_all=True)
875
876 def test_af(self):
877 self.assertEqual(self.client.af, socket.AF_INET6)
878
879 def test_makeport(self):
880 with self.client.makeport():
881 self.assertEqual(self.server.handler_instance.last_received_cmd,
882 'eprt')
883
884 def test_makepasv(self):
885 host, port = self.client.makepasv()
886 conn = socket.create_connection((host, port), timeout=TIMEOUT)
887 conn.close()
888 self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
889
890 def test_transfer(self):
891 def retr():
892 def callback(data):
893 received.append(data.decode(self.client.encoding))
894 received = []
895 self.client.retrbinary('retr', callback)
896 self.assertEqual(len(''.join(received)), len(RETR_DATA))
897 self.assertEqual(''.join(received), RETR_DATA)
898 self.client.set_pasv(True)
899 retr()
900 self.client.set_pasv(False)
901 retr()
902
903
904 @skipUnless(ssl, "SSL not available")
905 class ESC[4;38;5;81mTestTLS_FTPClassMixin(ESC[4;38;5;149mTestFTPClass):
906 """Repeat TestFTPClass tests starting the TLS layer for both control
907 and data connections first.
908 """
909
910 def setUp(self, encoding=DEFAULT_ENCODING):
911 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
912 self.server.start()
913 self.client = ftplib.FTP_TLS(timeout=TIMEOUT, encoding=encoding)
914 self.client.connect(self.server.host, self.server.port)
915 # enable TLS
916 self.client.auth()
917 self.client.prot_p()
918
919
920 @skipUnless(ssl, "SSL not available")
921 class ESC[4;38;5;81mTestTLS_FTPClass(ESC[4;38;5;149mTestCase):
922 """Specific TLS_FTP class tests."""
923
924 def setUp(self, encoding=DEFAULT_ENCODING):
925 self.server = DummyTLS_FTPServer((HOST, 0), encoding=encoding)
926 self.server.start()
927 self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
928 self.client.connect(self.server.host, self.server.port)
929
930 def tearDown(self):
931 self.client.close()
932 self.server.stop()
933 # Explicitly clear the attribute to prevent dangling thread
934 self.server = None
935 asyncore.close_all(ignore_all=True)
936
937 def test_control_connection(self):
938 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
939 self.client.auth()
940 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
941
942 def test_data_connection(self):
943 # clear text
944 with self.client.transfercmd('list') as sock:
945 self.assertNotIsInstance(sock, ssl.SSLSocket)
946 self.assertEqual(sock.recv(1024),
947 LIST_DATA.encode(self.client.encoding))
948 self.assertEqual(self.client.voidresp(), "226 transfer complete")
949
950 # secured, after PROT P
951 self.client.prot_p()
952 with self.client.transfercmd('list') as sock:
953 self.assertIsInstance(sock, ssl.SSLSocket)
954 # consume from SSL socket to finalize handshake and avoid
955 # "SSLError [SSL] shutdown while in init"
956 self.assertEqual(sock.recv(1024),
957 LIST_DATA.encode(self.client.encoding))
958 self.assertEqual(self.client.voidresp(), "226 transfer complete")
959
960 # PROT C is issued, the connection must be in cleartext again
961 self.client.prot_c()
962 with self.client.transfercmd('list') as sock:
963 self.assertNotIsInstance(sock, ssl.SSLSocket)
964 self.assertEqual(sock.recv(1024),
965 LIST_DATA.encode(self.client.encoding))
966 self.assertEqual(self.client.voidresp(), "226 transfer complete")
967
968 def test_login(self):
969 # login() is supposed to implicitly secure the control connection
970 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
971 self.client.login()
972 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
973 # make sure that AUTH TLS doesn't get issued again
974 self.client.login()
975
976 def test_auth_issued_twice(self):
977 self.client.auth()
978 self.assertRaises(ValueError, self.client.auth)
979
980 def test_context(self):
981 self.client.quit()
982 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
983 ctx.check_hostname = False
984 ctx.verify_mode = ssl.CERT_NONE
985 self.assertRaises(TypeError, ftplib.FTP_TLS, keyfile=CERTFILE,
986 context=ctx)
987 self.assertRaises(TypeError, ftplib.FTP_TLS, certfile=CERTFILE,
988 context=ctx)
989 self.assertRaises(TypeError, ftplib.FTP_TLS, certfile=CERTFILE,
990 keyfile=CERTFILE, context=ctx)
991
992 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
993 self.client.connect(self.server.host, self.server.port)
994 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
995 self.client.auth()
996 self.assertIs(self.client.sock.context, ctx)
997 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
998
999 self.client.prot_p()
1000 with self.client.transfercmd('list') as sock:
1001 self.assertIs(sock.context, ctx)
1002 self.assertIsInstance(sock, ssl.SSLSocket)
1003
1004 def test_ccc(self):
1005 self.assertRaises(ValueError, self.client.ccc)
1006 self.client.login(secure=True)
1007 self.assertIsInstance(self.client.sock, ssl.SSLSocket)
1008 self.client.ccc()
1009 self.assertRaises(ValueError, self.client.sock.unwrap)
1010
1011 @skipUnless(False, "FIXME: bpo-32706")
1012 def test_check_hostname(self):
1013 self.client.quit()
1014 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1015 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1016 self.assertEqual(ctx.check_hostname, True)
1017 ctx.load_verify_locations(CAFILE)
1018 self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
1019
1020 # 127.0.0.1 doesn't match SAN
1021 self.client.connect(self.server.host, self.server.port)
1022 with self.assertRaises(ssl.CertificateError):
1023 self.client.auth()
1024 # exception quits connection
1025
1026 self.client.connect(self.server.host, self.server.port)
1027 self.client.prot_p()
1028 with self.assertRaises(ssl.CertificateError):
1029 with self.client.transfercmd("list") as sock:
1030 pass
1031 self.client.quit()
1032
1033 self.client.connect("localhost", self.server.port)
1034 self.client.auth()
1035 self.client.quit()
1036
1037 self.client.connect("localhost", self.server.port)
1038 self.client.prot_p()
1039 with self.client.transfercmd("list") as sock:
1040 pass
1041
1042
1043 class ESC[4;38;5;81mTestTimeouts(ESC[4;38;5;149mTestCase):
1044
1045 def setUp(self):
1046 self.evt = threading.Event()
1047 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1048 self.sock.settimeout(20)
1049 self.port = socket_helper.bind_port(self.sock)
1050 self.server_thread = threading.Thread(target=self.server)
1051 self.server_thread.daemon = True
1052 self.server_thread.start()
1053 # Wait for the server to be ready.
1054 self.evt.wait()
1055 self.evt.clear()
1056 self.old_port = ftplib.FTP.port
1057 ftplib.FTP.port = self.port
1058
1059 def tearDown(self):
1060 ftplib.FTP.port = self.old_port
1061 self.server_thread.join()
1062 # Explicitly clear the attribute to prevent dangling thread
1063 self.server_thread = None
1064
1065 def server(self):
1066 # This method sets the evt 3 times:
1067 # 1) when the connection is ready to be accepted.
1068 # 2) when it is safe for the caller to close the connection
1069 # 3) when we have closed the socket
1070 self.sock.listen()
1071 # (1) Signal the caller that we are ready to accept the connection.
1072 self.evt.set()
1073 try:
1074 conn, addr = self.sock.accept()
1075 except TimeoutError:
1076 pass
1077 else:
1078 conn.sendall(b"1 Hola mundo\n")
1079 conn.shutdown(socket.SHUT_WR)
1080 # (2) Signal the caller that it is safe to close the socket.
1081 self.evt.set()
1082 conn.close()
1083 finally:
1084 self.sock.close()
1085
1086 def testTimeoutDefault(self):
1087 # default -- use global socket timeout
1088 self.assertIsNone(socket.getdefaulttimeout())
1089 socket.setdefaulttimeout(30)
1090 try:
1091 ftp = ftplib.FTP(HOST)
1092 finally:
1093 socket.setdefaulttimeout(None)
1094 self.assertEqual(ftp.sock.gettimeout(), 30)
1095 self.evt.wait()
1096 ftp.close()
1097
1098 def testTimeoutNone(self):
1099 # no timeout -- do not use global socket timeout
1100 self.assertIsNone(socket.getdefaulttimeout())
1101 socket.setdefaulttimeout(30)
1102 try:
1103 ftp = ftplib.FTP(HOST, timeout=None)
1104 finally:
1105 socket.setdefaulttimeout(None)
1106 self.assertIsNone(ftp.sock.gettimeout())
1107 self.evt.wait()
1108 ftp.close()
1109
1110 def testTimeoutValue(self):
1111 # a value
1112 ftp = ftplib.FTP(HOST, timeout=30)
1113 self.assertEqual(ftp.sock.gettimeout(), 30)
1114 self.evt.wait()
1115 ftp.close()
1116
1117 # bpo-39259
1118 with self.assertRaises(ValueError):
1119 ftplib.FTP(HOST, timeout=0)
1120
1121 def testTimeoutConnect(self):
1122 ftp = ftplib.FTP()
1123 ftp.connect(HOST, timeout=30)
1124 self.assertEqual(ftp.sock.gettimeout(), 30)
1125 self.evt.wait()
1126 ftp.close()
1127
1128 def testTimeoutDifferentOrder(self):
1129 ftp = ftplib.FTP(timeout=30)
1130 ftp.connect(HOST)
1131 self.assertEqual(ftp.sock.gettimeout(), 30)
1132 self.evt.wait()
1133 ftp.close()
1134
1135 def testTimeoutDirectAccess(self):
1136 ftp = ftplib.FTP()
1137 ftp.timeout = 30
1138 ftp.connect(HOST)
1139 self.assertEqual(ftp.sock.gettimeout(), 30)
1140 self.evt.wait()
1141 ftp.close()
1142
1143
1144 class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149mTestCase):
1145 def test__all__(self):
1146 not_exported = {
1147 'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF', 'Error',
1148 'parse150', 'parse227', 'parse229', 'parse257', 'print_line',
1149 'ftpcp', 'test'}
1150 support.check__all__(self, ftplib, not_exported=not_exported)
1151
1152
1153 def setUpModule():
1154 thread_info = threading_helper.threading_setup()
1155 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1156
1157
1158 if __name__ == '__main__':
1159 unittest.main()