• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import socket
3import datetime
4import textwrap
5import unittest
6import functools
7import contextlib
8import nntplib
9import os.path
10import re
11import threading
12
13from test import support
14from test.support import socket_helper
15from nntplib import NNTP, GroupInfo
16from unittest.mock import patch
17try:
18    import ssl
19except ImportError:
20    ssl = None
21
22
23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
24
25if ssl is not None:
26    SSLError = ssl.SSLError
27else:
28    class SSLError(Exception):
29        """Non-existent exception class when we lack SSL support."""
30        reason = "This will never be raised."
31
32# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
35# - test auth and `usenetrc`
36
37
38class NetworkedNNTPTestsMixin:
39
40    def test_welcome(self):
41        welcome = self.server.getwelcome()
42        self.assertEqual(str, type(welcome))
43
44    def test_help(self):
45        resp, lines = self.server.help()
46        self.assertTrue(resp.startswith("100 "), resp)
47        for line in lines:
48            self.assertEqual(str, type(line))
49
50    def test_list(self):
51        resp, groups = self.server.list()
52        if len(groups) > 0:
53            self.assertEqual(GroupInfo, type(groups[0]))
54            self.assertEqual(str, type(groups[0].group))
55
56    def test_list_active(self):
57        resp, groups = self.server.list(self.GROUP_PAT)
58        if len(groups) > 0:
59            self.assertEqual(GroupInfo, type(groups[0]))
60            self.assertEqual(str, type(groups[0].group))
61
62    def test_unknown_command(self):
63        with self.assertRaises(nntplib.NNTPPermanentError) as cm:
64            self.server._shortcmd("XYZZY")
65        resp = cm.exception.response
66        self.assertTrue(resp.startswith("500 "), resp)
67
68    def test_newgroups(self):
69        # gmane gets a constant influx of new groups.  In order not to stress
70        # the server too much, we choose a recent date in the past.
71        dt = datetime.date.today() - datetime.timedelta(days=7)
72        resp, groups = self.server.newgroups(dt)
73        if len(groups) > 0:
74            self.assertIsInstance(groups[0], GroupInfo)
75            self.assertIsInstance(groups[0].group, str)
76
77    def test_description(self):
78        def _check_desc(desc):
79            # Sanity checks
80            self.assertIsInstance(desc, str)
81            self.assertNotIn(self.GROUP_NAME, desc)
82        desc = self.server.description(self.GROUP_NAME)
83        _check_desc(desc)
84        # Another sanity check
85        self.assertIn(self.DESC, desc)
86        # With a pattern
87        desc = self.server.description(self.GROUP_PAT)
88        _check_desc(desc)
89        # Shouldn't exist
90        desc = self.server.description("zk.brrtt.baz")
91        self.assertEqual(desc, '')
92
93    def test_descriptions(self):
94        resp, descs = self.server.descriptions(self.GROUP_PAT)
95        # 215 for LIST NEWSGROUPS, 282 for XGTITLE
96        self.assertTrue(
97            resp.startswith("215 ") or resp.startswith("282 "), resp)
98        self.assertIsInstance(descs, dict)
99        desc = descs[self.GROUP_NAME]
100        self.assertEqual(desc, self.server.description(self.GROUP_NAME))
101
102    def test_group(self):
103        result = self.server.group(self.GROUP_NAME)
104        self.assertEqual(5, len(result))
105        resp, count, first, last, group = result
106        self.assertEqual(group, self.GROUP_NAME)
107        self.assertIsInstance(count, int)
108        self.assertIsInstance(first, int)
109        self.assertIsInstance(last, int)
110        self.assertLessEqual(first, last)
111        self.assertTrue(resp.startswith("211 "), resp)
112
113    def test_date(self):
114        resp, date = self.server.date()
115        self.assertIsInstance(date, datetime.datetime)
116        # Sanity check
117        self.assertGreaterEqual(date.year, 1995)
118        self.assertLessEqual(date.year, 2030)
119
120    def _check_art_dict(self, art_dict):
121        # Some sanity checks for a field dictionary returned by OVER / XOVER
122        self.assertIsInstance(art_dict, dict)
123        # NNTP has 7 mandatory fields
124        self.assertGreaterEqual(art_dict.keys(),
125            {"subject", "from", "date", "message-id",
126             "references", ":bytes", ":lines"}
127            )
128        for v in art_dict.values():
129            self.assertIsInstance(v, (str, type(None)))
130
131    def test_xover(self):
132        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
133        resp, lines = self.server.xover(last - 5, last)
134        if len(lines) == 0:
135            self.skipTest("no articles retrieved")
136        # The 'last' article is not necessarily part of the output (cancelled?)
137        art_num, art_dict = lines[0]
138        self.assertGreaterEqual(art_num, last - 5)
139        self.assertLessEqual(art_num, last)
140        self._check_art_dict(art_dict)
141
142    @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
143                           ' is found for issue #28971')
144    def test_over(self):
145        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
146        start = last - 10
147        # The "start-" article range form
148        resp, lines = self.server.over((start, None))
149        art_num, art_dict = lines[0]
150        self._check_art_dict(art_dict)
151        # The "start-end" article range form
152        resp, lines = self.server.over((start, last))
153        art_num, art_dict = lines[-1]
154        # The 'last' article is not necessarily part of the output (cancelled?)
155        self.assertGreaterEqual(art_num, start)
156        self.assertLessEqual(art_num, last)
157        self._check_art_dict(art_dict)
158        # XXX The "message_id" form is unsupported by gmane
159        # 503 Overview by message-ID unsupported
160
161    def test_xhdr(self):
162        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
163        resp, lines = self.server.xhdr('subject', last)
164        for line in lines:
165            self.assertEqual(str, type(line[1]))
166
167    def check_article_resp(self, resp, article, art_num=None):
168        self.assertIsInstance(article, nntplib.ArticleInfo)
169        if art_num is not None:
170            self.assertEqual(article.number, art_num)
171        for line in article.lines:
172            self.assertIsInstance(line, bytes)
173        # XXX this could exceptionally happen...
174        self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
175
176    @unittest.skipIf(True, "FIXME: see bpo-32128")
177    def test_article_head_body(self):
178        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
179        # Try to find an available article
180        for art_num in (last, first, last - 1):
181            try:
182                resp, head = self.server.head(art_num)
183            except nntplib.NNTPTemporaryError as e:
184                if not e.response.startswith("423 "):
185                    raise
186                # "423 No such article" => choose another one
187                continue
188            break
189        else:
190            self.skipTest("could not find a suitable article number")
191        self.assertTrue(resp.startswith("221 "), resp)
192        self.check_article_resp(resp, head, art_num)
193        resp, body = self.server.body(art_num)
194        self.assertTrue(resp.startswith("222 "), resp)
195        self.check_article_resp(resp, body, art_num)
196        resp, article = self.server.article(art_num)
197        self.assertTrue(resp.startswith("220 "), resp)
198        self.check_article_resp(resp, article, art_num)
199        # Tolerate running the tests from behind a NNTP virus checker
200        blacklist = lambda line: line.startswith(b'X-Antivirus')
201        filtered_head_lines = [line for line in head.lines
202                               if not blacklist(line)]
203        filtered_lines = [line for line in article.lines
204                          if not blacklist(line)]
205        self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
206
207    def test_capabilities(self):
208        # The server under test implements NNTP version 2 and has a
209        # couple of well-known capabilities. Just sanity check that we
210        # got them.
211        def _check_caps(caps):
212            caps_list = caps['LIST']
213            self.assertIsInstance(caps_list, (list, tuple))
214            self.assertIn('OVERVIEW.FMT', caps_list)
215        self.assertGreaterEqual(self.server.nntp_version, 2)
216        _check_caps(self.server.getcapabilities())
217        # This re-emits the command
218        resp, caps = self.server.capabilities()
219        _check_caps(caps)
220
221    def test_zlogin(self):
222        # This test must be the penultimate because further commands will be
223        # refused.
224        baduser = "notarealuser"
225        badpw = "notarealpassword"
226        # Check that bogus credentials cause failure
227        self.assertRaises(nntplib.NNTPError, self.server.login,
228                          user=baduser, password=badpw, usenetrc=False)
229        # FIXME: We should check that correct credentials succeed, but that
230        # would require valid details for some server somewhere to be in the
231        # test suite, I think. Gmane is anonymous, at least as used for the
232        # other tests.
233
234    def test_zzquit(self):
235        # This test must be called last, hence the name
236        cls = type(self)
237        try:
238            self.server.quit()
239        finally:
240            cls.server = None
241
242    @classmethod
243    def wrap_methods(cls):
244        # Wrap all methods in a transient_internet() exception catcher
245        # XXX put a generic version in test.support?
246        def wrap_meth(meth):
247            @functools.wraps(meth)
248            def wrapped(self):
249                with socket_helper.transient_internet(self.NNTP_HOST):
250                    meth(self)
251            return wrapped
252        for name in dir(cls):
253            if not name.startswith('test_'):
254                continue
255            meth = getattr(cls, name)
256            if not callable(meth):
257                continue
258            # Need to use a closure so that meth remains bound to its current
259            # value
260            setattr(cls, name, wrap_meth(meth))
261
262    def test_timeout(self):
263        with self.assertRaises(ValueError):
264            self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
265
266    def test_with_statement(self):
267        def is_connected():
268            if not hasattr(server, 'file'):
269                return False
270            try:
271                server.help()
272            except (OSError, EOFError):
273                return False
274            return True
275
276        try:
277            server = self.NNTP_CLASS(self.NNTP_HOST,
278                                     timeout=support.INTERNET_TIMEOUT,
279                                     usenetrc=False)
280            with server:
281                self.assertTrue(is_connected())
282                self.assertTrue(server.help())
283            self.assertFalse(is_connected())
284
285            server = self.NNTP_CLASS(self.NNTP_HOST,
286                                     timeout=support.INTERNET_TIMEOUT,
287                                     usenetrc=False)
288            with server:
289                server.quit()
290            self.assertFalse(is_connected())
291        except SSLError as ssl_err:
292            # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
293            if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
294                raise unittest.SkipTest(f"Got {ssl_err} connecting "
295                                        f"to {self.NNTP_HOST!r}")
296            raise
297
298
299NetworkedNNTPTestsMixin.wrap_methods()
300
301
302EOF_ERRORS = (EOFError,)
303if ssl is not None:
304    EOF_ERRORS += (ssl.SSLEOFError,)
305
306
307class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
308    # This server supports STARTTLS (gmane doesn't)
309    NNTP_HOST = 'news.trigofacile.com'
310    GROUP_NAME = 'fr.comp.lang.python'
311    GROUP_PAT = 'fr.comp.lang.*'
312    DESC = 'Python'
313
314    NNTP_CLASS = NNTP
315
316    @classmethod
317    def setUpClass(cls):
318        support.requires("network")
319        with socket_helper.transient_internet(cls.NNTP_HOST):
320            try:
321                cls.server = cls.NNTP_CLASS(cls.NNTP_HOST,
322                                            timeout=support.INTERNET_TIMEOUT,
323                                            usenetrc=False)
324            except SSLError as ssl_err:
325                # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
326                if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
327                    raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
328                                            f"to {cls.NNTP_HOST!r}")
329                raise
330            except EOF_ERRORS:
331                raise unittest.SkipTest(f"{cls} got EOF error on connecting "
332                                        f"to {cls.NNTP_HOST!r}")
333
334    @classmethod
335    def tearDownClass(cls):
336        if cls.server is not None:
337            cls.server.quit()
338
339@unittest.skipUnless(ssl, 'requires SSL support')
340class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
341
342    # Technical limits for this public NNTP server (see http://www.aioe.org):
343    # "Only two concurrent connections per IP address are allowed and
344    # 400 connections per day are accepted from each IP address."
345
346    NNTP_HOST = 'nntp.aioe.org'
347    # bpo-42794: aioe.test is one of the official groups on this server
348    # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
349    GROUP_NAME = 'aioe.test'
350    GROUP_PAT = 'aioe.*'
351    DESC = 'test'
352
353    NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
354
355    # Disabled as it produces too much data
356    test_list = None
357
358    # Disabled as the connection will already be encrypted.
359    test_starttls = None
360
361
362#
363# Non-networked tests using a local server (or something mocking it).
364#
365
366class _NNTPServerIO(io.RawIOBase):
367    """A raw IO object allowing NNTP commands to be received and processed
368    by a handler.  The handler can push responses which can then be read
369    from the IO object."""
370
371    def __init__(self, handler):
372        io.RawIOBase.__init__(self)
373        # The channel from the client
374        self.c2s = io.BytesIO()
375        # The channel to the client
376        self.s2c = io.BytesIO()
377        self.handler = handler
378        self.handler.start(self.c2s.readline, self.push_data)
379
380    def readable(self):
381        return True
382
383    def writable(self):
384        return True
385
386    def push_data(self, data):
387        """Push (buffer) some data to send to the client."""
388        pos = self.s2c.tell()
389        self.s2c.seek(0, 2)
390        self.s2c.write(data)
391        self.s2c.seek(pos)
392
393    def write(self, b):
394        """The client sends us some data"""
395        pos = self.c2s.tell()
396        self.c2s.write(b)
397        self.c2s.seek(pos)
398        self.handler.process_pending()
399        return len(b)
400
401    def readinto(self, buf):
402        """The client wants to read a response"""
403        self.handler.process_pending()
404        b = self.s2c.read(len(buf))
405        n = len(b)
406        buf[:n] = b
407        return n
408
409
410def make_mock_file(handler):
411    sio = _NNTPServerIO(handler)
412    # Using BufferedRWPair instead of BufferedRandom ensures the file
413    # isn't seekable.
414    file = io.BufferedRWPair(sio, sio)
415    return (sio, file)
416
417
418class NNTPServer(nntplib.NNTP):
419
420    def __init__(self, f, host, readermode=None):
421        self.file = f
422        self.host = host
423        self._base_init(readermode)
424
425    def _close(self):
426        self.file.close()
427        del self.file
428
429
430class MockedNNTPTestsMixin:
431    # Override in derived classes
432    handler_class = None
433
434    def setUp(self):
435        super().setUp()
436        self.make_server()
437
438    def tearDown(self):
439        super().tearDown()
440        del self.server
441
442    def make_server(self, *args, **kwargs):
443        self.handler = self.handler_class()
444        self.sio, file = make_mock_file(self.handler)
445        self.server = NNTPServer(file, 'test.server', *args, **kwargs)
446        return self.server
447
448
449class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
450    def setUp(self):
451        super().setUp()
452        self.make_server(readermode=True)
453
454
455class NNTPv1Handler:
456    """A handler for RFC 977"""
457
458    welcome = "200 NNTP mock server"
459
460    def start(self, readline, push_data):
461        self.in_body = False
462        self.allow_posting = True
463        self._readline = readline
464        self._push_data = push_data
465        self._logged_in = False
466        self._user_sent = False
467        # Our welcome
468        self.handle_welcome()
469
470    def _decode(self, data):
471        return str(data, "utf-8", "surrogateescape")
472
473    def process_pending(self):
474        if self.in_body:
475            while True:
476                line = self._readline()
477                if not line:
478                    return
479                self.body.append(line)
480                if line == b".\r\n":
481                    break
482            try:
483                meth, tokens = self.body_callback
484                meth(*tokens, body=self.body)
485            finally:
486                self.body_callback = None
487                self.body = None
488                self.in_body = False
489        while True:
490            line = self._decode(self._readline())
491            if not line:
492                return
493            if not line.endswith("\r\n"):
494                raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
495            line = line[:-2]
496            cmd, *tokens = line.split()
497            #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
498            meth = getattr(self, "handle_" + cmd.upper(), None)
499            if meth is None:
500                self.handle_unknown()
501            else:
502                try:
503                    meth(*tokens)
504                except Exception as e:
505                    raise ValueError("command failed: {!r}".format(line)) from e
506                else:
507                    if self.in_body:
508                        self.body_callback = meth, tokens
509                        self.body = []
510
511    def expect_body(self):
512        """Flag that the client is expected to post a request body"""
513        self.in_body = True
514
515    def push_data(self, data):
516        """Push some binary data"""
517        self._push_data(data)
518
519    def push_lit(self, lit):
520        """Push a string literal"""
521        lit = textwrap.dedent(lit)
522        lit = "\r\n".join(lit.splitlines()) + "\r\n"
523        lit = lit.encode('utf-8')
524        self.push_data(lit)
525
526    def handle_unknown(self):
527        self.push_lit("500 What?")
528
529    def handle_welcome(self):
530        self.push_lit(self.welcome)
531
532    def handle_QUIT(self):
533        self.push_lit("205 Bye!")
534
535    def handle_DATE(self):
536        self.push_lit("111 20100914001155")
537
538    def handle_GROUP(self, group):
539        if group == "fr.comp.lang.python":
540            self.push_lit("211 486 761 1265 fr.comp.lang.python")
541        else:
542            self.push_lit("411 No such group {}".format(group))
543
544    def handle_HELP(self):
545        self.push_lit("""\
546            100 Legal commands
547              authinfo user Name|pass Password|generic <prog> <args>
548              date
549              help
550            Report problems to <root@example.org>
551            .""")
552
553    def handle_STAT(self, message_spec=None):
554        if message_spec is None:
555            self.push_lit("412 No newsgroup selected")
556        elif message_spec == "3000234":
557            self.push_lit("223 3000234 <45223423@example.com>")
558        elif message_spec == "<45223423@example.com>":
559            self.push_lit("223 0 <45223423@example.com>")
560        else:
561            self.push_lit("430 No Such Article Found")
562
563    def handle_NEXT(self):
564        self.push_lit("223 3000237 <668929@example.org> retrieved")
565
566    def handle_LAST(self):
567        self.push_lit("223 3000234 <45223423@example.com> retrieved")
568
569    def handle_LIST(self, action=None, param=None):
570        if action is None:
571            self.push_lit("""\
572                215 Newsgroups in form "group high low flags".
573                comp.lang.python 0000052340 0000002828 y
574                comp.lang.python.announce 0000001153 0000000993 m
575                free.it.comp.lang.python 0000000002 0000000002 y
576                fr.comp.lang.python 0000001254 0000000760 y
577                free.it.comp.lang.python.learner 0000000000 0000000001 y
578                tw.bbs.comp.lang.python 0000000304 0000000304 y
579                .""")
580        elif action == "ACTIVE":
581            if param == "*distutils*":
582                self.push_lit("""\
583                    215 Newsgroups in form "group high low flags"
584                    gmane.comp.python.distutils.devel 0000014104 0000000001 m
585                    gmane.comp.python.distutils.cvs 0000000000 0000000001 m
586                    .""")
587            else:
588                self.push_lit("""\
589                    215 Newsgroups in form "group high low flags"
590                    .""")
591        elif action == "OVERVIEW.FMT":
592            self.push_lit("""\
593                215 Order of fields in overview database.
594                Subject:
595                From:
596                Date:
597                Message-ID:
598                References:
599                Bytes:
600                Lines:
601                Xref:full
602                .""")
603        elif action == "NEWSGROUPS":
604            assert param is not None
605            if param == "comp.lang.python":
606                self.push_lit("""\
607                    215 Descriptions in form "group description".
608                    comp.lang.python\tThe Python computer language.
609                    .""")
610            elif param == "comp.lang.python*":
611                self.push_lit("""\
612                    215 Descriptions in form "group description".
613                    comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
614                    comp.lang.python\tThe Python computer language.
615                    .""")
616            else:
617                self.push_lit("""\
618                    215 Descriptions in form "group description".
619                    .""")
620        else:
621            self.push_lit('501 Unknown LIST keyword')
622
623    def handle_NEWNEWS(self, group, date_str, time_str):
624        # We hard code different return messages depending on passed
625        # argument and date syntax.
626        if (group == "comp.lang.python" and date_str == "20100913"
627            and time_str == "082004"):
628            # Date was passed in RFC 3977 format (NNTP "v2")
629            self.push_lit("""\
630                230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
631                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
632                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
633                .""")
634        elif (group == "comp.lang.python" and date_str == "100913"
635            and time_str == "082004"):
636            # Date was passed in RFC 977 format (NNTP "v1")
637            self.push_lit("""\
638                230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
639                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
640                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
641                .""")
642        elif (group == 'comp.lang.python' and
643              date_str in ('20100101', '100101') and
644              time_str == '090000'):
645            self.push_lit('too long line' * 3000 +
646                          '\n.')
647        else:
648            self.push_lit("""\
649                230 An empty list of newsarticles follows
650                .""")
651        # (Note for experiments: many servers disable NEWNEWS.
652        #  As of this writing, sicinfo3.epfl.ch doesn't.)
653
654    def handle_XOVER(self, message_spec):
655        if message_spec == "57-59":
656            self.push_lit(
657                "224 Overview information for 57-58 follows\n"
658                "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
659                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
660                    "\tSat, 19 Jun 2010 18:04:08 -0400"
661                    "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
662                    "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
663                    "\tXref: news.gmane.io gmane.comp.python.authors:57"
664                    "\n"
665                "58\tLooking for a few good bloggers"
666                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
667                    "\tThu, 22 Jul 2010 09:14:14 -0400"
668                    "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
669                    "\t\t6683\t16"
670                    "\t"
671                    "\n"
672                # A UTF-8 overview line from fr.comp.lang.python
673                "59\tRe: Message d'erreur incompréhensible (par moi)"
674                    "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
675                    "\tWed, 15 Sep 2010 18:09:15 +0200"
676                    "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
677                    "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
678                    "\tXref: saria.nerim.net fr.comp.lang.python:1265"
679                    "\n"
680                ".\n")
681        else:
682            self.push_lit("""\
683                224 No articles
684                .""")
685
686    def handle_POST(self, *, body=None):
687        if body is None:
688            if self.allow_posting:
689                self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
690                self.expect_body()
691            else:
692                self.push_lit("440 Posting not permitted")
693        else:
694            assert self.allow_posting
695            self.push_lit("240 Article received OK")
696            self.posted_body = body
697
698    def handle_IHAVE(self, message_id, *, body=None):
699        if body is None:
700            if (self.allow_posting and
701                message_id == "<i.am.an.article.you.will.want@example.com>"):
702                self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
703                self.expect_body()
704            else:
705                self.push_lit("435 Article not wanted")
706        else:
707            assert self.allow_posting
708            self.push_lit("235 Article transferred OK")
709            self.posted_body = body
710
711    sample_head = """\
712        From: "Demo User" <nobody@example.net>
713        Subject: I am just a test article
714        Content-Type: text/plain; charset=UTF-8; format=flowed
715        Message-ID: <i.am.an.article.you.will.want@example.com>"""
716
717    sample_body = """\
718        This is just a test article.
719        ..Here is a dot-starting line.
720
721        -- Signed by Andr\xe9."""
722
723    sample_article = sample_head + "\n\n" + sample_body
724
725    def handle_ARTICLE(self, message_spec=None):
726        if message_spec is None:
727            self.push_lit("220 3000237 <45223423@example.com>")
728        elif message_spec == "<45223423@example.com>":
729            self.push_lit("220 0 <45223423@example.com>")
730        elif message_spec == "3000234":
731            self.push_lit("220 3000234 <45223423@example.com>")
732        else:
733            self.push_lit("430 No Such Article Found")
734            return
735        self.push_lit(self.sample_article)
736        self.push_lit(".")
737
738    def handle_HEAD(self, message_spec=None):
739        if message_spec is None:
740            self.push_lit("221 3000237 <45223423@example.com>")
741        elif message_spec == "<45223423@example.com>":
742            self.push_lit("221 0 <45223423@example.com>")
743        elif message_spec == "3000234":
744            self.push_lit("221 3000234 <45223423@example.com>")
745        else:
746            self.push_lit("430 No Such Article Found")
747            return
748        self.push_lit(self.sample_head)
749        self.push_lit(".")
750
751    def handle_BODY(self, message_spec=None):
752        if message_spec is None:
753            self.push_lit("222 3000237 <45223423@example.com>")
754        elif message_spec == "<45223423@example.com>":
755            self.push_lit("222 0 <45223423@example.com>")
756        elif message_spec == "3000234":
757            self.push_lit("222 3000234 <45223423@example.com>")
758        else:
759            self.push_lit("430 No Such Article Found")
760            return
761        self.push_lit(self.sample_body)
762        self.push_lit(".")
763
764    def handle_AUTHINFO(self, cred_type, data):
765        if self._logged_in:
766            self.push_lit('502 Already Logged In')
767        elif cred_type == 'user':
768            if self._user_sent:
769                self.push_lit('482 User Credential Already Sent')
770            else:
771                self.push_lit('381 Password Required')
772                self._user_sent = True
773        elif cred_type == 'pass':
774            self.push_lit('281 Login Successful')
775            self._logged_in = True
776        else:
777            raise Exception('Unknown cred type {}'.format(cred_type))
778
779
780class NNTPv2Handler(NNTPv1Handler):
781    """A handler for RFC 3977 (NNTP "v2")"""
782
783    def handle_CAPABILITIES(self):
784        fmt = """\
785            101 Capability list:
786            VERSION 2 3
787            IMPLEMENTATION INN 2.5.1{}
788            HDR
789            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
790            OVER
791            POST
792            READER
793            ."""
794
795        if not self._logged_in:
796            self.push_lit(fmt.format('\n            AUTHINFO USER'))
797        else:
798            self.push_lit(fmt.format(''))
799
800    def handle_MODE(self, _):
801        raise Exception('MODE READER sent despite READER has been advertised')
802
803    def handle_OVER(self, message_spec=None):
804        return self.handle_XOVER(message_spec)
805
806
807class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
808    """A handler that allows CAPABILITIES only after login"""
809
810    def handle_CAPABILITIES(self):
811        if not self._logged_in:
812            self.push_lit('480 You must log in.')
813        else:
814            super().handle_CAPABILITIES()
815
816
817class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
818    """A server that starts in transit mode"""
819
820    def __init__(self):
821        self._switched = False
822
823    def handle_CAPABILITIES(self):
824        fmt = """\
825            101 Capability list:
826            VERSION 2 3
827            IMPLEMENTATION INN 2.5.1
828            HDR
829            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
830            OVER
831            POST
832            {}READER
833            ."""
834        if self._switched:
835            self.push_lit(fmt.format(''))
836        else:
837            self.push_lit(fmt.format('MODE-'))
838
839    def handle_MODE(self, what):
840        assert not self._switched and what == 'reader'
841        self._switched = True
842        self.push_lit('200 Posting allowed')
843
844
845class NNTPv1v2TestsMixin:
846
847    def setUp(self):
848        super().setUp()
849
850    def test_welcome(self):
851        self.assertEqual(self.server.welcome, self.handler.welcome)
852
853    def test_authinfo(self):
854        if self.nntp_version == 2:
855            self.assertIn('AUTHINFO', self.server._caps)
856        self.server.login('testuser', 'testpw')
857        # if AUTHINFO is gone from _caps we also know that getcapabilities()
858        # has been called after login as it should
859        self.assertNotIn('AUTHINFO', self.server._caps)
860
861    def test_date(self):
862        resp, date = self.server.date()
863        self.assertEqual(resp, "111 20100914001155")
864        self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
865
866    def test_quit(self):
867        self.assertFalse(self.sio.closed)
868        resp = self.server.quit()
869        self.assertEqual(resp, "205 Bye!")
870        self.assertTrue(self.sio.closed)
871
872    def test_help(self):
873        resp, help = self.server.help()
874        self.assertEqual(resp, "100 Legal commands")
875        self.assertEqual(help, [
876            '  authinfo user Name|pass Password|generic <prog> <args>',
877            '  date',
878            '  help',
879            'Report problems to <root@example.org>',
880        ])
881
882    def test_list(self):
883        resp, groups = self.server.list()
884        self.assertEqual(len(groups), 6)
885        g = groups[1]
886        self.assertEqual(g,
887            GroupInfo("comp.lang.python.announce", "0000001153",
888                      "0000000993", "m"))
889        resp, groups = self.server.list("*distutils*")
890        self.assertEqual(len(groups), 2)
891        g = groups[0]
892        self.assertEqual(g,
893            GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
894                      "0000000001", "m"))
895
896    def test_stat(self):
897        resp, art_num, message_id = self.server.stat(3000234)
898        self.assertEqual(resp, "223 3000234 <45223423@example.com>")
899        self.assertEqual(art_num, 3000234)
900        self.assertEqual(message_id, "<45223423@example.com>")
901        resp, art_num, message_id = self.server.stat("<45223423@example.com>")
902        self.assertEqual(resp, "223 0 <45223423@example.com>")
903        self.assertEqual(art_num, 0)
904        self.assertEqual(message_id, "<45223423@example.com>")
905        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
906            self.server.stat("<non.existent.id>")
907        self.assertEqual(cm.exception.response, "430 No Such Article Found")
908        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
909            self.server.stat()
910        self.assertEqual(cm.exception.response, "412 No newsgroup selected")
911
912    def test_next(self):
913        resp, art_num, message_id = self.server.next()
914        self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
915        self.assertEqual(art_num, 3000237)
916        self.assertEqual(message_id, "<668929@example.org>")
917
918    def test_last(self):
919        resp, art_num, message_id = self.server.last()
920        self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
921        self.assertEqual(art_num, 3000234)
922        self.assertEqual(message_id, "<45223423@example.com>")
923
924    def test_description(self):
925        desc = self.server.description("comp.lang.python")
926        self.assertEqual(desc, "The Python computer language.")
927        desc = self.server.description("comp.lang.pythonx")
928        self.assertEqual(desc, "")
929
930    def test_descriptions(self):
931        resp, groups = self.server.descriptions("comp.lang.python")
932        self.assertEqual(resp, '215 Descriptions in form "group description".')
933        self.assertEqual(groups, {
934            "comp.lang.python": "The Python computer language.",
935            })
936        resp, groups = self.server.descriptions("comp.lang.python*")
937        self.assertEqual(groups, {
938            "comp.lang.python": "The Python computer language.",
939            "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
940            })
941        resp, groups = self.server.descriptions("comp.lang.pythonx")
942        self.assertEqual(groups, {})
943
944    def test_group(self):
945        resp, count, first, last, group = self.server.group("fr.comp.lang.python")
946        self.assertTrue(resp.startswith("211 "), resp)
947        self.assertEqual(first, 761)
948        self.assertEqual(last, 1265)
949        self.assertEqual(count, 486)
950        self.assertEqual(group, "fr.comp.lang.python")
951        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
952            self.server.group("comp.lang.python.devel")
953        exc = cm.exception
954        self.assertTrue(exc.response.startswith("411 No such group"),
955                        exc.response)
956
957    def test_newnews(self):
958        # NEWNEWS comp.lang.python [20]100913 082004
959        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
960        resp, ids = self.server.newnews("comp.lang.python", dt)
961        expected = (
962            "230 list of newsarticles (NNTP v{0}) "
963            "created after Mon Sep 13 08:20:04 2010 follows"
964            ).format(self.nntp_version)
965        self.assertEqual(resp, expected)
966        self.assertEqual(ids, [
967            "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
968            "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
969            ])
970        # NEWNEWS fr.comp.lang.python [20]100913 082004
971        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
972        resp, ids = self.server.newnews("fr.comp.lang.python", dt)
973        self.assertEqual(resp, "230 An empty list of newsarticles follows")
974        self.assertEqual(ids, [])
975
976    def _check_article_body(self, lines):
977        self.assertEqual(len(lines), 4)
978        self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
979        self.assertEqual(lines[-2], b"")
980        self.assertEqual(lines[-3], b".Here is a dot-starting line.")
981        self.assertEqual(lines[-4], b"This is just a test article.")
982
983    def _check_article_head(self, lines):
984        self.assertEqual(len(lines), 4)
985        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
986        self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
987
988    def _check_article_data(self, lines):
989        self.assertEqual(len(lines), 9)
990        self._check_article_head(lines[:4])
991        self._check_article_body(lines[-4:])
992        self.assertEqual(lines[4], b"")
993
994    def test_article(self):
995        # ARTICLE
996        resp, info = self.server.article()
997        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
998        art_num, message_id, lines = info
999        self.assertEqual(art_num, 3000237)
1000        self.assertEqual(message_id, "<45223423@example.com>")
1001        self._check_article_data(lines)
1002        # ARTICLE num
1003        resp, info = self.server.article(3000234)
1004        self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1005        art_num, message_id, lines = info
1006        self.assertEqual(art_num, 3000234)
1007        self.assertEqual(message_id, "<45223423@example.com>")
1008        self._check_article_data(lines)
1009        # ARTICLE id
1010        resp, info = self.server.article("<45223423@example.com>")
1011        self.assertEqual(resp, "220 0 <45223423@example.com>")
1012        art_num, message_id, lines = info
1013        self.assertEqual(art_num, 0)
1014        self.assertEqual(message_id, "<45223423@example.com>")
1015        self._check_article_data(lines)
1016        # Non-existent id
1017        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1018            self.server.article("<non-existent@example.com>")
1019        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1020
1021    def test_article_file(self):
1022        # With a "file" argument
1023        f = io.BytesIO()
1024        resp, info = self.server.article(file=f)
1025        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1026        art_num, message_id, lines = info
1027        self.assertEqual(art_num, 3000237)
1028        self.assertEqual(message_id, "<45223423@example.com>")
1029        self.assertEqual(lines, [])
1030        data = f.getvalue()
1031        self.assertTrue(data.startswith(
1032            b'From: "Demo User" <nobody@example.net>\r\n'
1033            b'Subject: I am just a test article\r\n'
1034            ), ascii(data))
1035        self.assertTrue(data.endswith(
1036            b'This is just a test article.\r\n'
1037            b'.Here is a dot-starting line.\r\n'
1038            b'\r\n'
1039            b'-- Signed by Andr\xc3\xa9.\r\n'
1040            ), ascii(data))
1041
1042    def test_head(self):
1043        # HEAD
1044        resp, info = self.server.head()
1045        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1046        art_num, message_id, lines = info
1047        self.assertEqual(art_num, 3000237)
1048        self.assertEqual(message_id, "<45223423@example.com>")
1049        self._check_article_head(lines)
1050        # HEAD num
1051        resp, info = self.server.head(3000234)
1052        self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1053        art_num, message_id, lines = info
1054        self.assertEqual(art_num, 3000234)
1055        self.assertEqual(message_id, "<45223423@example.com>")
1056        self._check_article_head(lines)
1057        # HEAD id
1058        resp, info = self.server.head("<45223423@example.com>")
1059        self.assertEqual(resp, "221 0 <45223423@example.com>")
1060        art_num, message_id, lines = info
1061        self.assertEqual(art_num, 0)
1062        self.assertEqual(message_id, "<45223423@example.com>")
1063        self._check_article_head(lines)
1064        # Non-existent id
1065        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1066            self.server.head("<non-existent@example.com>")
1067        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1068
1069    def test_head_file(self):
1070        f = io.BytesIO()
1071        resp, info = self.server.head(file=f)
1072        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1073        art_num, message_id, lines = info
1074        self.assertEqual(art_num, 3000237)
1075        self.assertEqual(message_id, "<45223423@example.com>")
1076        self.assertEqual(lines, [])
1077        data = f.getvalue()
1078        self.assertTrue(data.startswith(
1079            b'From: "Demo User" <nobody@example.net>\r\n'
1080            b'Subject: I am just a test article\r\n'
1081            ), ascii(data))
1082        self.assertFalse(data.endswith(
1083            b'This is just a test article.\r\n'
1084            b'.Here is a dot-starting line.\r\n'
1085            b'\r\n'
1086            b'-- Signed by Andr\xc3\xa9.\r\n'
1087            ), ascii(data))
1088
1089    def test_body(self):
1090        # BODY
1091        resp, info = self.server.body()
1092        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1093        art_num, message_id, lines = info
1094        self.assertEqual(art_num, 3000237)
1095        self.assertEqual(message_id, "<45223423@example.com>")
1096        self._check_article_body(lines)
1097        # BODY num
1098        resp, info = self.server.body(3000234)
1099        self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1100        art_num, message_id, lines = info
1101        self.assertEqual(art_num, 3000234)
1102        self.assertEqual(message_id, "<45223423@example.com>")
1103        self._check_article_body(lines)
1104        # BODY id
1105        resp, info = self.server.body("<45223423@example.com>")
1106        self.assertEqual(resp, "222 0 <45223423@example.com>")
1107        art_num, message_id, lines = info
1108        self.assertEqual(art_num, 0)
1109        self.assertEqual(message_id, "<45223423@example.com>")
1110        self._check_article_body(lines)
1111        # Non-existent id
1112        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1113            self.server.body("<non-existent@example.com>")
1114        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1115
1116    def test_body_file(self):
1117        f = io.BytesIO()
1118        resp, info = self.server.body(file=f)
1119        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1120        art_num, message_id, lines = info
1121        self.assertEqual(art_num, 3000237)
1122        self.assertEqual(message_id, "<45223423@example.com>")
1123        self.assertEqual(lines, [])
1124        data = f.getvalue()
1125        self.assertFalse(data.startswith(
1126            b'From: "Demo User" <nobody@example.net>\r\n'
1127            b'Subject: I am just a test article\r\n'
1128            ), ascii(data))
1129        self.assertTrue(data.endswith(
1130            b'This is just a test article.\r\n'
1131            b'.Here is a dot-starting line.\r\n'
1132            b'\r\n'
1133            b'-- Signed by Andr\xc3\xa9.\r\n'
1134            ), ascii(data))
1135
1136    def check_over_xover_resp(self, resp, overviews):
1137        self.assertTrue(resp.startswith("224 "), resp)
1138        self.assertEqual(len(overviews), 3)
1139        art_num, over = overviews[0]
1140        self.assertEqual(art_num, 57)
1141        self.assertEqual(over, {
1142            "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1143            "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1144            "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1145            "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1146            "references": "<hvalf7$ort$1@dough.gmane.org>",
1147            ":bytes": "7103",
1148            ":lines": "16",
1149            "xref": "news.gmane.io gmane.comp.python.authors:57"
1150            })
1151        art_num, over = overviews[1]
1152        self.assertEqual(over["xref"], None)
1153        art_num, over = overviews[2]
1154        self.assertEqual(over["subject"],
1155                         "Re: Message d'erreur incompréhensible (par moi)")
1156
1157    def test_xover(self):
1158        resp, overviews = self.server.xover(57, 59)
1159        self.check_over_xover_resp(resp, overviews)
1160
1161    def test_over(self):
1162        # In NNTP "v1", this will fallback on XOVER
1163        resp, overviews = self.server.over((57, 59))
1164        self.check_over_xover_resp(resp, overviews)
1165
1166    sample_post = (
1167        b'From: "Demo User" <nobody@example.net>\r\n'
1168        b'Subject: I am just a test article\r\n'
1169        b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1170        b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1171        b'\r\n'
1172        b'This is just a test article.\r\n'
1173        b'.Here is a dot-starting line.\r\n'
1174        b'\r\n'
1175        b'-- Signed by Andr\xc3\xa9.\r\n'
1176    )
1177
1178    def _check_posted_body(self):
1179        # Check the raw body as received by the server
1180        lines = self.handler.posted_body
1181        # One additional line for the "." terminator
1182        self.assertEqual(len(lines), 10)
1183        self.assertEqual(lines[-1], b'.\r\n')
1184        self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1185        self.assertEqual(lines[-3], b'\r\n')
1186        self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1187        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1188
1189    def _check_post_ihave_sub(self, func, *args, file_factory):
1190        # First the prepared post with CRLF endings
1191        post = self.sample_post
1192        func_args = args + (file_factory(post),)
1193        self.handler.posted_body = None
1194        resp = func(*func_args)
1195        self._check_posted_body()
1196        # Then the same post with "normal" line endings - they should be
1197        # converted by NNTP.post and NNTP.ihave.
1198        post = self.sample_post.replace(b"\r\n", b"\n")
1199        func_args = args + (file_factory(post),)
1200        self.handler.posted_body = None
1201        resp = func(*func_args)
1202        self._check_posted_body()
1203        return resp
1204
1205    def check_post_ihave(self, func, success_resp, *args):
1206        # With a bytes object
1207        resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1208        self.assertEqual(resp, success_resp)
1209        # With a bytearray object
1210        resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1211        self.assertEqual(resp, success_resp)
1212        # With a file object
1213        resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1214        self.assertEqual(resp, success_resp)
1215        # With an iterable of terminated lines
1216        def iterlines(b):
1217            return iter(b.splitlines(keepends=True))
1218        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1219        self.assertEqual(resp, success_resp)
1220        # With an iterable of non-terminated lines
1221        def iterlines(b):
1222            return iter(b.splitlines(keepends=False))
1223        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1224        self.assertEqual(resp, success_resp)
1225
1226    def test_post(self):
1227        self.check_post_ihave(self.server.post, "240 Article received OK")
1228        self.handler.allow_posting = False
1229        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1230            self.server.post(self.sample_post)
1231        self.assertEqual(cm.exception.response,
1232                         "440 Posting not permitted")
1233
1234    def test_ihave(self):
1235        self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1236                              "<i.am.an.article.you.will.want@example.com>")
1237        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1238            self.server.ihave("<another.message.id>", self.sample_post)
1239        self.assertEqual(cm.exception.response,
1240                         "435 Article not wanted")
1241
1242    def test_too_long_lines(self):
1243        dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1244        self.assertRaises(nntplib.NNTPDataError,
1245                          self.server.newnews, "comp.lang.python", dt)
1246
1247
1248class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1249    """Tests an NNTP v1 server (no capabilities)."""
1250
1251    nntp_version = 1
1252    handler_class = NNTPv1Handler
1253
1254    def test_caps(self):
1255        caps = self.server.getcapabilities()
1256        self.assertEqual(caps, {})
1257        self.assertEqual(self.server.nntp_version, 1)
1258        self.assertEqual(self.server.nntp_implementation, None)
1259
1260
1261class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1262    """Tests an NNTP v2 server (with capabilities)."""
1263
1264    nntp_version = 2
1265    handler_class = NNTPv2Handler
1266
1267    def test_caps(self):
1268        caps = self.server.getcapabilities()
1269        self.assertEqual(caps, {
1270            'VERSION': ['2', '3'],
1271            'IMPLEMENTATION': ['INN', '2.5.1'],
1272            'AUTHINFO': ['USER'],
1273            'HDR': [],
1274            'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1275                     'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1276            'OVER': [],
1277            'POST': [],
1278            'READER': [],
1279            })
1280        self.assertEqual(self.server.nntp_version, 3)
1281        self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
1282
1283
1284class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1285    """Tests a probably NNTP v2 server with capabilities only after login."""
1286
1287    nntp_version = 2
1288    handler_class = CapsAfterLoginNNTPv2Handler
1289
1290    def test_caps_only_after_login(self):
1291        self.assertEqual(self.server._caps, {})
1292        self.server.login('testuser', 'testpw')
1293        self.assertIn('VERSION', self.server._caps)
1294
1295
1296class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1297        unittest.TestCase):
1298    """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1299    that isn't in READER mode by default."""
1300
1301    nntp_version = 2
1302    handler_class = ModeSwitchingNNTPv2Handler
1303
1304    def test_we_are_in_reader_mode_after_connect(self):
1305        self.assertIn('READER', self.server._caps)
1306
1307
1308class MiscTests(unittest.TestCase):
1309
1310    def test_decode_header(self):
1311        def gives(a, b):
1312            self.assertEqual(nntplib.decode_header(a), b)
1313        gives("" , "")
1314        gives("a plain header", "a plain header")
1315        gives(" with extra  spaces ", " with extra  spaces ")
1316        gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1317        gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1318              " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1319              "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1320        gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1321              "Re: problème de matrice")
1322        # A natively utf-8 header (found in the real world!)
1323        gives("Re: Message d'erreur incompréhensible (par moi)",
1324              "Re: Message d'erreur incompréhensible (par moi)")
1325
1326    def test_parse_overview_fmt(self):
1327        # The minimal (default) response
1328        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1329                 "References:", ":bytes", ":lines"]
1330        self.assertEqual(nntplib._parse_overview_fmt(lines),
1331            ["subject", "from", "date", "message-id", "references",
1332             ":bytes", ":lines"])
1333        # The minimal response using alternative names
1334        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1335                 "References:", "Bytes:", "Lines:"]
1336        self.assertEqual(nntplib._parse_overview_fmt(lines),
1337            ["subject", "from", "date", "message-id", "references",
1338             ":bytes", ":lines"])
1339        # Variations in casing
1340        lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1341                 "References:", "BYTES:", "Lines:"]
1342        self.assertEqual(nntplib._parse_overview_fmt(lines),
1343            ["subject", "from", "date", "message-id", "references",
1344             ":bytes", ":lines"])
1345        # First example from RFC 3977
1346        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1347                 "References:", ":bytes", ":lines", "Xref:full",
1348                 "Distribution:full"]
1349        self.assertEqual(nntplib._parse_overview_fmt(lines),
1350            ["subject", "from", "date", "message-id", "references",
1351             ":bytes", ":lines", "xref", "distribution"])
1352        # Second example from RFC 3977
1353        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1354                 "References:", "Bytes:", "Lines:", "Xref:FULL",
1355                 "Distribution:FULL"]
1356        self.assertEqual(nntplib._parse_overview_fmt(lines),
1357            ["subject", "from", "date", "message-id", "references",
1358             ":bytes", ":lines", "xref", "distribution"])
1359        # A classic response from INN
1360        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1361                 "References:", "Bytes:", "Lines:", "Xref:full"]
1362        self.assertEqual(nntplib._parse_overview_fmt(lines),
1363            ["subject", "from", "date", "message-id", "references",
1364             ":bytes", ":lines", "xref"])
1365
1366    def test_parse_overview(self):
1367        fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1368        # First example from RFC 3977
1369        lines = [
1370            '3000234\tI am just a test article\t"Demo User" '
1371            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1372            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1373            '17\tXref: news.example.com misc.test:3000363',
1374        ]
1375        overview = nntplib._parse_overview(lines, fmt)
1376        (art_num, fields), = overview
1377        self.assertEqual(art_num, 3000234)
1378        self.assertEqual(fields, {
1379            'subject': 'I am just a test article',
1380            'from': '"Demo User" <nobody@example.com>',
1381            'date': '6 Oct 1998 04:38:40 -0500',
1382            'message-id': '<45223423@example.com>',
1383            'references': '<45454@example.net>',
1384            ':bytes': '1234',
1385            ':lines': '17',
1386            'xref': 'news.example.com misc.test:3000363',
1387        })
1388        # Second example; here the "Xref" field is totally absent (including
1389        # the header name) and comes out as None
1390        lines = [
1391            '3000234\tI am just a test article\t"Demo User" '
1392            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1393            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1394            '17\t\t',
1395        ]
1396        overview = nntplib._parse_overview(lines, fmt)
1397        (art_num, fields), = overview
1398        self.assertEqual(fields['xref'], None)
1399        # Third example; the "Xref" is an empty string, while "references"
1400        # is a single space.
1401        lines = [
1402            '3000234\tI am just a test article\t"Demo User" '
1403            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1404            '<45223423@example.com>\t \t1234\t'
1405            '17\tXref: \t',
1406        ]
1407        overview = nntplib._parse_overview(lines, fmt)
1408        (art_num, fields), = overview
1409        self.assertEqual(fields['references'], ' ')
1410        self.assertEqual(fields['xref'], '')
1411
1412    def test_parse_datetime(self):
1413        def gives(a, b, *c):
1414            self.assertEqual(nntplib._parse_datetime(a, b),
1415                             datetime.datetime(*c))
1416        # Output of DATE command
1417        gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1418        # Variations
1419        gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1420        gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1421        gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1422
1423    def test_unparse_datetime(self):
1424        # Test non-legacy mode
1425        # 1) with a datetime
1426        def gives(y, M, d, h, m, s, date_str, time_str):
1427            dt = datetime.datetime(y, M, d, h, m, s)
1428            self.assertEqual(nntplib._unparse_datetime(dt),
1429                             (date_str, time_str))
1430            self.assertEqual(nntplib._unparse_datetime(dt, False),
1431                             (date_str, time_str))
1432        gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1433        gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1434        gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1435        # 2) with a date
1436        def gives(y, M, d, date_str, time_str):
1437            dt = datetime.date(y, M, d)
1438            self.assertEqual(nntplib._unparse_datetime(dt),
1439                             (date_str, time_str))
1440            self.assertEqual(nntplib._unparse_datetime(dt, False),
1441                             (date_str, time_str))
1442        gives(1999, 6, 23, "19990623", "000000")
1443        gives(2000, 6, 23, "20000623", "000000")
1444        gives(2010, 6, 5, "20100605", "000000")
1445
1446    def test_unparse_datetime_legacy(self):
1447        # Test legacy mode (RFC 977)
1448        # 1) with a datetime
1449        def gives(y, M, d, h, m, s, date_str, time_str):
1450            dt = datetime.datetime(y, M, d, h, m, s)
1451            self.assertEqual(nntplib._unparse_datetime(dt, True),
1452                             (date_str, time_str))
1453        gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1454        gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1455        gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1456        # 2) with a date
1457        def gives(y, M, d, date_str, time_str):
1458            dt = datetime.date(y, M, d)
1459            self.assertEqual(nntplib._unparse_datetime(dt, True),
1460                             (date_str, time_str))
1461        gives(1999, 6, 23, "990623", "000000")
1462        gives(2000, 6, 23, "000623", "000000")
1463        gives(2010, 6, 5, "100605", "000000")
1464
1465    @unittest.skipUnless(ssl, 'requires SSL support')
1466    def test_ssl_support(self):
1467        self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
1468
1469
1470class PublicAPITests(unittest.TestCase):
1471    """Ensures that the correct values are exposed in the public API."""
1472
1473    def test_module_all_attribute(self):
1474        self.assertTrue(hasattr(nntplib, '__all__'))
1475        target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1476                      'NNTPTemporaryError', 'NNTPPermanentError',
1477                      'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1478        if ssl is not None:
1479            target_api.append('NNTP_SSL')
1480        self.assertEqual(set(nntplib.__all__), set(target_api))
1481
1482class MockSocketTests(unittest.TestCase):
1483    """Tests involving a mock socket object
1484
1485    Used where the _NNTPServerIO file object is not enough."""
1486
1487    nntp_class = nntplib.NNTP
1488
1489    def check_constructor_error_conditions(
1490            self, handler_class,
1491            expected_error_type, expected_error_msg,
1492            login=None, password=None):
1493
1494        class mock_socket_module:
1495            def create_connection(address, timeout):
1496                return MockSocket()
1497
1498        class MockSocket:
1499            def close(self):
1500                nonlocal socket_closed
1501                socket_closed = True
1502
1503            def makefile(socket, mode):
1504                handler = handler_class()
1505                _, file = make_mock_file(handler)
1506                files.append(file)
1507                return file
1508
1509        socket_closed = False
1510        files = []
1511        with patch('nntplib.socket', mock_socket_module), \
1512             self.assertRaisesRegex(expected_error_type, expected_error_msg):
1513            self.nntp_class('dummy', user=login, password=password)
1514        self.assertTrue(socket_closed)
1515        for f in files:
1516            self.assertTrue(f.closed)
1517
1518    def test_bad_welcome(self):
1519        #Test a bad welcome message
1520        class Handler(NNTPv1Handler):
1521            welcome = 'Bad Welcome'
1522        self.check_constructor_error_conditions(
1523            Handler, nntplib.NNTPProtocolError, Handler.welcome)
1524
1525    def test_service_temporarily_unavailable(self):
1526        #Test service temporarily unavailable
1527        class Handler(NNTPv1Handler):
1528            welcome = '400 Service temporarily unavailable'
1529        self.check_constructor_error_conditions(
1530            Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1531
1532    def test_service_permanently_unavailable(self):
1533        #Test service permanently unavailable
1534        class Handler(NNTPv1Handler):
1535            welcome = '502 Service permanently unavailable'
1536        self.check_constructor_error_conditions(
1537            Handler, nntplib.NNTPPermanentError, Handler.welcome)
1538
1539    def test_bad_capabilities(self):
1540        #Test a bad capabilities response
1541        class Handler(NNTPv1Handler):
1542            def handle_CAPABILITIES(self):
1543                self.push_lit(capabilities_response)
1544        capabilities_response = '201 bad capability'
1545        self.check_constructor_error_conditions(
1546            Handler, nntplib.NNTPReplyError, capabilities_response)
1547
1548    def test_login_aborted(self):
1549        #Test a bad authinfo response
1550        login = 't@e.com'
1551        password = 'python'
1552        class Handler(NNTPv1Handler):
1553            def handle_AUTHINFO(self, *args):
1554                self.push_lit(authinfo_response)
1555        authinfo_response = '503 Mechanism not recognized'
1556        self.check_constructor_error_conditions(
1557            Handler, nntplib.NNTPPermanentError, authinfo_response,
1558            login, password)
1559
1560class bypass_context:
1561    """Bypass encryption and actual SSL module"""
1562    def wrap_socket(sock, **args):
1563        return sock
1564
1565@unittest.skipUnless(ssl, 'requires SSL support')
1566class MockSslTests(MockSocketTests):
1567    @staticmethod
1568    def nntp_class(*pos, **kw):
1569        return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
1570
1571
1572class LocalServerTests(unittest.TestCase):
1573    def setUp(self):
1574        sock = socket.socket()
1575        port = socket_helper.bind_port(sock)
1576        sock.listen()
1577        self.background = threading.Thread(
1578            target=self.run_server, args=(sock,))
1579        self.background.start()
1580        self.addCleanup(self.background.join)
1581
1582        self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
1583        self.addCleanup(self.nntp.__exit__, None, None, None)
1584
1585    def run_server(self, sock):
1586        # Could be generalized to handle more commands in separate methods
1587        with sock:
1588            [client, _] = sock.accept()
1589        with contextlib.ExitStack() as cleanup:
1590            cleanup.enter_context(client)
1591            reader = cleanup.enter_context(client.makefile('rb'))
1592            client.sendall(b'200 Server ready\r\n')
1593            while True:
1594                cmd = reader.readline()
1595                if cmd == b'CAPABILITIES\r\n':
1596                    client.sendall(
1597                        b'101 Capability list:\r\n'
1598                        b'VERSION 2\r\n'
1599                        b'STARTTLS\r\n'
1600                        b'.\r\n'
1601                    )
1602                elif cmd == b'STARTTLS\r\n':
1603                    reader.close()
1604                    client.sendall(b'382 Begin TLS negotiation now\r\n')
1605                    context = ssl.SSLContext()
1606                    context.load_cert_chain(certfile)
1607                    client = context.wrap_socket(
1608                        client, server_side=True)
1609                    cleanup.enter_context(client)
1610                    reader = cleanup.enter_context(client.makefile('rb'))
1611                elif cmd == b'QUIT\r\n':
1612                    client.sendall(b'205 Bye!\r\n')
1613                    break
1614                else:
1615                    raise ValueError('Unexpected command {!r}'.format(cmd))
1616
1617    @unittest.skipUnless(ssl, 'requires SSL support')
1618    def test_starttls(self):
1619        file = self.nntp.file
1620        sock = self.nntp.sock
1621        self.nntp.starttls()
1622        # Check that the socket and internal pseudo-file really were
1623        # changed.
1624        self.assertNotEqual(file, self.nntp.file)
1625        self.assertNotEqual(sock, self.nntp.sock)
1626        # Check that the new socket really is an SSL one
1627        self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1628        # Check that trying starttls when it's already active fails.
1629        self.assertRaises(ValueError, self.nntp.starttls)
1630
1631
1632if __name__ == "__main__":
1633    unittest.main()
1634