• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import socket
3import datetime
4import textwrap
5import unittest
6import functools
7import contextlib
8import nntplib
9import os.path
10import re
11import threading
12
13from test import support
14from test.support import socket_helper
15from nntplib import NNTP, GroupInfo
16from unittest.mock import patch
17try:
18    import ssl
19except ImportError:
20    ssl = None
21
22
23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
24
25if ssl is not None:
26    SSLError = ssl.SSLError
27else:
28    class SSLError(Exception):
29        """Non-existent exception class when we lack SSL support."""
30        reason = "This will never be raised."
31
32# TODO:
33# - test the `file` arg to more commands
34# - test error conditions
35# - test auth and `usenetrc`
36
37
38class NetworkedNNTPTestsMixin:
39
40    ssl_context = None
41
42    def test_welcome(self):
43        welcome = self.server.getwelcome()
44        self.assertEqual(str, type(welcome))
45
46    def test_help(self):
47        resp, lines = self.server.help()
48        self.assertTrue(resp.startswith("100 "), resp)
49        for line in lines:
50            self.assertEqual(str, type(line))
51
52    def test_list(self):
53        resp, groups = self.server.list()
54        if len(groups) > 0:
55            self.assertEqual(GroupInfo, type(groups[0]))
56            self.assertEqual(str, type(groups[0].group))
57
58    def test_list_active(self):
59        resp, groups = self.server.list(self.GROUP_PAT)
60        if len(groups) > 0:
61            self.assertEqual(GroupInfo, type(groups[0]))
62            self.assertEqual(str, type(groups[0].group))
63
64    def test_unknown_command(self):
65        with self.assertRaises(nntplib.NNTPPermanentError) as cm:
66            self.server._shortcmd("XYZZY")
67        resp = cm.exception.response
68        self.assertTrue(resp.startswith("500 "), resp)
69
70    def test_newgroups(self):
71        # gmane gets a constant influx of new groups.  In order not to stress
72        # the server too much, we choose a recent date in the past.
73        dt = datetime.date.today() - datetime.timedelta(days=7)
74        resp, groups = self.server.newgroups(dt)
75        if len(groups) > 0:
76            self.assertIsInstance(groups[0], GroupInfo)
77            self.assertIsInstance(groups[0].group, str)
78
79    def test_description(self):
80        def _check_desc(desc):
81            # Sanity checks
82            self.assertIsInstance(desc, str)
83            self.assertNotIn(self.GROUP_NAME, desc)
84        desc = self.server.description(self.GROUP_NAME)
85        _check_desc(desc)
86        # Another sanity check
87        self.assertIn(self.DESC, desc)
88        # With a pattern
89        desc = self.server.description(self.GROUP_PAT)
90        _check_desc(desc)
91        # Shouldn't exist
92        desc = self.server.description("zk.brrtt.baz")
93        self.assertEqual(desc, '')
94
95    def test_descriptions(self):
96        resp, descs = self.server.descriptions(self.GROUP_PAT)
97        # 215 for LIST NEWSGROUPS, 282 for XGTITLE
98        self.assertTrue(
99            resp.startswith("215 ") or resp.startswith("282 "), resp)
100        self.assertIsInstance(descs, dict)
101        desc = descs[self.GROUP_NAME]
102        self.assertEqual(desc, self.server.description(self.GROUP_NAME))
103
104    def test_group(self):
105        result = self.server.group(self.GROUP_NAME)
106        self.assertEqual(5, len(result))
107        resp, count, first, last, group = result
108        self.assertEqual(group, self.GROUP_NAME)
109        self.assertIsInstance(count, int)
110        self.assertIsInstance(first, int)
111        self.assertIsInstance(last, int)
112        self.assertLessEqual(first, last)
113        self.assertTrue(resp.startswith("211 "), resp)
114
115    def test_date(self):
116        resp, date = self.server.date()
117        self.assertIsInstance(date, datetime.datetime)
118        # Sanity check
119        self.assertGreaterEqual(date.year, 1995)
120        self.assertLessEqual(date.year, 2030)
121
122    def _check_art_dict(self, art_dict):
123        # Some sanity checks for a field dictionary returned by OVER / XOVER
124        self.assertIsInstance(art_dict, dict)
125        # NNTP has 7 mandatory fields
126        self.assertGreaterEqual(art_dict.keys(),
127            {"subject", "from", "date", "message-id",
128             "references", ":bytes", ":lines"}
129            )
130        for v in art_dict.values():
131            self.assertIsInstance(v, (str, type(None)))
132
133    def test_xover(self):
134        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
135        resp, lines = self.server.xover(last - 5, last)
136        if len(lines) == 0:
137            self.skipTest("no articles retrieved")
138        # The 'last' article is not necessarily part of the output (cancelled?)
139        art_num, art_dict = lines[0]
140        self.assertGreaterEqual(art_num, last - 5)
141        self.assertLessEqual(art_num, last)
142        self._check_art_dict(art_dict)
143
144    @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
145                           ' is found for issue #28971')
146    def test_over(self):
147        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
148        start = last - 10
149        # The "start-" article range form
150        resp, lines = self.server.over((start, None))
151        art_num, art_dict = lines[0]
152        self._check_art_dict(art_dict)
153        # The "start-end" article range form
154        resp, lines = self.server.over((start, last))
155        art_num, art_dict = lines[-1]
156        # The 'last' article is not necessarily part of the output (cancelled?)
157        self.assertGreaterEqual(art_num, start)
158        self.assertLessEqual(art_num, last)
159        self._check_art_dict(art_dict)
160        # XXX The "message_id" form is unsupported by gmane
161        # 503 Overview by message-ID unsupported
162
163    def test_xhdr(self):
164        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
165        resp, lines = self.server.xhdr('subject', last)
166        for line in lines:
167            self.assertEqual(str, type(line[1]))
168
169    def check_article_resp(self, resp, article, art_num=None):
170        self.assertIsInstance(article, nntplib.ArticleInfo)
171        if art_num is not None:
172            self.assertEqual(article.number, art_num)
173        for line in article.lines:
174            self.assertIsInstance(line, bytes)
175        # XXX this could exceptionally happen...
176        self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
177
178    @unittest.skipIf(True, "FIXME: see bpo-32128")
179    def test_article_head_body(self):
180        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
181        # Try to find an available article
182        for art_num in (last, first, last - 1):
183            try:
184                resp, head = self.server.head(art_num)
185            except nntplib.NNTPTemporaryError as e:
186                if not e.response.startswith("423 "):
187                    raise
188                # "423 No such article" => choose another one
189                continue
190            break
191        else:
192            self.skipTest("could not find a suitable article number")
193        self.assertTrue(resp.startswith("221 "), resp)
194        self.check_article_resp(resp, head, art_num)
195        resp, body = self.server.body(art_num)
196        self.assertTrue(resp.startswith("222 "), resp)
197        self.check_article_resp(resp, body, art_num)
198        resp, article = self.server.article(art_num)
199        self.assertTrue(resp.startswith("220 "), resp)
200        self.check_article_resp(resp, article, art_num)
201        # Tolerate running the tests from behind a NNTP virus checker
202        denylist = lambda line: line.startswith(b'X-Antivirus')
203        filtered_head_lines = [line for line in head.lines
204                               if not denylist(line)]
205        filtered_lines = [line for line in article.lines
206                          if not denylist(line)]
207        self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
208
209    def test_capabilities(self):
210        # The server under test implements NNTP version 2 and has a
211        # couple of well-known capabilities. Just sanity check that we
212        # got them.
213        def _check_caps(caps):
214            caps_list = caps['LIST']
215            self.assertIsInstance(caps_list, (list, tuple))
216            self.assertIn('OVERVIEW.FMT', caps_list)
217        self.assertGreaterEqual(self.server.nntp_version, 2)
218        _check_caps(self.server.getcapabilities())
219        # This re-emits the command
220        resp, caps = self.server.capabilities()
221        _check_caps(caps)
222
223    def test_zlogin(self):
224        # This test must be the penultimate because further commands will be
225        # refused.
226        baduser = "notarealuser"
227        badpw = "notarealpassword"
228        # Check that bogus credentials cause failure
229        self.assertRaises(nntplib.NNTPError, self.server.login,
230                          user=baduser, password=badpw, usenetrc=False)
231        # FIXME: We should check that correct credentials succeed, but that
232        # would require valid details for some server somewhere to be in the
233        # test suite, I think. Gmane is anonymous, at least as used for the
234        # other tests.
235
236    def test_zzquit(self):
237        # This test must be called last, hence the name
238        cls = type(self)
239        try:
240            self.server.quit()
241        finally:
242            cls.server = None
243
244    @classmethod
245    def wrap_methods(cls):
246        # Wrap all methods in a transient_internet() exception catcher
247        # XXX put a generic version in test.support?
248        def wrap_meth(meth):
249            @functools.wraps(meth)
250            def wrapped(self):
251                with socket_helper.transient_internet(self.NNTP_HOST):
252                    meth(self)
253            return wrapped
254        for name in dir(cls):
255            if not name.startswith('test_'):
256                continue
257            meth = getattr(cls, name)
258            if not callable(meth):
259                continue
260            # Need to use a closure so that meth remains bound to its current
261            # value
262            setattr(cls, name, wrap_meth(meth))
263
264    def test_timeout(self):
265        with self.assertRaises(ValueError):
266            self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
267
268    def test_with_statement(self):
269        def is_connected():
270            if not hasattr(server, 'file'):
271                return False
272            try:
273                server.help()
274            except (OSError, EOFError):
275                return False
276            return True
277
278        kwargs = dict(
279            timeout=support.INTERNET_TIMEOUT,
280            usenetrc=False
281        )
282        if self.ssl_context is not None:
283            kwargs["ssl_context"] = self.ssl_context
284
285        try:
286            server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
287            with server:
288                self.assertTrue(is_connected())
289                self.assertTrue(server.help())
290            self.assertFalse(is_connected())
291
292            server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
293            with server:
294                server.quit()
295            self.assertFalse(is_connected())
296        except SSLError as ssl_err:
297            # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
298            if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
299                raise unittest.SkipTest(f"Got {ssl_err} connecting "
300                                        f"to {self.NNTP_HOST!r}")
301            raise
302
303
304NetworkedNNTPTestsMixin.wrap_methods()
305
306
307EOF_ERRORS = (EOFError,)
308if ssl is not None:
309    EOF_ERRORS += (ssl.SSLEOFError,)
310
311
312class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
313    # This server supports STARTTLS (gmane doesn't)
314    NNTP_HOST = 'news.trigofacile.com'
315    GROUP_NAME = 'fr.comp.lang.python'
316    GROUP_PAT = 'fr.comp.lang.*'
317    DESC = 'Python'
318
319    NNTP_CLASS = NNTP
320
321    @classmethod
322    def setUpClass(cls):
323        support.requires("network")
324        kwargs = dict(
325            timeout=support.INTERNET_TIMEOUT,
326            usenetrc=False
327        )
328        if cls.ssl_context is not None:
329            kwargs["ssl_context"] = cls.ssl_context
330        with socket_helper.transient_internet(cls.NNTP_HOST):
331            try:
332                cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs)
333            except SSLError as ssl_err:
334                # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
335                if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
336                    raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
337                                            f"to {cls.NNTP_HOST!r}")
338                print(cls.NNTP_HOST)
339                raise
340            except EOF_ERRORS:
341                raise unittest.SkipTest(f"{cls} got EOF error on connecting "
342                                        f"to {cls.NNTP_HOST!r}")
343
344    @classmethod
345    def tearDownClass(cls):
346        if cls.server is not None:
347            cls.server.quit()
348
349@unittest.skipUnless(ssl, 'requires SSL support')
350class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
351
352    # Technical limits for this public NNTP server (see http://www.aioe.org):
353    # "Only two concurrent connections per IP address are allowed and
354    # 400 connections per day are accepted from each IP address."
355
356    NNTP_HOST = 'nntp.aioe.org'
357    # bpo-42794: aioe.test is one of the official groups on this server
358    # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
359    GROUP_NAME = 'aioe.test'
360    GROUP_PAT = 'aioe.*'
361    DESC = 'test'
362
363    NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
364
365    # Disabled as it produces too much data
366    test_list = None
367
368    # Disabled as the connection will already be encrypted.
369    test_starttls = None
370
371    if ssl is not None:
372        ssl_context = ssl._create_unverified_context()
373        ssl_context.set_ciphers("DEFAULT")
374        ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
375
376#
377# Non-networked tests using a local server (or something mocking it).
378#
379
380class _NNTPServerIO(io.RawIOBase):
381    """A raw IO object allowing NNTP commands to be received and processed
382    by a handler.  The handler can push responses which can then be read
383    from the IO object."""
384
385    def __init__(self, handler):
386        io.RawIOBase.__init__(self)
387        # The channel from the client
388        self.c2s = io.BytesIO()
389        # The channel to the client
390        self.s2c = io.BytesIO()
391        self.handler = handler
392        self.handler.start(self.c2s.readline, self.push_data)
393
394    def readable(self):
395        return True
396
397    def writable(self):
398        return True
399
400    def push_data(self, data):
401        """Push (buffer) some data to send to the client."""
402        pos = self.s2c.tell()
403        self.s2c.seek(0, 2)
404        self.s2c.write(data)
405        self.s2c.seek(pos)
406
407    def write(self, b):
408        """The client sends us some data"""
409        pos = self.c2s.tell()
410        self.c2s.write(b)
411        self.c2s.seek(pos)
412        self.handler.process_pending()
413        return len(b)
414
415    def readinto(self, buf):
416        """The client wants to read a response"""
417        self.handler.process_pending()
418        b = self.s2c.read(len(buf))
419        n = len(b)
420        buf[:n] = b
421        return n
422
423
424def make_mock_file(handler):
425    sio = _NNTPServerIO(handler)
426    # Using BufferedRWPair instead of BufferedRandom ensures the file
427    # isn't seekable.
428    file = io.BufferedRWPair(sio, sio)
429    return (sio, file)
430
431
432class NNTPServer(nntplib.NNTP):
433
434    def __init__(self, f, host, readermode=None):
435        self.file = f
436        self.host = host
437        self._base_init(readermode)
438
439    def _close(self):
440        self.file.close()
441        del self.file
442
443
444class MockedNNTPTestsMixin:
445    # Override in derived classes
446    handler_class = None
447
448    def setUp(self):
449        super().setUp()
450        self.make_server()
451
452    def tearDown(self):
453        super().tearDown()
454        del self.server
455
456    def make_server(self, *args, **kwargs):
457        self.handler = self.handler_class()
458        self.sio, file = make_mock_file(self.handler)
459        self.server = NNTPServer(file, 'test.server', *args, **kwargs)
460        return self.server
461
462
463class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
464    def setUp(self):
465        super().setUp()
466        self.make_server(readermode=True)
467
468
469class NNTPv1Handler:
470    """A handler for RFC 977"""
471
472    welcome = "200 NNTP mock server"
473
474    def start(self, readline, push_data):
475        self.in_body = False
476        self.allow_posting = True
477        self._readline = readline
478        self._push_data = push_data
479        self._logged_in = False
480        self._user_sent = False
481        # Our welcome
482        self.handle_welcome()
483
484    def _decode(self, data):
485        return str(data, "utf-8", "surrogateescape")
486
487    def process_pending(self):
488        if self.in_body:
489            while True:
490                line = self._readline()
491                if not line:
492                    return
493                self.body.append(line)
494                if line == b".\r\n":
495                    break
496            try:
497                meth, tokens = self.body_callback
498                meth(*tokens, body=self.body)
499            finally:
500                self.body_callback = None
501                self.body = None
502                self.in_body = False
503        while True:
504            line = self._decode(self._readline())
505            if not line:
506                return
507            if not line.endswith("\r\n"):
508                raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
509            line = line[:-2]
510            cmd, *tokens = line.split()
511            #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
512            meth = getattr(self, "handle_" + cmd.upper(), None)
513            if meth is None:
514                self.handle_unknown()
515            else:
516                try:
517                    meth(*tokens)
518                except Exception as e:
519                    raise ValueError("command failed: {!r}".format(line)) from e
520                else:
521                    if self.in_body:
522                        self.body_callback = meth, tokens
523                        self.body = []
524
525    def expect_body(self):
526        """Flag that the client is expected to post a request body"""
527        self.in_body = True
528
529    def push_data(self, data):
530        """Push some binary data"""
531        self._push_data(data)
532
533    def push_lit(self, lit):
534        """Push a string literal"""
535        lit = textwrap.dedent(lit)
536        lit = "\r\n".join(lit.splitlines()) + "\r\n"
537        lit = lit.encode('utf-8')
538        self.push_data(lit)
539
540    def handle_unknown(self):
541        self.push_lit("500 What?")
542
543    def handle_welcome(self):
544        self.push_lit(self.welcome)
545
546    def handle_QUIT(self):
547        self.push_lit("205 Bye!")
548
549    def handle_DATE(self):
550        self.push_lit("111 20100914001155")
551
552    def handle_GROUP(self, group):
553        if group == "fr.comp.lang.python":
554            self.push_lit("211 486 761 1265 fr.comp.lang.python")
555        else:
556            self.push_lit("411 No such group {}".format(group))
557
558    def handle_HELP(self):
559        self.push_lit("""\
560            100 Legal commands
561              authinfo user Name|pass Password|generic <prog> <args>
562              date
563              help
564            Report problems to <root@example.org>
565            .""")
566
567    def handle_STAT(self, message_spec=None):
568        if message_spec is None:
569            self.push_lit("412 No newsgroup selected")
570        elif message_spec == "3000234":
571            self.push_lit("223 3000234 <45223423@example.com>")
572        elif message_spec == "<45223423@example.com>":
573            self.push_lit("223 0 <45223423@example.com>")
574        else:
575            self.push_lit("430 No Such Article Found")
576
577    def handle_NEXT(self):
578        self.push_lit("223 3000237 <668929@example.org> retrieved")
579
580    def handle_LAST(self):
581        self.push_lit("223 3000234 <45223423@example.com> retrieved")
582
583    def handle_LIST(self, action=None, param=None):
584        if action is None:
585            self.push_lit("""\
586                215 Newsgroups in form "group high low flags".
587                comp.lang.python 0000052340 0000002828 y
588                comp.lang.python.announce 0000001153 0000000993 m
589                free.it.comp.lang.python 0000000002 0000000002 y
590                fr.comp.lang.python 0000001254 0000000760 y
591                free.it.comp.lang.python.learner 0000000000 0000000001 y
592                tw.bbs.comp.lang.python 0000000304 0000000304 y
593                .""")
594        elif action == "ACTIVE":
595            if param == "*distutils*":
596                self.push_lit("""\
597                    215 Newsgroups in form "group high low flags"
598                    gmane.comp.python.distutils.devel 0000014104 0000000001 m
599                    gmane.comp.python.distutils.cvs 0000000000 0000000001 m
600                    .""")
601            else:
602                self.push_lit("""\
603                    215 Newsgroups in form "group high low flags"
604                    .""")
605        elif action == "OVERVIEW.FMT":
606            self.push_lit("""\
607                215 Order of fields in overview database.
608                Subject:
609                From:
610                Date:
611                Message-ID:
612                References:
613                Bytes:
614                Lines:
615                Xref:full
616                .""")
617        elif action == "NEWSGROUPS":
618            assert param is not None
619            if param == "comp.lang.python":
620                self.push_lit("""\
621                    215 Descriptions in form "group description".
622                    comp.lang.python\tThe Python computer language.
623                    .""")
624            elif param == "comp.lang.python*":
625                self.push_lit("""\
626                    215 Descriptions in form "group description".
627                    comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
628                    comp.lang.python\tThe Python computer language.
629                    .""")
630            else:
631                self.push_lit("""\
632                    215 Descriptions in form "group description".
633                    .""")
634        else:
635            self.push_lit('501 Unknown LIST keyword')
636
637    def handle_NEWNEWS(self, group, date_str, time_str):
638        # We hard code different return messages depending on passed
639        # argument and date syntax.
640        if (group == "comp.lang.python" and date_str == "20100913"
641            and time_str == "082004"):
642            # Date was passed in RFC 3977 format (NNTP "v2")
643            self.push_lit("""\
644                230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
645                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
646                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
647                .""")
648        elif (group == "comp.lang.python" and date_str == "100913"
649            and time_str == "082004"):
650            # Date was passed in RFC 977 format (NNTP "v1")
651            self.push_lit("""\
652                230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
653                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
654                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
655                .""")
656        elif (group == 'comp.lang.python' and
657              date_str in ('20100101', '100101') and
658              time_str == '090000'):
659            self.push_lit('too long line' * 3000 +
660                          '\n.')
661        else:
662            self.push_lit("""\
663                230 An empty list of newsarticles follows
664                .""")
665        # (Note for experiments: many servers disable NEWNEWS.
666        #  As of this writing, sicinfo3.epfl.ch doesn't.)
667
668    def handle_XOVER(self, message_spec):
669        if message_spec == "57-59":
670            self.push_lit(
671                "224 Overview information for 57-58 follows\n"
672                "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
673                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
674                    "\tSat, 19 Jun 2010 18:04:08 -0400"
675                    "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
676                    "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
677                    "\tXref: news.gmane.io gmane.comp.python.authors:57"
678                    "\n"
679                "58\tLooking for a few good bloggers"
680                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
681                    "\tThu, 22 Jul 2010 09:14:14 -0400"
682                    "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
683                    "\t\t6683\t16"
684                    "\t"
685                    "\n"
686                # A UTF-8 overview line from fr.comp.lang.python
687                "59\tRe: Message d'erreur incompréhensible (par moi)"
688                    "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
689                    "\tWed, 15 Sep 2010 18:09:15 +0200"
690                    "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
691                    "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
692                    "\tXref: saria.nerim.net fr.comp.lang.python:1265"
693                    "\n"
694                ".\n")
695        else:
696            self.push_lit("""\
697                224 No articles
698                .""")
699
700    def handle_POST(self, *, body=None):
701        if body is None:
702            if self.allow_posting:
703                self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
704                self.expect_body()
705            else:
706                self.push_lit("440 Posting not permitted")
707        else:
708            assert self.allow_posting
709            self.push_lit("240 Article received OK")
710            self.posted_body = body
711
712    def handle_IHAVE(self, message_id, *, body=None):
713        if body is None:
714            if (self.allow_posting and
715                message_id == "<i.am.an.article.you.will.want@example.com>"):
716                self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
717                self.expect_body()
718            else:
719                self.push_lit("435 Article not wanted")
720        else:
721            assert self.allow_posting
722            self.push_lit("235 Article transferred OK")
723            self.posted_body = body
724
725    sample_head = """\
726        From: "Demo User" <nobody@example.net>
727        Subject: I am just a test article
728        Content-Type: text/plain; charset=UTF-8; format=flowed
729        Message-ID: <i.am.an.article.you.will.want@example.com>"""
730
731    sample_body = """\
732        This is just a test article.
733        ..Here is a dot-starting line.
734
735        -- Signed by Andr\xe9."""
736
737    sample_article = sample_head + "\n\n" + sample_body
738
739    def handle_ARTICLE(self, message_spec=None):
740        if message_spec is None:
741            self.push_lit("220 3000237 <45223423@example.com>")
742        elif message_spec == "<45223423@example.com>":
743            self.push_lit("220 0 <45223423@example.com>")
744        elif message_spec == "3000234":
745            self.push_lit("220 3000234 <45223423@example.com>")
746        else:
747            self.push_lit("430 No Such Article Found")
748            return
749        self.push_lit(self.sample_article)
750        self.push_lit(".")
751
752    def handle_HEAD(self, message_spec=None):
753        if message_spec is None:
754            self.push_lit("221 3000237 <45223423@example.com>")
755        elif message_spec == "<45223423@example.com>":
756            self.push_lit("221 0 <45223423@example.com>")
757        elif message_spec == "3000234":
758            self.push_lit("221 3000234 <45223423@example.com>")
759        else:
760            self.push_lit("430 No Such Article Found")
761            return
762        self.push_lit(self.sample_head)
763        self.push_lit(".")
764
765    def handle_BODY(self, message_spec=None):
766        if message_spec is None:
767            self.push_lit("222 3000237 <45223423@example.com>")
768        elif message_spec == "<45223423@example.com>":
769            self.push_lit("222 0 <45223423@example.com>")
770        elif message_spec == "3000234":
771            self.push_lit("222 3000234 <45223423@example.com>")
772        else:
773            self.push_lit("430 No Such Article Found")
774            return
775        self.push_lit(self.sample_body)
776        self.push_lit(".")
777
778    def handle_AUTHINFO(self, cred_type, data):
779        if self._logged_in:
780            self.push_lit('502 Already Logged In')
781        elif cred_type == 'user':
782            if self._user_sent:
783                self.push_lit('482 User Credential Already Sent')
784            else:
785                self.push_lit('381 Password Required')
786                self._user_sent = True
787        elif cred_type == 'pass':
788            self.push_lit('281 Login Successful')
789            self._logged_in = True
790        else:
791            raise Exception('Unknown cred type {}'.format(cred_type))
792
793
794class NNTPv2Handler(NNTPv1Handler):
795    """A handler for RFC 3977 (NNTP "v2")"""
796
797    def handle_CAPABILITIES(self):
798        fmt = """\
799            101 Capability list:
800            VERSION 2 3
801            IMPLEMENTATION INN 2.5.1{}
802            HDR
803            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
804            OVER
805            POST
806            READER
807            ."""
808
809        if not self._logged_in:
810            self.push_lit(fmt.format('\n            AUTHINFO USER'))
811        else:
812            self.push_lit(fmt.format(''))
813
814    def handle_MODE(self, _):
815        raise Exception('MODE READER sent despite READER has been advertised')
816
817    def handle_OVER(self, message_spec=None):
818        return self.handle_XOVER(message_spec)
819
820
821class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
822    """A handler that allows CAPABILITIES only after login"""
823
824    def handle_CAPABILITIES(self):
825        if not self._logged_in:
826            self.push_lit('480 You must log in.')
827        else:
828            super().handle_CAPABILITIES()
829
830
831class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
832    """A server that starts in transit mode"""
833
834    def __init__(self):
835        self._switched = False
836
837    def handle_CAPABILITIES(self):
838        fmt = """\
839            101 Capability list:
840            VERSION 2 3
841            IMPLEMENTATION INN 2.5.1
842            HDR
843            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
844            OVER
845            POST
846            {}READER
847            ."""
848        if self._switched:
849            self.push_lit(fmt.format(''))
850        else:
851            self.push_lit(fmt.format('MODE-'))
852
853    def handle_MODE(self, what):
854        assert not self._switched and what == 'reader'
855        self._switched = True
856        self.push_lit('200 Posting allowed')
857
858
859class NNTPv1v2TestsMixin:
860
861    def setUp(self):
862        super().setUp()
863
864    def test_welcome(self):
865        self.assertEqual(self.server.welcome, self.handler.welcome)
866
867    def test_authinfo(self):
868        if self.nntp_version == 2:
869            self.assertIn('AUTHINFO', self.server._caps)
870        self.server.login('testuser', 'testpw')
871        # if AUTHINFO is gone from _caps we also know that getcapabilities()
872        # has been called after login as it should
873        self.assertNotIn('AUTHINFO', self.server._caps)
874
875    def test_date(self):
876        resp, date = self.server.date()
877        self.assertEqual(resp, "111 20100914001155")
878        self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
879
880    def test_quit(self):
881        self.assertFalse(self.sio.closed)
882        resp = self.server.quit()
883        self.assertEqual(resp, "205 Bye!")
884        self.assertTrue(self.sio.closed)
885
886    def test_help(self):
887        resp, help = self.server.help()
888        self.assertEqual(resp, "100 Legal commands")
889        self.assertEqual(help, [
890            '  authinfo user Name|pass Password|generic <prog> <args>',
891            '  date',
892            '  help',
893            'Report problems to <root@example.org>',
894        ])
895
896    def test_list(self):
897        resp, groups = self.server.list()
898        self.assertEqual(len(groups), 6)
899        g = groups[1]
900        self.assertEqual(g,
901            GroupInfo("comp.lang.python.announce", "0000001153",
902                      "0000000993", "m"))
903        resp, groups = self.server.list("*distutils*")
904        self.assertEqual(len(groups), 2)
905        g = groups[0]
906        self.assertEqual(g,
907            GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
908                      "0000000001", "m"))
909
910    def test_stat(self):
911        resp, art_num, message_id = self.server.stat(3000234)
912        self.assertEqual(resp, "223 3000234 <45223423@example.com>")
913        self.assertEqual(art_num, 3000234)
914        self.assertEqual(message_id, "<45223423@example.com>")
915        resp, art_num, message_id = self.server.stat("<45223423@example.com>")
916        self.assertEqual(resp, "223 0 <45223423@example.com>")
917        self.assertEqual(art_num, 0)
918        self.assertEqual(message_id, "<45223423@example.com>")
919        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
920            self.server.stat("<non.existent.id>")
921        self.assertEqual(cm.exception.response, "430 No Such Article Found")
922        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
923            self.server.stat()
924        self.assertEqual(cm.exception.response, "412 No newsgroup selected")
925
926    def test_next(self):
927        resp, art_num, message_id = self.server.next()
928        self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
929        self.assertEqual(art_num, 3000237)
930        self.assertEqual(message_id, "<668929@example.org>")
931
932    def test_last(self):
933        resp, art_num, message_id = self.server.last()
934        self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
935        self.assertEqual(art_num, 3000234)
936        self.assertEqual(message_id, "<45223423@example.com>")
937
938    def test_description(self):
939        desc = self.server.description("comp.lang.python")
940        self.assertEqual(desc, "The Python computer language.")
941        desc = self.server.description("comp.lang.pythonx")
942        self.assertEqual(desc, "")
943
944    def test_descriptions(self):
945        resp, groups = self.server.descriptions("comp.lang.python")
946        self.assertEqual(resp, '215 Descriptions in form "group description".')
947        self.assertEqual(groups, {
948            "comp.lang.python": "The Python computer language.",
949            })
950        resp, groups = self.server.descriptions("comp.lang.python*")
951        self.assertEqual(groups, {
952            "comp.lang.python": "The Python computer language.",
953            "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
954            })
955        resp, groups = self.server.descriptions("comp.lang.pythonx")
956        self.assertEqual(groups, {})
957
958    def test_group(self):
959        resp, count, first, last, group = self.server.group("fr.comp.lang.python")
960        self.assertTrue(resp.startswith("211 "), resp)
961        self.assertEqual(first, 761)
962        self.assertEqual(last, 1265)
963        self.assertEqual(count, 486)
964        self.assertEqual(group, "fr.comp.lang.python")
965        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
966            self.server.group("comp.lang.python.devel")
967        exc = cm.exception
968        self.assertTrue(exc.response.startswith("411 No such group"),
969                        exc.response)
970
971    def test_newnews(self):
972        # NEWNEWS comp.lang.python [20]100913 082004
973        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
974        resp, ids = self.server.newnews("comp.lang.python", dt)
975        expected = (
976            "230 list of newsarticles (NNTP v{0}) "
977            "created after Mon Sep 13 08:20:04 2010 follows"
978            ).format(self.nntp_version)
979        self.assertEqual(resp, expected)
980        self.assertEqual(ids, [
981            "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
982            "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
983            ])
984        # NEWNEWS fr.comp.lang.python [20]100913 082004
985        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
986        resp, ids = self.server.newnews("fr.comp.lang.python", dt)
987        self.assertEqual(resp, "230 An empty list of newsarticles follows")
988        self.assertEqual(ids, [])
989
990    def _check_article_body(self, lines):
991        self.assertEqual(len(lines), 4)
992        self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
993        self.assertEqual(lines[-2], b"")
994        self.assertEqual(lines[-3], b".Here is a dot-starting line.")
995        self.assertEqual(lines[-4], b"This is just a test article.")
996
997    def _check_article_head(self, lines):
998        self.assertEqual(len(lines), 4)
999        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
1000        self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
1001
1002    def _check_article_data(self, lines):
1003        self.assertEqual(len(lines), 9)
1004        self._check_article_head(lines[:4])
1005        self._check_article_body(lines[-4:])
1006        self.assertEqual(lines[4], b"")
1007
1008    def test_article(self):
1009        # ARTICLE
1010        resp, info = self.server.article()
1011        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1012        art_num, message_id, lines = info
1013        self.assertEqual(art_num, 3000237)
1014        self.assertEqual(message_id, "<45223423@example.com>")
1015        self._check_article_data(lines)
1016        # ARTICLE num
1017        resp, info = self.server.article(3000234)
1018        self.assertEqual(resp, "220 3000234 <45223423@example.com>")
1019        art_num, message_id, lines = info
1020        self.assertEqual(art_num, 3000234)
1021        self.assertEqual(message_id, "<45223423@example.com>")
1022        self._check_article_data(lines)
1023        # ARTICLE id
1024        resp, info = self.server.article("<45223423@example.com>")
1025        self.assertEqual(resp, "220 0 <45223423@example.com>")
1026        art_num, message_id, lines = info
1027        self.assertEqual(art_num, 0)
1028        self.assertEqual(message_id, "<45223423@example.com>")
1029        self._check_article_data(lines)
1030        # Non-existent id
1031        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1032            self.server.article("<non-existent@example.com>")
1033        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1034
1035    def test_article_file(self):
1036        # With a "file" argument
1037        f = io.BytesIO()
1038        resp, info = self.server.article(file=f)
1039        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
1040        art_num, message_id, lines = info
1041        self.assertEqual(art_num, 3000237)
1042        self.assertEqual(message_id, "<45223423@example.com>")
1043        self.assertEqual(lines, [])
1044        data = f.getvalue()
1045        self.assertTrue(data.startswith(
1046            b'From: "Demo User" <nobody@example.net>\r\n'
1047            b'Subject: I am just a test article\r\n'
1048            ), ascii(data))
1049        self.assertTrue(data.endswith(
1050            b'This is just a test article.\r\n'
1051            b'.Here is a dot-starting line.\r\n'
1052            b'\r\n'
1053            b'-- Signed by Andr\xc3\xa9.\r\n'
1054            ), ascii(data))
1055
1056    def test_head(self):
1057        # HEAD
1058        resp, info = self.server.head()
1059        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1060        art_num, message_id, lines = info
1061        self.assertEqual(art_num, 3000237)
1062        self.assertEqual(message_id, "<45223423@example.com>")
1063        self._check_article_head(lines)
1064        # HEAD num
1065        resp, info = self.server.head(3000234)
1066        self.assertEqual(resp, "221 3000234 <45223423@example.com>")
1067        art_num, message_id, lines = info
1068        self.assertEqual(art_num, 3000234)
1069        self.assertEqual(message_id, "<45223423@example.com>")
1070        self._check_article_head(lines)
1071        # HEAD id
1072        resp, info = self.server.head("<45223423@example.com>")
1073        self.assertEqual(resp, "221 0 <45223423@example.com>")
1074        art_num, message_id, lines = info
1075        self.assertEqual(art_num, 0)
1076        self.assertEqual(message_id, "<45223423@example.com>")
1077        self._check_article_head(lines)
1078        # Non-existent id
1079        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1080            self.server.head("<non-existent@example.com>")
1081        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1082
1083    def test_head_file(self):
1084        f = io.BytesIO()
1085        resp, info = self.server.head(file=f)
1086        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1087        art_num, message_id, lines = info
1088        self.assertEqual(art_num, 3000237)
1089        self.assertEqual(message_id, "<45223423@example.com>")
1090        self.assertEqual(lines, [])
1091        data = f.getvalue()
1092        self.assertTrue(data.startswith(
1093            b'From: "Demo User" <nobody@example.net>\r\n'
1094            b'Subject: I am just a test article\r\n'
1095            ), ascii(data))
1096        self.assertFalse(data.endswith(
1097            b'This is just a test article.\r\n'
1098            b'.Here is a dot-starting line.\r\n'
1099            b'\r\n'
1100            b'-- Signed by Andr\xc3\xa9.\r\n'
1101            ), ascii(data))
1102
1103    def test_body(self):
1104        # BODY
1105        resp, info = self.server.body()
1106        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1107        art_num, message_id, lines = info
1108        self.assertEqual(art_num, 3000237)
1109        self.assertEqual(message_id, "<45223423@example.com>")
1110        self._check_article_body(lines)
1111        # BODY num
1112        resp, info = self.server.body(3000234)
1113        self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1114        art_num, message_id, lines = info
1115        self.assertEqual(art_num, 3000234)
1116        self.assertEqual(message_id, "<45223423@example.com>")
1117        self._check_article_body(lines)
1118        # BODY id
1119        resp, info = self.server.body("<45223423@example.com>")
1120        self.assertEqual(resp, "222 0 <45223423@example.com>")
1121        art_num, message_id, lines = info
1122        self.assertEqual(art_num, 0)
1123        self.assertEqual(message_id, "<45223423@example.com>")
1124        self._check_article_body(lines)
1125        # Non-existent id
1126        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1127            self.server.body("<non-existent@example.com>")
1128        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1129
1130    def test_body_file(self):
1131        f = io.BytesIO()
1132        resp, info = self.server.body(file=f)
1133        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1134        art_num, message_id, lines = info
1135        self.assertEqual(art_num, 3000237)
1136        self.assertEqual(message_id, "<45223423@example.com>")
1137        self.assertEqual(lines, [])
1138        data = f.getvalue()
1139        self.assertFalse(data.startswith(
1140            b'From: "Demo User" <nobody@example.net>\r\n'
1141            b'Subject: I am just a test article\r\n'
1142            ), ascii(data))
1143        self.assertTrue(data.endswith(
1144            b'This is just a test article.\r\n'
1145            b'.Here is a dot-starting line.\r\n'
1146            b'\r\n'
1147            b'-- Signed by Andr\xc3\xa9.\r\n'
1148            ), ascii(data))
1149
1150    def check_over_xover_resp(self, resp, overviews):
1151        self.assertTrue(resp.startswith("224 "), resp)
1152        self.assertEqual(len(overviews), 3)
1153        art_num, over = overviews[0]
1154        self.assertEqual(art_num, 57)
1155        self.assertEqual(over, {
1156            "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1157            "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1158            "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1159            "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1160            "references": "<hvalf7$ort$1@dough.gmane.org>",
1161            ":bytes": "7103",
1162            ":lines": "16",
1163            "xref": "news.gmane.io gmane.comp.python.authors:57"
1164            })
1165        art_num, over = overviews[1]
1166        self.assertEqual(over["xref"], None)
1167        art_num, over = overviews[2]
1168        self.assertEqual(over["subject"],
1169                         "Re: Message d'erreur incompréhensible (par moi)")
1170
1171    def test_xover(self):
1172        resp, overviews = self.server.xover(57, 59)
1173        self.check_over_xover_resp(resp, overviews)
1174
1175    def test_over(self):
1176        # In NNTP "v1", this will fallback on XOVER
1177        resp, overviews = self.server.over((57, 59))
1178        self.check_over_xover_resp(resp, overviews)
1179
1180    sample_post = (
1181        b'From: "Demo User" <nobody@example.net>\r\n'
1182        b'Subject: I am just a test article\r\n'
1183        b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1184        b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1185        b'\r\n'
1186        b'This is just a test article.\r\n'
1187        b'.Here is a dot-starting line.\r\n'
1188        b'\r\n'
1189        b'-- Signed by Andr\xc3\xa9.\r\n'
1190    )
1191
1192    def _check_posted_body(self):
1193        # Check the raw body as received by the server
1194        lines = self.handler.posted_body
1195        # One additional line for the "." terminator
1196        self.assertEqual(len(lines), 10)
1197        self.assertEqual(lines[-1], b'.\r\n')
1198        self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1199        self.assertEqual(lines[-3], b'\r\n')
1200        self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1201        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1202
1203    def _check_post_ihave_sub(self, func, *args, file_factory):
1204        # First the prepared post with CRLF endings
1205        post = self.sample_post
1206        func_args = args + (file_factory(post),)
1207        self.handler.posted_body = None
1208        resp = func(*func_args)
1209        self._check_posted_body()
1210        # Then the same post with "normal" line endings - they should be
1211        # converted by NNTP.post and NNTP.ihave.
1212        post = self.sample_post.replace(b"\r\n", b"\n")
1213        func_args = args + (file_factory(post),)
1214        self.handler.posted_body = None
1215        resp = func(*func_args)
1216        self._check_posted_body()
1217        return resp
1218
1219    def check_post_ihave(self, func, success_resp, *args):
1220        # With a bytes object
1221        resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1222        self.assertEqual(resp, success_resp)
1223        # With a bytearray object
1224        resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1225        self.assertEqual(resp, success_resp)
1226        # With a file object
1227        resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1228        self.assertEqual(resp, success_resp)
1229        # With an iterable of terminated lines
1230        def iterlines(b):
1231            return iter(b.splitlines(keepends=True))
1232        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1233        self.assertEqual(resp, success_resp)
1234        # With an iterable of non-terminated lines
1235        def iterlines(b):
1236            return iter(b.splitlines(keepends=False))
1237        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1238        self.assertEqual(resp, success_resp)
1239
1240    def test_post(self):
1241        self.check_post_ihave(self.server.post, "240 Article received OK")
1242        self.handler.allow_posting = False
1243        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1244            self.server.post(self.sample_post)
1245        self.assertEqual(cm.exception.response,
1246                         "440 Posting not permitted")
1247
1248    def test_ihave(self):
1249        self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1250                              "<i.am.an.article.you.will.want@example.com>")
1251        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1252            self.server.ihave("<another.message.id>", self.sample_post)
1253        self.assertEqual(cm.exception.response,
1254                         "435 Article not wanted")
1255
1256    def test_too_long_lines(self):
1257        dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1258        self.assertRaises(nntplib.NNTPDataError,
1259                          self.server.newnews, "comp.lang.python", dt)
1260
1261
1262class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1263    """Tests an NNTP v1 server (no capabilities)."""
1264
1265    nntp_version = 1
1266    handler_class = NNTPv1Handler
1267
1268    def test_caps(self):
1269        caps = self.server.getcapabilities()
1270        self.assertEqual(caps, {})
1271        self.assertEqual(self.server.nntp_version, 1)
1272        self.assertEqual(self.server.nntp_implementation, None)
1273
1274
1275class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1276    """Tests an NNTP v2 server (with capabilities)."""
1277
1278    nntp_version = 2
1279    handler_class = NNTPv2Handler
1280
1281    def test_caps(self):
1282        caps = self.server.getcapabilities()
1283        self.assertEqual(caps, {
1284            'VERSION': ['2', '3'],
1285            'IMPLEMENTATION': ['INN', '2.5.1'],
1286            'AUTHINFO': ['USER'],
1287            'HDR': [],
1288            'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1289                     'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1290            'OVER': [],
1291            'POST': [],
1292            'READER': [],
1293            })
1294        self.assertEqual(self.server.nntp_version, 3)
1295        self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
1296
1297
1298class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1299    """Tests a probably NNTP v2 server with capabilities only after login."""
1300
1301    nntp_version = 2
1302    handler_class = CapsAfterLoginNNTPv2Handler
1303
1304    def test_caps_only_after_login(self):
1305        self.assertEqual(self.server._caps, {})
1306        self.server.login('testuser', 'testpw')
1307        self.assertIn('VERSION', self.server._caps)
1308
1309
1310class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1311        unittest.TestCase):
1312    """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1313    that isn't in READER mode by default."""
1314
1315    nntp_version = 2
1316    handler_class = ModeSwitchingNNTPv2Handler
1317
1318    def test_we_are_in_reader_mode_after_connect(self):
1319        self.assertIn('READER', self.server._caps)
1320
1321
1322class MiscTests(unittest.TestCase):
1323
1324    def test_decode_header(self):
1325        def gives(a, b):
1326            self.assertEqual(nntplib.decode_header(a), b)
1327        gives("" , "")
1328        gives("a plain header", "a plain header")
1329        gives(" with extra  spaces ", " with extra  spaces ")
1330        gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1331        gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1332              " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1333              "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1334        gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1335              "Re: problème de matrice")
1336        # A natively utf-8 header (found in the real world!)
1337        gives("Re: Message d'erreur incompréhensible (par moi)",
1338              "Re: Message d'erreur incompréhensible (par moi)")
1339
1340    def test_parse_overview_fmt(self):
1341        # The minimal (default) response
1342        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1343                 "References:", ":bytes", ":lines"]
1344        self.assertEqual(nntplib._parse_overview_fmt(lines),
1345            ["subject", "from", "date", "message-id", "references",
1346             ":bytes", ":lines"])
1347        # The minimal response using alternative names
1348        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1349                 "References:", "Bytes:", "Lines:"]
1350        self.assertEqual(nntplib._parse_overview_fmt(lines),
1351            ["subject", "from", "date", "message-id", "references",
1352             ":bytes", ":lines"])
1353        # Variations in casing
1354        lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1355                 "References:", "BYTES:", "Lines:"]
1356        self.assertEqual(nntplib._parse_overview_fmt(lines),
1357            ["subject", "from", "date", "message-id", "references",
1358             ":bytes", ":lines"])
1359        # First example from RFC 3977
1360        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1361                 "References:", ":bytes", ":lines", "Xref:full",
1362                 "Distribution:full"]
1363        self.assertEqual(nntplib._parse_overview_fmt(lines),
1364            ["subject", "from", "date", "message-id", "references",
1365             ":bytes", ":lines", "xref", "distribution"])
1366        # Second example from RFC 3977
1367        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1368                 "References:", "Bytes:", "Lines:", "Xref:FULL",
1369                 "Distribution:FULL"]
1370        self.assertEqual(nntplib._parse_overview_fmt(lines),
1371            ["subject", "from", "date", "message-id", "references",
1372             ":bytes", ":lines", "xref", "distribution"])
1373        # A classic response from INN
1374        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1375                 "References:", "Bytes:", "Lines:", "Xref:full"]
1376        self.assertEqual(nntplib._parse_overview_fmt(lines),
1377            ["subject", "from", "date", "message-id", "references",
1378             ":bytes", ":lines", "xref"])
1379
1380    def test_parse_overview(self):
1381        fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1382        # First example from RFC 3977
1383        lines = [
1384            '3000234\tI am just a test article\t"Demo User" '
1385            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1386            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1387            '17\tXref: news.example.com misc.test:3000363',
1388        ]
1389        overview = nntplib._parse_overview(lines, fmt)
1390        (art_num, fields), = overview
1391        self.assertEqual(art_num, 3000234)
1392        self.assertEqual(fields, {
1393            'subject': 'I am just a test article',
1394            'from': '"Demo User" <nobody@example.com>',
1395            'date': '6 Oct 1998 04:38:40 -0500',
1396            'message-id': '<45223423@example.com>',
1397            'references': '<45454@example.net>',
1398            ':bytes': '1234',
1399            ':lines': '17',
1400            'xref': 'news.example.com misc.test:3000363',
1401        })
1402        # Second example; here the "Xref" field is totally absent (including
1403        # the header name) and comes out as None
1404        lines = [
1405            '3000234\tI am just a test article\t"Demo User" '
1406            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1407            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1408            '17\t\t',
1409        ]
1410        overview = nntplib._parse_overview(lines, fmt)
1411        (art_num, fields), = overview
1412        self.assertEqual(fields['xref'], None)
1413        # Third example; the "Xref" is an empty string, while "references"
1414        # is a single space.
1415        lines = [
1416            '3000234\tI am just a test article\t"Demo User" '
1417            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1418            '<45223423@example.com>\t \t1234\t'
1419            '17\tXref: \t',
1420        ]
1421        overview = nntplib._parse_overview(lines, fmt)
1422        (art_num, fields), = overview
1423        self.assertEqual(fields['references'], ' ')
1424        self.assertEqual(fields['xref'], '')
1425
1426    def test_parse_datetime(self):
1427        def gives(a, b, *c):
1428            self.assertEqual(nntplib._parse_datetime(a, b),
1429                             datetime.datetime(*c))
1430        # Output of DATE command
1431        gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1432        # Variations
1433        gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1434        gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1435        gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1436
1437    def test_unparse_datetime(self):
1438        # Test non-legacy mode
1439        # 1) with a datetime
1440        def gives(y, M, d, h, m, s, date_str, time_str):
1441            dt = datetime.datetime(y, M, d, h, m, s)
1442            self.assertEqual(nntplib._unparse_datetime(dt),
1443                             (date_str, time_str))
1444            self.assertEqual(nntplib._unparse_datetime(dt, False),
1445                             (date_str, time_str))
1446        gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1447        gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1448        gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1449        # 2) with a date
1450        def gives(y, M, d, date_str, time_str):
1451            dt = datetime.date(y, M, d)
1452            self.assertEqual(nntplib._unparse_datetime(dt),
1453                             (date_str, time_str))
1454            self.assertEqual(nntplib._unparse_datetime(dt, False),
1455                             (date_str, time_str))
1456        gives(1999, 6, 23, "19990623", "000000")
1457        gives(2000, 6, 23, "20000623", "000000")
1458        gives(2010, 6, 5, "20100605", "000000")
1459
1460    def test_unparse_datetime_legacy(self):
1461        # Test legacy mode (RFC 977)
1462        # 1) with a datetime
1463        def gives(y, M, d, h, m, s, date_str, time_str):
1464            dt = datetime.datetime(y, M, d, h, m, s)
1465            self.assertEqual(nntplib._unparse_datetime(dt, True),
1466                             (date_str, time_str))
1467        gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1468        gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1469        gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1470        # 2) with a date
1471        def gives(y, M, d, date_str, time_str):
1472            dt = datetime.date(y, M, d)
1473            self.assertEqual(nntplib._unparse_datetime(dt, True),
1474                             (date_str, time_str))
1475        gives(1999, 6, 23, "990623", "000000")
1476        gives(2000, 6, 23, "000623", "000000")
1477        gives(2010, 6, 5, "100605", "000000")
1478
1479    @unittest.skipUnless(ssl, 'requires SSL support')
1480    def test_ssl_support(self):
1481        self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
1482
1483
1484class PublicAPITests(unittest.TestCase):
1485    """Ensures that the correct values are exposed in the public API."""
1486
1487    def test_module_all_attribute(self):
1488        self.assertTrue(hasattr(nntplib, '__all__'))
1489        target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1490                      'NNTPTemporaryError', 'NNTPPermanentError',
1491                      'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1492        if ssl is not None:
1493            target_api.append('NNTP_SSL')
1494        self.assertEqual(set(nntplib.__all__), set(target_api))
1495
1496class MockSocketTests(unittest.TestCase):
1497    """Tests involving a mock socket object
1498
1499    Used where the _NNTPServerIO file object is not enough."""
1500
1501    nntp_class = nntplib.NNTP
1502
1503    def check_constructor_error_conditions(
1504            self, handler_class,
1505            expected_error_type, expected_error_msg,
1506            login=None, password=None):
1507
1508        class mock_socket_module:
1509            def create_connection(address, timeout):
1510                return MockSocket()
1511
1512        class MockSocket:
1513            def close(self):
1514                nonlocal socket_closed
1515                socket_closed = True
1516
1517            def makefile(socket, mode):
1518                handler = handler_class()
1519                _, file = make_mock_file(handler)
1520                files.append(file)
1521                return file
1522
1523        socket_closed = False
1524        files = []
1525        with patch('nntplib.socket', mock_socket_module), \
1526             self.assertRaisesRegex(expected_error_type, expected_error_msg):
1527            self.nntp_class('dummy', user=login, password=password)
1528        self.assertTrue(socket_closed)
1529        for f in files:
1530            self.assertTrue(f.closed)
1531
1532    def test_bad_welcome(self):
1533        #Test a bad welcome message
1534        class Handler(NNTPv1Handler):
1535            welcome = 'Bad Welcome'
1536        self.check_constructor_error_conditions(
1537            Handler, nntplib.NNTPProtocolError, Handler.welcome)
1538
1539    def test_service_temporarily_unavailable(self):
1540        #Test service temporarily unavailable
1541        class Handler(NNTPv1Handler):
1542            welcome = '400 Service temporarily unavailable'
1543        self.check_constructor_error_conditions(
1544            Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1545
1546    def test_service_permanently_unavailable(self):
1547        #Test service permanently unavailable
1548        class Handler(NNTPv1Handler):
1549            welcome = '502 Service permanently unavailable'
1550        self.check_constructor_error_conditions(
1551            Handler, nntplib.NNTPPermanentError, Handler.welcome)
1552
1553    def test_bad_capabilities(self):
1554        #Test a bad capabilities response
1555        class Handler(NNTPv1Handler):
1556            def handle_CAPABILITIES(self):
1557                self.push_lit(capabilities_response)
1558        capabilities_response = '201 bad capability'
1559        self.check_constructor_error_conditions(
1560            Handler, nntplib.NNTPReplyError, capabilities_response)
1561
1562    def test_login_aborted(self):
1563        #Test a bad authinfo response
1564        login = 't@e.com'
1565        password = 'python'
1566        class Handler(NNTPv1Handler):
1567            def handle_AUTHINFO(self, *args):
1568                self.push_lit(authinfo_response)
1569        authinfo_response = '503 Mechanism not recognized'
1570        self.check_constructor_error_conditions(
1571            Handler, nntplib.NNTPPermanentError, authinfo_response,
1572            login, password)
1573
1574class bypass_context:
1575    """Bypass encryption and actual SSL module"""
1576    def wrap_socket(sock, **args):
1577        return sock
1578
1579@unittest.skipUnless(ssl, 'requires SSL support')
1580class MockSslTests(MockSocketTests):
1581    @staticmethod
1582    def nntp_class(*pos, **kw):
1583        return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
1584
1585
1586class LocalServerTests(unittest.TestCase):
1587    def setUp(self):
1588        sock = socket.socket()
1589        port = socket_helper.bind_port(sock)
1590        sock.listen()
1591        self.background = threading.Thread(
1592            target=self.run_server, args=(sock,))
1593        self.background.start()
1594        self.addCleanup(self.background.join)
1595
1596        self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
1597        self.addCleanup(self.nntp.__exit__, None, None, None)
1598
1599    def run_server(self, sock):
1600        # Could be generalized to handle more commands in separate methods
1601        with sock:
1602            [client, _] = sock.accept()
1603        with contextlib.ExitStack() as cleanup:
1604            cleanup.enter_context(client)
1605            reader = cleanup.enter_context(client.makefile('rb'))
1606            client.sendall(b'200 Server ready\r\n')
1607            while True:
1608                cmd = reader.readline()
1609                if cmd == b'CAPABILITIES\r\n':
1610                    client.sendall(
1611                        b'101 Capability list:\r\n'
1612                        b'VERSION 2\r\n'
1613                        b'STARTTLS\r\n'
1614                        b'.\r\n'
1615                    )
1616                elif cmd == b'STARTTLS\r\n':
1617                    reader.close()
1618                    client.sendall(b'382 Begin TLS negotiation now\r\n')
1619                    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1620                    context.load_cert_chain(certfile)
1621                    client = context.wrap_socket(
1622                        client, server_side=True)
1623                    cleanup.enter_context(client)
1624                    reader = cleanup.enter_context(client.makefile('rb'))
1625                elif cmd == b'QUIT\r\n':
1626                    client.sendall(b'205 Bye!\r\n')
1627                    break
1628                else:
1629                    raise ValueError('Unexpected command {!r}'.format(cmd))
1630
1631    @unittest.skipUnless(ssl, 'requires SSL support')
1632    def test_starttls(self):
1633        file = self.nntp.file
1634        sock = self.nntp.sock
1635        self.nntp.starttls()
1636        # Check that the socket and internal pseudo-file really were
1637        # changed.
1638        self.assertNotEqual(file, self.nntp.file)
1639        self.assertNotEqual(sock, self.nntp.sock)
1640        # Check that the new socket really is an SSL one
1641        self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1642        # Check that trying starttls when it's already active fails.
1643        self.assertRaises(ValueError, self.nntp.starttls)
1644
1645
1646if __name__ == "__main__":
1647    unittest.main()
1648