• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import socket
3import datetime
4import textwrap
5import unittest
6import functools
7import contextlib
8import os.path
9import re
10import threading
11
12from test import support
13from nntplib import NNTP, GroupInfo
14import nntplib
15from unittest.mock import patch
16try:
17    import ssl
18except ImportError:
19    ssl = None
20
21
22TIMEOUT = 30
23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
24
25if ssl is not None:
26    SSLError = ssl.SSLError
27else:
28    class SSLError(Exception):
29        """Non-existent exception class when we lack SSL support."""
30        reason = "This will never be raised."
31
32# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
35# - test auth and `usenetrc`
36
37
38class NetworkedNNTPTestsMixin:
39
40    def test_welcome(self):
41        welcome = self.server.getwelcome()
42        self.assertEqual(str, type(welcome))
43
44    def test_help(self):
45        resp, lines = self.server.help()
46        self.assertTrue(resp.startswith("100 "), resp)
47        for line in lines:
48            self.assertEqual(str, type(line))
49
50    def test_list(self):
51        resp, groups = self.server.list()
52        if len(groups) > 0:
53            self.assertEqual(GroupInfo, type(groups[0]))
54            self.assertEqual(str, type(groups[0].group))
55
56    def test_list_active(self):
57        resp, groups = self.server.list(self.GROUP_PAT)
58        if len(groups) > 0:
59            self.assertEqual(GroupInfo, type(groups[0]))
60            self.assertEqual(str, type(groups[0].group))
61
62    def test_unknown_command(self):
63        with self.assertRaises(nntplib.NNTPPermanentError) as cm:
64            self.server._shortcmd("XYZZY")
65        resp = cm.exception.response
66        self.assertTrue(resp.startswith("500 "), resp)
67
68    def test_newgroups(self):
69        # gmane gets a constant influx of new groups.  In order not to stress
70        # the server too much, we choose a recent date in the past.
71        dt = datetime.date.today() - datetime.timedelta(days=7)
72        resp, groups = self.server.newgroups(dt)
73        if len(groups) > 0:
74            self.assertIsInstance(groups[0], GroupInfo)
75            self.assertIsInstance(groups[0].group, str)
76
77    def test_description(self):
78        def _check_desc(desc):
79            # Sanity checks
80            self.assertIsInstance(desc, str)
81            self.assertNotIn(self.GROUP_NAME, desc)
82        desc = self.server.description(self.GROUP_NAME)
83        _check_desc(desc)
84        # Another sanity check
85        self.assertIn("Python", desc)
86        # With a pattern
87        desc = self.server.description(self.GROUP_PAT)
88        _check_desc(desc)
89        # Shouldn't exist
90        desc = self.server.description("zk.brrtt.baz")
91        self.assertEqual(desc, '')
92
93    def test_descriptions(self):
94        resp, descs = self.server.descriptions(self.GROUP_PAT)
95        # 215 for LIST NEWSGROUPS, 282 for XGTITLE
96        self.assertTrue(
97            resp.startswith("215 ") or resp.startswith("282 "), resp)
98        self.assertIsInstance(descs, dict)
99        desc = descs[self.GROUP_NAME]
100        self.assertEqual(desc, self.server.description(self.GROUP_NAME))
101
102    def test_group(self):
103        result = self.server.group(self.GROUP_NAME)
104        self.assertEqual(5, len(result))
105        resp, count, first, last, group = result
106        self.assertEqual(group, self.GROUP_NAME)
107        self.assertIsInstance(count, int)
108        self.assertIsInstance(first, int)
109        self.assertIsInstance(last, int)
110        self.assertLessEqual(first, last)
111        self.assertTrue(resp.startswith("211 "), resp)
112
113    def test_date(self):
114        resp, date = self.server.date()
115        self.assertIsInstance(date, datetime.datetime)
116        # Sanity check
117        self.assertGreaterEqual(date.year, 1995)
118        self.assertLessEqual(date.year, 2030)
119
120    def _check_art_dict(self, art_dict):
121        # Some sanity checks for a field dictionary returned by OVER / XOVER
122        self.assertIsInstance(art_dict, dict)
123        # NNTP has 7 mandatory fields
124        self.assertGreaterEqual(art_dict.keys(),
125            {"subject", "from", "date", "message-id",
126             "references", ":bytes", ":lines"}
127            )
128        for v in art_dict.values():
129            self.assertIsInstance(v, (str, type(None)))
130
131    def test_xover(self):
132        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
133        resp, lines = self.server.xover(last - 5, last)
134        if len(lines) == 0:
135            self.skipTest("no articles retrieved")
136        # The 'last' article is not necessarily part of the output (cancelled?)
137        art_num, art_dict = lines[0]
138        self.assertGreaterEqual(art_num, last - 5)
139        self.assertLessEqual(art_num, last)
140        self._check_art_dict(art_dict)
141
142    @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
143                           ' is found for issue #28971')
144    def test_over(self):
145        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146        start = last - 10
147        # The "start-" article range form
148        resp, lines = self.server.over((start, None))
149        art_num, art_dict = lines[0]
150        self._check_art_dict(art_dict)
151        # The "start-end" article range form
152        resp, lines = self.server.over((start, last))
153        art_num, art_dict = lines[-1]
154        # The 'last' article is not necessarily part of the output (cancelled?)
155        self.assertGreaterEqual(art_num, start)
156        self.assertLessEqual(art_num, last)
157        self._check_art_dict(art_dict)
158        # XXX The "message_id" form is unsupported by gmane
159        # 503 Overview by message-ID unsupported
160
161    def test_xhdr(self):
162        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
163        resp, lines = self.server.xhdr('subject', last)
164        for line in lines:
165            self.assertEqual(str, type(line[1]))
166
167    def check_article_resp(self, resp, article, art_num=None):
168        self.assertIsInstance(article, nntplib.ArticleInfo)
169        if art_num is not None:
170            self.assertEqual(article.number, art_num)
171        for line in article.lines:
172            self.assertIsInstance(line, bytes)
173        # XXX this could exceptionally happen...
174        self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
175
176    @unittest.skipIf(True, "FIXME: see bpo-32128")
177    def test_article_head_body(self):
178        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
179        # Try to find an available article
180        for art_num in (last, first, last - 1):
181            try:
182                resp, head = self.server.head(art_num)
183            except nntplib.NNTPTemporaryError as e:
184                if not e.response.startswith("423 "):
185                    raise
186                # "423 No such article" => choose another one
187                continue
188            break
189        else:
190            self.skipTest("could not find a suitable article number")
191        self.assertTrue(resp.startswith("221 "), resp)
192        self.check_article_resp(resp, head, art_num)
193        resp, body = self.server.body(art_num)
194        self.assertTrue(resp.startswith("222 "), resp)
195        self.check_article_resp(resp, body, art_num)
196        resp, article = self.server.article(art_num)
197        self.assertTrue(resp.startswith("220 "), resp)
198        self.check_article_resp(resp, article, art_num)
199        # Tolerate running the tests from behind a NNTP virus checker
200        blacklist = lambda line: line.startswith(b'X-Antivirus')
201        filtered_head_lines = [line for line in head.lines
202                               if not blacklist(line)]
203        filtered_lines = [line for line in article.lines
204                          if not blacklist(line)]
205        self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
206
207    def test_capabilities(self):
208        # The server under test implements NNTP version 2 and has a
209        # couple of well-known capabilities. Just sanity check that we
210        # got them.
211        def _check_caps(caps):
212            caps_list = caps['LIST']
213            self.assertIsInstance(caps_list, (list, tuple))
214            self.assertIn('OVERVIEW.FMT', caps_list)
215        self.assertGreaterEqual(self.server.nntp_version, 2)
216        _check_caps(self.server.getcapabilities())
217        # This re-emits the command
218        resp, caps = self.server.capabilities()
219        _check_caps(caps)
220
221    def test_zlogin(self):
222        # This test must be the penultimate because further commands will be
223        # refused.
224        baduser = "notarealuser"
225        badpw = "notarealpassword"
226        # Check that bogus credentials cause failure
227        self.assertRaises(nntplib.NNTPError, self.server.login,
228                          user=baduser, password=badpw, usenetrc=False)
229        # FIXME: We should check that correct credentials succeed, but that
230        # would require valid details for some server somewhere to be in the
231        # test suite, I think. Gmane is anonymous, at least as used for the
232        # other tests.
233
234    def test_zzquit(self):
235        # This test must be called last, hence the name
236        cls = type(self)
237        try:
238            self.server.quit()
239        finally:
240            cls.server = None
241
242    @classmethod
243    def wrap_methods(cls):
244        # Wrap all methods in a transient_internet() exception catcher
245        # XXX put a generic version in test.support?
246        def wrap_meth(meth):
247            @functools.wraps(meth)
248            def wrapped(self):
249                with support.transient_internet(self.NNTP_HOST):
250                    meth(self)
251            return wrapped
252        for name in dir(cls):
253            if not name.startswith('test_'):
254                continue
255            meth = getattr(cls, name)
256            if not callable(meth):
257                continue
258            # Need to use a closure so that meth remains bound to its current
259            # value
260            setattr(cls, name, wrap_meth(meth))
261
262    def test_with_statement(self):
263        def is_connected():
264            if not hasattr(server, 'file'):
265                return False
266            try:
267                server.help()
268            except (OSError, EOFError):
269                return False
270            return True
271
272        try:
273            with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
274                self.assertTrue(is_connected())
275                self.assertTrue(server.help())
276            self.assertFalse(is_connected())
277
278            with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
279                server.quit()
280            self.assertFalse(is_connected())
281        except SSLError as ssl_err:
282            # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
283            if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
284                raise unittest.SkipTest(f"Got {ssl_err} connecting "
285                                        f"to {self.NNTP_HOST!r}")
286            raise
287
288
289NetworkedNNTPTestsMixin.wrap_methods()
290
291
292EOF_ERRORS = (EOFError,)
293if ssl is not None:
294    EOF_ERRORS += (ssl.SSLEOFError,)
295
296
297class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
298    # This server supports STARTTLS (gmane doesn't)
299    NNTP_HOST = 'news.trigofacile.com'
300    GROUP_NAME = 'fr.comp.lang.python'
301    GROUP_PAT = 'fr.comp.lang.*'
302
303    NNTP_CLASS = NNTP
304
305    @classmethod
306    def setUpClass(cls):
307        support.requires("network")
308        with support.transient_internet(cls.NNTP_HOST):
309            try:
310                cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT,
311                                            usenetrc=False)
312            except SSLError as ssl_err:
313                # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
314                if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
315                    raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
316                                            f"to {cls.NNTP_HOST!r}")
317                raise
318            except EOF_ERRORS:
319                raise unittest.SkipTest(f"{cls} got EOF error on connecting "
320                                        f"to {cls.NNTP_HOST!r}")
321
322    @classmethod
323    def tearDownClass(cls):
324        if cls.server is not None:
325            cls.server.quit()
326
327@unittest.skipUnless(ssl, 'requires SSL support')
328class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
329
330    # Technical limits for this public NNTP server (see http://www.aioe.org):
331    # "Only two concurrent connections per IP address are allowed and
332    # 400 connections per day are accepted from each IP address."
333
334    NNTP_HOST = 'nntp.aioe.org'
335    GROUP_NAME = 'comp.lang.python'
336    GROUP_PAT = 'comp.lang.*'
337
338    NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
339
340    # Disabled as it produces too much data
341    test_list = None
342
343    # Disabled as the connection will already be encrypted.
344    test_starttls = None
345
346
347#
348# Non-networked tests using a local server (or something mocking it).
349#
350
351class _NNTPServerIO(io.RawIOBase):
352    """A raw IO object allowing NNTP commands to be received and processed
353    by a handler.  The handler can push responses which can then be read
354    from the IO object."""
355
356    def __init__(self, handler):
357        io.RawIOBase.__init__(self)
358        # The channel from the client
359        self.c2s = io.BytesIO()
360        # The channel to the client
361        self.s2c = io.BytesIO()
362        self.handler = handler
363        self.handler.start(self.c2s.readline, self.push_data)
364
365    def readable(self):
366        return True
367
368    def writable(self):
369        return True
370
371    def push_data(self, data):
372        """Push (buffer) some data to send to the client."""
373        pos = self.s2c.tell()
374        self.s2c.seek(0, 2)
375        self.s2c.write(data)
376        self.s2c.seek(pos)
377
378    def write(self, b):
379        """The client sends us some data"""
380        pos = self.c2s.tell()
381        self.c2s.write(b)
382        self.c2s.seek(pos)
383        self.handler.process_pending()
384        return len(b)
385
386    def readinto(self, buf):
387        """The client wants to read a response"""
388        self.handler.process_pending()
389        b = self.s2c.read(len(buf))
390        n = len(b)
391        buf[:n] = b
392        return n
393
394
395def make_mock_file(handler):
396    sio = _NNTPServerIO(handler)
397    # Using BufferedRWPair instead of BufferedRandom ensures the file
398    # isn't seekable.
399    file = io.BufferedRWPair(sio, sio)
400    return (sio, file)
401
402
403class MockedNNTPTestsMixin:
404    # Override in derived classes
405    handler_class = None
406
407    def setUp(self):
408        super().setUp()
409        self.make_server()
410
411    def tearDown(self):
412        super().tearDown()
413        del self.server
414
415    def make_server(self, *args, **kwargs):
416        self.handler = self.handler_class()
417        self.sio, file = make_mock_file(self.handler)
418        self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
419        return self.server
420
421
422class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
423    def setUp(self):
424        super().setUp()
425        self.make_server(readermode=True)
426
427
428class NNTPv1Handler:
429    """A handler for RFC 977"""
430
431    welcome = "200 NNTP mock server"
432
433    def start(self, readline, push_data):
434        self.in_body = False
435        self.allow_posting = True
436        self._readline = readline
437        self._push_data = push_data
438        self._logged_in = False
439        self._user_sent = False
440        # Our welcome
441        self.handle_welcome()
442
443    def _decode(self, data):
444        return str(data, "utf-8", "surrogateescape")
445
446    def process_pending(self):
447        if self.in_body:
448            while True:
449                line = self._readline()
450                if not line:
451                    return
452                self.body.append(line)
453                if line == b".\r\n":
454                    break
455            try:
456                meth, tokens = self.body_callback
457                meth(*tokens, body=self.body)
458            finally:
459                self.body_callback = None
460                self.body = None
461                self.in_body = False
462        while True:
463            line = self._decode(self._readline())
464            if not line:
465                return
466            if not line.endswith("\r\n"):
467                raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
468            line = line[:-2]
469            cmd, *tokens = line.split()
470            #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
471            meth = getattr(self, "handle_" + cmd.upper(), None)
472            if meth is None:
473                self.handle_unknown()
474            else:
475                try:
476                    meth(*tokens)
477                except Exception as e:
478                    raise ValueError("command failed: {!r}".format(line)) from e
479                else:
480                    if self.in_body:
481                        self.body_callback = meth, tokens
482                        self.body = []
483
484    def expect_body(self):
485        """Flag that the client is expected to post a request body"""
486        self.in_body = True
487
488    def push_data(self, data):
489        """Push some binary data"""
490        self._push_data(data)
491
492    def push_lit(self, lit):
493        """Push a string literal"""
494        lit = textwrap.dedent(lit)
495        lit = "\r\n".join(lit.splitlines()) + "\r\n"
496        lit = lit.encode('utf-8')
497        self.push_data(lit)
498
499    def handle_unknown(self):
500        self.push_lit("500 What?")
501
502    def handle_welcome(self):
503        self.push_lit(self.welcome)
504
505    def handle_QUIT(self):
506        self.push_lit("205 Bye!")
507
508    def handle_DATE(self):
509        self.push_lit("111 20100914001155")
510
511    def handle_GROUP(self, group):
512        if group == "fr.comp.lang.python":
513            self.push_lit("211 486 761 1265 fr.comp.lang.python")
514        else:
515            self.push_lit("411 No such group {}".format(group))
516
517    def handle_HELP(self):
518        self.push_lit("""\
519            100 Legal commands
520              authinfo user Name|pass Password|generic <prog> <args>
521              date
522              help
523            Report problems to <root@example.org>
524            .""")
525
526    def handle_STAT(self, message_spec=None):
527        if message_spec is None:
528            self.push_lit("412 No newsgroup selected")
529        elif message_spec == "3000234":
530            self.push_lit("223 3000234 <45223423@example.com>")
531        elif message_spec == "<45223423@example.com>":
532            self.push_lit("223 0 <45223423@example.com>")
533        else:
534            self.push_lit("430 No Such Article Found")
535
536    def handle_NEXT(self):
537        self.push_lit("223 3000237 <668929@example.org> retrieved")
538
539    def handle_LAST(self):
540        self.push_lit("223 3000234 <45223423@example.com> retrieved")
541
542    def handle_LIST(self, action=None, param=None):
543        if action is None:
544            self.push_lit("""\
545                215 Newsgroups in form "group high low flags".
546                comp.lang.python 0000052340 0000002828 y
547                comp.lang.python.announce 0000001153 0000000993 m
548                free.it.comp.lang.python 0000000002 0000000002 y
549                fr.comp.lang.python 0000001254 0000000760 y
550                free.it.comp.lang.python.learner 0000000000 0000000001 y
551                tw.bbs.comp.lang.python 0000000304 0000000304 y
552                .""")
553        elif action == "ACTIVE":
554            if param == "*distutils*":
555                self.push_lit("""\
556                    215 Newsgroups in form "group high low flags"
557                    gmane.comp.python.distutils.devel 0000014104 0000000001 m
558                    gmane.comp.python.distutils.cvs 0000000000 0000000001 m
559                    .""")
560            else:
561                self.push_lit("""\
562                    215 Newsgroups in form "group high low flags"
563                    .""")
564        elif action == "OVERVIEW.FMT":
565            self.push_lit("""\
566                215 Order of fields in overview database.
567                Subject:
568                From:
569                Date:
570                Message-ID:
571                References:
572                Bytes:
573                Lines:
574                Xref:full
575                .""")
576        elif action == "NEWSGROUPS":
577            assert param is not None
578            if param == "comp.lang.python":
579                self.push_lit("""\
580                    215 Descriptions in form "group description".
581                    comp.lang.python\tThe Python computer language.
582                    .""")
583            elif param == "comp.lang.python*":
584                self.push_lit("""\
585                    215 Descriptions in form "group description".
586                    comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
587                    comp.lang.python\tThe Python computer language.
588                    .""")
589            else:
590                self.push_lit("""\
591                    215 Descriptions in form "group description".
592                    .""")
593        else:
594            self.push_lit('501 Unknown LIST keyword')
595
596    def handle_NEWNEWS(self, group, date_str, time_str):
597        # We hard code different return messages depending on passed
598        # argument and date syntax.
599        if (group == "comp.lang.python" and date_str == "20100913"
600            and time_str == "082004"):
601            # Date was passed in RFC 3977 format (NNTP "v2")
602            self.push_lit("""\
603                230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
604                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
605                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
606                .""")
607        elif (group == "comp.lang.python" and date_str == "100913"
608            and time_str == "082004"):
609            # Date was passed in RFC 977 format (NNTP "v1")
610            self.push_lit("""\
611                230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
612                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
613                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
614                .""")
615        elif (group == 'comp.lang.python' and
616              date_str in ('20100101', '100101') and
617              time_str == '090000'):
618            self.push_lit('too long line' * 3000 +
619                          '\n.')
620        else:
621            self.push_lit("""\
622                230 An empty list of newsarticles follows
623                .""")
624        # (Note for experiments: many servers disable NEWNEWS.
625        #  As of this writing, sicinfo3.epfl.ch doesn't.)
626
627    def handle_XOVER(self, message_spec):
628        if message_spec == "57-59":
629            self.push_lit(
630                "224 Overview information for 57-58 follows\n"
631                "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
632                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
633                    "\tSat, 19 Jun 2010 18:04:08 -0400"
634                    "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
635                    "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
636                    "\tXref: news.gmane.io gmane.comp.python.authors:57"
637                    "\n"
638                "58\tLooking for a few good bloggers"
639                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
640                    "\tThu, 22 Jul 2010 09:14:14 -0400"
641                    "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
642                    "\t\t6683\t16"
643                    "\t"
644                    "\n"
645                # A UTF-8 overview line from fr.comp.lang.python
646                "59\tRe: Message d'erreur incompréhensible (par moi)"
647                    "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
648                    "\tWed, 15 Sep 2010 18:09:15 +0200"
649                    "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
650                    "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
651                    "\tXref: saria.nerim.net fr.comp.lang.python:1265"
652                    "\n"
653                ".\n")
654        else:
655            self.push_lit("""\
656                224 No articles
657                .""")
658
659    def handle_POST(self, *, body=None):
660        if body is None:
661            if self.allow_posting:
662                self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
663                self.expect_body()
664            else:
665                self.push_lit("440 Posting not permitted")
666        else:
667            assert self.allow_posting
668            self.push_lit("240 Article received OK")
669            self.posted_body = body
670
671    def handle_IHAVE(self, message_id, *, body=None):
672        if body is None:
673            if (self.allow_posting and
674                message_id == "<i.am.an.article.you.will.want@example.com>"):
675                self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
676                self.expect_body()
677            else:
678                self.push_lit("435 Article not wanted")
679        else:
680            assert self.allow_posting
681            self.push_lit("235 Article transferred OK")
682            self.posted_body = body
683
684    sample_head = """\
685        From: "Demo User" <nobody@example.net>
686        Subject: I am just a test article
687        Content-Type: text/plain; charset=UTF-8; format=flowed
688        Message-ID: <i.am.an.article.you.will.want@example.com>"""
689
690    sample_body = """\
691        This is just a test article.
692        ..Here is a dot-starting line.
693
694        -- Signed by Andr\xe9."""
695
696    sample_article = sample_head + "\n\n" + sample_body
697
698    def handle_ARTICLE(self, message_spec=None):
699        if message_spec is None:
700            self.push_lit("220 3000237 <45223423@example.com>")
701        elif message_spec == "<45223423@example.com>":
702            self.push_lit("220 0 <45223423@example.com>")
703        elif message_spec == "3000234":
704            self.push_lit("220 3000234 <45223423@example.com>")
705        else:
706            self.push_lit("430 No Such Article Found")
707            return
708        self.push_lit(self.sample_article)
709        self.push_lit(".")
710
711    def handle_HEAD(self, message_spec=None):
712        if message_spec is None:
713            self.push_lit("221 3000237 <45223423@example.com>")
714        elif message_spec == "<45223423@example.com>":
715            self.push_lit("221 0 <45223423@example.com>")
716        elif message_spec == "3000234":
717            self.push_lit("221 3000234 <45223423@example.com>")
718        else:
719            self.push_lit("430 No Such Article Found")
720            return
721        self.push_lit(self.sample_head)
722        self.push_lit(".")
723
724    def handle_BODY(self, message_spec=None):
725        if message_spec is None:
726            self.push_lit("222 3000237 <45223423@example.com>")
727        elif message_spec == "<45223423@example.com>":
728            self.push_lit("222 0 <45223423@example.com>")
729        elif message_spec == "3000234":
730            self.push_lit("222 3000234 <45223423@example.com>")
731        else:
732            self.push_lit("430 No Such Article Found")
733            return
734        self.push_lit(self.sample_body)
735        self.push_lit(".")
736
737    def handle_AUTHINFO(self, cred_type, data):
738        if self._logged_in:
739            self.push_lit('502 Already Logged In')
740        elif cred_type == 'user':
741            if self._user_sent:
742                self.push_lit('482 User Credential Already Sent')
743            else:
744                self.push_lit('381 Password Required')
745                self._user_sent = True
746        elif cred_type == 'pass':
747            self.push_lit('281 Login Successful')
748            self._logged_in = True
749        else:
750            raise Exception('Unknown cred type {}'.format(cred_type))
751
752
753class NNTPv2Handler(NNTPv1Handler):
754    """A handler for RFC 3977 (NNTP "v2")"""
755
756    def handle_CAPABILITIES(self):
757        fmt = """\
758            101 Capability list:
759            VERSION 2 3
760            IMPLEMENTATION INN 2.5.1{}
761            HDR
762            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
763            OVER
764            POST
765            READER
766            ."""
767
768        if not self._logged_in:
769            self.push_lit(fmt.format('\n            AUTHINFO USER'))
770        else:
771            self.push_lit(fmt.format(''))
772
773    def handle_MODE(self, _):
774        raise Exception('MODE READER sent despite READER has been advertised')
775
776    def handle_OVER(self, message_spec=None):
777        return self.handle_XOVER(message_spec)
778
779
780class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
781    """A handler that allows CAPABILITIES only after login"""
782
783    def handle_CAPABILITIES(self):
784        if not self._logged_in:
785            self.push_lit('480 You must log in.')
786        else:
787            super().handle_CAPABILITIES()
788
789
790class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
791    """A server that starts in transit mode"""
792
793    def __init__(self):
794        self._switched = False
795
796    def handle_CAPABILITIES(self):
797        fmt = """\
798            101 Capability list:
799            VERSION 2 3
800            IMPLEMENTATION INN 2.5.1
801            HDR
802            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
803            OVER
804            POST
805            {}READER
806            ."""
807        if self._switched:
808            self.push_lit(fmt.format(''))
809        else:
810            self.push_lit(fmt.format('MODE-'))
811
812    def handle_MODE(self, what):
813        assert not self._switched and what == 'reader'
814        self._switched = True
815        self.push_lit('200 Posting allowed')
816
817
818class NNTPv1v2TestsMixin:
819
820    def setUp(self):
821        super().setUp()
822
823    def test_welcome(self):
824        self.assertEqual(self.server.welcome, self.handler.welcome)
825
826    def test_authinfo(self):
827        if self.nntp_version == 2:
828            self.assertIn('AUTHINFO', self.server._caps)
829        self.server.login('testuser', 'testpw')
830        # if AUTHINFO is gone from _caps we also know that getcapabilities()
831        # has been called after login as it should
832        self.assertNotIn('AUTHINFO', self.server._caps)
833
834    def test_date(self):
835        resp, date = self.server.date()
836        self.assertEqual(resp, "111 20100914001155")
837        self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
838
839    def test_quit(self):
840        self.assertFalse(self.sio.closed)
841        resp = self.server.quit()
842        self.assertEqual(resp, "205 Bye!")
843        self.assertTrue(self.sio.closed)
844
845    def test_help(self):
846        resp, help = self.server.help()
847        self.assertEqual(resp, "100 Legal commands")
848        self.assertEqual(help, [
849            '  authinfo user Name|pass Password|generic <prog> <args>',
850            '  date',
851            '  help',
852            'Report problems to <root@example.org>',
853        ])
854
855    def test_list(self):
856        resp, groups = self.server.list()
857        self.assertEqual(len(groups), 6)
858        g = groups[1]
859        self.assertEqual(g,
860            GroupInfo("comp.lang.python.announce", "0000001153",
861                      "0000000993", "m"))
862        resp, groups = self.server.list("*distutils*")
863        self.assertEqual(len(groups), 2)
864        g = groups[0]
865        self.assertEqual(g,
866            GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
867                      "0000000001", "m"))
868
869    def test_stat(self):
870        resp, art_num, message_id = self.server.stat(3000234)
871        self.assertEqual(resp, "223 3000234 <45223423@example.com>")
872        self.assertEqual(art_num, 3000234)
873        self.assertEqual(message_id, "<45223423@example.com>")
874        resp, art_num, message_id = self.server.stat("<45223423@example.com>")
875        self.assertEqual(resp, "223 0 <45223423@example.com>")
876        self.assertEqual(art_num, 0)
877        self.assertEqual(message_id, "<45223423@example.com>")
878        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
879            self.server.stat("<non.existent.id>")
880        self.assertEqual(cm.exception.response, "430 No Such Article Found")
881        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
882            self.server.stat()
883        self.assertEqual(cm.exception.response, "412 No newsgroup selected")
884
885    def test_next(self):
886        resp, art_num, message_id = self.server.next()
887        self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
888        self.assertEqual(art_num, 3000237)
889        self.assertEqual(message_id, "<668929@example.org>")
890
891    def test_last(self):
892        resp, art_num, message_id = self.server.last()
893        self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
894        self.assertEqual(art_num, 3000234)
895        self.assertEqual(message_id, "<45223423@example.com>")
896
897    def test_description(self):
898        desc = self.server.description("comp.lang.python")
899        self.assertEqual(desc, "The Python computer language.")
900        desc = self.server.description("comp.lang.pythonx")
901        self.assertEqual(desc, "")
902
903    def test_descriptions(self):
904        resp, groups = self.server.descriptions("comp.lang.python")
905        self.assertEqual(resp, '215 Descriptions in form "group description".')
906        self.assertEqual(groups, {
907            "comp.lang.python": "The Python computer language.",
908            })
909        resp, groups = self.server.descriptions("comp.lang.python*")
910        self.assertEqual(groups, {
911            "comp.lang.python": "The Python computer language.",
912            "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
913            })
914        resp, groups = self.server.descriptions("comp.lang.pythonx")
915        self.assertEqual(groups, {})
916
917    def test_group(self):
918        resp, count, first, last, group = self.server.group("fr.comp.lang.python")
919        self.assertTrue(resp.startswith("211 "), resp)
920        self.assertEqual(first, 761)
921        self.assertEqual(last, 1265)
922        self.assertEqual(count, 486)
923        self.assertEqual(group, "fr.comp.lang.python")
924        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
925            self.server.group("comp.lang.python.devel")
926        exc = cm.exception
927        self.assertTrue(exc.response.startswith("411 No such group"),
928                        exc.response)
929
930    def test_newnews(self):
931        # NEWNEWS comp.lang.python [20]100913 082004
932        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
933        resp, ids = self.server.newnews("comp.lang.python", dt)
934        expected = (
935            "230 list of newsarticles (NNTP v{0}) "
936            "created after Mon Sep 13 08:20:04 2010 follows"
937            ).format(self.nntp_version)
938        self.assertEqual(resp, expected)
939        self.assertEqual(ids, [
940            "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
941            "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
942            ])
943        # NEWNEWS fr.comp.lang.python [20]100913 082004
944        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
945        resp, ids = self.server.newnews("fr.comp.lang.python", dt)
946        self.assertEqual(resp, "230 An empty list of newsarticles follows")
947        self.assertEqual(ids, [])
948
949    def _check_article_body(self, lines):
950        self.assertEqual(len(lines), 4)
951        self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
952        self.assertEqual(lines[-2], b"")
953        self.assertEqual(lines[-3], b".Here is a dot-starting line.")
954        self.assertEqual(lines[-4], b"This is just a test article.")
955
956    def _check_article_head(self, lines):
957        self.assertEqual(len(lines), 4)
958        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
959        self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
960
961    def _check_article_data(self, lines):
962        self.assertEqual(len(lines), 9)
963        self._check_article_head(lines[:4])
964        self._check_article_body(lines[-4:])
965        self.assertEqual(lines[4], b"")
966
967    def test_article(self):
968        # ARTICLE
969        resp, info = self.server.article()
970        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
971        art_num, message_id, lines = info
972        self.assertEqual(art_num, 3000237)
973        self.assertEqual(message_id, "<45223423@example.com>")
974        self._check_article_data(lines)
975        # ARTICLE num
976        resp, info = self.server.article(3000234)
977        self.assertEqual(resp, "220 3000234 <45223423@example.com>")
978        art_num, message_id, lines = info
979        self.assertEqual(art_num, 3000234)
980        self.assertEqual(message_id, "<45223423@example.com>")
981        self._check_article_data(lines)
982        # ARTICLE id
983        resp, info = self.server.article("<45223423@example.com>")
984        self.assertEqual(resp, "220 0 <45223423@example.com>")
985        art_num, message_id, lines = info
986        self.assertEqual(art_num, 0)
987        self.assertEqual(message_id, "<45223423@example.com>")
988        self._check_article_data(lines)
989        # Non-existent id
990        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
991            self.server.article("<non-existent@example.com>")
992        self.assertEqual(cm.exception.response, "430 No Such Article Found")
993
994    def test_article_file(self):
995        # With a "file" argument
996        f = io.BytesIO()
997        resp, info = self.server.article(file=f)
998        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
999        art_num, message_id, lines = info
1000        self.assertEqual(art_num, 3000237)
1001        self.assertEqual(message_id, "<45223423@example.com>")
1002        self.assertEqual(lines, [])
1003        data = f.getvalue()
1004        self.assertTrue(data.startswith(
1005            b'From: "Demo User" <nobody@example.net>\r\n'
1006            b'Subject: I am just a test article\r\n'
1007            ), ascii(data))
1008        self.assertTrue(data.endswith(
1009            b'This is just a test article.\r\n'
1010            b'.Here is a dot-starting line.\r\n'
1011            b'\r\n'
1012            b'-- Signed by Andr\xc3\xa9.\r\n'
1013            ), ascii(data))
1014
1015    def test_head(self):
1016        # HEAD
1017        resp, info = self.server.head()
1018        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1019        art_num, message_id, lines = info
1020        self.assertEqual(art_num, 3000237)
1021        self.assertEqual(message_id, "<45223423@example.com>")
1022        self._check_article_head(lines)
1023        # HEAD num
1024        resp, info = self.server.head(3000234)
1025        self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1026        art_num, message_id, lines = info
1027        self.assertEqual(art_num, 3000234)
1028        self.assertEqual(message_id, "<45223423@example.com>")
1029        self._check_article_head(lines)
1030        # HEAD id
1031        resp, info = self.server.head("<45223423@example.com>")
1032        self.assertEqual(resp, "221 0 <45223423@example.com>")
1033        art_num, message_id, lines = info
1034        self.assertEqual(art_num, 0)
1035        self.assertEqual(message_id, "<45223423@example.com>")
1036        self._check_article_head(lines)
1037        # Non-existent id
1038        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1039            self.server.head("<non-existent@example.com>")
1040        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1041
1042    def test_head_file(self):
1043        f = io.BytesIO()
1044        resp, info = self.server.head(file=f)
1045        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1046        art_num, message_id, lines = info
1047        self.assertEqual(art_num, 3000237)
1048        self.assertEqual(message_id, "<45223423@example.com>")
1049        self.assertEqual(lines, [])
1050        data = f.getvalue()
1051        self.assertTrue(data.startswith(
1052            b'From: "Demo User" <nobody@example.net>\r\n'
1053            b'Subject: I am just a test article\r\n'
1054            ), ascii(data))
1055        self.assertFalse(data.endswith(
1056            b'This is just a test article.\r\n'
1057            b'.Here is a dot-starting line.\r\n'
1058            b'\r\n'
1059            b'-- Signed by Andr\xc3\xa9.\r\n'
1060            ), ascii(data))
1061
1062    def test_body(self):
1063        # BODY
1064        resp, info = self.server.body()
1065        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1066        art_num, message_id, lines = info
1067        self.assertEqual(art_num, 3000237)
1068        self.assertEqual(message_id, "<45223423@example.com>")
1069        self._check_article_body(lines)
1070        # BODY num
1071        resp, info = self.server.body(3000234)
1072        self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1073        art_num, message_id, lines = info
1074        self.assertEqual(art_num, 3000234)
1075        self.assertEqual(message_id, "<45223423@example.com>")
1076        self._check_article_body(lines)
1077        # BODY id
1078        resp, info = self.server.body("<45223423@example.com>")
1079        self.assertEqual(resp, "222 0 <45223423@example.com>")
1080        art_num, message_id, lines = info
1081        self.assertEqual(art_num, 0)
1082        self.assertEqual(message_id, "<45223423@example.com>")
1083        self._check_article_body(lines)
1084        # Non-existent id
1085        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1086            self.server.body("<non-existent@example.com>")
1087        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1088
1089    def test_body_file(self):
1090        f = io.BytesIO()
1091        resp, info = self.server.body(file=f)
1092        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1093        art_num, message_id, lines = info
1094        self.assertEqual(art_num, 3000237)
1095        self.assertEqual(message_id, "<45223423@example.com>")
1096        self.assertEqual(lines, [])
1097        data = f.getvalue()
1098        self.assertFalse(data.startswith(
1099            b'From: "Demo User" <nobody@example.net>\r\n'
1100            b'Subject: I am just a test article\r\n'
1101            ), ascii(data))
1102        self.assertTrue(data.endswith(
1103            b'This is just a test article.\r\n'
1104            b'.Here is a dot-starting line.\r\n'
1105            b'\r\n'
1106            b'-- Signed by Andr\xc3\xa9.\r\n'
1107            ), ascii(data))
1108
1109    def check_over_xover_resp(self, resp, overviews):
1110        self.assertTrue(resp.startswith("224 "), resp)
1111        self.assertEqual(len(overviews), 3)
1112        art_num, over = overviews[0]
1113        self.assertEqual(art_num, 57)
1114        self.assertEqual(over, {
1115            "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1116            "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1117            "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1118            "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1119            "references": "<hvalf7$ort$1@dough.gmane.org>",
1120            ":bytes": "7103",
1121            ":lines": "16",
1122            "xref": "news.gmane.io gmane.comp.python.authors:57"
1123            })
1124        art_num, over = overviews[1]
1125        self.assertEqual(over["xref"], None)
1126        art_num, over = overviews[2]
1127        self.assertEqual(over["subject"],
1128                         "Re: Message d'erreur incompréhensible (par moi)")
1129
1130    def test_xover(self):
1131        resp, overviews = self.server.xover(57, 59)
1132        self.check_over_xover_resp(resp, overviews)
1133
1134    def test_over(self):
1135        # In NNTP "v1", this will fallback on XOVER
1136        resp, overviews = self.server.over((57, 59))
1137        self.check_over_xover_resp(resp, overviews)
1138
1139    sample_post = (
1140        b'From: "Demo User" <nobody@example.net>\r\n'
1141        b'Subject: I am just a test article\r\n'
1142        b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1143        b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1144        b'\r\n'
1145        b'This is just a test article.\r\n'
1146        b'.Here is a dot-starting line.\r\n'
1147        b'\r\n'
1148        b'-- Signed by Andr\xc3\xa9.\r\n'
1149    )
1150
1151    def _check_posted_body(self):
1152        # Check the raw body as received by the server
1153        lines = self.handler.posted_body
1154        # One additional line for the "." terminator
1155        self.assertEqual(len(lines), 10)
1156        self.assertEqual(lines[-1], b'.\r\n')
1157        self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1158        self.assertEqual(lines[-3], b'\r\n')
1159        self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1160        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1161
1162    def _check_post_ihave_sub(self, func, *args, file_factory):
1163        # First the prepared post with CRLF endings
1164        post = self.sample_post
1165        func_args = args + (file_factory(post),)
1166        self.handler.posted_body = None
1167        resp = func(*func_args)
1168        self._check_posted_body()
1169        # Then the same post with "normal" line endings - they should be
1170        # converted by NNTP.post and NNTP.ihave.
1171        post = self.sample_post.replace(b"\r\n", b"\n")
1172        func_args = args + (file_factory(post),)
1173        self.handler.posted_body = None
1174        resp = func(*func_args)
1175        self._check_posted_body()
1176        return resp
1177
1178    def check_post_ihave(self, func, success_resp, *args):
1179        # With a bytes object
1180        resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1181        self.assertEqual(resp, success_resp)
1182        # With a bytearray object
1183        resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1184        self.assertEqual(resp, success_resp)
1185        # With a file object
1186        resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1187        self.assertEqual(resp, success_resp)
1188        # With an iterable of terminated lines
1189        def iterlines(b):
1190            return iter(b.splitlines(keepends=True))
1191        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1192        self.assertEqual(resp, success_resp)
1193        # With an iterable of non-terminated lines
1194        def iterlines(b):
1195            return iter(b.splitlines(keepends=False))
1196        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1197        self.assertEqual(resp, success_resp)
1198
1199    def test_post(self):
1200        self.check_post_ihave(self.server.post, "240 Article received OK")
1201        self.handler.allow_posting = False
1202        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1203            self.server.post(self.sample_post)
1204        self.assertEqual(cm.exception.response,
1205                         "440 Posting not permitted")
1206
1207    def test_ihave(self):
1208        self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1209                              "<i.am.an.article.you.will.want@example.com>")
1210        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1211            self.server.ihave("<another.message.id>", self.sample_post)
1212        self.assertEqual(cm.exception.response,
1213                         "435 Article not wanted")
1214
1215    def test_too_long_lines(self):
1216        dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1217        self.assertRaises(nntplib.NNTPDataError,
1218                          self.server.newnews, "comp.lang.python", dt)
1219
1220
1221class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1222    """Tests an NNTP v1 server (no capabilities)."""
1223
1224    nntp_version = 1
1225    handler_class = NNTPv1Handler
1226
1227    def test_caps(self):
1228        caps = self.server.getcapabilities()
1229        self.assertEqual(caps, {})
1230        self.assertEqual(self.server.nntp_version, 1)
1231        self.assertEqual(self.server.nntp_implementation, None)
1232
1233
1234class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1235    """Tests an NNTP v2 server (with capabilities)."""
1236
1237    nntp_version = 2
1238    handler_class = NNTPv2Handler
1239
1240    def test_caps(self):
1241        caps = self.server.getcapabilities()
1242        self.assertEqual(caps, {
1243            'VERSION': ['2', '3'],
1244            'IMPLEMENTATION': ['INN', '2.5.1'],
1245            'AUTHINFO': ['USER'],
1246            'HDR': [],
1247            'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1248                     'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1249            'OVER': [],
1250            'POST': [],
1251            'READER': [],
1252            })
1253        self.assertEqual(self.server.nntp_version, 3)
1254        self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
1255
1256
1257class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1258    """Tests a probably NNTP v2 server with capabilities only after login."""
1259
1260    nntp_version = 2
1261    handler_class = CapsAfterLoginNNTPv2Handler
1262
1263    def test_caps_only_after_login(self):
1264        self.assertEqual(self.server._caps, {})
1265        self.server.login('testuser', 'testpw')
1266        self.assertIn('VERSION', self.server._caps)
1267
1268
1269class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1270        unittest.TestCase):
1271    """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1272    that isn't in READER mode by default."""
1273
1274    nntp_version = 2
1275    handler_class = ModeSwitchingNNTPv2Handler
1276
1277    def test_we_are_in_reader_mode_after_connect(self):
1278        self.assertIn('READER', self.server._caps)
1279
1280
1281class MiscTests(unittest.TestCase):
1282
1283    def test_decode_header(self):
1284        def gives(a, b):
1285            self.assertEqual(nntplib.decode_header(a), b)
1286        gives("" , "")
1287        gives("a plain header", "a plain header")
1288        gives(" with extra  spaces ", " with extra  spaces ")
1289        gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1290        gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1291              " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1292              "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1293        gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1294              "Re: problème de matrice")
1295        # A natively utf-8 header (found in the real world!)
1296        gives("Re: Message d'erreur incompréhensible (par moi)",
1297              "Re: Message d'erreur incompréhensible (par moi)")
1298
1299    def test_parse_overview_fmt(self):
1300        # The minimal (default) response
1301        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1302                 "References:", ":bytes", ":lines"]
1303        self.assertEqual(nntplib._parse_overview_fmt(lines),
1304            ["subject", "from", "date", "message-id", "references",
1305             ":bytes", ":lines"])
1306        # The minimal response using alternative names
1307        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1308                 "References:", "Bytes:", "Lines:"]
1309        self.assertEqual(nntplib._parse_overview_fmt(lines),
1310            ["subject", "from", "date", "message-id", "references",
1311             ":bytes", ":lines"])
1312        # Variations in casing
1313        lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1314                 "References:", "BYTES:", "Lines:"]
1315        self.assertEqual(nntplib._parse_overview_fmt(lines),
1316            ["subject", "from", "date", "message-id", "references",
1317             ":bytes", ":lines"])
1318        # First example from RFC 3977
1319        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1320                 "References:", ":bytes", ":lines", "Xref:full",
1321                 "Distribution:full"]
1322        self.assertEqual(nntplib._parse_overview_fmt(lines),
1323            ["subject", "from", "date", "message-id", "references",
1324             ":bytes", ":lines", "xref", "distribution"])
1325        # Second example from RFC 3977
1326        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1327                 "References:", "Bytes:", "Lines:", "Xref:FULL",
1328                 "Distribution:FULL"]
1329        self.assertEqual(nntplib._parse_overview_fmt(lines),
1330            ["subject", "from", "date", "message-id", "references",
1331             ":bytes", ":lines", "xref", "distribution"])
1332        # A classic response from INN
1333        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1334                 "References:", "Bytes:", "Lines:", "Xref:full"]
1335        self.assertEqual(nntplib._parse_overview_fmt(lines),
1336            ["subject", "from", "date", "message-id", "references",
1337             ":bytes", ":lines", "xref"])
1338
1339    def test_parse_overview(self):
1340        fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1341        # First example from RFC 3977
1342        lines = [
1343            '3000234\tI am just a test article\t"Demo User" '
1344            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1345            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1346            '17\tXref: news.example.com misc.test:3000363',
1347        ]
1348        overview = nntplib._parse_overview(lines, fmt)
1349        (art_num, fields), = overview
1350        self.assertEqual(art_num, 3000234)
1351        self.assertEqual(fields, {
1352            'subject': 'I am just a test article',
1353            'from': '"Demo User" <nobody@example.com>',
1354            'date': '6 Oct 1998 04:38:40 -0500',
1355            'message-id': '<45223423@example.com>',
1356            'references': '<45454@example.net>',
1357            ':bytes': '1234',
1358            ':lines': '17',
1359            'xref': 'news.example.com misc.test:3000363',
1360        })
1361        # Second example; here the "Xref" field is totally absent (including
1362        # the header name) and comes out as None
1363        lines = [
1364            '3000234\tI am just a test article\t"Demo User" '
1365            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1366            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1367            '17\t\t',
1368        ]
1369        overview = nntplib._parse_overview(lines, fmt)
1370        (art_num, fields), = overview
1371        self.assertEqual(fields['xref'], None)
1372        # Third example; the "Xref" is an empty string, while "references"
1373        # is a single space.
1374        lines = [
1375            '3000234\tI am just a test article\t"Demo User" '
1376            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1377            '<45223423@example.com>\t \t1234\t'
1378            '17\tXref: \t',
1379        ]
1380        overview = nntplib._parse_overview(lines, fmt)
1381        (art_num, fields), = overview
1382        self.assertEqual(fields['references'], ' ')
1383        self.assertEqual(fields['xref'], '')
1384
1385    def test_parse_datetime(self):
1386        def gives(a, b, *c):
1387            self.assertEqual(nntplib._parse_datetime(a, b),
1388                             datetime.datetime(*c))
1389        # Output of DATE command
1390        gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1391        # Variations
1392        gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1393        gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1394        gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1395
1396    def test_unparse_datetime(self):
1397        # Test non-legacy mode
1398        # 1) with a datetime
1399        def gives(y, M, d, h, m, s, date_str, time_str):
1400            dt = datetime.datetime(y, M, d, h, m, s)
1401            self.assertEqual(nntplib._unparse_datetime(dt),
1402                             (date_str, time_str))
1403            self.assertEqual(nntplib._unparse_datetime(dt, False),
1404                             (date_str, time_str))
1405        gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1406        gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1407        gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1408        # 2) with a date
1409        def gives(y, M, d, date_str, time_str):
1410            dt = datetime.date(y, M, d)
1411            self.assertEqual(nntplib._unparse_datetime(dt),
1412                             (date_str, time_str))
1413            self.assertEqual(nntplib._unparse_datetime(dt, False),
1414                             (date_str, time_str))
1415        gives(1999, 6, 23, "19990623", "000000")
1416        gives(2000, 6, 23, "20000623", "000000")
1417        gives(2010, 6, 5, "20100605", "000000")
1418
1419    def test_unparse_datetime_legacy(self):
1420        # Test legacy mode (RFC 977)
1421        # 1) with a datetime
1422        def gives(y, M, d, h, m, s, date_str, time_str):
1423            dt = datetime.datetime(y, M, d, h, m, s)
1424            self.assertEqual(nntplib._unparse_datetime(dt, True),
1425                             (date_str, time_str))
1426        gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1427        gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1428        gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1429        # 2) with a date
1430        def gives(y, M, d, date_str, time_str):
1431            dt = datetime.date(y, M, d)
1432            self.assertEqual(nntplib._unparse_datetime(dt, True),
1433                             (date_str, time_str))
1434        gives(1999, 6, 23, "990623", "000000")
1435        gives(2000, 6, 23, "000623", "000000")
1436        gives(2010, 6, 5, "100605", "000000")
1437
1438    @unittest.skipUnless(ssl, 'requires SSL support')
1439    def test_ssl_support(self):
1440        self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
1441
1442
1443class PublicAPITests(unittest.TestCase):
1444    """Ensures that the correct values are exposed in the public API."""
1445
1446    def test_module_all_attribute(self):
1447        self.assertTrue(hasattr(nntplib, '__all__'))
1448        target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1449                      'NNTPTemporaryError', 'NNTPPermanentError',
1450                      'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1451        if ssl is not None:
1452            target_api.append('NNTP_SSL')
1453        self.assertEqual(set(nntplib.__all__), set(target_api))
1454
1455class MockSocketTests(unittest.TestCase):
1456    """Tests involving a mock socket object
1457
1458    Used where the _NNTPServerIO file object is not enough."""
1459
1460    nntp_class = nntplib.NNTP
1461
1462    def check_constructor_error_conditions(
1463            self, handler_class,
1464            expected_error_type, expected_error_msg,
1465            login=None, password=None):
1466
1467        class mock_socket_module:
1468            def create_connection(address, timeout):
1469                return MockSocket()
1470
1471        class MockSocket:
1472            def close(self):
1473                nonlocal socket_closed
1474                socket_closed = True
1475
1476            def makefile(socket, mode):
1477                handler = handler_class()
1478                _, file = make_mock_file(handler)
1479                files.append(file)
1480                return file
1481
1482        socket_closed = False
1483        files = []
1484        with patch('nntplib.socket', mock_socket_module), \
1485             self.assertRaisesRegex(expected_error_type, expected_error_msg):
1486            self.nntp_class('dummy', user=login, password=password)
1487        self.assertTrue(socket_closed)
1488        for f in files:
1489            self.assertTrue(f.closed)
1490
1491    def test_bad_welcome(self):
1492        #Test a bad welcome message
1493        class Handler(NNTPv1Handler):
1494            welcome = 'Bad Welcome'
1495        self.check_constructor_error_conditions(
1496            Handler, nntplib.NNTPProtocolError, Handler.welcome)
1497
1498    def test_service_temporarily_unavailable(self):
1499        #Test service temporarily unavailable
1500        class Handler(NNTPv1Handler):
1501            welcome = '400 Service temporarily unavailable'
1502        self.check_constructor_error_conditions(
1503            Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1504
1505    def test_service_permanently_unavailable(self):
1506        #Test service permanently unavailable
1507        class Handler(NNTPv1Handler):
1508            welcome = '502 Service permanently unavailable'
1509        self.check_constructor_error_conditions(
1510            Handler, nntplib.NNTPPermanentError, Handler.welcome)
1511
1512    def test_bad_capabilities(self):
1513        #Test a bad capabilities response
1514        class Handler(NNTPv1Handler):
1515            def handle_CAPABILITIES(self):
1516                self.push_lit(capabilities_response)
1517        capabilities_response = '201 bad capability'
1518        self.check_constructor_error_conditions(
1519            Handler, nntplib.NNTPReplyError, capabilities_response)
1520
1521    def test_login_aborted(self):
1522        #Test a bad authinfo response
1523        login = 't@e.com'
1524        password = 'python'
1525        class Handler(NNTPv1Handler):
1526            def handle_AUTHINFO(self, *args):
1527                self.push_lit(authinfo_response)
1528        authinfo_response = '503 Mechanism not recognized'
1529        self.check_constructor_error_conditions(
1530            Handler, nntplib.NNTPPermanentError, authinfo_response,
1531            login, password)
1532
1533class bypass_context:
1534    """Bypass encryption and actual SSL module"""
1535    def wrap_socket(sock, **args):
1536        return sock
1537
1538@unittest.skipUnless(ssl, 'requires SSL support')
1539class MockSslTests(MockSocketTests):
1540    @staticmethod
1541    def nntp_class(*pos, **kw):
1542        return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
1543
1544
1545class LocalServerTests(unittest.TestCase):
1546    def setUp(self):
1547        sock = socket.socket()
1548        port = support.bind_port(sock)
1549        sock.listen()
1550        self.background = threading.Thread(
1551            target=self.run_server, args=(sock,))
1552        self.background.start()
1553        self.addCleanup(self.background.join)
1554
1555        self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1556        self.addCleanup(self.nntp.__exit__, None, None, None)
1557
1558    def run_server(self, sock):
1559        # Could be generalized to handle more commands in separate methods
1560        with sock:
1561            [client, _] = sock.accept()
1562        with contextlib.ExitStack() as cleanup:
1563            cleanup.enter_context(client)
1564            reader = cleanup.enter_context(client.makefile('rb'))
1565            client.sendall(b'200 Server ready\r\n')
1566            while True:
1567                cmd = reader.readline()
1568                if cmd == b'CAPABILITIES\r\n':
1569                    client.sendall(
1570                        b'101 Capability list:\r\n'
1571                        b'VERSION 2\r\n'
1572                        b'STARTTLS\r\n'
1573                        b'.\r\n'
1574                    )
1575                elif cmd == b'STARTTLS\r\n':
1576                    reader.close()
1577                    client.sendall(b'382 Begin TLS negotiation now\r\n')
1578                    context = ssl.SSLContext()
1579                    context.load_cert_chain(certfile)
1580                    client = context.wrap_socket(
1581                        client, server_side=True)
1582                    cleanup.enter_context(client)
1583                    reader = cleanup.enter_context(client.makefile('rb'))
1584                elif cmd == b'QUIT\r\n':
1585                    client.sendall(b'205 Bye!\r\n')
1586                    break
1587                else:
1588                    raise ValueError('Unexpected command {!r}'.format(cmd))
1589
1590    @unittest.skipUnless(ssl, 'requires SSL support')
1591    def test_starttls(self):
1592        file = self.nntp.file
1593        sock = self.nntp.sock
1594        self.nntp.starttls()
1595        # Check that the socket and internal pseudo-file really were
1596        # changed.
1597        self.assertNotEqual(file, self.nntp.file)
1598        self.assertNotEqual(sock, self.nntp.sock)
1599        # Check that the new socket really is an SSL one
1600        self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1601        # Check that trying starttls when it's already active fails.
1602        self.assertRaises(ValueError, self.nntp.starttls)
1603
1604
1605if __name__ == "__main__":
1606    unittest.main()
1607