(root)/
Python-3.11.7/
Lib/
test/
test_nntplib.py
       1  import io
       2  import socket
       3  import datetime
       4  import textwrap
       5  import unittest
       6  import functools
       7  import contextlib
       8  import os.path
       9  import re
      10  import threading
      11  
      12  from test import support
      13  from test.support import socket_helper, warnings_helper
      14  nntplib = warnings_helper.import_deprecated("nntplib")
      15  from nntplib import NNTP, GroupInfo
      16  from unittest.mock import patch
      17  try:
      18      import ssl
      19  except ImportError:
      20      ssl = None
      21  
      22  
      23  certfile = os.path.join(os.path.dirname(__file__), 'certdata', 'keycert3.pem')
      24  
      25  if ssl is not None:
      26      SSLError = ssl.SSLError
      27  else:
      28      class ESC[4;38;5;81mSSLError(ESC[4;38;5;149mException):
      29          """Non-existent exception class when we lack SSL support."""
      30          reason = "This will never be raised."
      31  
      32  # TODO:
      33  # - test the `file` arg to more commands
      34  # - test error conditions
      35  # - test auth and `usenetrc`
      36  
      37  
      38  class ESC[4;38;5;81mNetworkedNNTPTestsMixin:
      39  
      40      ssl_context = None
      41  
      42      def test_welcome(self):
      43          welcome = self.server.getwelcome()
      44          self.assertEqual(str, type(welcome))
      45  
      46      def test_help(self):
      47          resp, lines = self.server.help()
      48          self.assertTrue(resp.startswith("100 "), resp)
      49          for line in lines:
      50              self.assertEqual(str, type(line))
      51  
      52      def test_list(self):
      53          resp, groups = self.server.list()
      54          if len(groups) > 0:
      55              self.assertEqual(GroupInfo, type(groups[0]))
      56              self.assertEqual(str, type(groups[0].group))
      57  
      58      def test_list_active(self):
      59          resp, groups = self.server.list(self.GROUP_PAT)
      60          if len(groups) > 0:
      61              self.assertEqual(GroupInfo, type(groups[0]))
      62              self.assertEqual(str, type(groups[0].group))
      63  
      64      def test_unknown_command(self):
      65          with self.assertRaises(nntplib.NNTPPermanentError) as cm:
      66              self.server._shortcmd("XYZZY")
      67          resp = cm.exception.response
      68          self.assertTrue(resp.startswith("500 "), resp)
      69  
      70      def test_newgroups(self):
      71          # gmane gets a constant influx of new groups.  In order not to stress
      72          # the server too much, we choose a recent date in the past.
      73          dt = datetime.date.today() - datetime.timedelta(days=7)
      74          resp, groups = self.server.newgroups(dt)
      75          if len(groups) > 0:
      76              self.assertIsInstance(groups[0], GroupInfo)
      77              self.assertIsInstance(groups[0].group, str)
      78  
      79      def test_description(self):
      80          def _check_desc(desc):
      81              # Sanity checks
      82              self.assertIsInstance(desc, str)
      83              self.assertNotIn(self.GROUP_NAME, desc)
      84          desc = self.server.description(self.GROUP_NAME)
      85          _check_desc(desc)
      86          # Another sanity check
      87          self.assertIn(self.DESC, desc)
      88          # With a pattern
      89          desc = self.server.description(self.GROUP_PAT)
      90          _check_desc(desc)
      91          # Shouldn't exist
      92          desc = self.server.description("zk.brrtt.baz")
      93          self.assertEqual(desc, '')
      94  
      95      def test_descriptions(self):
      96          resp, descs = self.server.descriptions(self.GROUP_PAT)
      97          # 215 for LIST NEWSGROUPS, 282 for XGTITLE
      98          self.assertTrue(
      99              resp.startswith("215 ") or resp.startswith("282 "), resp)
     100          self.assertIsInstance(descs, dict)
     101          desc = descs[self.GROUP_NAME]
     102          self.assertEqual(desc, self.server.description(self.GROUP_NAME))
     103  
     104      def test_group(self):
     105          result = self.server.group(self.GROUP_NAME)
     106          self.assertEqual(5, len(result))
     107          resp, count, first, last, group = result
     108          self.assertEqual(group, self.GROUP_NAME)
     109          self.assertIsInstance(count, int)
     110          self.assertIsInstance(first, int)
     111          self.assertIsInstance(last, int)
     112          self.assertLessEqual(first, last)
     113          self.assertTrue(resp.startswith("211 "), resp)
     114  
     115      def test_date(self):
     116          resp, date = self.server.date()
     117          self.assertIsInstance(date, datetime.datetime)
     118          # Sanity check
     119          self.assertGreaterEqual(date.year, 1995)
     120          self.assertLessEqual(date.year, 2030)
     121  
     122      def _check_art_dict(self, art_dict):
     123          # Some sanity checks for a field dictionary returned by OVER / XOVER
     124          self.assertIsInstance(art_dict, dict)
     125          # NNTP has 7 mandatory fields
     126          self.assertGreaterEqual(art_dict.keys(),
     127              {"subject", "from", "date", "message-id",
     128               "references", ":bytes", ":lines"}
     129              )
     130          for v in art_dict.values():
     131              self.assertIsInstance(v, (str, type(None)))
     132  
     133      def test_xover(self):
     134          resp, count, first, last, name = self.server.group(self.GROUP_NAME)
     135          resp, lines = self.server.xover(last - 5, last)
     136          if len(lines) == 0:
     137              self.skipTest("no articles retrieved")
     138          # The 'last' article is not necessarily part of the output (cancelled?)
     139          art_num, art_dict = lines[0]
     140          self.assertGreaterEqual(art_num, last - 5)
     141          self.assertLessEqual(art_num, last)
     142          self._check_art_dict(art_dict)
     143  
     144      @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
     145                             ' is found for issue #28971')
     146      def test_over(self):
     147          resp, count, first, last, name = self.server.group(self.GROUP_NAME)
     148          start = last - 10
     149          # The "start-" article range form
     150          resp, lines = self.server.over((start, None))
     151          art_num, art_dict = lines[0]
     152          self._check_art_dict(art_dict)
     153          # The "start-end" article range form
     154          resp, lines = self.server.over((start, last))
     155          art_num, art_dict = lines[-1]
     156          # The 'last' article is not necessarily part of the output (cancelled?)
     157          self.assertGreaterEqual(art_num, start)
     158          self.assertLessEqual(art_num, last)
     159          self._check_art_dict(art_dict)
     160          # XXX The "message_id" form is unsupported by gmane
     161          # 503 Overview by message-ID unsupported
     162  
     163      def test_xhdr(self):
     164          resp, count, first, last, name = self.server.group(self.GROUP_NAME)
     165          resp, lines = self.server.xhdr('subject', last)
     166          for line in lines:
     167              self.assertEqual(str, type(line[1]))
     168  
     169      def check_article_resp(self, resp, article, art_num=None):
     170          self.assertIsInstance(article, nntplib.ArticleInfo)
     171          if art_num is not None:
     172              self.assertEqual(article.number, art_num)
     173          for line in article.lines:
     174              self.assertIsInstance(line, bytes)
     175          # XXX this could exceptionally happen...
     176          self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
     177  
     178      @unittest.skipIf(True, "FIXME: see bpo-32128")
     179      def test_article_head_body(self):
     180          resp, count, first, last, name = self.server.group(self.GROUP_NAME)
     181          # Try to find an available article
     182          for art_num in (last, first, last - 1):
     183              try:
     184                  resp, head = self.server.head(art_num)
     185              except nntplib.NNTPTemporaryError as e:
     186                  if not e.response.startswith("423 "):
     187                      raise
     188                  # "423 No such article" => choose another one
     189                  continue
     190              break
     191          else:
     192              self.skipTest("could not find a suitable article number")
     193          self.assertTrue(resp.startswith("221 "), resp)
     194          self.check_article_resp(resp, head, art_num)
     195          resp, body = self.server.body(art_num)
     196          self.assertTrue(resp.startswith("222 "), resp)
     197          self.check_article_resp(resp, body, art_num)
     198          resp, article = self.server.article(art_num)
     199          self.assertTrue(resp.startswith("220 "), resp)
     200          self.check_article_resp(resp, article, art_num)
     201          # Tolerate running the tests from behind a NNTP virus checker
     202          denylist = lambda line: line.startswith(b'X-Antivirus')
     203          filtered_head_lines = [line for line in head.lines
     204                                 if not denylist(line)]
     205          filtered_lines = [line for line in article.lines
     206                            if not denylist(line)]
     207          self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
     208  
     209      def test_capabilities(self):
     210          # The server under test implements NNTP version 2 and has a
     211          # couple of well-known capabilities. Just sanity check that we
     212          # got them.
     213          def _check_caps(caps):
     214              caps_list = caps['LIST']
     215              self.assertIsInstance(caps_list, (list, tuple))
     216              self.assertIn('OVERVIEW.FMT', caps_list)
     217          self.assertGreaterEqual(self.server.nntp_version, 2)
     218          _check_caps(self.server.getcapabilities())
     219          # This re-emits the command
     220          resp, caps = self.server.capabilities()
     221          _check_caps(caps)
     222  
     223      def test_zlogin(self):
     224          # This test must be the penultimate because further commands will be
     225          # refused.
     226          baduser = "notarealuser"
     227          badpw = "notarealpassword"
     228          # Check that bogus credentials cause failure
     229          self.assertRaises(nntplib.NNTPError, self.server.login,
     230                            user=baduser, password=badpw, usenetrc=False)
     231          # FIXME: We should check that correct credentials succeed, but that
     232          # would require valid details for some server somewhere to be in the
     233          # test suite, I think. Gmane is anonymous, at least as used for the
     234          # other tests.
     235  
     236      def test_zzquit(self):
     237          # This test must be called last, hence the name
     238          cls = type(self)
     239          try:
     240              self.server.quit()
     241          finally:
     242              cls.server = None
     243  
     244      @classmethod
     245      def wrap_methods(cls):
     246          # Wrap all methods in a transient_internet() exception catcher
     247          # XXX put a generic version in test.support?
     248          def wrap_meth(meth):
     249              @functools.wraps(meth)
     250              def wrapped(self):
     251                  with socket_helper.transient_internet(self.NNTP_HOST):
     252                      meth(self)
     253              return wrapped
     254          for name in dir(cls):
     255              if not name.startswith('test_'):
     256                  continue
     257              meth = getattr(cls, name)
     258              if not callable(meth):
     259                  continue
     260              # Need to use a closure so that meth remains bound to its current
     261              # value
     262              setattr(cls, name, wrap_meth(meth))
     263  
     264      def test_timeout(self):
     265          with self.assertRaises(ValueError):
     266              self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
     267  
     268      def test_with_statement(self):
     269          def is_connected():
     270              if not hasattr(server, 'file'):
     271                  return False
     272              try:
     273                  server.help()
     274              except (OSError, EOFError):
     275                  return False
     276              return True
     277  
     278          kwargs = dict(
     279              timeout=support.INTERNET_TIMEOUT,
     280              usenetrc=False
     281          )
     282          if self.ssl_context is not None:
     283              kwargs["ssl_context"] = self.ssl_context
     284  
     285          try:
     286              server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
     287              with server:
     288                  self.assertTrue(is_connected())
     289                  self.assertTrue(server.help())
     290              self.assertFalse(is_connected())
     291  
     292              server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
     293              with server:
     294                  server.quit()
     295              self.assertFalse(is_connected())
     296          except SSLError as ssl_err:
     297              # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
     298              if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
     299                  raise unittest.SkipTest(f"Got {ssl_err} connecting "
     300                                          f"to {self.NNTP_HOST!r}")
     301              raise
     302  
     303  
     304  NetworkedNNTPTestsMixin.wrap_methods()
     305  
     306  
     307  EOF_ERRORS = (EOFError,)
     308  if ssl is not None:
     309      EOF_ERRORS += (ssl.SSLEOFError,)
     310  
     311  
     312  class ESC[4;38;5;81mNetworkedNNTPTests(ESC[4;38;5;149mNetworkedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     313      # This server supports STARTTLS (gmane doesn't)
     314      NNTP_HOST = 'news.trigofacile.com'
     315      GROUP_NAME = 'fr.comp.lang.python'
     316      GROUP_PAT = 'fr.comp.lang.*'
     317      DESC = 'Python'
     318  
     319      NNTP_CLASS = NNTP
     320  
     321      @classmethod
     322      def setUpClass(cls):
     323          support.requires("network")
     324          kwargs = dict(
     325              timeout=support.INTERNET_TIMEOUT,
     326              usenetrc=False
     327          )
     328          if cls.ssl_context is not None:
     329              kwargs["ssl_context"] = cls.ssl_context
     330          with socket_helper.transient_internet(cls.NNTP_HOST):
     331              try:
     332                  cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs)
     333              except SSLError as ssl_err:
     334                  # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
     335                  if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
     336                      raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
     337                                              f"to {cls.NNTP_HOST!r}")
     338                  print(cls.NNTP_HOST)
     339                  raise
     340              except EOF_ERRORS:
     341                  raise unittest.SkipTest(f"{cls} got EOF error on connecting "
     342                                          f"to {cls.NNTP_HOST!r}")
     343  
     344      @classmethod
     345      def tearDownClass(cls):
     346          if cls.server is not None:
     347              cls.server.quit()
     348  
     349  @unittest.skipUnless(ssl, 'requires SSL support')
     350  class ESC[4;38;5;81mNetworkedNNTP_SSLTests(ESC[4;38;5;149mNetworkedNNTPTests):
     351  
     352      # Technical limits for this public NNTP server (see http://www.aioe.org):
     353      # "Only two concurrent connections per IP address are allowed and
     354      # 400 connections per day are accepted from each IP address."
     355  
     356      NNTP_HOST = 'nntp.aioe.org'
     357      # bpo-42794: aioe.test is one of the official groups on this server
     358      # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
     359      GROUP_NAME = 'aioe.test'
     360      GROUP_PAT = 'aioe.*'
     361      DESC = 'test'
     362  
     363      NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
     364  
     365      # Disabled as it produces too much data
     366      test_list = None
     367  
     368      # Disabled as the connection will already be encrypted.
     369      test_starttls = None
     370  
     371      if ssl is not None:
     372          ssl_context = ssl._create_unverified_context()
     373          ssl_context.set_ciphers("DEFAULT")
     374          ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
     375  
     376  #
     377  # Non-networked tests using a local server (or something mocking it).
     378  #
     379  
     380  class ESC[4;38;5;81m_NNTPServerIO(ESC[4;38;5;149mioESC[4;38;5;149m.ESC[4;38;5;149mRawIOBase):
     381      """A raw IO object allowing NNTP commands to be received and processed
     382      by a handler.  The handler can push responses which can then be read
     383      from the IO object."""
     384  
     385      def __init__(self, handler):
     386          io.RawIOBase.__init__(self)
     387          # The channel from the client
     388          self.c2s = io.BytesIO()
     389          # The channel to the client
     390          self.s2c = io.BytesIO()
     391          self.handler = handler
     392          self.handler.start(self.c2s.readline, self.push_data)
     393  
     394      def readable(self):
     395          return True
     396  
     397      def writable(self):
     398          return True
     399  
     400      def push_data(self, data):
     401          """Push (buffer) some data to send to the client."""
     402          pos = self.s2c.tell()
     403          self.s2c.seek(0, 2)
     404          self.s2c.write(data)
     405          self.s2c.seek(pos)
     406  
     407      def write(self, b):
     408          """The client sends us some data"""
     409          pos = self.c2s.tell()
     410          self.c2s.write(b)
     411          self.c2s.seek(pos)
     412          self.handler.process_pending()
     413          return len(b)
     414  
     415      def readinto(self, buf):
     416          """The client wants to read a response"""
     417          self.handler.process_pending()
     418          b = self.s2c.read(len(buf))
     419          n = len(b)
     420          buf[:n] = b
     421          return n
     422  
     423  
     424  def make_mock_file(handler):
     425      sio = _NNTPServerIO(handler)
     426      # Using BufferedRWPair instead of BufferedRandom ensures the file
     427      # isn't seekable.
     428      file = io.BufferedRWPair(sio, sio)
     429      return (sio, file)
     430  
     431  
     432  class ESC[4;38;5;81mNNTPServer(ESC[4;38;5;149mnntplibESC[4;38;5;149m.ESC[4;38;5;149mNNTP):
     433  
     434      def __init__(self, f, host, readermode=None):
     435          self.file = f
     436          self.host = host
     437          self._base_init(readermode)
     438  
     439      def _close(self):
     440          self.file.close()
     441          del self.file
     442  
     443  
     444  class ESC[4;38;5;81mMockedNNTPTestsMixin:
     445      # Override in derived classes
     446      handler_class = None
     447  
     448      def setUp(self):
     449          super().setUp()
     450          self.make_server()
     451  
     452      def tearDown(self):
     453          super().tearDown()
     454          del self.server
     455  
     456      def make_server(self, *args, **kwargs):
     457          self.handler = self.handler_class()
     458          self.sio, file = make_mock_file(self.handler)
     459          self.server = NNTPServer(file, 'test.server', *args, **kwargs)
     460          return self.server
     461  
     462  
     463  class ESC[4;38;5;81mMockedNNTPWithReaderModeMixin(ESC[4;38;5;149mMockedNNTPTestsMixin):
     464      def setUp(self):
     465          super().setUp()
     466          self.make_server(readermode=True)
     467  
     468  
     469  class ESC[4;38;5;81mNNTPv1Handler:
     470      """A handler for RFC 977"""
     471  
     472      welcome = "200 NNTP mock server"
     473  
     474      def start(self, readline, push_data):
     475          self.in_body = False
     476          self.allow_posting = True
     477          self._readline = readline
     478          self._push_data = push_data
     479          self._logged_in = False
     480          self._user_sent = False
     481          # Our welcome
     482          self.handle_welcome()
     483  
     484      def _decode(self, data):
     485          return str(data, "utf-8", "surrogateescape")
     486  
     487      def process_pending(self):
     488          if self.in_body:
     489              while True:
     490                  line = self._readline()
     491                  if not line:
     492                      return
     493                  self.body.append(line)
     494                  if line == b".\r\n":
     495                      break
     496              try:
     497                  meth, tokens = self.body_callback
     498                  meth(*tokens, body=self.body)
     499              finally:
     500                  self.body_callback = None
     501                  self.body = None
     502                  self.in_body = False
     503          while True:
     504              line = self._decode(self._readline())
     505              if not line:
     506                  return
     507              if not line.endswith("\r\n"):
     508                  raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
     509              line = line[:-2]
     510              cmd, *tokens = line.split()
     511              #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
     512              meth = getattr(self, "handle_" + cmd.upper(), None)
     513              if meth is None:
     514                  self.handle_unknown()
     515              else:
     516                  try:
     517                      meth(*tokens)
     518                  except Exception as e:
     519                      raise ValueError("command failed: {!r}".format(line)) from e
     520                  else:
     521                      if self.in_body:
     522                          self.body_callback = meth, tokens
     523                          self.body = []
     524  
     525      def expect_body(self):
     526          """Flag that the client is expected to post a request body"""
     527          self.in_body = True
     528  
     529      def push_data(self, data):
     530          """Push some binary data"""
     531          self._push_data(data)
     532  
     533      def push_lit(self, lit):
     534          """Push a string literal"""
     535          lit = textwrap.dedent(lit)
     536          lit = "\r\n".join(lit.splitlines()) + "\r\n"
     537          lit = lit.encode('utf-8')
     538          self.push_data(lit)
     539  
     540      def handle_unknown(self):
     541          self.push_lit("500 What?")
     542  
     543      def handle_welcome(self):
     544          self.push_lit(self.welcome)
     545  
     546      def handle_QUIT(self):
     547          self.push_lit("205 Bye!")
     548  
     549      def handle_DATE(self):
     550          self.push_lit("111 20100914001155")
     551  
     552      def handle_GROUP(self, group):
     553          if group == "fr.comp.lang.python":
     554              self.push_lit("211 486 761 1265 fr.comp.lang.python")
     555          else:
     556              self.push_lit("411 No such group {}".format(group))
     557  
     558      def handle_HELP(self):
     559          self.push_lit("""\
     560              100 Legal commands
     561                authinfo user Name|pass Password|generic <prog> <args>
     562                date
     563                help
     564              Report problems to <root@example.org>
     565              .""")
     566  
     567      def handle_STAT(self, message_spec=None):
     568          if message_spec is None:
     569              self.push_lit("412 No newsgroup selected")
     570          elif message_spec == "3000234":
     571              self.push_lit("223 3000234 <45223423@example.com>")
     572          elif message_spec == "<45223423@example.com>":
     573              self.push_lit("223 0 <45223423@example.com>")
     574          else:
     575              self.push_lit("430 No Such Article Found")
     576  
     577      def handle_NEXT(self):
     578          self.push_lit("223 3000237 <668929@example.org> retrieved")
     579  
     580      def handle_LAST(self):
     581          self.push_lit("223 3000234 <45223423@example.com> retrieved")
     582  
     583      def handle_LIST(self, action=None, param=None):
     584          if action is None:
     585              self.push_lit("""\
     586                  215 Newsgroups in form "group high low flags".
     587                  comp.lang.python 0000052340 0000002828 y
     588                  comp.lang.python.announce 0000001153 0000000993 m
     589                  free.it.comp.lang.python 0000000002 0000000002 y
     590                  fr.comp.lang.python 0000001254 0000000760 y
     591                  free.it.comp.lang.python.learner 0000000000 0000000001 y
     592                  tw.bbs.comp.lang.python 0000000304 0000000304 y
     593                  .""")
     594          elif action == "ACTIVE":
     595              if param == "*distutils*":
     596                  self.push_lit("""\
     597                      215 Newsgroups in form "group high low flags"
     598                      gmane.comp.python.distutils.devel 0000014104 0000000001 m
     599                      gmane.comp.python.distutils.cvs 0000000000 0000000001 m
     600                      .""")
     601              else:
     602                  self.push_lit("""\
     603                      215 Newsgroups in form "group high low flags"
     604                      .""")
     605          elif action == "OVERVIEW.FMT":
     606              self.push_lit("""\
     607                  215 Order of fields in overview database.
     608                  Subject:
     609                  From:
     610                  Date:
     611                  Message-ID:
     612                  References:
     613                  Bytes:
     614                  Lines:
     615                  Xref:full
     616                  .""")
     617          elif action == "NEWSGROUPS":
     618              assert param is not None
     619              if param == "comp.lang.python":
     620                  self.push_lit("""\
     621                      215 Descriptions in form "group description".
     622                      comp.lang.python\tThe Python computer language.
     623                      .""")
     624              elif param == "comp.lang.python*":
     625                  self.push_lit("""\
     626                      215 Descriptions in form "group description".
     627                      comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
     628                      comp.lang.python\tThe Python computer language.
     629                      .""")
     630              else:
     631                  self.push_lit("""\
     632                      215 Descriptions in form "group description".
     633                      .""")
     634          else:
     635              self.push_lit('501 Unknown LIST keyword')
     636  
     637      def handle_NEWNEWS(self, group, date_str, time_str):
     638          # We hard code different return messages depending on passed
     639          # argument and date syntax.
     640          if (group == "comp.lang.python" and date_str == "20100913"
     641              and time_str == "082004"):
     642              # Date was passed in RFC 3977 format (NNTP "v2")
     643              self.push_lit("""\
     644                  230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
     645                  <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
     646                  <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
     647                  .""")
     648          elif (group == "comp.lang.python" and date_str == "100913"
     649              and time_str == "082004"):
     650              # Date was passed in RFC 977 format (NNTP "v1")
     651              self.push_lit("""\
     652                  230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
     653                  <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
     654                  <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
     655                  .""")
     656          elif (group == 'comp.lang.python' and
     657                date_str in ('20100101', '100101') and
     658                time_str == '090000'):
     659              self.push_lit('too long line' * 3000 +
     660                            '\n.')
     661          else:
     662              self.push_lit("""\
     663                  230 An empty list of newsarticles follows
     664                  .""")
     665          # (Note for experiments: many servers disable NEWNEWS.
     666          #  As of this writing, sicinfo3.epfl.ch doesn't.)
     667  
     668      def handle_XOVER(self, message_spec):
     669          if message_spec == "57-59":
     670              self.push_lit(
     671                  "224 Overview information for 57-58 follows\n"
     672                  "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
     673                      "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
     674                      "\tSat, 19 Jun 2010 18:04:08 -0400"
     675                      "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
     676                      "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
     677                      "\tXref: news.gmane.io gmane.comp.python.authors:57"
     678                      "\n"
     679                  "58\tLooking for a few good bloggers"
     680                      "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
     681                      "\tThu, 22 Jul 2010 09:14:14 -0400"
     682                      "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
     683                      "\t\t6683\t16"
     684                      "\t"
     685                      "\n"
     686                  # A UTF-8 overview line from fr.comp.lang.python
     687                  "59\tRe: Message d'erreur incompréhensible (par moi)"
     688                      "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
     689                      "\tWed, 15 Sep 2010 18:09:15 +0200"
     690                      "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
     691                      "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
     692                      "\tXref: saria.nerim.net fr.comp.lang.python:1265"
     693                      "\n"
     694                  ".\n")
     695          else:
     696              self.push_lit("""\
     697                  224 No articles
     698                  .""")
     699  
     700      def handle_POST(self, *, body=None):
     701          if body is None:
     702              if self.allow_posting:
     703                  self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
     704                  self.expect_body()
     705              else:
     706                  self.push_lit("440 Posting not permitted")
     707          else:
     708              assert self.allow_posting
     709              self.push_lit("240 Article received OK")
     710              self.posted_body = body
     711  
     712      def handle_IHAVE(self, message_id, *, body=None):
     713          if body is None:
     714              if (self.allow_posting and
     715                  message_id == "<i.am.an.article.you.will.want@example.com>"):
     716                  self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
     717                  self.expect_body()
     718              else:
     719                  self.push_lit("435 Article not wanted")
     720          else:
     721              assert self.allow_posting
     722              self.push_lit("235 Article transferred OK")
     723              self.posted_body = body
     724  
     725      sample_head = """\
     726          From: "Demo User" <nobody@example.net>
     727          Subject: I am just a test article
     728          Content-Type: text/plain; charset=UTF-8; format=flowed
     729          Message-ID: <i.am.an.article.you.will.want@example.com>"""
     730  
     731      sample_body = """\
     732          This is just a test article.
     733          ..Here is a dot-starting line.
     734  
     735          -- Signed by Andr\xe9."""
     736  
     737      sample_article = sample_head + "\n\n" + sample_body
     738  
     739      def handle_ARTICLE(self, message_spec=None):
     740          if message_spec is None:
     741              self.push_lit("220 3000237 <45223423@example.com>")
     742          elif message_spec == "<45223423@example.com>":
     743              self.push_lit("220 0 <45223423@example.com>")
     744          elif message_spec == "3000234":
     745              self.push_lit("220 3000234 <45223423@example.com>")
     746          else:
     747              self.push_lit("430 No Such Article Found")
     748              return
     749          self.push_lit(self.sample_article)
     750          self.push_lit(".")
     751  
     752      def handle_HEAD(self, message_spec=None):
     753          if message_spec is None:
     754              self.push_lit("221 3000237 <45223423@example.com>")
     755          elif message_spec == "<45223423@example.com>":
     756              self.push_lit("221 0 <45223423@example.com>")
     757          elif message_spec == "3000234":
     758              self.push_lit("221 3000234 <45223423@example.com>")
     759          else:
     760              self.push_lit("430 No Such Article Found")
     761              return
     762          self.push_lit(self.sample_head)
     763          self.push_lit(".")
     764  
     765      def handle_BODY(self, message_spec=None):
     766          if message_spec is None:
     767              self.push_lit("222 3000237 <45223423@example.com>")
     768          elif message_spec == "<45223423@example.com>":
     769              self.push_lit("222 0 <45223423@example.com>")
     770          elif message_spec == "3000234":
     771              self.push_lit("222 3000234 <45223423@example.com>")
     772          else:
     773              self.push_lit("430 No Such Article Found")
     774              return
     775          self.push_lit(self.sample_body)
     776          self.push_lit(".")
     777  
     778      def handle_AUTHINFO(self, cred_type, data):
     779          if self._logged_in:
     780              self.push_lit('502 Already Logged In')
     781          elif cred_type == 'user':
     782              if self._user_sent:
     783                  self.push_lit('482 User Credential Already Sent')
     784              else:
     785                  self.push_lit('381 Password Required')
     786                  self._user_sent = True
     787          elif cred_type == 'pass':
     788              self.push_lit('281 Login Successful')
     789              self._logged_in = True
     790          else:
     791              raise Exception('Unknown cred type {}'.format(cred_type))
     792  
     793  
     794  class ESC[4;38;5;81mNNTPv2Handler(ESC[4;38;5;149mNNTPv1Handler):
     795      """A handler for RFC 3977 (NNTP "v2")"""
     796  
     797      def handle_CAPABILITIES(self):
     798          fmt = """\
     799              101 Capability list:
     800              VERSION 2 3
     801              IMPLEMENTATION INN 2.5.1{}
     802              HDR
     803              LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
     804              OVER
     805              POST
     806              READER
     807              ."""
     808  
     809          if not self._logged_in:
     810              self.push_lit(fmt.format('\n            AUTHINFO USER'))
     811          else:
     812              self.push_lit(fmt.format(''))
     813  
     814      def handle_MODE(self, _):
     815          raise Exception('MODE READER sent despite READER has been advertised')
     816  
     817      def handle_OVER(self, message_spec=None):
     818          return self.handle_XOVER(message_spec)
     819  
     820  
     821  class ESC[4;38;5;81mCapsAfterLoginNNTPv2Handler(ESC[4;38;5;149mNNTPv2Handler):
     822      """A handler that allows CAPABILITIES only after login"""
     823  
     824      def handle_CAPABILITIES(self):
     825          if not self._logged_in:
     826              self.push_lit('480 You must log in.')
     827          else:
     828              super().handle_CAPABILITIES()
     829  
     830  
     831  class ESC[4;38;5;81mModeSwitchingNNTPv2Handler(ESC[4;38;5;149mNNTPv2Handler):
     832      """A server that starts in transit mode"""
     833  
     834      def __init__(self):
     835          self._switched = False
     836  
     837      def handle_CAPABILITIES(self):
     838          fmt = """\
     839              101 Capability list:
     840              VERSION 2 3
     841              IMPLEMENTATION INN 2.5.1
     842              HDR
     843              LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
     844              OVER
     845              POST
     846              {}READER
     847              ."""
     848          if self._switched:
     849              self.push_lit(fmt.format(''))
     850          else:
     851              self.push_lit(fmt.format('MODE-'))
     852  
     853      def handle_MODE(self, what):
     854          assert not self._switched and what == 'reader'
     855          self._switched = True
     856          self.push_lit('200 Posting allowed')
     857  
     858  
     859  class ESC[4;38;5;81mNNTPv1v2TestsMixin:
     860  
     861      def setUp(self):
     862          super().setUp()
     863  
     864      def test_welcome(self):
     865          self.assertEqual(self.server.welcome, self.handler.welcome)
     866  
     867      def test_authinfo(self):
     868          if self.nntp_version == 2:
     869              self.assertIn('AUTHINFO', self.server._caps)
     870          self.server.login('testuser', 'testpw')
     871          # if AUTHINFO is gone from _caps we also know that getcapabilities()
     872          # has been called after login as it should
     873          self.assertNotIn('AUTHINFO', self.server._caps)
     874  
     875      def test_date(self):
     876          resp, date = self.server.date()
     877          self.assertEqual(resp, "111 20100914001155")
     878          self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
     879  
     880      def test_quit(self):
     881          self.assertFalse(self.sio.closed)
     882          resp = self.server.quit()
     883          self.assertEqual(resp, "205 Bye!")
     884          self.assertTrue(self.sio.closed)
     885  
     886      def test_help(self):
     887          resp, help = self.server.help()
     888          self.assertEqual(resp, "100 Legal commands")
     889          self.assertEqual(help, [
     890              '  authinfo user Name|pass Password|generic <prog> <args>',
     891              '  date',
     892              '  help',
     893              'Report problems to <root@example.org>',
     894          ])
     895  
     896      def test_list(self):
     897          resp, groups = self.server.list()
     898          self.assertEqual(len(groups), 6)
     899          g = groups[1]
     900          self.assertEqual(g,
     901              GroupInfo("comp.lang.python.announce", "0000001153",
     902                        "0000000993", "m"))
     903          resp, groups = self.server.list("*distutils*")
     904          self.assertEqual(len(groups), 2)
     905          g = groups[0]
     906          self.assertEqual(g,
     907              GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
     908                        "0000000001", "m"))
     909  
     910      def test_stat(self):
     911          resp, art_num, message_id = self.server.stat(3000234)
     912          self.assertEqual(resp, "223 3000234 <45223423@example.com>")
     913          self.assertEqual(art_num, 3000234)
     914          self.assertEqual(message_id, "<45223423@example.com>")
     915          resp, art_num, message_id = self.server.stat("<45223423@example.com>")
     916          self.assertEqual(resp, "223 0 <45223423@example.com>")
     917          self.assertEqual(art_num, 0)
     918          self.assertEqual(message_id, "<45223423@example.com>")
     919          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
     920              self.server.stat("<non.existent.id>")
     921          self.assertEqual(cm.exception.response, "430 No Such Article Found")
     922          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
     923              self.server.stat()
     924          self.assertEqual(cm.exception.response, "412 No newsgroup selected")
     925  
     926      def test_next(self):
     927          resp, art_num, message_id = self.server.next()
     928          self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
     929          self.assertEqual(art_num, 3000237)
     930          self.assertEqual(message_id, "<668929@example.org>")
     931  
     932      def test_last(self):
     933          resp, art_num, message_id = self.server.last()
     934          self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
     935          self.assertEqual(art_num, 3000234)
     936          self.assertEqual(message_id, "<45223423@example.com>")
     937  
     938      def test_description(self):
     939          desc = self.server.description("comp.lang.python")
     940          self.assertEqual(desc, "The Python computer language.")
     941          desc = self.server.description("comp.lang.pythonx")
     942          self.assertEqual(desc, "")
     943  
     944      def test_descriptions(self):
     945          resp, groups = self.server.descriptions("comp.lang.python")
     946          self.assertEqual(resp, '215 Descriptions in form "group description".')
     947          self.assertEqual(groups, {
     948              "comp.lang.python": "The Python computer language.",
     949              })
     950          resp, groups = self.server.descriptions("comp.lang.python*")
     951          self.assertEqual(groups, {
     952              "comp.lang.python": "The Python computer language.",
     953              "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
     954              })
     955          resp, groups = self.server.descriptions("comp.lang.pythonx")
     956          self.assertEqual(groups, {})
     957  
     958      def test_group(self):
     959          resp, count, first, last, group = self.server.group("fr.comp.lang.python")
     960          self.assertTrue(resp.startswith("211 "), resp)
     961          self.assertEqual(first, 761)
     962          self.assertEqual(last, 1265)
     963          self.assertEqual(count, 486)
     964          self.assertEqual(group, "fr.comp.lang.python")
     965          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
     966              self.server.group("comp.lang.python.devel")
     967          exc = cm.exception
     968          self.assertTrue(exc.response.startswith("411 No such group"),
     969                          exc.response)
     970  
     971      def test_newnews(self):
     972          # NEWNEWS comp.lang.python [20]100913 082004
     973          dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
     974          resp, ids = self.server.newnews("comp.lang.python", dt)
     975          expected = (
     976              "230 list of newsarticles (NNTP v{0}) "
     977              "created after Mon Sep 13 08:20:04 2010 follows"
     978              ).format(self.nntp_version)
     979          self.assertEqual(resp, expected)
     980          self.assertEqual(ids, [
     981              "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
     982              "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
     983              ])
     984          # NEWNEWS fr.comp.lang.python [20]100913 082004
     985          dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
     986          resp, ids = self.server.newnews("fr.comp.lang.python", dt)
     987          self.assertEqual(resp, "230 An empty list of newsarticles follows")
     988          self.assertEqual(ids, [])
     989  
     990      def _check_article_body(self, lines):
     991          self.assertEqual(len(lines), 4)
     992          self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
     993          self.assertEqual(lines[-2], b"")
     994          self.assertEqual(lines[-3], b".Here is a dot-starting line.")
     995          self.assertEqual(lines[-4], b"This is just a test article.")
     996  
     997      def _check_article_head(self, lines):
     998          self.assertEqual(len(lines), 4)
     999          self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
    1000          self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
    1001  
    1002      def _check_article_data(self, lines):
    1003          self.assertEqual(len(lines), 9)
    1004          self._check_article_head(lines[:4])
    1005          self._check_article_body(lines[-4:])
    1006          self.assertEqual(lines[4], b"")
    1007  
    1008      def test_article(self):
    1009          # ARTICLE
    1010          resp, info = self.server.article()
    1011          self.assertEqual(resp, "220 3000237 <45223423@example.com>")
    1012          art_num, message_id, lines = info
    1013          self.assertEqual(art_num, 3000237)
    1014          self.assertEqual(message_id, "<45223423@example.com>")
    1015          self._check_article_data(lines)
    1016          # ARTICLE num
    1017          resp, info = self.server.article(3000234)
    1018          self.assertEqual(resp, "220 3000234 <45223423@example.com>")
    1019          art_num, message_id, lines = info
    1020          self.assertEqual(art_num, 3000234)
    1021          self.assertEqual(message_id, "<45223423@example.com>")
    1022          self._check_article_data(lines)
    1023          # ARTICLE id
    1024          resp, info = self.server.article("<45223423@example.com>")
    1025          self.assertEqual(resp, "220 0 <45223423@example.com>")
    1026          art_num, message_id, lines = info
    1027          self.assertEqual(art_num, 0)
    1028          self.assertEqual(message_id, "<45223423@example.com>")
    1029          self._check_article_data(lines)
    1030          # Non-existent id
    1031          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    1032              self.server.article("<non-existent@example.com>")
    1033          self.assertEqual(cm.exception.response, "430 No Such Article Found")
    1034  
    1035      def test_article_file(self):
    1036          # With a "file" argument
    1037          f = io.BytesIO()
    1038          resp, info = self.server.article(file=f)
    1039          self.assertEqual(resp, "220 3000237 <45223423@example.com>")
    1040          art_num, message_id, lines = info
    1041          self.assertEqual(art_num, 3000237)
    1042          self.assertEqual(message_id, "<45223423@example.com>")
    1043          self.assertEqual(lines, [])
    1044          data = f.getvalue()
    1045          self.assertTrue(data.startswith(
    1046              b'From: "Demo User" <nobody@example.net>\r\n'
    1047              b'Subject: I am just a test article\r\n'
    1048              ), ascii(data))
    1049          self.assertTrue(data.endswith(
    1050              b'This is just a test article.\r\n'
    1051              b'.Here is a dot-starting line.\r\n'
    1052              b'\r\n'
    1053              b'-- Signed by Andr\xc3\xa9.\r\n'
    1054              ), ascii(data))
    1055  
    1056      def test_head(self):
    1057          # HEAD
    1058          resp, info = self.server.head()
    1059          self.assertEqual(resp, "221 3000237 <45223423@example.com>")
    1060          art_num, message_id, lines = info
    1061          self.assertEqual(art_num, 3000237)
    1062          self.assertEqual(message_id, "<45223423@example.com>")
    1063          self._check_article_head(lines)
    1064          # HEAD num
    1065          resp, info = self.server.head(3000234)
    1066          self.assertEqual(resp, "221 3000234 <45223423@example.com>")
    1067          art_num, message_id, lines = info
    1068          self.assertEqual(art_num, 3000234)
    1069          self.assertEqual(message_id, "<45223423@example.com>")
    1070          self._check_article_head(lines)
    1071          # HEAD id
    1072          resp, info = self.server.head("<45223423@example.com>")
    1073          self.assertEqual(resp, "221 0 <45223423@example.com>")
    1074          art_num, message_id, lines = info
    1075          self.assertEqual(art_num, 0)
    1076          self.assertEqual(message_id, "<45223423@example.com>")
    1077          self._check_article_head(lines)
    1078          # Non-existent id
    1079          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    1080              self.server.head("<non-existent@example.com>")
    1081          self.assertEqual(cm.exception.response, "430 No Such Article Found")
    1082  
    1083      def test_head_file(self):
    1084          f = io.BytesIO()
    1085          resp, info = self.server.head(file=f)
    1086          self.assertEqual(resp, "221 3000237 <45223423@example.com>")
    1087          art_num, message_id, lines = info
    1088          self.assertEqual(art_num, 3000237)
    1089          self.assertEqual(message_id, "<45223423@example.com>")
    1090          self.assertEqual(lines, [])
    1091          data = f.getvalue()
    1092          self.assertTrue(data.startswith(
    1093              b'From: "Demo User" <nobody@example.net>\r\n'
    1094              b'Subject: I am just a test article\r\n'
    1095              ), ascii(data))
    1096          self.assertFalse(data.endswith(
    1097              b'This is just a test article.\r\n'
    1098              b'.Here is a dot-starting line.\r\n'
    1099              b'\r\n'
    1100              b'-- Signed by Andr\xc3\xa9.\r\n'
    1101              ), ascii(data))
    1102  
    1103      def test_body(self):
    1104          # BODY
    1105          resp, info = self.server.body()
    1106          self.assertEqual(resp, "222 3000237 <45223423@example.com>")
    1107          art_num, message_id, lines = info
    1108          self.assertEqual(art_num, 3000237)
    1109          self.assertEqual(message_id, "<45223423@example.com>")
    1110          self._check_article_body(lines)
    1111          # BODY num
    1112          resp, info = self.server.body(3000234)
    1113          self.assertEqual(resp, "222 3000234 <45223423@example.com>")
    1114          art_num, message_id, lines = info
    1115          self.assertEqual(art_num, 3000234)
    1116          self.assertEqual(message_id, "<45223423@example.com>")
    1117          self._check_article_body(lines)
    1118          # BODY id
    1119          resp, info = self.server.body("<45223423@example.com>")
    1120          self.assertEqual(resp, "222 0 <45223423@example.com>")
    1121          art_num, message_id, lines = info
    1122          self.assertEqual(art_num, 0)
    1123          self.assertEqual(message_id, "<45223423@example.com>")
    1124          self._check_article_body(lines)
    1125          # Non-existent id
    1126          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    1127              self.server.body("<non-existent@example.com>")
    1128          self.assertEqual(cm.exception.response, "430 No Such Article Found")
    1129  
    1130      def test_body_file(self):
    1131          f = io.BytesIO()
    1132          resp, info = self.server.body(file=f)
    1133          self.assertEqual(resp, "222 3000237 <45223423@example.com>")
    1134          art_num, message_id, lines = info
    1135          self.assertEqual(art_num, 3000237)
    1136          self.assertEqual(message_id, "<45223423@example.com>")
    1137          self.assertEqual(lines, [])
    1138          data = f.getvalue()
    1139          self.assertFalse(data.startswith(
    1140              b'From: "Demo User" <nobody@example.net>\r\n'
    1141              b'Subject: I am just a test article\r\n'
    1142              ), ascii(data))
    1143          self.assertTrue(data.endswith(
    1144              b'This is just a test article.\r\n'
    1145              b'.Here is a dot-starting line.\r\n'
    1146              b'\r\n'
    1147              b'-- Signed by Andr\xc3\xa9.\r\n'
    1148              ), ascii(data))
    1149  
    1150      def check_over_xover_resp(self, resp, overviews):
    1151          self.assertTrue(resp.startswith("224 "), resp)
    1152          self.assertEqual(len(overviews), 3)
    1153          art_num, over = overviews[0]
    1154          self.assertEqual(art_num, 57)
    1155          self.assertEqual(over, {
    1156              "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
    1157              "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
    1158              "date": "Sat, 19 Jun 2010 18:04:08 -0400",
    1159              "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
    1160              "references": "<hvalf7$ort$1@dough.gmane.org>",
    1161              ":bytes": "7103",
    1162              ":lines": "16",
    1163              "xref": "news.gmane.io gmane.comp.python.authors:57"
    1164              })
    1165          art_num, over = overviews[1]
    1166          self.assertEqual(over["xref"], None)
    1167          art_num, over = overviews[2]
    1168          self.assertEqual(over["subject"],
    1169                           "Re: Message d'erreur incompréhensible (par moi)")
    1170  
    1171      def test_xover(self):
    1172          resp, overviews = self.server.xover(57, 59)
    1173          self.check_over_xover_resp(resp, overviews)
    1174  
    1175      def test_over(self):
    1176          # In NNTP "v1", this will fallback on XOVER
    1177          resp, overviews = self.server.over((57, 59))
    1178          self.check_over_xover_resp(resp, overviews)
    1179  
    1180      sample_post = (
    1181          b'From: "Demo User" <nobody@example.net>\r\n'
    1182          b'Subject: I am just a test article\r\n'
    1183          b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
    1184          b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
    1185          b'\r\n'
    1186          b'This is just a test article.\r\n'
    1187          b'.Here is a dot-starting line.\r\n'
    1188          b'\r\n'
    1189          b'-- Signed by Andr\xc3\xa9.\r\n'
    1190      )
    1191  
    1192      def _check_posted_body(self):
    1193          # Check the raw body as received by the server
    1194          lines = self.handler.posted_body
    1195          # One additional line for the "." terminator
    1196          self.assertEqual(len(lines), 10)
    1197          self.assertEqual(lines[-1], b'.\r\n')
    1198          self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
    1199          self.assertEqual(lines[-3], b'\r\n')
    1200          self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
    1201          self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
    1202  
    1203      def _check_post_ihave_sub(self, func, *args, file_factory):
    1204          # First the prepared post with CRLF endings
    1205          post = self.sample_post
    1206          func_args = args + (file_factory(post),)
    1207          self.handler.posted_body = None
    1208          resp = func(*func_args)
    1209          self._check_posted_body()
    1210          # Then the same post with "normal" line endings - they should be
    1211          # converted by NNTP.post and NNTP.ihave.
    1212          post = self.sample_post.replace(b"\r\n", b"\n")
    1213          func_args = args + (file_factory(post),)
    1214          self.handler.posted_body = None
    1215          resp = func(*func_args)
    1216          self._check_posted_body()
    1217          return resp
    1218  
    1219      def check_post_ihave(self, func, success_resp, *args):
    1220          # With a bytes object
    1221          resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
    1222          self.assertEqual(resp, success_resp)
    1223          # With a bytearray object
    1224          resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
    1225          self.assertEqual(resp, success_resp)
    1226          # With a file object
    1227          resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
    1228          self.assertEqual(resp, success_resp)
    1229          # With an iterable of terminated lines
    1230          def iterlines(b):
    1231              return iter(b.splitlines(keepends=True))
    1232          resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
    1233          self.assertEqual(resp, success_resp)
    1234          # With an iterable of non-terminated lines
    1235          def iterlines(b):
    1236              return iter(b.splitlines(keepends=False))
    1237          resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
    1238          self.assertEqual(resp, success_resp)
    1239  
    1240      def test_post(self):
    1241          self.check_post_ihave(self.server.post, "240 Article received OK")
    1242          self.handler.allow_posting = False
    1243          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    1244              self.server.post(self.sample_post)
    1245          self.assertEqual(cm.exception.response,
    1246                           "440 Posting not permitted")
    1247  
    1248      def test_ihave(self):
    1249          self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
    1250                                "<i.am.an.article.you.will.want@example.com>")
    1251          with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    1252              self.server.ihave("<another.message.id>", self.sample_post)
    1253          self.assertEqual(cm.exception.response,
    1254                           "435 Article not wanted")
    1255  
    1256      def test_too_long_lines(self):
    1257          dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
    1258          self.assertRaises(nntplib.NNTPDataError,
    1259                            self.server.newnews, "comp.lang.python", dt)
    1260  
    1261  
    1262  class ESC[4;38;5;81mNNTPv1Tests(ESC[4;38;5;149mNNTPv1v2TestsMixin, ESC[4;38;5;149mMockedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1263      """Tests an NNTP v1 server (no capabilities)."""
    1264  
    1265      nntp_version = 1
    1266      handler_class = NNTPv1Handler
    1267  
    1268      def test_caps(self):
    1269          caps = self.server.getcapabilities()
    1270          self.assertEqual(caps, {})
    1271          self.assertEqual(self.server.nntp_version, 1)
    1272          self.assertEqual(self.server.nntp_implementation, None)
    1273  
    1274  
    1275  class ESC[4;38;5;81mNNTPv2Tests(ESC[4;38;5;149mNNTPv1v2TestsMixin, ESC[4;38;5;149mMockedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1276      """Tests an NNTP v2 server (with capabilities)."""
    1277  
    1278      nntp_version = 2
    1279      handler_class = NNTPv2Handler
    1280  
    1281      def test_caps(self):
    1282          caps = self.server.getcapabilities()
    1283          self.assertEqual(caps, {
    1284              'VERSION': ['2', '3'],
    1285              'IMPLEMENTATION': ['INN', '2.5.1'],
    1286              'AUTHINFO': ['USER'],
    1287              'HDR': [],
    1288              'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
    1289                       'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
    1290              'OVER': [],
    1291              'POST': [],
    1292              'READER': [],
    1293              })
    1294          self.assertEqual(self.server.nntp_version, 3)
    1295          self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
    1296  
    1297  
    1298  class ESC[4;38;5;81mCapsAfterLoginNNTPv2Tests(ESC[4;38;5;149mMockedNNTPTestsMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1299      """Tests a probably NNTP v2 server with capabilities only after login."""
    1300  
    1301      nntp_version = 2
    1302      handler_class = CapsAfterLoginNNTPv2Handler
    1303  
    1304      def test_caps_only_after_login(self):
    1305          self.assertEqual(self.server._caps, {})
    1306          self.server.login('testuser', 'testpw')
    1307          self.assertIn('VERSION', self.server._caps)
    1308  
    1309  
    1310  class ESC[4;38;5;81mSendReaderNNTPv2Tests(ESC[4;38;5;149mMockedNNTPWithReaderModeMixin,
    1311          ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1312      """Same tests as for v2 but we tell NTTP to send MODE READER to a server
    1313      that isn't in READER mode by default."""
    1314  
    1315      nntp_version = 2
    1316      handler_class = ModeSwitchingNNTPv2Handler
    1317  
    1318      def test_we_are_in_reader_mode_after_connect(self):
    1319          self.assertIn('READER', self.server._caps)
    1320  
    1321  
    1322  class ESC[4;38;5;81mMiscTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1323  
    1324      def test_decode_header(self):
    1325          def gives(a, b):
    1326              self.assertEqual(nntplib.decode_header(a), b)
    1327          gives("" , "")
    1328          gives("a plain header", "a plain header")
    1329          gives(" with extra  spaces ", " with extra  spaces ")
    1330          gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
    1331          gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
    1332                " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
    1333                "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
    1334          gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
    1335                "Re: problème de matrice")
    1336          # A natively utf-8 header (found in the real world!)
    1337          gives("Re: Message d'erreur incompréhensible (par moi)",
    1338                "Re: Message d'erreur incompréhensible (par moi)")
    1339  
    1340      def test_parse_overview_fmt(self):
    1341          # The minimal (default) response
    1342          lines = ["Subject:", "From:", "Date:", "Message-ID:",
    1343                   "References:", ":bytes", ":lines"]
    1344          self.assertEqual(nntplib._parse_overview_fmt(lines),
    1345              ["subject", "from", "date", "message-id", "references",
    1346               ":bytes", ":lines"])
    1347          # The minimal response using alternative names
    1348          lines = ["Subject:", "From:", "Date:", "Message-ID:",
    1349                   "References:", "Bytes:", "Lines:"]
    1350          self.assertEqual(nntplib._parse_overview_fmt(lines),
    1351              ["subject", "from", "date", "message-id", "references",
    1352               ":bytes", ":lines"])
    1353          # Variations in casing
    1354          lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
    1355                   "References:", "BYTES:", "Lines:"]
    1356          self.assertEqual(nntplib._parse_overview_fmt(lines),
    1357              ["subject", "from", "date", "message-id", "references",
    1358               ":bytes", ":lines"])
    1359          # First example from RFC 3977
    1360          lines = ["Subject:", "From:", "Date:", "Message-ID:",
    1361                   "References:", ":bytes", ":lines", "Xref:full",
    1362                   "Distribution:full"]
    1363          self.assertEqual(nntplib._parse_overview_fmt(lines),
    1364              ["subject", "from", "date", "message-id", "references",
    1365               ":bytes", ":lines", "xref", "distribution"])
    1366          # Second example from RFC 3977
    1367          lines = ["Subject:", "From:", "Date:", "Message-ID:",
    1368                   "References:", "Bytes:", "Lines:", "Xref:FULL",
    1369                   "Distribution:FULL"]
    1370          self.assertEqual(nntplib._parse_overview_fmt(lines),
    1371              ["subject", "from", "date", "message-id", "references",
    1372               ":bytes", ":lines", "xref", "distribution"])
    1373          # A classic response from INN
    1374          lines = ["Subject:", "From:", "Date:", "Message-ID:",
    1375                   "References:", "Bytes:", "Lines:", "Xref:full"]
    1376          self.assertEqual(nntplib._parse_overview_fmt(lines),
    1377              ["subject", "from", "date", "message-id", "references",
    1378               ":bytes", ":lines", "xref"])
    1379  
    1380      def test_parse_overview(self):
    1381          fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
    1382          # First example from RFC 3977
    1383          lines = [
    1384              '3000234\tI am just a test article\t"Demo User" '
    1385              '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
    1386              '<45223423@example.com>\t<45454@example.net>\t1234\t'
    1387              '17\tXref: news.example.com misc.test:3000363',
    1388          ]
    1389          overview = nntplib._parse_overview(lines, fmt)
    1390          (art_num, fields), = overview
    1391          self.assertEqual(art_num, 3000234)
    1392          self.assertEqual(fields, {
    1393              'subject': 'I am just a test article',
    1394              'from': '"Demo User" <nobody@example.com>',
    1395              'date': '6 Oct 1998 04:38:40 -0500',
    1396              'message-id': '<45223423@example.com>',
    1397              'references': '<45454@example.net>',
    1398              ':bytes': '1234',
    1399              ':lines': '17',
    1400              'xref': 'news.example.com misc.test:3000363',
    1401          })
    1402          # Second example; here the "Xref" field is totally absent (including
    1403          # the header name) and comes out as None
    1404          lines = [
    1405              '3000234\tI am just a test article\t"Demo User" '
    1406              '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
    1407              '<45223423@example.com>\t<45454@example.net>\t1234\t'
    1408              '17\t\t',
    1409          ]
    1410          overview = nntplib._parse_overview(lines, fmt)
    1411          (art_num, fields), = overview
    1412          self.assertEqual(fields['xref'], None)
    1413          # Third example; the "Xref" is an empty string, while "references"
    1414          # is a single space.
    1415          lines = [
    1416              '3000234\tI am just a test article\t"Demo User" '
    1417              '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
    1418              '<45223423@example.com>\t \t1234\t'
    1419              '17\tXref: \t',
    1420          ]
    1421          overview = nntplib._parse_overview(lines, fmt)
    1422          (art_num, fields), = overview
    1423          self.assertEqual(fields['references'], ' ')
    1424          self.assertEqual(fields['xref'], '')
    1425  
    1426      def test_parse_datetime(self):
    1427          def gives(a, b, *c):
    1428              self.assertEqual(nntplib._parse_datetime(a, b),
    1429                               datetime.datetime(*c))
    1430          # Output of DATE command
    1431          gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
    1432          # Variations
    1433          gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
    1434          gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
    1435          gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
    1436  
    1437      def test_unparse_datetime(self):
    1438          # Test non-legacy mode
    1439          # 1) with a datetime
    1440          def gives(y, M, d, h, m, s, date_str, time_str):
    1441              dt = datetime.datetime(y, M, d, h, m, s)
    1442              self.assertEqual(nntplib._unparse_datetime(dt),
    1443                               (date_str, time_str))
    1444              self.assertEqual(nntplib._unparse_datetime(dt, False),
    1445                               (date_str, time_str))
    1446          gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
    1447          gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
    1448          gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
    1449          # 2) with a date
    1450          def gives(y, M, d, date_str, time_str):
    1451              dt = datetime.date(y, M, d)
    1452              self.assertEqual(nntplib._unparse_datetime(dt),
    1453                               (date_str, time_str))
    1454              self.assertEqual(nntplib._unparse_datetime(dt, False),
    1455                               (date_str, time_str))
    1456          gives(1999, 6, 23, "19990623", "000000")
    1457          gives(2000, 6, 23, "20000623", "000000")
    1458          gives(2010, 6, 5, "20100605", "000000")
    1459  
    1460      def test_unparse_datetime_legacy(self):
    1461          # Test legacy mode (RFC 977)
    1462          # 1) with a datetime
    1463          def gives(y, M, d, h, m, s, date_str, time_str):
    1464              dt = datetime.datetime(y, M, d, h, m, s)
    1465              self.assertEqual(nntplib._unparse_datetime(dt, True),
    1466                               (date_str, time_str))
    1467          gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
    1468          gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
    1469          gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
    1470          # 2) with a date
    1471          def gives(y, M, d, date_str, time_str):
    1472              dt = datetime.date(y, M, d)
    1473              self.assertEqual(nntplib._unparse_datetime(dt, True),
    1474                               (date_str, time_str))
    1475          gives(1999, 6, 23, "990623", "000000")
    1476          gives(2000, 6, 23, "000623", "000000")
    1477          gives(2010, 6, 5, "100605", "000000")
    1478  
    1479      @unittest.skipUnless(ssl, 'requires SSL support')
    1480      def test_ssl_support(self):
    1481          self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
    1482  
    1483  
    1484  class ESC[4;38;5;81mPublicAPITests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1485      """Ensures that the correct values are exposed in the public API."""
    1486  
    1487      def test_module_all_attribute(self):
    1488          self.assertTrue(hasattr(nntplib, '__all__'))
    1489          target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
    1490                        'NNTPTemporaryError', 'NNTPPermanentError',
    1491                        'NNTPProtocolError', 'NNTPDataError', 'decode_header']
    1492          if ssl is not None:
    1493              target_api.append('NNTP_SSL')
    1494          self.assertEqual(set(nntplib.__all__), set(target_api))
    1495  
    1496  class ESC[4;38;5;81mMockSocketTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1497      """Tests involving a mock socket object
    1498  
    1499      Used where the _NNTPServerIO file object is not enough."""
    1500  
    1501      nntp_class = nntplib.NNTP
    1502  
    1503      def check_constructor_error_conditions(
    1504              self, handler_class,
    1505              expected_error_type, expected_error_msg,
    1506              login=None, password=None):
    1507  
    1508          class ESC[4;38;5;81mmock_socket_module:
    1509              def create_connection(address, timeout):
    1510                  return MockSocket()
    1511  
    1512          class ESC[4;38;5;81mMockSocket:
    1513              def close(self):
    1514                  nonlocal socket_closed
    1515                  socket_closed = True
    1516  
    1517              def makefile(socket, mode):
    1518                  handler = handler_class()
    1519                  _, file = make_mock_file(handler)
    1520                  files.append(file)
    1521                  return file
    1522  
    1523          socket_closed = False
    1524          files = []
    1525          with patch('nntplib.socket', mock_socket_module), \
    1526               self.assertRaisesRegex(expected_error_type, expected_error_msg):
    1527              self.nntp_class('dummy', user=login, password=password)
    1528          self.assertTrue(socket_closed)
    1529          for f in files:
    1530              self.assertTrue(f.closed)
    1531  
    1532      def test_bad_welcome(self):
    1533          #Test a bad welcome message
    1534          class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
    1535              welcome = 'Bad Welcome'
    1536          self.check_constructor_error_conditions(
    1537              Handler, nntplib.NNTPProtocolError, Handler.welcome)
    1538  
    1539      def test_service_temporarily_unavailable(self):
    1540          #Test service temporarily unavailable
    1541          class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
    1542              welcome = '400 Service temporarily unavailable'
    1543          self.check_constructor_error_conditions(
    1544              Handler, nntplib.NNTPTemporaryError, Handler.welcome)
    1545  
    1546      def test_service_permanently_unavailable(self):
    1547          #Test service permanently unavailable
    1548          class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
    1549              welcome = '502 Service permanently unavailable'
    1550          self.check_constructor_error_conditions(
    1551              Handler, nntplib.NNTPPermanentError, Handler.welcome)
    1552  
    1553      def test_bad_capabilities(self):
    1554          #Test a bad capabilities response
    1555          class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
    1556              def handle_CAPABILITIES(self):
    1557                  self.push_lit(capabilities_response)
    1558          capabilities_response = '201 bad capability'
    1559          self.check_constructor_error_conditions(
    1560              Handler, nntplib.NNTPReplyError, capabilities_response)
    1561  
    1562      def test_login_aborted(self):
    1563          #Test a bad authinfo response
    1564          login = 't@e.com'
    1565          password = 'python'
    1566          class ESC[4;38;5;81mHandler(ESC[4;38;5;149mNNTPv1Handler):
    1567              def handle_AUTHINFO(self, *args):
    1568                  self.push_lit(authinfo_response)
    1569          authinfo_response = '503 Mechanism not recognized'
    1570          self.check_constructor_error_conditions(
    1571              Handler, nntplib.NNTPPermanentError, authinfo_response,
    1572              login, password)
    1573  
    1574  class ESC[4;38;5;81mbypass_context:
    1575      """Bypass encryption and actual SSL module"""
    1576      def wrap_socket(sock, **args):
    1577          return sock
    1578  
    1579  @unittest.skipUnless(ssl, 'requires SSL support')
    1580  class ESC[4;38;5;81mMockSslTests(ESC[4;38;5;149mMockSocketTests):
    1581      @staticmethod
    1582      def nntp_class(*pos, **kw):
    1583          return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
    1584  
    1585  
    1586  class ESC[4;38;5;81mLocalServerTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1587      def setUp(self):
    1588          sock = socket.socket()
    1589          port = socket_helper.bind_port(sock)
    1590          sock.listen()
    1591          self.background = threading.Thread(
    1592              target=self.run_server, args=(sock,))
    1593          self.background.start()
    1594          self.addCleanup(self.background.join)
    1595  
    1596          self.nntp = self.enterContext(NNTP(socket_helper.HOST, port, usenetrc=False))
    1597  
    1598      def run_server(self, sock):
    1599          # Could be generalized to handle more commands in separate methods
    1600          with sock:
    1601              [client, _] = sock.accept()
    1602          with contextlib.ExitStack() as cleanup:
    1603              cleanup.enter_context(client)
    1604              reader = cleanup.enter_context(client.makefile('rb'))
    1605              client.sendall(b'200 Server ready\r\n')
    1606              while True:
    1607                  cmd = reader.readline()
    1608                  if cmd == b'CAPABILITIES\r\n':
    1609                      client.sendall(
    1610                          b'101 Capability list:\r\n'
    1611                          b'VERSION 2\r\n'
    1612                          b'STARTTLS\r\n'
    1613                          b'.\r\n'
    1614                      )
    1615                  elif cmd == b'STARTTLS\r\n':
    1616                      reader.close()
    1617                      client.sendall(b'382 Begin TLS negotiation now\r\n')
    1618                      context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    1619                      context.load_cert_chain(certfile)
    1620                      client = context.wrap_socket(
    1621                          client, server_side=True)
    1622                      cleanup.enter_context(client)
    1623                      reader = cleanup.enter_context(client.makefile('rb'))
    1624                  elif cmd == b'QUIT\r\n':
    1625                      client.sendall(b'205 Bye!\r\n')
    1626                      break
    1627                  else:
    1628                      raise ValueError('Unexpected command {!r}'.format(cmd))
    1629  
    1630      @unittest.skipUnless(ssl, 'requires SSL support')
    1631      def test_starttls(self):
    1632          file = self.nntp.file
    1633          sock = self.nntp.sock
    1634          self.nntp.starttls()
    1635          # Check that the socket and internal pseudo-file really were
    1636          # changed.
    1637          self.assertNotEqual(file, self.nntp.file)
    1638          self.assertNotEqual(sock, self.nntp.sock)
    1639          # Check that the new socket really is an SSL one
    1640          self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
    1641          # Check that trying starttls when it's already active fails.
    1642          self.assertRaises(ValueError, self.nntp.starttls)
    1643  
    1644  
    1645  if __name__ == "__main__":
    1646      unittest.main()