1 import io
2 import socket
3 import datetime
4 import textwrap
5 import unittest
6 import functools
7 import contextlib
8 import os.path
9 import re
10 import threading
11
12 from test import support
13 from test.support import socket_helper, warnings_helper
14 nntplib = warnings_helper.import_deprecated("nntplib")
15 from nntplib import NNTP, GroupInfo
16 from unittest.mock import patch
17 try:
18 import ssl
19 except ImportError:
20 ssl = None
21
22
23 certfile = os.path.join(os.path.dirname(__file__), 'certdata', 'keycert3.pem')
24
25 if ssl is not None:
26 SSLError = ssl.SSLError
27 else:
28 class ESC[4;38;5;81mSSLError(ESC[4;38;5;149mException):
29 """Non-existent exception class when we lack SSL support."""
30 reason = "This will never be raised."
31
32 # TODO:
33 # - test the `file` arg to more commands
34 # - test error conditions
35 # - test auth and `usenetrc`
36
37
38 class ESC[4;38;5;81mNetworkedNNTPTestsMixin:
39
40 ssl_context = None
41
42 def test_welcome(self):
43 welcome = self.server.getwelcome()
44 self.assertEqual(str, type(welcome))
45
46 def test_help(self):
47 resp, lines = self.server.help()
48 self.assertTrue(resp.startswith("100 "), resp)
49 for line in lines:
50 self.assertEqual(str, type(line))
51
52 def test_list(self):
53 resp, groups = self.server.list()
54 if len(groups) > 0:
55 self.assertEqual(GroupInfo, type(groups[0]))
56 self.assertEqual(str, type(groups[0].group))
57
58 def test_list_active(self):
59 resp, groups = self.server.list(self.GROUP_PAT)
60 if len(groups) > 0:
61 self.assertEqual(GroupInfo, type(groups[0]))
62 self.assertEqual(str, type(groups[0].group))
63
64 def test_unknown_command(self):
65 with self.assertRaises(nntplib.NNTPPermanentError) as cm:
66 self.server._shortcmd("XYZZY")
67 resp = cm.exception.response
68 self.assertTrue(resp.startswith("500 "), resp)
69
70 def test_newgroups(self):
71 # gmane gets a constant influx of new groups. In order not to stress
72 # the server too much, we choose a recent date in the past.
73 dt = datetime.date.today() - datetime.timedelta(days=7)
74 resp, groups = self.server.newgroups(dt)
75 if len(groups) > 0:
76 self.assertIsInstance(groups[0], GroupInfo)
77 self.assertIsInstance(groups[0].group, str)
78
79 def test_description(self):
80 def _check_desc(desc):
81 # Sanity checks
82 self.assertIsInstance(desc, str)
83 self.assertNotIn(self.GROUP_NAME, desc)
84 desc = self.server.description(self.GROUP_NAME)
85 _check_desc(desc)
86 # Another sanity check
87 self.assertIn(self.DESC, desc)
88 # With a pattern
89 desc = self.server.description(self.GROUP_PAT)
90 _check_desc(desc)
91 # Shouldn't exist
92 desc = self.server.description("zk.brrtt.baz")
93 self.assertEqual(desc, '')
94
95 def test_descriptions(self):
96 resp, descs = self.server.descriptions(self.GROUP_PAT)
97 # 215 for LIST NEWSGROUPS, 282 for XGTITLE
98 self.assertTrue(
99 resp.startswith("215 ") or resp.startswith("282 "), resp)
100 self.assertIsInstance(descs, dict)
101 desc = descs[self.GROUP_NAME]
102 self.assertEqual(desc, self.server.description(self.GROUP_NAME))
103
104 def test_group(self):
105 result = self.server.group(self.GROUP_NAME)
106 self.assertEqual(5, len(result))
107 resp, count, first, last, group = result
108 self.assertEqual(group, self.GROUP_NAME)
109 self.assertIsInstance(count, int)
110 self.assertIsInstance(first, int)
111 self.assertIsInstance(last, int)
112 self.assertLessEqual(first, last)
113 self.assertTrue(resp.startswith("211 "), resp)
114
115 def test_date(self):
116 resp, date = self.server.date()
117 self.assertIsInstance(date, datetime.datetime)
118 # Sanity check
119 self.assertGreaterEqual(date.year, 1995)
120 self.assertLessEqual(date.year, 2030)
121
122 def _check_art_dict(self, art_dict):
123 # Some sanity checks for a field dictionary returned by OVER / XOVER
124 self.assertIsInstance(art_dict, dict)
125 # NNTP has 7 mandatory fields
126 self.assertGreaterEqual(art_dict.keys(),
127 {"subject", "from", "date", "message-id",
128 "references", ":bytes", ":lines"}
129 )
130 for v in art_dict.values():
131 self.assertIsInstance(v, (str, type(None)))
132
133 def test_xover(self):
134 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
135 resp, lines = self.server.xover(last - 5, last)
136 if len(lines) == 0:
137 self.skipTest("no articles retrieved")
138 # The 'last' article is not necessarily part of the output (cancelled?)
139 art_num, art_dict = lines[0]
140 self.assertGreaterEqual(art_num, last - 5)
141 self.assertLessEqual(art_num, last)
142 self._check_art_dict(art_dict)
143
144 @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
145 ' is found for issue #28971')
146 def test_over(self):
147 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
148 start = last - 10
149 # The "start-" article range form
150 resp, lines = self.server.over((start, None))
151 art_num, art_dict = lines[0]
152 self._check_art_dict(art_dict)
153 # The "start-end" article range form
154 resp, lines = self.server.over((start, last))
155 art_num, art_dict = lines[-1]
156 # The 'last' article is not necessarily part of the output (cancelled?)
157 self.assertGreaterEqual(art_num, start)
158 self.assertLessEqual(art_num, last)
159 self._check_art_dict(art_dict)
160 # XXX The "message_id" form is unsupported by gmane
161 # 503 Overview by message-ID unsupported
162
163 def test_xhdr(self):
164 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
165 resp, lines = self.server.xhdr('subject', last)
166 for line in lines:
167 self.assertEqual(str, type(line[1]))
168
169 def check_article_resp(self, resp, article, art_num=None):
170 self.assertIsInstance(article, nntplib.ArticleInfo)
171 if art_num is not None:
172 self.assertEqual(article.number, art_num)
173 for line in article.lines:
174 self.assertIsInstance(line, bytes)
175 # XXX this could exceptionally happen...
176 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
177
178 @unittest.skipIf(True, "FIXME: see bpo-32128")
179 def test_article_head_body(self):
180 resp, count, first, last, name = self.server.group(self.GROUP_NAME)
181 # Try to find an available article
182 for art_num in (last, first, last - 1):
183 try:
184 resp, head = self.server.head(art_num)
185 except nntplib.NNTPTemporaryError as e:
186 if not e.response.startswith("423 "):
187 raise
188 # "423 No such article" => choose another one
189 continue
190 break
191 else:
192 self.skipTest("could not find a suitable article number")
193 self.assertTrue(resp.startswith("221 "), resp)
194 self.check_article_resp(resp, head, art_num)
195 resp, body = self.server.body(art_num)
196 self.assertTrue(resp.startswith("222 "), resp)
197 self.check_article_resp(resp, body, art_num)
198 resp, article = self.server.article(art_num)
199 self.assertTrue(resp.startswith("220 "), resp)
200 self.check_article_resp(resp, article, art_num)
201 # Tolerate running the tests from behind a NNTP virus checker
202 denylist = lambda line: line.startswith(b'X-Antivirus')
203 filtered_head_lines = [line for line in head.lines
204 if not denylist(line)]
205 filtered_lines = [line for line in article.lines
206 if not denylist(line)]
207 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
208
209 def test_capabilities(self):
210 # The server under test implements NNTP version 2 and has a
211 # couple of well-known capabilities. Just sanity check that we
212 # got them.
213 def _check_caps(caps):
214 caps_list = caps['LIST']
215 self.assertIsInstance(caps_list, (list, tuple))
216 self.assertIn('OVERVIEW.FMT', caps_list)
217 self.assertGreaterEqual(self.server.nntp_version, 2)
218 _check_caps(self.server.getcapabilities())
219 # This re-emits the command
220 resp, caps = self.server.capabilities()
221 _check_caps(caps)
222
223 def test_zlogin(self):
224 # This test must be the penultimate because further commands will be
225 # refused.
226 baduser = "notarealuser"
227 badpw = "notarealpassword"
228 # Check that bogus credentials cause failure
229 self.assertRaises(nntplib.NNTPError, self.server.login,
230 user=baduser, password=badpw, usenetrc=False)
231 # FIXME: We should check that correct credentials succeed, but that
232 # would require valid details for some server somewhere to be in the
233 # test suite, I think. Gmane is anonymous, at least as used for the
234 # other tests.
235
236 def test_zzquit(self):
237 # This test must be called last, hence the name
238 cls = type(self)
239 try:
240 self.server.quit()
241 finally:
242 cls.server = None
243
244 @classmethod
245 def wrap_methods(cls):
246 # Wrap all methods in a transient_internet() exception catcher
247 # XXX put a generic version in test.support?
248 def wrap_meth(meth):
249 @functools.wraps(meth)
250 def wrapped(self):
251 with socket_helper.transient_internet(self.NNTP_HOST):
252 meth(self)
253 return wrapped
254 for name in dir(cls):
255 if not name.startswith('test_'):
256 continue
257 meth = getattr(cls, name)
258 if not callable(meth):
259 continue
260 # Need to use a closure so that meth remains bound to its current
261 # value
262 setattr(cls, name, wrap_meth(meth))
263
264 def test_timeout(self):
265 with self.assertRaises(ValueError):
266 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
267
268 def test_with_statement(self):
269 def is_connected():
270 if not hasattr(server, 'file'):
271 return False
272 try:
273 server.help()
274 except (OSError, EOFError):
275 return False
276 return True
277
278 kwargs = dict(
279 timeout=support.INTERNET_TIMEOUT,
280 usenetrc=False
281 )
282 if self.ssl_context is not None:
283 kwargs["ssl_context"] = self.ssl_context
284
285 try:
286 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
287 with server:
288 self.assertTrue(is_connected())
289 self.assertTrue(server.help())
290 self.assertFalse(is_connected())
291
292 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
293 with server:
294 server.quit()
295 self.assertFalse(is_connected())
296 except SSLError as ssl_err:
297 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
298 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
299 raise unittest.SkipTest(f"Got {ssl_err} connecting "
300 f"to {self.NNTP_HOST!r}")
301 raise
302
303
304 NetworkedNNTPTestsMixin.wrap_methods()
305
306
307 EOF_ERRORS = (EOFError,)
308 if ssl is not None:
309 EOF_ERRORS += (ssl.SSLEOFError,)
310
311
312 class ESC[4;38;5;81mNetworkedNNTPTests(ESC[4;38;5;149mNetworkedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
313 # This server supports STARTTLS (gmane doesn't)
314 NNTP_HOST = 'news.trigofacile.com'
315 GROUP_NAME = 'fr.comp.lang.python'
316 GROUP_PAT = 'fr.comp.lang.*'
317 DESC = 'Python'
318
319 NNTP_CLASS = NNTP
320
321 @classmethod
322 def setUpClass(cls):
323 support.requires("network")
324 kwargs = dict(
325 timeout=support.INTERNET_TIMEOUT,
326 usenetrc=False
327 )
328 if cls.ssl_context is not None:
329 kwargs["ssl_context"] = cls.ssl_context
330 with socket_helper.transient_internet(cls.NNTP_HOST):
331 try:
332 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs)
333 except SSLError as ssl_err:
334 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
335 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
336 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
337 f"to {cls.NNTP_HOST!r}")
338 print(cls.NNTP_HOST)
339 raise
340 except EOF_ERRORS:
341 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
342 f"to {cls.NNTP_HOST!r}")
343
344 @classmethod
345 def tearDownClass(cls):
346 if cls.server is not None:
347 cls.server.quit()
348
349 @unittest.skipUnless(ssl, 'requires SSL support')
350 class ESC[4;38;5;81mNetworkedNNTP_SSLTests(ESC[4;38;5;149mNetworkedNNTPTests):
351
352 # Technical limits for this public NNTP server (see http://www.aioe.org):
353 # "Only two concurrent connections per IP address are allowed and
354 # 400 connections per day are accepted from each IP address."
355
356 NNTP_HOST = 'nntp.aioe.org'
357 # bpo-42794: aioe.test is one of the official groups on this server
358 # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
359 GROUP_NAME = 'aioe.test'
360 GROUP_PAT = 'aioe.*'
361 DESC = 'test'
362
363 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
364
365 # Disabled as it produces too much data
366 test_list = None
367
368 # Disabled as the connection will already be encrypted.
369 test_starttls = None
370
371 if ssl is not None:
372 ssl_context = ssl._create_unverified_context()
373 ssl_context.set_ciphers("DEFAULT")
374 ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
375
376 #
377 # Non-networked tests using a local server (or something mocking it).
378 #
379
380 class ESC[4;38;5;81m_NNTPServerIO(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mRawIOBase):
381 """A raw IO object allowing NNTP commands to be received and processed
382 by a handler. The handler can push responses which can then be read
383 from the IO object."""
384
385 def __init__(self, handler):
386 io.RawIOBase.__init__(self)
387 # The channel from the client
388 self.c2s = io.BytesIO()
389 # The channel to the client
390 self.s2c = io.BytesIO()
391 self.handler = handler
392 self.handler.start(self.c2s.readline, self.push_data)
393
394 def readable(self):
395 return True
396
397 def writable(self):
398 return True
399
400 def push_data(self, data):
401 """Push (buffer) some data to send to the client."""
402 pos = self.s2c.tell()
403 self.s2c.seek(0, 2)
404 self.s2c.write(data)
405 self.s2c.seek(pos)
406
407 def write(self, b):
408 """The client sends us some data"""
409 pos = self.c2s.tell()
410 self.c2s.write(b)
411 self.c2s.seek(pos)
412 self.handler.process_pending()
413 return len(b)
414
415 def readinto(self, buf):
416 """The client wants to read a response"""
417 self.handler.process_pending()
418 b = self.s2c.read(len(buf))
419 n = len(b)
420 buf[:n] = b
421 return n
422
423
424 def make_mock_file(handler):
425 sio = _NNTPServerIO(handler)
426 # Using BufferedRWPair instead of BufferedRandom ensures the file
427 # isn't seekable.
428 file = io.BufferedRWPair(sio, sio)
429 return (sio, file)
430
431
432 class ESC[4;38;5;81mNNTPServer(ESC[4;38;5;149mnntplibESC[4;38;5;149m.ESC[4;38;5;149mNNTP):
433
434 def __init__(self, f, host, readermode=None):
435 self.file = f
436 self.host = host
437 self._base_init(readermode)
438
439 def _close(self):
440 self.file.close()
441 del self.file
442
443
444 class ESC[4;38;5;81mMockedNNTPTestsMixin:
445 # Override in derived classes
446 handler_class = None
447
448 def setUp(self):
449 super().setUp()
450 self.make_server()
451
452 def tearDown(self):
453 super().tearDown()
454 del self.server
455
456 def make_server(self, *args, **kwargs):
457 self.handler = self.handler_class()
458 self.sio, file = make_mock_file(self.handler)
459 self.server = NNTPServer(file, 'test.server', *args, **kwargs)
460 return self.server
461
462
463 class ESC[4;38;5;81mMockedNNTPWithReaderModeMixin(ESC[4;38;5;149mMockedNNTPTestsMixin):
464 def setUp(self):
465 super().setUp()
466 self.make_server(readermode=True)
467
468
469 class ESC[4;38;5;81mNNTPv1Handler:
470 """A handler for RFC 977"""
471
472 welcome = "200 NNTP mock server"
473
474 def start(self, readline, push_data):
475 self.in_body = False
476 self.allow_posting = True
477 self._readline = readline
478 self._push_data = push_data
479 self._logged_in = False
480 self._user_sent = False
481 # Our welcome
482 self.handle_welcome()
483
484 def _decode(self, data):
485 return str(data, "utf-8", "surrogateescape")
486
487 def process_pending(self):
488 if self.in_body:
489 while True:
490 line = self._readline()
491 if not line:
492 return
493 self.body.append(line)
494 if line == b".\r\n":
495 break
496 try:
497 meth, tokens = self.body_callback
498 meth(*tokens, body=self.body)
499 finally:
500 self.body_callback = None
501 self.body = None
502 self.in_body = False
503 while True:
504 line = self._decode(self._readline())
505 if not line:
506 return
507 if not line.endswith("\r\n"):
508 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
509 line = line[:-2]
510 cmd, *tokens = line.split()
511 #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
512 meth = getattr(self, "handle_" + cmd.upper(), None)
513 if meth is None:
514 self.handle_unknown()
515 else:
516 try:
517 meth(*tokens)
518 except Exception as e:
519 raise ValueError("command failed: {!r}".format(line)) from e
520 else:
521 if self.in_body:
522 self.body_callback = meth, tokens
523 self.body = []
524
525 def expect_body(self):
526 """Flag that the client is expected to post a request body"""
527 self.in_body = True
528
529 def push_data(self, data):
530 """Push some binary data"""
531 self._push_data(data)
532
533 def push_lit(self, lit):
534 """Push a string literal"""
535 lit = textwrap.dedent(lit)
536 lit = "\r\n".join(lit.splitlines()) + "\r\n"
537 lit = lit.encode('utf-8')
538 self.push_data(lit)
539
540 def handle_unknown(self):
541 self.push_lit("500 What?")
542
543 def handle_welcome(self):
544 self.push_lit(self.welcome)
545
546 def handle_QUIT(self):
547 self.push_lit("205 Bye!")
548
549 def handle_DATE(self):
550 self.push_lit("111 20100914001155")
551
552 def handle_GROUP(self, group):
553 if group == "fr.comp.lang.python":
554 self.push_lit("211 486 761 1265 fr.comp.lang.python")
555 else:
556 self.push_lit("411 No such group {}".format(group))
557
558 def handle_HELP(self):
559 self.push_lit("""\
560 100 Legal commands
561 authinfo user Name|pass Password|generic <prog> <args>
562 date
563 help
564 Report problems to <root@example.org>
565 .""")
566
567 def handle_STAT(self, message_spec=None):
568 if message_spec is None:
569 self.push_lit("412 No newsgroup selected")
570 elif message_spec == "3000234":
571 self.push_lit("223 3000234 <45223423@example.com>")
572 elif message_spec == "<45223423@example.com>":
573 self.push_lit("223 0 <45223423@example.com>")
574 else:
575 self.push_lit("430 No Such Article Found")
576
577 def handle_NEXT(self):
578 self.push_lit("223 3000237 <668929@example.org> retrieved")
579
580 def handle_LAST(self):
581 self.push_lit("223 3000234 <45223423@example.com> retrieved")
582
583 def handle_LIST(self, action=None, param=None):
584 if action is None:
585 self.push_lit("""\
586 215 Newsgroups in form "group high low flags".
587 comp.lang.python 0000052340 0000002828 y
588 comp.lang.python.announce 0000001153 0000000993 m
589 free.it.comp.lang.python 0000000002 0000000002 y
590 fr.comp.lang.python 0000001254 0000000760 y
591 free.it.comp.lang.python.learner 0000000000 0000000001 y
592 tw.bbs.comp.lang.python 0000000304 0000000304 y
593 .""")
594 elif action == "ACTIVE":
595 if param == "*distutils*":
596 self.push_lit("""\
597 215 Newsgroups in form "group high low flags"
598 gmane.comp.python.distutils.devel 0000014104 0000000001 m
599 gmane.comp.python.distutils.cvs 0000000000 0000000001 m
600 .""")
601 else:
602 self.push_lit("""\
603 215 Newsgroups in form "group high low flags"
604 .""")
605 elif action == "OVERVIEW.FMT":
606 self.push_lit("""\
607 215 Order of fields in overview database.
608 Subject:
609 From:
610 Date:
611 Message-ID:
612 References:
613 Bytes:
614 Lines:
615 Xref:full
616 .""")
617 elif action == "NEWSGROUPS":
618 assert param is not None
619 if param == "comp.lang.python":
620 self.push_lit("""\
621 215 Descriptions in form "group description".
622 comp.lang.python\tThe Python computer language.
623 .""")
624 elif param == "comp.lang.python*":
625 self.push_lit("""\
626 215 Descriptions in form "group description".
627 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
628 comp.lang.python\tThe Python computer language.
629 .""")
630 else:
631 self.push_lit("""\
632 215 Descriptions in form "group description".
633 .""")
634 else:
635 self.push_lit('501 Unknown LIST keyword')
636
637 def handle_NEWNEWS(self, group, date_str, time_str):
638 # We hard code different return messages depending on passed
639 # argument and date syntax.
640 if (group == "comp.lang.python" and date_str == "20100913"
641 and time_str == "082004"):
642 # Date was passed in RFC 3977 format (NNTP "v2")
643 self.push_lit("""\
644 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
645 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
646 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
647 .""")
648 elif (group == "comp.lang.python" and date_str == "100913"
649 and time_str == "082004"):
650 # Date was passed in RFC 977 format (NNTP "v1")
651 self.push_lit("""\
652 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
653 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
654 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
655 .""")
656 elif (group == 'comp.lang.python' and
657 date_str in ('20100101', '100101') and
658 time_str == '090000'):
659 self.push_lit('too long line' * 3000 +
660 '\n.')
661 else:
662 self.push_lit("""\
663 230 An empty list of newsarticles follows
664 .""")
665 # (Note for experiments: many servers disable NEWNEWS.
666 # As of this writing, sicinfo3.epfl.ch doesn't.)
667
668 def handle_XOVER(self, message_spec):
669 if message_spec == "57-59":
670 self.push_lit(
671 "224 Overview information for 57-58 follows\n"
672 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
673 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
674 "\tSat, 19 Jun 2010 18:04:08 -0400"
675 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
676 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
677 "\tXref: news.gmane.io gmane.comp.python.authors:57"
678 "\n"
679 "58\tLooking for a few good bloggers"
680 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
681 "\tThu, 22 Jul 2010 09:14:14 -0400"
682 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
683 "\t\t6683\t16"
684 "\t"
685 "\n"
686 # A UTF-8 overview line from fr.comp.lang.python
687 "59\tRe: Message d'erreur incompréhensible (par moi)"
688 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
689 "\tWed, 15 Sep 2010 18:09:15 +0200"
690 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
691 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
692 "\tXref: saria.nerim.net fr.comp.lang.python:1265"
693 "\n"
694 ".\n")
695 else:
696 self.push_lit("""\
697 224 No articles
698 .""")
699
700 def handle_POST(self, *, body=None):
701 if body is None:
702 if self.allow_posting:
703 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
704 self.expect_body()
705 else:
706 self.push_lit("440 Posting not permitted")
707 else:
708 assert self.allow_posting
709 self.push_lit("240 Article received OK")
710 self.posted_body = body
711
712 def handle_IHAVE(self, message_id, *, body=None):
713 if body is None:
714 if (self.allow_posting and
715 message_id == "<i.am.an.article.you.will.want@example.com>"):
716 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
717 self.expect_body()
718 else:
719 self.push_lit("435 Article not wanted")
720 else:
721 assert self.allow_posting
722 self.push_lit("235 Article transferred OK")
723 self.posted_body = body
724
725 sample_head = """\
726 From: "Demo User" <nobody@example.net>
727 Subject: I am just a test article
728 Content-Type: text/plain; charset=UTF-8; format=flowed
729 Message-ID: <i.am.an.article.you.will.want@example.com>"""
730
731 sample_body = """\
732 This is just a test article.
733 ..Here is a dot-starting line.
734
735 -- Signed by Andr\xe9."""
736
737 sample_article = sample_head + "\n\n" + sample_body
738
739 def handle_ARTICLE(self, message_spec=None):
740 if message_spec is None:
741 self.push_lit("220 3000237 <45223423@example.com>")
742 elif message_spec == "<45223423@example.com>":
743 self.push_lit("220 0 <45223423@example.com>")
744 elif message_spec == "3000234":
745 self.push_lit("220 3000234 <45223423@example.com>")
746 else:
747 self.push_lit("430 No Such Article Found")
748 return
749 self.push_lit(self.sample_article)
750 self.push_lit(".")
751
752 def handle_HEAD(self, message_spec=None):
753 if message_spec is None:
754 self.push_lit("221 3000237 <45223423@example.com>")
755 elif message_spec == "<45223423@example.com>":
756 self.push_lit("221 0 <45223423@example.com>")
757 elif message_spec == "3000234":
758 self.push_lit("221 3000234 <45223423@example.com>")
759 else:
760 self.push_lit("430 No Such Article Found")
761 return
762 self.push_lit(self.sample_head)
763 self.push_lit(".")
764
765 def handle_BODY(self, message_spec=None):
766 if message_spec is None:
767 self.push_lit("222 3000237 <45223423@example.com>")
768 elif message_spec == "<45223423@example.com>":
769 self.push_lit("222 0 <45223423@example.com>")
770 elif message_spec == "3000234":
771 self.push_lit("222 3000234 <45223423@example.com>")
772 else:
773 self.push_lit("430 No Such Article Found")
774 return
775 self.push_lit(self.sample_body)
776 self.push_lit(".")
777
778 def handle_AUTHINFO(self, cred_type, data):
779 if self._logged_in:
780 self.push_lit('502 Already Logged In')
781 elif cred_type == 'user':
782 if self._user_sent:
783 self.push_lit('482 User Credential Already Sent')
784 else:
785 self.push_lit('381 Password Required')
786 self._user_sent = True
787 elif cred_type == 'pass':
788 self.push_lit('281 Login Successful')
789 self._logged_in = True
790 else:
791 raise Exception('Unknown cred type {}'.format(cred_type))
792
793
794 class ESC[4;38;5;81mNNTPv2Handler(ESC[4;38;5;149mNNTPv1Handler):
795 """A handler for RFC 3977 (NNTP "v2")"""
796
797 def handle_CAPABILITIES(self):
798 fmt = """\
799 101 Capability list:
800 VERSION 2 3
801 IMPLEMENTATION INN 2.5.1{}
802 HDR
803 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
804 OVER
805 POST
806 READER
807 ."""
808
809 if not self._logged_in:
810 self.push_lit(fmt.format('\n AUTHINFO USER'))
811 else:
812 self.push_lit(fmt.format(''))
813
814 def handle_MODE(self, _):
815 raise Exception('MODE READER sent despite READER has been advertised')
816
817 def handle_OVER(self, message_spec=None):
818 return self.handle_XOVER(message_spec)
819
820
821 class ESC[4;38;5;81mCapsAfterLoginNNTPv2Handler(ESC[4;38;5;149mNNTPv2Handler):
822 """A handler that allows CAPABILITIES only after login"""
823
824 def handle_CAPABILITIES(self):
825 if not self._logged_in:
826 self.push_lit('480 You must log in.')
827 else:
828 super().handle_CAPABILITIES()
829
830
831 class ESC[4;38;5;81mModeSwitchingNNTPv2Handler(ESC[4;38;5;149mNNTPv2Handler):
832 """A server that starts in transit mode"""
833
834 def __init__(self):
835 self._switched = False
836
837 def handle_CAPABILITIES(self):
838 fmt = """\
839 101 Capability list:
840 VERSION 2 3
841 IMPLEMENTATION INN 2.5.1
842 HDR
843 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
844 OVER
845 POST
846 {}READER
847 ."""
848 if self._switched:
849 self.push_lit(fmt.format(''))
850 else:
851 self.push_lit(fmt.format('MODE-'))
852
853 def handle_MODE(self, what):
854 assert not self._switched and what == 'reader'
855 self._switched = True
856 self.push_lit('200 Posting allowed')
857
858
859 class ESC[4;38;5;81mNNTPv1v2TestsMixin:
860
861 def setUp(self):
862 super().setUp()
863
864 def test_welcome(self):
865 self.assertEqual(self.server.welcome, self.handler.welcome)
866
867 def test_authinfo(self):
868 if self.nntp_version == 2:
869 self.assertIn('AUTHINFO', self.server._caps)
870 self.server.login('testuser', 'testpw')
871 # if AUTHINFO is gone from _caps we also know that getcapabilities()
872 # has been called after login as it should
873 self.assertNotIn('AUTHINFO', self.server._caps)
874
875 def test_date(self):
876 resp, date = self.server.date()
877 self.assertEqual(resp, "111 20100914001155")
878 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
879
880 def test_quit(self):
881 self.assertFalse(self.sio.closed)
882 resp = self.server.quit()
883 self.assertEqual(resp, "205 Bye!")
884 self.assertTrue(self.sio.closed)
885
886 def test_help(self):
887 resp, help = self.server.help()
888 self.assertEqual(resp, "100 Legal commands")
889 self.assertEqual(help, [
890 ' authinfo user Name|pass Password|generic <prog> <args>',
891 ' date',
892 ' help',
893 'Report problems to <root@example.org>',
894 ])
895
896 def test_list(self):
897 resp, groups = self.server.list()
898 self.assertEqual(len(groups), 6)
899 g = groups[1]
900 self.assertEqual(g,
901 GroupInfo("comp.lang.python.announce", "0000001153",
902 "0000000993", "m"))
903 resp, groups = self.server.list("*distutils*")
904 self.assertEqual(len(groups), 2)
905 g = groups[0]
906 self.assertEqual(g,
907 GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
908 "0000000001", "m"))
909
910 def test_stat(self):
911 resp, art_num, message_id = self.server.stat(3000234)
912 self.assertEqual(resp, "223 3000234 <45223423@example.com>")
913 self.assertEqual(art_num, 3000234)
914 self.assertEqual(message_id, "<45223423@example.com>")
915 resp, art_num, message_id = self.server.stat("<45223423@example.com>")
916 self.assertEqual(resp, "223 0 <45223423@example.com>")
917 self.assertEqual(art_num, 0)
918 self.assertEqual(message_id, "<45223423@example.com>")
919 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
920 self.server.stat("<non.existent.id>")
921 self.assertEqual(cm.exception.response, "430 No Such Article Found")
922 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
923 self.server.stat()
924 self.assertEqual(cm.exception.response, "412 No newsgroup selected")
925
926 def test_next(self):
927 resp, art_num, message_id = self.server.next()
928 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
929 self.assertEqual(art_num, 3000237)
930 self.assertEqual(message_id, "<668929@example.org>")
931
932 def test_last(self):
933 resp, art_num, message_id = self.server.last()
934 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
935 self.assertEqual(art_num, 3000234)
936 self.assertEqual(message_id, "<45223423@example.com>")
937
938 def test_description(self):
939 desc = self.server.description("comp.lang.python")
940 self.assertEqual(desc, "The Python computer language.")
941 desc = self.server.description("comp.lang.pythonx")
942 self.assertEqual(desc, "")
943
944 def test_descriptions(self):
945 resp, groups = self.server.descriptions("comp.lang.python")
946 self.assertEqual(resp, '215 Descriptions in form "group description".')
947 self.assertEqual(groups, {
948 "comp.lang.python": "The Python computer language.",
949 })
950 resp, groups = self.server.descriptions("comp.lang.python*")
951 self.assertEqual(groups, {
952 "comp.lang.python": "The Python computer language.",
953 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
954 })
955 resp, groups = self.server.descriptions("comp.lang.pythonx")
956 self.assertEqual(groups, {})
957
958 def test_group(self):
959 resp, count, first, last, group = self.server.group("fr.comp.lang.python")
960 self.assertTrue(resp.startswith("211 "), resp)
961 self.assertEqual(first, 761)
962 self.assertEqual(last, 1265)
963 self.assertEqual(count, 486)
964 self.assertEqual(group, "fr.comp.lang.python")
965 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
966 self.server.group("comp.lang.python.devel")
967 exc = cm.exception
968 self.assertTrue(exc.response.startswith("411 No such group"),
969 exc.response)
970
971 def test_newnews(self):
972 # NEWNEWS comp.lang.python [20]100913 082004
973 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
974 resp, ids = self.server.newnews("comp.lang.python", dt)
975 expected = (
976 "230 list of newsarticles (NNTP v{0}) "
977 "created after Mon Sep 13 08:20:04 2010 follows"
978 ).format(self.nntp_version)
979 self.assertEqual(resp, expected)
980 self.assertEqual(ids, [
981 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
982 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
983 ])
984 # NEWNEWS fr.comp.lang.python [20]100913 082004
985 dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
986 resp, ids = self.server.newnews("fr.comp.lang.python", dt)
987 self.assertEqual(resp, "230 An empty list of newsarticles follows")
988 self.assertEqual(ids, [])
989
990 def _check_article_body(self, lines):
991 self.assertEqual(len(lines), 4)
992 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
993 self.assertEqual(lines[-2], b"")
994 self.assertEqual(lines[-3], b".Here is a dot-starting line.")
995 self.assertEqual(lines[-4], b"This is just a test article.")
996
997 def _check_article_head(self, lines):
998 self.assertEqual(len(lines), 4)
999 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
1000 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
1001
1002 def _check_article_data(self, lines):
1003 self.assertEqual(len(lines), 9)
1004 self._check_article_head(lines[:4])
1005 self._check_article_body(lines[-4:])
1006 self.assertEqual(lines[4], b"")
1007
1008 def test_article(self):
1009 # ARTICLE
1010 resp, info = self.server.article()
1011 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1012 art_num, message_id, lines = info
1013 self.assertEqual(art_num, 3000237)
1014 self.assertEqual(message_id, "<45223423@example.com>")
1015 self._check_article_data(lines)
1016 # ARTICLE num
1017 resp, info = self.server.article(3000234)
1018 self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1019 art_num, message_id, lines = info
1020 self.assertEqual(art_num, 3000234)
1021 self.assertEqual(message_id, "<45223423@example.com>")
1022 self._check_article_data(lines)
1023 # ARTICLE id
1024 resp, info = self.server.article("<45223423@example.com>")
1025 self.assertEqual(resp, "220 0 <45223423@example.com>")
1026 art_num, message_id, lines = info
1027 self.assertEqual(art_num, 0)
1028 self.assertEqual(message_id, "<45223423@example.com>")
1029 self._check_article_data(lines)
1030 # Non-existent id
1031 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1032 self.server.article("<non-existent@example.com>")
1033 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1034
1035 def test_article_file(self):
1036 # With a "file" argument
1037 f = io.BytesIO()
1038 resp, info = self.server.article(file=f)
1039 self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1040 art_num, message_id, lines = info
1041 self.assertEqual(art_num, 3000237)
1042 self.assertEqual(message_id, "<45223423@example.com>")
1043 self.assertEqual(lines, [])
1044 data = f.getvalue()
1045 self.assertTrue(data.startswith(
1046 b'From: "Demo User" <nobody@example.net>\r\n'
1047 b'Subject: I am just a test article\r\n'
1048 ), ascii(data))
1049 self.assertTrue(data.endswith(
1050 b'This is just a test article.\r\n'
1051 b'.Here is a dot-starting line.\r\n'
1052 b'\r\n'
1053 b'-- Signed by Andr\xc3\xa9.\r\n'
1054 ), ascii(data))
1055
1056 def test_head(self):
1057 # HEAD
1058 resp, info = self.server.head()
1059 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1060 art_num, message_id, lines = info
1061 self.assertEqual(art_num, 3000237)
1062 self.assertEqual(message_id, "<45223423@example.com>")
1063 self._check_article_head(lines)
1064 # HEAD num
1065 resp, info = self.server.head(3000234)
1066 self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1067 art_num, message_id, lines = info
1068 self.assertEqual(art_num, 3000234)
1069 self.assertEqual(message_id, "<45223423@example.com>")
1070 self._check_article_head(lines)
1071 # HEAD id
1072 resp, info = self.server.head("<45223423@example.com>")
1073 self.assertEqual(resp, "221 0 <45223423@example.com>")
1074 art_num, message_id, lines = info
1075 self.assertEqual(art_num, 0)
1076 self.assertEqual(message_id, "<45223423@example.com>")
1077 self._check_article_head(lines)
1078 # Non-existent id
1079 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1080 self.server.head("<non-existent@example.com>")
1081 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1082
1083 def test_head_file(self):
1084 f = io.BytesIO()
1085 resp, info = self.server.head(file=f)
1086 self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1087 art_num, message_id, lines = info
1088 self.assertEqual(art_num, 3000237)
1089 self.assertEqual(message_id, "<45223423@example.com>")
1090 self.assertEqual(lines, [])
1091 data = f.getvalue()
1092 self.assertTrue(data.startswith(
1093 b'From: "Demo User" <nobody@example.net>\r\n'
1094 b'Subject: I am just a test article\r\n'
1095 ), ascii(data))
1096 self.assertFalse(data.endswith(
1097 b'This is just a test article.\r\n'
1098 b'.Here is a dot-starting line.\r\n'
1099 b'\r\n'
1100 b'-- Signed by Andr\xc3\xa9.\r\n'
1101 ), ascii(data))
1102
1103 def test_body(self):
1104 # BODY
1105 resp, info = self.server.body()
1106 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1107 art_num, message_id, lines = info
1108 self.assertEqual(art_num, 3000237)
1109 self.assertEqual(message_id, "<45223423@example.com>")
1110 self._check_article_body(lines)
1111 # BODY num
1112 resp, info = self.server.body(3000234)
1113 self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1114 art_num, message_id, lines = info
1115 self.assertEqual(art_num, 3000234)
1116 self.assertEqual(message_id, "<45223423@example.com>")
1117 self._check_article_body(lines)
1118 # BODY id
1119 resp, info = self.server.body("<45223423@example.com>")
1120 self.assertEqual(resp, "222 0 <45223423@example.com>")
1121 art_num, message_id, lines = info
1122 self.assertEqual(art_num, 0)
1123 self.assertEqual(message_id, "<45223423@example.com>")
1124 self._check_article_body(lines)
1125 # Non-existent id
1126 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1127 self.server.body("<non-existent@example.com>")
1128 self.assertEqual(cm.exception.response, "430 No Such Article Found")
1129
1130 def test_body_file(self):
1131 f = io.BytesIO()
1132 resp, info = self.server.body(file=f)
1133 self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1134 art_num, message_id, lines = info
1135 self.assertEqual(art_num, 3000237)
1136 self.assertEqual(message_id, "<45223423@example.com>")
1137 self.assertEqual(lines, [])
1138 data = f.getvalue()
1139 self.assertFalse(data.startswith(
1140 b'From: "Demo User" <nobody@example.net>\r\n'
1141 b'Subject: I am just a test article\r\n'
1142 ), ascii(data))
1143 self.assertTrue(data.endswith(
1144 b'This is just a test article.\r\n'
1145 b'.Here is a dot-starting line.\r\n'
1146 b'\r\n'
1147 b'-- Signed by Andr\xc3\xa9.\r\n'
1148 ), ascii(data))
1149
1150 def check_over_xover_resp(self, resp, overviews):
1151 self.assertTrue(resp.startswith("224 "), resp)
1152 self.assertEqual(len(overviews), 3)
1153 art_num, over = overviews[0]
1154 self.assertEqual(art_num, 57)
1155 self.assertEqual(over, {
1156 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1157 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1158 "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1159 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1160 "references": "<hvalf7$ort$1@dough.gmane.org>",
1161 ":bytes": "7103",
1162 ":lines": "16",
1163 "xref": "news.gmane.io gmane.comp.python.authors:57"
1164 })
1165 art_num, over = overviews[1]
1166 self.assertEqual(over["xref"], None)
1167 art_num, over = overviews[2]
1168 self.assertEqual(over["subject"],
1169 "Re: Message d'erreur incompréhensible (par moi)")
1170
1171 def test_xover(self):
1172 resp, overviews = self.server.xover(57, 59)
1173 self.check_over_xover_resp(resp, overviews)
1174
1175 def test_over(self):
1176 # In NNTP "v1", this will fallback on XOVER
1177 resp, overviews = self.server.over((57, 59))
1178 self.check_over_xover_resp(resp, overviews)
1179
1180 sample_post = (
1181 b'From: "Demo User" <nobody@example.net>\r\n'
1182 b'Subject: I am just a test article\r\n'
1183 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1184 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1185 b'\r\n'
1186 b'This is just a test article.\r\n'
1187 b'.Here is a dot-starting line.\r\n'
1188 b'\r\n'
1189 b'-- Signed by Andr\xc3\xa9.\r\n'
1190 )
1191
1192 def _check_posted_body(self):
1193 # Check the raw body as received by the server
1194 lines = self.handler.posted_body
1195 # One additional line for the "." terminator
1196 self.assertEqual(len(lines), 10)
1197 self.assertEqual(lines[-1], b'.\r\n')
1198 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1199 self.assertEqual(lines[-3], b'\r\n')
1200 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1201 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1202
1203 def _check_post_ihave_sub(self, func, *args, file_factory):
1204 # First the prepared post with CRLF endings
1205 post = self.sample_post
1206 func_args = args + (file_factory(post),)
1207 self.handler.posted_body = None
1208 resp = func(*func_args)
1209 self._check_posted_body()
1210 # Then the same post with "normal" line endings - they should be
1211 # converted by NNTP.post and NNTP.ihave.
1212 post = self.sample_post.replace(b"\r\n", b"\n")
1213 func_args = args + (file_factory(post),)
1214 self.handler.posted_body = None
1215 resp = func(*func_args)
1216 self._check_posted_body()
1217 return resp
1218
1219 def check_post_ihave(self, func, success_resp, *args):
1220 # With a bytes object
1221 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1222 self.assertEqual(resp, success_resp)
1223 # With a bytearray object
1224 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1225 self.assertEqual(resp, success_resp)
1226 # With a file object
1227 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1228 self.assertEqual(resp, success_resp)
1229 # With an iterable of terminated lines
1230 def iterlines(b):
1231 return iter(b.splitlines(keepends=True))
1232 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1233 self.assertEqual(resp, success_resp)
1234 # With an iterable of non-terminated lines
1235 def iterlines(b):
1236 return iter(b.splitlines(keepends=False))
1237 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1238 self.assertEqual(resp, success_resp)
1239
1240 def test_post(self):
1241 self.check_post_ihave(self.server.post, "240 Article received OK")
1242 self.handler.allow_posting = False
1243 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1244 self.server.post(self.sample_post)
1245 self.assertEqual(cm.exception.response,
1246 "440 Posting not permitted")
1247
1248 def test_ihave(self):
1249 self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1250 "<i.am.an.article.you.will.want@example.com>")
1251 with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1252 self.server.ihave("<another.message.id>", self.sample_post)
1253 self.assertEqual(cm.exception.response,
1254 "435 Article not wanted")
1255
1256 def test_too_long_lines(self):
1257 dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1258 self.assertRaises(nntplib.NNTPDataError,
1259 self.server.newnews, "comp.lang.python", dt)
1260
1261
1262 class ESC[4;38;5;81mNNTPv1Tests(ESC[4;38;5;149mNNTPv1v2TestsMixin, ESC[4;38;5;149mMockedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1263 """Tests an NNTP v1 server (no capabilities)."""
1264
1265 nntp_version = 1
1266 handler_class = NNTPv1Handler
1267
1268 def test_caps(self):
1269 caps = self.server.getcapabilities()
1270 self.assertEqual(caps, {})
1271 self.assertEqual(self.server.nntp_version, 1)
1272 self.assertEqual(self.server.nntp_implementation, None)
1273
1274
1275 class ESC[4;38;5;81mNNTPv2Tests(ESC[4;38;5;149mNNTPv1v2TestsMixin, ESC[4;38;5;149mMockedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1276 """Tests an NNTP v2 server (with capabilities)."""
1277
1278 nntp_version = 2
1279 handler_class = NNTPv2Handler
1280
1281 def test_caps(self):
1282 caps = self.server.getcapabilities()
1283 self.assertEqual(caps, {
1284 'VERSION': ['2', '3'],
1285 'IMPLEMENTATION': ['INN', '2.5.1'],
1286 'AUTHINFO': ['USER'],
1287 'HDR': [],
1288 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1289 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1290 'OVER': [],
1291 'POST': [],
1292 'READER': [],
1293 })
1294 self.assertEqual(self.server.nntp_version, 3)
1295 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
1296
1297
1298 class ESC[4;38;5;81mCapsAfterLoginNNTPv2Tests(ESC[4;38;5;149mMockedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1299 """Tests a probably NNTP v2 server with capabilities only after login."""
1300
1301 nntp_version = 2
1302 handler_class = CapsAfterLoginNNTPv2Handler
1303
1304 def test_caps_only_after_login(self):
1305 self.assertEqual(self.server._caps, {})
1306 self.server.login('testuser', 'testpw')
1307 self.assertIn('VERSION', self.server._caps)
1308
1309
1310 class ESC[4;38;5;81mSendReaderNNTPv2Tests(ESC[4;38;5;149mMockedNNTPWithReaderModeMixin,
1311 ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1312 """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1313 that isn't in READER mode by default."""
1314
1315 nntp_version = 2
1316 handler_class = ModeSwitchingNNTPv2Handler
1317
1318 def test_we_are_in_reader_mode_after_connect(self):
1319 self.assertIn('READER', self.server._caps)
1320
1321
1322 class ESC[4;38;5;81mMiscTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1323
1324 def test_decode_header(self):
1325 def gives(a, b):
1326 self.assertEqual(nntplib.decode_header(a), b)
1327 gives("" , "")
1328 gives("a plain header", "a plain header")
1329 gives(" with extra spaces ", " with extra spaces ")
1330 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1331 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1332 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1333 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1334 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1335 "Re: problème de matrice")
1336 # A natively utf-8 header (found in the real world!)
1337 gives("Re: Message d'erreur incompréhensible (par moi)",
1338 "Re: Message d'erreur incompréhensible (par moi)")
1339
1340 def test_parse_overview_fmt(self):
1341 # The minimal (default) response
1342 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1343 "References:", ":bytes", ":lines"]
1344 self.assertEqual(nntplib._parse_overview_fmt(lines),
1345 ["subject", "from", "date", "message-id", "references",
1346 ":bytes", ":lines"])
1347 # The minimal response using alternative names
1348 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1349 "References:", "Bytes:", "Lines:"]
1350 self.assertEqual(nntplib._parse_overview_fmt(lines),
1351 ["subject", "from", "date", "message-id", "references",
1352 ":bytes", ":lines"])
1353 # Variations in casing
1354 lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1355 "References:", "BYTES:", "Lines:"]
1356 self.assertEqual(nntplib._parse_overview_fmt(lines),
1357 ["subject", "from", "date", "message-id", "references",
1358 ":bytes", ":lines"])
1359 # First example from RFC 3977
1360 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1361 "References:", ":bytes", ":lines", "Xref:full",
1362 "Distribution:full"]
1363 self.assertEqual(nntplib._parse_overview_fmt(lines),
1364 ["subject", "from", "date", "message-id", "references",
1365 ":bytes", ":lines", "xref", "distribution"])
1366 # Second example from RFC 3977
1367 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1368 "References:", "Bytes:", "Lines:", "Xref:FULL",
1369 "Distribution:FULL"]
1370 self.assertEqual(nntplib._parse_overview_fmt(lines),
1371 ["subject", "from", "date", "message-id", "references",
1372 ":bytes", ":lines", "xref", "distribution"])
1373 # A classic response from INN
1374 lines = ["Subject:", "From:", "Date:", "Message-ID:",
1375 "References:", "Bytes:", "Lines:", "Xref:full"]
1376 self.assertEqual(nntplib._parse_overview_fmt(lines),
1377 ["subject", "from", "date", "message-id", "references",
1378 ":bytes", ":lines", "xref"])
1379
1380 def test_parse_overview(self):
1381 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1382 # First example from RFC 3977
1383 lines = [
1384 '3000234\tI am just a test article\t"Demo User" '
1385 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1386 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1387 '17\tXref: news.example.com misc.test:3000363',
1388 ]
1389 overview = nntplib._parse_overview(lines, fmt)
1390 (art_num, fields), = overview
1391 self.assertEqual(art_num, 3000234)
1392 self.assertEqual(fields, {
1393 'subject': 'I am just a test article',
1394 'from': '"Demo User" <nobody@example.com>',
1395 'date': '6 Oct 1998 04:38:40 -0500',
1396 'message-id': '<45223423@example.com>',
1397 'references': '<45454@example.net>',
1398 ':bytes': '1234',
1399 ':lines': '17',
1400 'xref': 'news.example.com misc.test:3000363',
1401 })
1402 # Second example; here the "Xref" field is totally absent (including
1403 # the header name) and comes out as None
1404 lines = [
1405 '3000234\tI am just a test article\t"Demo User" '
1406 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1407 '<45223423@example.com>\t<45454@example.net>\t1234\t'
1408 '17\t\t',
1409 ]
1410 overview = nntplib._parse_overview(lines, fmt)
1411 (art_num, fields), = overview
1412 self.assertEqual(fields['xref'], None)
1413 # Third example; the "Xref" is an empty string, while "references"
1414 # is a single space.
1415 lines = [
1416 '3000234\tI am just a test article\t"Demo User" '
1417 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1418 '<45223423@example.com>\t \t1234\t'
1419 '17\tXref: \t',
1420 ]
1421 overview = nntplib._parse_overview(lines, fmt)
1422 (art_num, fields), = overview
1423 self.assertEqual(fields['references'], ' ')
1424 self.assertEqual(fields['xref'], '')
1425
1426 def test_parse_datetime(self):
1427 def gives(a, b, *c):
1428 self.assertEqual(nntplib._parse_datetime(a, b),
1429 datetime.datetime(*c))
1430 # Output of DATE command
1431 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1432 # Variations
1433 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1434 gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1435 gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1436
1437 def test_unparse_datetime(self):
1438 # Test non-legacy mode
1439 # 1) with a datetime
1440 def gives(y, M, d, h, m, s, date_str, time_str):
1441 dt = datetime.datetime(y, M, d, h, m, s)
1442 self.assertEqual(nntplib._unparse_datetime(dt),
1443 (date_str, time_str))
1444 self.assertEqual(nntplib._unparse_datetime(dt, False),
1445 (date_str, time_str))
1446 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1447 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1448 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1449 # 2) with a date
1450 def gives(y, M, d, date_str, time_str):
1451 dt = datetime.date(y, M, d)
1452 self.assertEqual(nntplib._unparse_datetime(dt),
1453 (date_str, time_str))
1454 self.assertEqual(nntplib._unparse_datetime(dt, False),
1455 (date_str, time_str))
1456 gives(1999, 6, 23, "19990623", "000000")
1457 gives(2000, 6, 23, "20000623", "000000")
1458 gives(2010, 6, 5, "20100605", "000000")
1459
1460 def test_unparse_datetime_legacy(self):
1461 # Test legacy mode (RFC 977)
1462 # 1) with a datetime
1463 def gives(y, M, d, h, m, s, date_str, time_str):
1464 dt = datetime.datetime(y, M, d, h, m, s)
1465 self.assertEqual(nntplib._unparse_datetime(dt, True),
1466 (date_str, time_str))
1467 gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1468 gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1469 gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1470 # 2) with a date
1471 def gives(y, M, d, date_str, time_str):
1472 dt = datetime.date(y, M, d)
1473 self.assertEqual(nntplib._unparse_datetime(dt, True),
1474 (date_str, time_str))
1475 gives(1999, 6, 23, "990623", "000000")
1476 gives(2000, 6, 23, "000623", "000000")
1477 gives(2010, 6, 5, "100605", "000000")
1478
1479 @unittest.skipUnless(ssl, 'requires SSL support')
1480 def test_ssl_support(self):
1481 self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
1482
1483
1484 class ESC[4;38;5;81mPublicAPITests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1485 """Ensures that the correct values are exposed in the public API."""
1486
1487 def test_module_all_attribute(self):
1488 self.assertTrue(hasattr(nntplib, '__all__'))
1489 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1490 'NNTPTemporaryError', 'NNTPPermanentError',
1491 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1492 if ssl is not None:
1493 target_api.append('NNTP_SSL')
1494 self.assertEqual(set(nntplib.__all__), set(target_api))
1495
1496 class ESC[4;38;5;81mMockSocketTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1497 """Tests involving a mock socket object
1498
1499 Used where the _NNTPServerIO file object is not enough."""
1500
1501 nntp_class = nntplib.NNTP
1502
1503 def check_constructor_error_conditions(
1504 self, handler_class,
1505 expected_error_type, expected_error_msg,
1506 login=None, password=None):
1507
1508 class ESC[4;38;5;81mmock_socket_module:
1509 def create_connection(address, timeout):
1510 return MockSocket()
1511
1512 class ESC[4;38;5;81mMockSocket:
1513 def close(self):
1514 nonlocal socket_closed
1515 socket_closed = True
1516
1517 def makefile(socket, mode):
1518 handler = handler_class()
1519 _, file = make_mock_file(handler)
1520 files.append(file)
1521 return file
1522
1523 socket_closed = False
1524 files = []
1525 with patch('nntplib.socket', mock_socket_module), \
1526 self.assertRaisesRegex(expected_error_type, expected_error_msg):
1527 self.nntp_class('dummy', user=login, password=password)
1528 self.assertTrue(socket_closed)
1529 for f in files:
1530 self.assertTrue(f.closed)
1531
1532 def test_bad_welcome(self):
1533 #Test a bad welcome message
1534 class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
1535 welcome = 'Bad Welcome'
1536 self.check_constructor_error_conditions(
1537 Handler, nntplib.NNTPProtocolError, Handler.welcome)
1538
1539 def test_service_temporarily_unavailable(self):
1540 #Test service temporarily unavailable
1541 class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
1542 welcome = '400 Service temporarily unavailable'
1543 self.check_constructor_error_conditions(
1544 Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1545
1546 def test_service_permanently_unavailable(self):
1547 #Test service permanently unavailable
1548 class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
1549 welcome = '502 Service permanently unavailable'
1550 self.check_constructor_error_conditions(
1551 Handler, nntplib.NNTPPermanentError, Handler.welcome)
1552
1553 def test_bad_capabilities(self):
1554 #Test a bad capabilities response
1555 class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
1556 def handle_CAPABILITIES(self):
1557 self.push_lit(capabilities_response)
1558 capabilities_response = '201 bad capability'
1559 self.check_constructor_error_conditions(
1560 Handler, nntplib.NNTPReplyError, capabilities_response)
1561
1562 def test_login_aborted(self):
1563 #Test a bad authinfo response
1564 login = 't@e.com'
1565 password = 'python'
1566 class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
1567 def handle_AUTHINFO(self, *args):
1568 self.push_lit(authinfo_response)
1569 authinfo_response = '503 Mechanism not recognized'
1570 self.check_constructor_error_conditions(
1571 Handler, nntplib.NNTPPermanentError, authinfo_response,
1572 login, password)
1573
1574 class ESC[4;38;5;81mbypass_context:
1575 """Bypass encryption and actual SSL module"""
1576 def wrap_socket(sock, **args):
1577 return sock
1578
1579 @unittest.skipUnless(ssl, 'requires SSL support')
1580 class ESC[4;38;5;81mMockSslTests(ESC[4;38;5;149mMockSocketTests):
1581 @staticmethod
1582 def nntp_class(*pos, **kw):
1583 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
1584
1585
1586 class ESC[4;38;5;81mLocalServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1587 def setUp(self):
1588 sock = socket.socket()
1589 port = socket_helper.bind_port(sock)
1590 sock.listen()
1591 self.background = threading.Thread(
1592 target=self.run_server, args=(sock,))
1593 self.background.start()
1594 self.addCleanup(self.background.join)
1595
1596 self.nntp = self.enterContext(NNTP(socket_helper.HOST, port, usenetrc=False))
1597
1598 def run_server(self, sock):
1599 # Could be generalized to handle more commands in separate methods
1600 with sock:
1601 [client, _] = sock.accept()
1602 with contextlib.ExitStack() as cleanup:
1603 cleanup.enter_context(client)
1604 reader = cleanup.enter_context(client.makefile('rb'))
1605 client.sendall(b'200 Server ready\r\n')
1606 while True:
1607 cmd = reader.readline()
1608 if cmd == b'CAPABILITIES\r\n':
1609 client.sendall(
1610 b'101 Capability list:\r\n'
1611 b'VERSION 2\r\n'
1612 b'STARTTLS\r\n'
1613 b'.\r\n'
1614 )
1615 elif cmd == b'STARTTLS\r\n':
1616 reader.close()
1617 client.sendall(b'382 Begin TLS negotiation now\r\n')
1618 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1619 context.load_cert_chain(certfile)
1620 client = context.wrap_socket(
1621 client, server_side=True)
1622 cleanup.enter_context(client)
1623 reader = cleanup.enter_context(client.makefile('rb'))
1624 elif cmd == b'QUIT\r\n':
1625 client.sendall(b'205 Bye!\r\n')
1626 break
1627 else:
1628 raise ValueError('Unexpected command {!r}'.format(cmd))
1629
1630 @unittest.skipUnless(ssl, 'requires SSL support')
1631 def test_starttls(self):
1632 file = self.nntp.file
1633 sock = self.nntp.sock
1634 self.nntp.starttls()
1635 # Check that the socket and internal pseudo-file really were
1636 # changed.
1637 self.assertNotEqual(file, self.nntp.file)
1638 self.assertNotEqual(sock, self.nntp.sock)
1639 # Check that the new socket really is an SSL one
1640 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1641 # Check that trying starttls when it's already active fails.
1642 self.assertRaises(ValueError, self.nntp.starttls)
1643
1644
1645 if __name__ == "__main__":
1646 unittest.main()