• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import socket
3import datetime
4import textwrap
5import unittest
6import functools
7import contextlib
8import os.path
9from test import support
10from nntplib import NNTP, GroupInfo
11import nntplib
12from unittest.mock import patch
13try:
14    import ssl
15except ImportError:
16    ssl = None
17try:
18    import threading
19except ImportError:
20    threading = None
21
22TIMEOUT = 30
23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
24
25# TODO:
26# - test the `file` arg to more commands
27# - test error conditions
28# - test auth and `usenetrc`
29
30
31class NetworkedNNTPTestsMixin:
32
33    def test_welcome(self):
34        welcome = self.server.getwelcome()
35        self.assertEqual(str, type(welcome))
36
37    def test_help(self):
38        resp, lines = self.server.help()
39        self.assertTrue(resp.startswith("100 "), resp)
40        for line in lines:
41            self.assertEqual(str, type(line))
42
43    def test_list(self):
44        resp, groups = self.server.list()
45        if len(groups) > 0:
46            self.assertEqual(GroupInfo, type(groups[0]))
47            self.assertEqual(str, type(groups[0].group))
48
49    def test_list_active(self):
50        resp, groups = self.server.list(self.GROUP_PAT)
51        if len(groups) > 0:
52            self.assertEqual(GroupInfo, type(groups[0]))
53            self.assertEqual(str, type(groups[0].group))
54
55    def test_unknown_command(self):
56        with self.assertRaises(nntplib.NNTPPermanentError) as cm:
57            self.server._shortcmd("XYZZY")
58        resp = cm.exception.response
59        self.assertTrue(resp.startswith("500 "), resp)
60
61    def test_newgroups(self):
62        # gmane gets a constant influx of new groups.  In order not to stress
63        # the server too much, we choose a recent date in the past.
64        dt = datetime.date.today() - datetime.timedelta(days=7)
65        resp, groups = self.server.newgroups(dt)
66        if len(groups) > 0:
67            self.assertIsInstance(groups[0], GroupInfo)
68            self.assertIsInstance(groups[0].group, str)
69
70    def test_description(self):
71        def _check_desc(desc):
72            # Sanity checks
73            self.assertIsInstance(desc, str)
74            self.assertNotIn(self.GROUP_NAME, desc)
75        desc = self.server.description(self.GROUP_NAME)
76        _check_desc(desc)
77        # Another sanity check
78        self.assertIn("Python", desc)
79        # With a pattern
80        desc = self.server.description(self.GROUP_PAT)
81        _check_desc(desc)
82        # Shouldn't exist
83        desc = self.server.description("zk.brrtt.baz")
84        self.assertEqual(desc, '')
85
86    def test_descriptions(self):
87        resp, descs = self.server.descriptions(self.GROUP_PAT)
88        # 215 for LIST NEWSGROUPS, 282 for XGTITLE
89        self.assertTrue(
90            resp.startswith("215 ") or resp.startswith("282 "), resp)
91        self.assertIsInstance(descs, dict)
92        desc = descs[self.GROUP_NAME]
93        self.assertEqual(desc, self.server.description(self.GROUP_NAME))
94
95    def test_group(self):
96        result = self.server.group(self.GROUP_NAME)
97        self.assertEqual(5, len(result))
98        resp, count, first, last, group = result
99        self.assertEqual(group, self.GROUP_NAME)
100        self.assertIsInstance(count, int)
101        self.assertIsInstance(first, int)
102        self.assertIsInstance(last, int)
103        self.assertLessEqual(first, last)
104        self.assertTrue(resp.startswith("211 "), resp)
105
106    def test_date(self):
107        resp, date = self.server.date()
108        self.assertIsInstance(date, datetime.datetime)
109        # Sanity check
110        self.assertGreaterEqual(date.year, 1995)
111        self.assertLessEqual(date.year, 2030)
112
113    def _check_art_dict(self, art_dict):
114        # Some sanity checks for a field dictionary returned by OVER / XOVER
115        self.assertIsInstance(art_dict, dict)
116        # NNTP has 7 mandatory fields
117        self.assertGreaterEqual(art_dict.keys(),
118            {"subject", "from", "date", "message-id",
119             "references", ":bytes", ":lines"}
120            )
121        for v in art_dict.values():
122            self.assertIsInstance(v, (str, type(None)))
123
124    def test_xover(self):
125        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
126        resp, lines = self.server.xover(last - 5, last)
127        if len(lines) == 0:
128            self.skipTest("no articles retrieved")
129        # The 'last' article is not necessarily part of the output (cancelled?)
130        art_num, art_dict = lines[0]
131        self.assertGreaterEqual(art_num, last - 5)
132        self.assertLessEqual(art_num, last)
133        self._check_art_dict(art_dict)
134
135    @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
136                           ' is found for issue #28971')
137    def test_over(self):
138        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
139        start = last - 10
140        # The "start-" article range form
141        resp, lines = self.server.over((start, None))
142        art_num, art_dict = lines[0]
143        self._check_art_dict(art_dict)
144        # The "start-end" article range form
145        resp, lines = self.server.over((start, last))
146        art_num, art_dict = lines[-1]
147        # The 'last' article is not necessarily part of the output (cancelled?)
148        self.assertGreaterEqual(art_num, start)
149        self.assertLessEqual(art_num, last)
150        self._check_art_dict(art_dict)
151        # XXX The "message_id" form is unsupported by gmane
152        # 503 Overview by message-ID unsupported
153
154    def test_xhdr(self):
155        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
156        resp, lines = self.server.xhdr('subject', last)
157        for line in lines:
158            self.assertEqual(str, type(line[1]))
159
160    def check_article_resp(self, resp, article, art_num=None):
161        self.assertIsInstance(article, nntplib.ArticleInfo)
162        if art_num is not None:
163            self.assertEqual(article.number, art_num)
164        for line in article.lines:
165            self.assertIsInstance(line, bytes)
166        # XXX this could exceptionally happen...
167        self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
168
169    def test_article_head_body(self):
170        resp, count, first, last, name = self.server.group(self.GROUP_NAME)
171        # Try to find an available article
172        for art_num in (last, first, last - 1):
173            try:
174                resp, head = self.server.head(art_num)
175            except nntplib.NNTPTemporaryError as e:
176                if not e.response.startswith("423 "):
177                    raise
178                # "423 No such article" => choose another one
179                continue
180            break
181        else:
182            self.skipTest("could not find a suitable article number")
183        self.assertTrue(resp.startswith("221 "), resp)
184        self.check_article_resp(resp, head, art_num)
185        resp, body = self.server.body(art_num)
186        self.assertTrue(resp.startswith("222 "), resp)
187        self.check_article_resp(resp, body, art_num)
188        resp, article = self.server.article(art_num)
189        self.assertTrue(resp.startswith("220 "), resp)
190        self.check_article_resp(resp, article, art_num)
191        # Tolerate running the tests from behind a NNTP virus checker
192        blacklist = lambda line: line.startswith(b'X-Antivirus')
193        filtered_head_lines = [line for line in head.lines
194                               if not blacklist(line)]
195        filtered_lines = [line for line in article.lines
196                          if not blacklist(line)]
197        self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
198
199    def test_capabilities(self):
200        # The server under test implements NNTP version 2 and has a
201        # couple of well-known capabilities. Just sanity check that we
202        # got them.
203        def _check_caps(caps):
204            caps_list = caps['LIST']
205            self.assertIsInstance(caps_list, (list, tuple))
206            self.assertIn('OVERVIEW.FMT', caps_list)
207        self.assertGreaterEqual(self.server.nntp_version, 2)
208        _check_caps(self.server.getcapabilities())
209        # This re-emits the command
210        resp, caps = self.server.capabilities()
211        _check_caps(caps)
212
213    def test_zlogin(self):
214        # This test must be the penultimate because further commands will be
215        # refused.
216        baduser = "notarealuser"
217        badpw = "notarealpassword"
218        # Check that bogus credentials cause failure
219        self.assertRaises(nntplib.NNTPError, self.server.login,
220                          user=baduser, password=badpw, usenetrc=False)
221        # FIXME: We should check that correct credentials succeed, but that
222        # would require valid details for some server somewhere to be in the
223        # test suite, I think. Gmane is anonymous, at least as used for the
224        # other tests.
225
226    def test_zzquit(self):
227        # This test must be called last, hence the name
228        cls = type(self)
229        try:
230            self.server.quit()
231        finally:
232            cls.server = None
233
234    @classmethod
235    def wrap_methods(cls):
236        # Wrap all methods in a transient_internet() exception catcher
237        # XXX put a generic version in test.support?
238        def wrap_meth(meth):
239            @functools.wraps(meth)
240            def wrapped(self):
241                with support.transient_internet(self.NNTP_HOST):
242                    meth(self)
243            return wrapped
244        for name in dir(cls):
245            if not name.startswith('test_'):
246                continue
247            meth = getattr(cls, name)
248            if not callable(meth):
249                continue
250            # Need to use a closure so that meth remains bound to its current
251            # value
252            setattr(cls, name, wrap_meth(meth))
253
254    def test_with_statement(self):
255        def is_connected():
256            if not hasattr(server, 'file'):
257                return False
258            try:
259                server.help()
260            except (OSError, EOFError):
261                return False
262            return True
263
264        with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
265            self.assertTrue(is_connected())
266            self.assertTrue(server.help())
267        self.assertFalse(is_connected())
268
269        with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
270            server.quit()
271        self.assertFalse(is_connected())
272
273
274NetworkedNNTPTestsMixin.wrap_methods()
275
276
277class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
278    # This server supports STARTTLS (gmane doesn't)
279    NNTP_HOST = 'news.trigofacile.com'
280    GROUP_NAME = 'fr.comp.lang.python'
281    GROUP_PAT = 'fr.comp.lang.*'
282
283    NNTP_CLASS = NNTP
284
285    @classmethod
286    def setUpClass(cls):
287        support.requires("network")
288        with support.transient_internet(cls.NNTP_HOST):
289            cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
290
291    @classmethod
292    def tearDownClass(cls):
293        if cls.server is not None:
294            cls.server.quit()
295
296@unittest.skipUnless(ssl, 'requires SSL support')
297class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
298
299    # Technical limits for this public NNTP server (see http://www.aioe.org):
300    # "Only two concurrent connections per IP address are allowed and
301    # 400 connections per day are accepted from each IP address."
302
303    NNTP_HOST = 'nntp.aioe.org'
304    GROUP_NAME = 'comp.lang.python'
305    GROUP_PAT = 'comp.lang.*'
306
307    NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
308
309    # Disabled as it produces too much data
310    test_list = None
311
312    # Disabled as the connection will already be encrypted.
313    test_starttls = None
314
315
316#
317# Non-networked tests using a local server (or something mocking it).
318#
319
320class _NNTPServerIO(io.RawIOBase):
321    """A raw IO object allowing NNTP commands to be received and processed
322    by a handler.  The handler can push responses which can then be read
323    from the IO object."""
324
325    def __init__(self, handler):
326        io.RawIOBase.__init__(self)
327        # The channel from the client
328        self.c2s = io.BytesIO()
329        # The channel to the client
330        self.s2c = io.BytesIO()
331        self.handler = handler
332        self.handler.start(self.c2s.readline, self.push_data)
333
334    def readable(self):
335        return True
336
337    def writable(self):
338        return True
339
340    def push_data(self, data):
341        """Push (buffer) some data to send to the client."""
342        pos = self.s2c.tell()
343        self.s2c.seek(0, 2)
344        self.s2c.write(data)
345        self.s2c.seek(pos)
346
347    def write(self, b):
348        """The client sends us some data"""
349        pos = self.c2s.tell()
350        self.c2s.write(b)
351        self.c2s.seek(pos)
352        self.handler.process_pending()
353        return len(b)
354
355    def readinto(self, buf):
356        """The client wants to read a response"""
357        self.handler.process_pending()
358        b = self.s2c.read(len(buf))
359        n = len(b)
360        buf[:n] = b
361        return n
362
363
364def make_mock_file(handler):
365    sio = _NNTPServerIO(handler)
366    # Using BufferedRWPair instead of BufferedRandom ensures the file
367    # isn't seekable.
368    file = io.BufferedRWPair(sio, sio)
369    return (sio, file)
370
371
372class MockedNNTPTestsMixin:
373    # Override in derived classes
374    handler_class = None
375
376    def setUp(self):
377        super().setUp()
378        self.make_server()
379
380    def tearDown(self):
381        super().tearDown()
382        del self.server
383
384    def make_server(self, *args, **kwargs):
385        self.handler = self.handler_class()
386        self.sio, file = make_mock_file(self.handler)
387        self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
388        return self.server
389
390
391class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
392    def setUp(self):
393        super().setUp()
394        self.make_server(readermode=True)
395
396
397class NNTPv1Handler:
398    """A handler for RFC 977"""
399
400    welcome = "200 NNTP mock server"
401
402    def start(self, readline, push_data):
403        self.in_body = False
404        self.allow_posting = True
405        self._readline = readline
406        self._push_data = push_data
407        self._logged_in = False
408        self._user_sent = False
409        # Our welcome
410        self.handle_welcome()
411
412    def _decode(self, data):
413        return str(data, "utf-8", "surrogateescape")
414
415    def process_pending(self):
416        if self.in_body:
417            while True:
418                line = self._readline()
419                if not line:
420                    return
421                self.body.append(line)
422                if line == b".\r\n":
423                    break
424            try:
425                meth, tokens = self.body_callback
426                meth(*tokens, body=self.body)
427            finally:
428                self.body_callback = None
429                self.body = None
430                self.in_body = False
431        while True:
432            line = self._decode(self._readline())
433            if not line:
434                return
435            if not line.endswith("\r\n"):
436                raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
437            line = line[:-2]
438            cmd, *tokens = line.split()
439            #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
440            meth = getattr(self, "handle_" + cmd.upper(), None)
441            if meth is None:
442                self.handle_unknown()
443            else:
444                try:
445                    meth(*tokens)
446                except Exception as e:
447                    raise ValueError("command failed: {!r}".format(line)) from e
448                else:
449                    if self.in_body:
450                        self.body_callback = meth, tokens
451                        self.body = []
452
453    def expect_body(self):
454        """Flag that the client is expected to post a request body"""
455        self.in_body = True
456
457    def push_data(self, data):
458        """Push some binary data"""
459        self._push_data(data)
460
461    def push_lit(self, lit):
462        """Push a string literal"""
463        lit = textwrap.dedent(lit)
464        lit = "\r\n".join(lit.splitlines()) + "\r\n"
465        lit = lit.encode('utf-8')
466        self.push_data(lit)
467
468    def handle_unknown(self):
469        self.push_lit("500 What?")
470
471    def handle_welcome(self):
472        self.push_lit(self.welcome)
473
474    def handle_QUIT(self):
475        self.push_lit("205 Bye!")
476
477    def handle_DATE(self):
478        self.push_lit("111 20100914001155")
479
480    def handle_GROUP(self, group):
481        if group == "fr.comp.lang.python":
482            self.push_lit("211 486 761 1265 fr.comp.lang.python")
483        else:
484            self.push_lit("411 No such group {}".format(group))
485
486    def handle_HELP(self):
487        self.push_lit("""\
488            100 Legal commands
489              authinfo user Name|pass Password|generic <prog> <args>
490              date
491              help
492            Report problems to <root@example.org>
493            .""")
494
495    def handle_STAT(self, message_spec=None):
496        if message_spec is None:
497            self.push_lit("412 No newsgroup selected")
498        elif message_spec == "3000234":
499            self.push_lit("223 3000234 <45223423@example.com>")
500        elif message_spec == "<45223423@example.com>":
501            self.push_lit("223 0 <45223423@example.com>")
502        else:
503            self.push_lit("430 No Such Article Found")
504
505    def handle_NEXT(self):
506        self.push_lit("223 3000237 <668929@example.org> retrieved")
507
508    def handle_LAST(self):
509        self.push_lit("223 3000234 <45223423@example.com> retrieved")
510
511    def handle_LIST(self, action=None, param=None):
512        if action is None:
513            self.push_lit("""\
514                215 Newsgroups in form "group high low flags".
515                comp.lang.python 0000052340 0000002828 y
516                comp.lang.python.announce 0000001153 0000000993 m
517                free.it.comp.lang.python 0000000002 0000000002 y
518                fr.comp.lang.python 0000001254 0000000760 y
519                free.it.comp.lang.python.learner 0000000000 0000000001 y
520                tw.bbs.comp.lang.python 0000000304 0000000304 y
521                .""")
522        elif action == "ACTIVE":
523            if param == "*distutils*":
524                self.push_lit("""\
525                    215 Newsgroups in form "group high low flags"
526                    gmane.comp.python.distutils.devel 0000014104 0000000001 m
527                    gmane.comp.python.distutils.cvs 0000000000 0000000001 m
528                    .""")
529            else:
530                self.push_lit("""\
531                    215 Newsgroups in form "group high low flags"
532                    .""")
533        elif action == "OVERVIEW.FMT":
534            self.push_lit("""\
535                215 Order of fields in overview database.
536                Subject:
537                From:
538                Date:
539                Message-ID:
540                References:
541                Bytes:
542                Lines:
543                Xref:full
544                .""")
545        elif action == "NEWSGROUPS":
546            assert param is not None
547            if param == "comp.lang.python":
548                self.push_lit("""\
549                    215 Descriptions in form "group description".
550                    comp.lang.python\tThe Python computer language.
551                    .""")
552            elif param == "comp.lang.python*":
553                self.push_lit("""\
554                    215 Descriptions in form "group description".
555                    comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
556                    comp.lang.python\tThe Python computer language.
557                    .""")
558            else:
559                self.push_lit("""\
560                    215 Descriptions in form "group description".
561                    .""")
562        else:
563            self.push_lit('501 Unknown LIST keyword')
564
565    def handle_NEWNEWS(self, group, date_str, time_str):
566        # We hard code different return messages depending on passed
567        # argument and date syntax.
568        if (group == "comp.lang.python" and date_str == "20100913"
569            and time_str == "082004"):
570            # Date was passed in RFC 3977 format (NNTP "v2")
571            self.push_lit("""\
572                230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
573                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
574                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
575                .""")
576        elif (group == "comp.lang.python" and date_str == "100913"
577            and time_str == "082004"):
578            # Date was passed in RFC 977 format (NNTP "v1")
579            self.push_lit("""\
580                230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
581                <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
582                <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
583                .""")
584        elif (group == 'comp.lang.python' and
585              date_str in ('20100101', '100101') and
586              time_str == '090000'):
587            self.push_lit('too long line' * 3000 +
588                          '\n.')
589        else:
590            self.push_lit("""\
591                230 An empty list of newsarticles follows
592                .""")
593        # (Note for experiments: many servers disable NEWNEWS.
594        #  As of this writing, sicinfo3.epfl.ch doesn't.)
595
596    def handle_XOVER(self, message_spec):
597        if message_spec == "57-59":
598            self.push_lit(
599                "224 Overview information for 57-58 follows\n"
600                "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
601                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
602                    "\tSat, 19 Jun 2010 18:04:08 -0400"
603                    "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
604                    "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
605                    "\tXref: news.gmane.org gmane.comp.python.authors:57"
606                    "\n"
607                "58\tLooking for a few good bloggers"
608                    "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
609                    "\tThu, 22 Jul 2010 09:14:14 -0400"
610                    "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
611                    "\t\t6683\t16"
612                    "\t"
613                    "\n"
614                # A UTF-8 overview line from fr.comp.lang.python
615                "59\tRe: Message d'erreur incompréhensible (par moi)"
616                    "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
617                    "\tWed, 15 Sep 2010 18:09:15 +0200"
618                    "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
619                    "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
620                    "\tXref: saria.nerim.net fr.comp.lang.python:1265"
621                    "\n"
622                ".\n")
623        else:
624            self.push_lit("""\
625                224 No articles
626                .""")
627
628    def handle_POST(self, *, body=None):
629        if body is None:
630            if self.allow_posting:
631                self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
632                self.expect_body()
633            else:
634                self.push_lit("440 Posting not permitted")
635        else:
636            assert self.allow_posting
637            self.push_lit("240 Article received OK")
638            self.posted_body = body
639
640    def handle_IHAVE(self, message_id, *, body=None):
641        if body is None:
642            if (self.allow_posting and
643                message_id == "<i.am.an.article.you.will.want@example.com>"):
644                self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
645                self.expect_body()
646            else:
647                self.push_lit("435 Article not wanted")
648        else:
649            assert self.allow_posting
650            self.push_lit("235 Article transferred OK")
651            self.posted_body = body
652
653    sample_head = """\
654        From: "Demo User" <nobody@example.net>
655        Subject: I am just a test article
656        Content-Type: text/plain; charset=UTF-8; format=flowed
657        Message-ID: <i.am.an.article.you.will.want@example.com>"""
658
659    sample_body = """\
660        This is just a test article.
661        ..Here is a dot-starting line.
662
663        -- Signed by Andr\xe9."""
664
665    sample_article = sample_head + "\n\n" + sample_body
666
667    def handle_ARTICLE(self, message_spec=None):
668        if message_spec is None:
669            self.push_lit("220 3000237 <45223423@example.com>")
670        elif message_spec == "<45223423@example.com>":
671            self.push_lit("220 0 <45223423@example.com>")
672        elif message_spec == "3000234":
673            self.push_lit("220 3000234 <45223423@example.com>")
674        else:
675            self.push_lit("430 No Such Article Found")
676            return
677        self.push_lit(self.sample_article)
678        self.push_lit(".")
679
680    def handle_HEAD(self, message_spec=None):
681        if message_spec is None:
682            self.push_lit("221 3000237 <45223423@example.com>")
683        elif message_spec == "<45223423@example.com>":
684            self.push_lit("221 0 <45223423@example.com>")
685        elif message_spec == "3000234":
686            self.push_lit("221 3000234 <45223423@example.com>")
687        else:
688            self.push_lit("430 No Such Article Found")
689            return
690        self.push_lit(self.sample_head)
691        self.push_lit(".")
692
693    def handle_BODY(self, message_spec=None):
694        if message_spec is None:
695            self.push_lit("222 3000237 <45223423@example.com>")
696        elif message_spec == "<45223423@example.com>":
697            self.push_lit("222 0 <45223423@example.com>")
698        elif message_spec == "3000234":
699            self.push_lit("222 3000234 <45223423@example.com>")
700        else:
701            self.push_lit("430 No Such Article Found")
702            return
703        self.push_lit(self.sample_body)
704        self.push_lit(".")
705
706    def handle_AUTHINFO(self, cred_type, data):
707        if self._logged_in:
708            self.push_lit('502 Already Logged In')
709        elif cred_type == 'user':
710            if self._user_sent:
711                self.push_lit('482 User Credential Already Sent')
712            else:
713                self.push_lit('381 Password Required')
714                self._user_sent = True
715        elif cred_type == 'pass':
716            self.push_lit('281 Login Successful')
717            self._logged_in = True
718        else:
719            raise Exception('Unknown cred type {}'.format(cred_type))
720
721
722class NNTPv2Handler(NNTPv1Handler):
723    """A handler for RFC 3977 (NNTP "v2")"""
724
725    def handle_CAPABILITIES(self):
726        fmt = """\
727            101 Capability list:
728            VERSION 2 3
729            IMPLEMENTATION INN 2.5.1{}
730            HDR
731            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
732            OVER
733            POST
734            READER
735            ."""
736
737        if not self._logged_in:
738            self.push_lit(fmt.format('\n            AUTHINFO USER'))
739        else:
740            self.push_lit(fmt.format(''))
741
742    def handle_MODE(self, _):
743        raise Exception('MODE READER sent despite READER has been advertised')
744
745    def handle_OVER(self, message_spec=None):
746        return self.handle_XOVER(message_spec)
747
748
749class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
750    """A handler that allows CAPABILITIES only after login"""
751
752    def handle_CAPABILITIES(self):
753        if not self._logged_in:
754            self.push_lit('480 You must log in.')
755        else:
756            super().handle_CAPABILITIES()
757
758
759class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
760    """A server that starts in transit mode"""
761
762    def __init__(self):
763        self._switched = False
764
765    def handle_CAPABILITIES(self):
766        fmt = """\
767            101 Capability list:
768            VERSION 2 3
769            IMPLEMENTATION INN 2.5.1
770            HDR
771            LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
772            OVER
773            POST
774            {}READER
775            ."""
776        if self._switched:
777            self.push_lit(fmt.format(''))
778        else:
779            self.push_lit(fmt.format('MODE-'))
780
781    def handle_MODE(self, what):
782        assert not self._switched and what == 'reader'
783        self._switched = True
784        self.push_lit('200 Posting allowed')
785
786
787class NNTPv1v2TestsMixin:
788
789    def setUp(self):
790        super().setUp()
791
792    def test_welcome(self):
793        self.assertEqual(self.server.welcome, self.handler.welcome)
794
795    def test_authinfo(self):
796        if self.nntp_version == 2:
797            self.assertIn('AUTHINFO', self.server._caps)
798        self.server.login('testuser', 'testpw')
799        # if AUTHINFO is gone from _caps we also know that getcapabilities()
800        # has been called after login as it should
801        self.assertNotIn('AUTHINFO', self.server._caps)
802
803    def test_date(self):
804        resp, date = self.server.date()
805        self.assertEqual(resp, "111 20100914001155")
806        self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
807
808    def test_quit(self):
809        self.assertFalse(self.sio.closed)
810        resp = self.server.quit()
811        self.assertEqual(resp, "205 Bye!")
812        self.assertTrue(self.sio.closed)
813
814    def test_help(self):
815        resp, help = self.server.help()
816        self.assertEqual(resp, "100 Legal commands")
817        self.assertEqual(help, [
818            '  authinfo user Name|pass Password|generic <prog> <args>',
819            '  date',
820            '  help',
821            'Report problems to <root@example.org>',
822        ])
823
824    def test_list(self):
825        resp, groups = self.server.list()
826        self.assertEqual(len(groups), 6)
827        g = groups[1]
828        self.assertEqual(g,
829            GroupInfo("comp.lang.python.announce", "0000001153",
830                      "0000000993", "m"))
831        resp, groups = self.server.list("*distutils*")
832        self.assertEqual(len(groups), 2)
833        g = groups[0]
834        self.assertEqual(g,
835            GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
836                      "0000000001", "m"))
837
838    def test_stat(self):
839        resp, art_num, message_id = self.server.stat(3000234)
840        self.assertEqual(resp, "223 3000234 <45223423@example.com>")
841        self.assertEqual(art_num, 3000234)
842        self.assertEqual(message_id, "<45223423@example.com>")
843        resp, art_num, message_id = self.server.stat("<45223423@example.com>")
844        self.assertEqual(resp, "223 0 <45223423@example.com>")
845        self.assertEqual(art_num, 0)
846        self.assertEqual(message_id, "<45223423@example.com>")
847        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
848            self.server.stat("<non.existent.id>")
849        self.assertEqual(cm.exception.response, "430 No Such Article Found")
850        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
851            self.server.stat()
852        self.assertEqual(cm.exception.response, "412 No newsgroup selected")
853
854    def test_next(self):
855        resp, art_num, message_id = self.server.next()
856        self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
857        self.assertEqual(art_num, 3000237)
858        self.assertEqual(message_id, "<668929@example.org>")
859
860    def test_last(self):
861        resp, art_num, message_id = self.server.last()
862        self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
863        self.assertEqual(art_num, 3000234)
864        self.assertEqual(message_id, "<45223423@example.com>")
865
866    def test_description(self):
867        desc = self.server.description("comp.lang.python")
868        self.assertEqual(desc, "The Python computer language.")
869        desc = self.server.description("comp.lang.pythonx")
870        self.assertEqual(desc, "")
871
872    def test_descriptions(self):
873        resp, groups = self.server.descriptions("comp.lang.python")
874        self.assertEqual(resp, '215 Descriptions in form "group description".')
875        self.assertEqual(groups, {
876            "comp.lang.python": "The Python computer language.",
877            })
878        resp, groups = self.server.descriptions("comp.lang.python*")
879        self.assertEqual(groups, {
880            "comp.lang.python": "The Python computer language.",
881            "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
882            })
883        resp, groups = self.server.descriptions("comp.lang.pythonx")
884        self.assertEqual(groups, {})
885
886    def test_group(self):
887        resp, count, first, last, group = self.server.group("fr.comp.lang.python")
888        self.assertTrue(resp.startswith("211 "), resp)
889        self.assertEqual(first, 761)
890        self.assertEqual(last, 1265)
891        self.assertEqual(count, 486)
892        self.assertEqual(group, "fr.comp.lang.python")
893        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
894            self.server.group("comp.lang.python.devel")
895        exc = cm.exception
896        self.assertTrue(exc.response.startswith("411 No such group"),
897                        exc.response)
898
899    def test_newnews(self):
900        # NEWNEWS comp.lang.python [20]100913 082004
901        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
902        resp, ids = self.server.newnews("comp.lang.python", dt)
903        expected = (
904            "230 list of newsarticles (NNTP v{0}) "
905            "created after Mon Sep 13 08:20:04 2010 follows"
906            ).format(self.nntp_version)
907        self.assertEqual(resp, expected)
908        self.assertEqual(ids, [
909            "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
910            "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
911            ])
912        # NEWNEWS fr.comp.lang.python [20]100913 082004
913        dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
914        resp, ids = self.server.newnews("fr.comp.lang.python", dt)
915        self.assertEqual(resp, "230 An empty list of newsarticles follows")
916        self.assertEqual(ids, [])
917
918    def _check_article_body(self, lines):
919        self.assertEqual(len(lines), 4)
920        self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
921        self.assertEqual(lines[-2], b"")
922        self.assertEqual(lines[-3], b".Here is a dot-starting line.")
923        self.assertEqual(lines[-4], b"This is just a test article.")
924
925    def _check_article_head(self, lines):
926        self.assertEqual(len(lines), 4)
927        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
928        self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
929
930    def _check_article_data(self, lines):
931        self.assertEqual(len(lines), 9)
932        self._check_article_head(lines[:4])
933        self._check_article_body(lines[-4:])
934        self.assertEqual(lines[4], b"")
935
936    def test_article(self):
937        # ARTICLE
938        resp, info = self.server.article()
939        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
940        art_num, message_id, lines = info
941        self.assertEqual(art_num, 3000237)
942        self.assertEqual(message_id, "<45223423@example.com>")
943        self._check_article_data(lines)
944        # ARTICLE num
945        resp, info = self.server.article(3000234)
946        self.assertEqual(resp, "220 3000234 <45223423@example.com>")
947        art_num, message_id, lines = info
948        self.assertEqual(art_num, 3000234)
949        self.assertEqual(message_id, "<45223423@example.com>")
950        self._check_article_data(lines)
951        # ARTICLE id
952        resp, info = self.server.article("<45223423@example.com>")
953        self.assertEqual(resp, "220 0 <45223423@example.com>")
954        art_num, message_id, lines = info
955        self.assertEqual(art_num, 0)
956        self.assertEqual(message_id, "<45223423@example.com>")
957        self._check_article_data(lines)
958        # Non-existent id
959        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
960            self.server.article("<non-existent@example.com>")
961        self.assertEqual(cm.exception.response, "430 No Such Article Found")
962
963    def test_article_file(self):
964        # With a "file" argument
965        f = io.BytesIO()
966        resp, info = self.server.article(file=f)
967        self.assertEqual(resp, "220 3000237 <45223423@example.com>")
968        art_num, message_id, lines = info
969        self.assertEqual(art_num, 3000237)
970        self.assertEqual(message_id, "<45223423@example.com>")
971        self.assertEqual(lines, [])
972        data = f.getvalue()
973        self.assertTrue(data.startswith(
974            b'From: "Demo User" <nobody@example.net>\r\n'
975            b'Subject: I am just a test article\r\n'
976            ), ascii(data))
977        self.assertTrue(data.endswith(
978            b'This is just a test article.\r\n'
979            b'.Here is a dot-starting line.\r\n'
980            b'\r\n'
981            b'-- Signed by Andr\xc3\xa9.\r\n'
982            ), ascii(data))
983
984    def test_head(self):
985        # HEAD
986        resp, info = self.server.head()
987        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
988        art_num, message_id, lines = info
989        self.assertEqual(art_num, 3000237)
990        self.assertEqual(message_id, "<45223423@example.com>")
991        self._check_article_head(lines)
992        # HEAD num
993        resp, info = self.server.head(3000234)
994        self.assertEqual(resp, "221 3000234 <45223423@example.com>")
995        art_num, message_id, lines = info
996        self.assertEqual(art_num, 3000234)
997        self.assertEqual(message_id, "<45223423@example.com>")
998        self._check_article_head(lines)
999        # HEAD id
1000        resp, info = self.server.head("<45223423@example.com>")
1001        self.assertEqual(resp, "221 0 <45223423@example.com>")
1002        art_num, message_id, lines = info
1003        self.assertEqual(art_num, 0)
1004        self.assertEqual(message_id, "<45223423@example.com>")
1005        self._check_article_head(lines)
1006        # Non-existent id
1007        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1008            self.server.head("<non-existent@example.com>")
1009        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1010
1011    def test_head_file(self):
1012        f = io.BytesIO()
1013        resp, info = self.server.head(file=f)
1014        self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1015        art_num, message_id, lines = info
1016        self.assertEqual(art_num, 3000237)
1017        self.assertEqual(message_id, "<45223423@example.com>")
1018        self.assertEqual(lines, [])
1019        data = f.getvalue()
1020        self.assertTrue(data.startswith(
1021            b'From: "Demo User" <nobody@example.net>\r\n'
1022            b'Subject: I am just a test article\r\n'
1023            ), ascii(data))
1024        self.assertFalse(data.endswith(
1025            b'This is just a test article.\r\n'
1026            b'.Here is a dot-starting line.\r\n'
1027            b'\r\n'
1028            b'-- Signed by Andr\xc3\xa9.\r\n'
1029            ), ascii(data))
1030
1031    def test_body(self):
1032        # BODY
1033        resp, info = self.server.body()
1034        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1035        art_num, message_id, lines = info
1036        self.assertEqual(art_num, 3000237)
1037        self.assertEqual(message_id, "<45223423@example.com>")
1038        self._check_article_body(lines)
1039        # BODY num
1040        resp, info = self.server.body(3000234)
1041        self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1042        art_num, message_id, lines = info
1043        self.assertEqual(art_num, 3000234)
1044        self.assertEqual(message_id, "<45223423@example.com>")
1045        self._check_article_body(lines)
1046        # BODY id
1047        resp, info = self.server.body("<45223423@example.com>")
1048        self.assertEqual(resp, "222 0 <45223423@example.com>")
1049        art_num, message_id, lines = info
1050        self.assertEqual(art_num, 0)
1051        self.assertEqual(message_id, "<45223423@example.com>")
1052        self._check_article_body(lines)
1053        # Non-existent id
1054        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1055            self.server.body("<non-existent@example.com>")
1056        self.assertEqual(cm.exception.response, "430 No Such Article Found")
1057
1058    def test_body_file(self):
1059        f = io.BytesIO()
1060        resp, info = self.server.body(file=f)
1061        self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1062        art_num, message_id, lines = info
1063        self.assertEqual(art_num, 3000237)
1064        self.assertEqual(message_id, "<45223423@example.com>")
1065        self.assertEqual(lines, [])
1066        data = f.getvalue()
1067        self.assertFalse(data.startswith(
1068            b'From: "Demo User" <nobody@example.net>\r\n'
1069            b'Subject: I am just a test article\r\n'
1070            ), ascii(data))
1071        self.assertTrue(data.endswith(
1072            b'This is just a test article.\r\n'
1073            b'.Here is a dot-starting line.\r\n'
1074            b'\r\n'
1075            b'-- Signed by Andr\xc3\xa9.\r\n'
1076            ), ascii(data))
1077
1078    def check_over_xover_resp(self, resp, overviews):
1079        self.assertTrue(resp.startswith("224 "), resp)
1080        self.assertEqual(len(overviews), 3)
1081        art_num, over = overviews[0]
1082        self.assertEqual(art_num, 57)
1083        self.assertEqual(over, {
1084            "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1085            "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1086            "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1087            "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1088            "references": "<hvalf7$ort$1@dough.gmane.org>",
1089            ":bytes": "7103",
1090            ":lines": "16",
1091            "xref": "news.gmane.org gmane.comp.python.authors:57"
1092            })
1093        art_num, over = overviews[1]
1094        self.assertEqual(over["xref"], None)
1095        art_num, over = overviews[2]
1096        self.assertEqual(over["subject"],
1097                         "Re: Message d'erreur incompréhensible (par moi)")
1098
1099    def test_xover(self):
1100        resp, overviews = self.server.xover(57, 59)
1101        self.check_over_xover_resp(resp, overviews)
1102
1103    def test_over(self):
1104        # In NNTP "v1", this will fallback on XOVER
1105        resp, overviews = self.server.over((57, 59))
1106        self.check_over_xover_resp(resp, overviews)
1107
1108    sample_post = (
1109        b'From: "Demo User" <nobody@example.net>\r\n'
1110        b'Subject: I am just a test article\r\n'
1111        b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1112        b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1113        b'\r\n'
1114        b'This is just a test article.\r\n'
1115        b'.Here is a dot-starting line.\r\n'
1116        b'\r\n'
1117        b'-- Signed by Andr\xc3\xa9.\r\n'
1118    )
1119
1120    def _check_posted_body(self):
1121        # Check the raw body as received by the server
1122        lines = self.handler.posted_body
1123        # One additional line for the "." terminator
1124        self.assertEqual(len(lines), 10)
1125        self.assertEqual(lines[-1], b'.\r\n')
1126        self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1127        self.assertEqual(lines[-3], b'\r\n')
1128        self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1129        self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1130
1131    def _check_post_ihave_sub(self, func, *args, file_factory):
1132        # First the prepared post with CRLF endings
1133        post = self.sample_post
1134        func_args = args + (file_factory(post),)
1135        self.handler.posted_body = None
1136        resp = func(*func_args)
1137        self._check_posted_body()
1138        # Then the same post with "normal" line endings - they should be
1139        # converted by NNTP.post and NNTP.ihave.
1140        post = self.sample_post.replace(b"\r\n", b"\n")
1141        func_args = args + (file_factory(post),)
1142        self.handler.posted_body = None
1143        resp = func(*func_args)
1144        self._check_posted_body()
1145        return resp
1146
1147    def check_post_ihave(self, func, success_resp, *args):
1148        # With a bytes object
1149        resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1150        self.assertEqual(resp, success_resp)
1151        # With a bytearray object
1152        resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1153        self.assertEqual(resp, success_resp)
1154        # With a file object
1155        resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1156        self.assertEqual(resp, success_resp)
1157        # With an iterable of terminated lines
1158        def iterlines(b):
1159            return iter(b.splitlines(keepends=True))
1160        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1161        self.assertEqual(resp, success_resp)
1162        # With an iterable of non-terminated lines
1163        def iterlines(b):
1164            return iter(b.splitlines(keepends=False))
1165        resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1166        self.assertEqual(resp, success_resp)
1167
1168    def test_post(self):
1169        self.check_post_ihave(self.server.post, "240 Article received OK")
1170        self.handler.allow_posting = False
1171        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1172            self.server.post(self.sample_post)
1173        self.assertEqual(cm.exception.response,
1174                         "440 Posting not permitted")
1175
1176    def test_ihave(self):
1177        self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1178                              "<i.am.an.article.you.will.want@example.com>")
1179        with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1180            self.server.ihave("<another.message.id>", self.sample_post)
1181        self.assertEqual(cm.exception.response,
1182                         "435 Article not wanted")
1183
1184    def test_too_long_lines(self):
1185        dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1186        self.assertRaises(nntplib.NNTPDataError,
1187                          self.server.newnews, "comp.lang.python", dt)
1188
1189
1190class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1191    """Tests an NNTP v1 server (no capabilities)."""
1192
1193    nntp_version = 1
1194    handler_class = NNTPv1Handler
1195
1196    def test_caps(self):
1197        caps = self.server.getcapabilities()
1198        self.assertEqual(caps, {})
1199        self.assertEqual(self.server.nntp_version, 1)
1200        self.assertEqual(self.server.nntp_implementation, None)
1201
1202
1203class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1204    """Tests an NNTP v2 server (with capabilities)."""
1205
1206    nntp_version = 2
1207    handler_class = NNTPv2Handler
1208
1209    def test_caps(self):
1210        caps = self.server.getcapabilities()
1211        self.assertEqual(caps, {
1212            'VERSION': ['2', '3'],
1213            'IMPLEMENTATION': ['INN', '2.5.1'],
1214            'AUTHINFO': ['USER'],
1215            'HDR': [],
1216            'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
1217                     'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
1218            'OVER': [],
1219            'POST': [],
1220            'READER': [],
1221            })
1222        self.assertEqual(self.server.nntp_version, 3)
1223        self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
1224
1225
1226class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1227    """Tests a probably NNTP v2 server with capabilities only after login."""
1228
1229    nntp_version = 2
1230    handler_class = CapsAfterLoginNNTPv2Handler
1231
1232    def test_caps_only_after_login(self):
1233        self.assertEqual(self.server._caps, {})
1234        self.server.login('testuser', 'testpw')
1235        self.assertIn('VERSION', self.server._caps)
1236
1237
1238class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1239        unittest.TestCase):
1240    """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1241    that isn't in READER mode by default."""
1242
1243    nntp_version = 2
1244    handler_class = ModeSwitchingNNTPv2Handler
1245
1246    def test_we_are_in_reader_mode_after_connect(self):
1247        self.assertIn('READER', self.server._caps)
1248
1249
1250class MiscTests(unittest.TestCase):
1251
1252    def test_decode_header(self):
1253        def gives(a, b):
1254            self.assertEqual(nntplib.decode_header(a), b)
1255        gives("" , "")
1256        gives("a plain header", "a plain header")
1257        gives(" with extra  spaces ", " with extra  spaces ")
1258        gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1259        gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1260              " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1261              "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1262        gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1263              "Re: problème de matrice")
1264        # A natively utf-8 header (found in the real world!)
1265        gives("Re: Message d'erreur incompréhensible (par moi)",
1266              "Re: Message d'erreur incompréhensible (par moi)")
1267
1268    def test_parse_overview_fmt(self):
1269        # The minimal (default) response
1270        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1271                 "References:", ":bytes", ":lines"]
1272        self.assertEqual(nntplib._parse_overview_fmt(lines),
1273            ["subject", "from", "date", "message-id", "references",
1274             ":bytes", ":lines"])
1275        # The minimal response using alternative names
1276        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1277                 "References:", "Bytes:", "Lines:"]
1278        self.assertEqual(nntplib._parse_overview_fmt(lines),
1279            ["subject", "from", "date", "message-id", "references",
1280             ":bytes", ":lines"])
1281        # Variations in casing
1282        lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1283                 "References:", "BYTES:", "Lines:"]
1284        self.assertEqual(nntplib._parse_overview_fmt(lines),
1285            ["subject", "from", "date", "message-id", "references",
1286             ":bytes", ":lines"])
1287        # First example from RFC 3977
1288        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1289                 "References:", ":bytes", ":lines", "Xref:full",
1290                 "Distribution:full"]
1291        self.assertEqual(nntplib._parse_overview_fmt(lines),
1292            ["subject", "from", "date", "message-id", "references",
1293             ":bytes", ":lines", "xref", "distribution"])
1294        # Second example from RFC 3977
1295        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1296                 "References:", "Bytes:", "Lines:", "Xref:FULL",
1297                 "Distribution:FULL"]
1298        self.assertEqual(nntplib._parse_overview_fmt(lines),
1299            ["subject", "from", "date", "message-id", "references",
1300             ":bytes", ":lines", "xref", "distribution"])
1301        # A classic response from INN
1302        lines = ["Subject:", "From:", "Date:", "Message-ID:",
1303                 "References:", "Bytes:", "Lines:", "Xref:full"]
1304        self.assertEqual(nntplib._parse_overview_fmt(lines),
1305            ["subject", "from", "date", "message-id", "references",
1306             ":bytes", ":lines", "xref"])
1307
1308    def test_parse_overview(self):
1309        fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1310        # First example from RFC 3977
1311        lines = [
1312            '3000234\tI am just a test article\t"Demo User" '
1313            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1314            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1315            '17\tXref: news.example.com misc.test:3000363',
1316        ]
1317        overview = nntplib._parse_overview(lines, fmt)
1318        (art_num, fields), = overview
1319        self.assertEqual(art_num, 3000234)
1320        self.assertEqual(fields, {
1321            'subject': 'I am just a test article',
1322            'from': '"Demo User" <nobody@example.com>',
1323            'date': '6 Oct 1998 04:38:40 -0500',
1324            'message-id': '<45223423@example.com>',
1325            'references': '<45454@example.net>',
1326            ':bytes': '1234',
1327            ':lines': '17',
1328            'xref': 'news.example.com misc.test:3000363',
1329        })
1330        # Second example; here the "Xref" field is totally absent (including
1331        # the header name) and comes out as None
1332        lines = [
1333            '3000234\tI am just a test article\t"Demo User" '
1334            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1335            '<45223423@example.com>\t<45454@example.net>\t1234\t'
1336            '17\t\t',
1337        ]
1338        overview = nntplib._parse_overview(lines, fmt)
1339        (art_num, fields), = overview
1340        self.assertEqual(fields['xref'], None)
1341        # Third example; the "Xref" is an empty string, while "references"
1342        # is a single space.
1343        lines = [
1344            '3000234\tI am just a test article\t"Demo User" '
1345            '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1346            '<45223423@example.com>\t \t1234\t'
1347            '17\tXref: \t',
1348        ]
1349        overview = nntplib._parse_overview(lines, fmt)
1350        (art_num, fields), = overview
1351        self.assertEqual(fields['references'], ' ')
1352        self.assertEqual(fields['xref'], '')
1353
1354    def test_parse_datetime(self):
1355        def gives(a, b, *c):
1356            self.assertEqual(nntplib._parse_datetime(a, b),
1357                             datetime.datetime(*c))
1358        # Output of DATE command
1359        gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1360        # Variations
1361        gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1362        gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1363        gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1364
1365    def test_unparse_datetime(self):
1366        # Test non-legacy mode
1367        # 1) with a datetime
1368        def gives(y, M, d, h, m, s, date_str, time_str):
1369            dt = datetime.datetime(y, M, d, h, m, s)
1370            self.assertEqual(nntplib._unparse_datetime(dt),
1371                             (date_str, time_str))
1372            self.assertEqual(nntplib._unparse_datetime(dt, False),
1373                             (date_str, time_str))
1374        gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1375        gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1376        gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1377        # 2) with a date
1378        def gives(y, M, d, date_str, time_str):
1379            dt = datetime.date(y, M, d)
1380            self.assertEqual(nntplib._unparse_datetime(dt),
1381                             (date_str, time_str))
1382            self.assertEqual(nntplib._unparse_datetime(dt, False),
1383                             (date_str, time_str))
1384        gives(1999, 6, 23, "19990623", "000000")
1385        gives(2000, 6, 23, "20000623", "000000")
1386        gives(2010, 6, 5, "20100605", "000000")
1387
1388    def test_unparse_datetime_legacy(self):
1389        # Test legacy mode (RFC 977)
1390        # 1) with a datetime
1391        def gives(y, M, d, h, m, s, date_str, time_str):
1392            dt = datetime.datetime(y, M, d, h, m, s)
1393            self.assertEqual(nntplib._unparse_datetime(dt, True),
1394                             (date_str, time_str))
1395        gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1396        gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1397        gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1398        # 2) with a date
1399        def gives(y, M, d, date_str, time_str):
1400            dt = datetime.date(y, M, d)
1401            self.assertEqual(nntplib._unparse_datetime(dt, True),
1402                             (date_str, time_str))
1403        gives(1999, 6, 23, "990623", "000000")
1404        gives(2000, 6, 23, "000623", "000000")
1405        gives(2010, 6, 5, "100605", "000000")
1406
1407    @unittest.skipUnless(ssl, 'requires SSL support')
1408    def test_ssl_support(self):
1409        self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
1410
1411
1412class PublicAPITests(unittest.TestCase):
1413    """Ensures that the correct values are exposed in the public API."""
1414
1415    def test_module_all_attribute(self):
1416        self.assertTrue(hasattr(nntplib, '__all__'))
1417        target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1418                      'NNTPTemporaryError', 'NNTPPermanentError',
1419                      'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1420        if ssl is not None:
1421            target_api.append('NNTP_SSL')
1422        self.assertEqual(set(nntplib.__all__), set(target_api))
1423
1424class MockSocketTests(unittest.TestCase):
1425    """Tests involving a mock socket object
1426
1427    Used where the _NNTPServerIO file object is not enough."""
1428
1429    nntp_class = nntplib.NNTP
1430
1431    def check_constructor_error_conditions(
1432            self, handler_class,
1433            expected_error_type, expected_error_msg,
1434            login=None, password=None):
1435
1436        class mock_socket_module:
1437            def create_connection(address, timeout):
1438                return MockSocket()
1439
1440        class MockSocket:
1441            def close(self):
1442                nonlocal socket_closed
1443                socket_closed = True
1444
1445            def makefile(socket, mode):
1446                handler = handler_class()
1447                _, file = make_mock_file(handler)
1448                files.append(file)
1449                return file
1450
1451        socket_closed = False
1452        files = []
1453        with patch('nntplib.socket', mock_socket_module), \
1454             self.assertRaisesRegex(expected_error_type, expected_error_msg):
1455            self.nntp_class('dummy', user=login, password=password)
1456        self.assertTrue(socket_closed)
1457        for f in files:
1458            self.assertTrue(f.closed)
1459
1460    def test_bad_welcome(self):
1461        #Test a bad welcome message
1462        class Handler(NNTPv1Handler):
1463            welcome = 'Bad Welcome'
1464        self.check_constructor_error_conditions(
1465            Handler, nntplib.NNTPProtocolError, Handler.welcome)
1466
1467    def test_service_temporarily_unavailable(self):
1468        #Test service temporarily unavailable
1469        class Handler(NNTPv1Handler):
1470            welcome = '400 Service temporarily unavailable'
1471        self.check_constructor_error_conditions(
1472            Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1473
1474    def test_service_permanently_unavailable(self):
1475        #Test service permanently unavailable
1476        class Handler(NNTPv1Handler):
1477            welcome = '502 Service permanently unavailable'
1478        self.check_constructor_error_conditions(
1479            Handler, nntplib.NNTPPermanentError, Handler.welcome)
1480
1481    def test_bad_capabilities(self):
1482        #Test a bad capabilities response
1483        class Handler(NNTPv1Handler):
1484            def handle_CAPABILITIES(self):
1485                self.push_lit(capabilities_response)
1486        capabilities_response = '201 bad capability'
1487        self.check_constructor_error_conditions(
1488            Handler, nntplib.NNTPReplyError, capabilities_response)
1489
1490    def test_login_aborted(self):
1491        #Test a bad authinfo response
1492        login = 't@e.com'
1493        password = 'python'
1494        class Handler(NNTPv1Handler):
1495            def handle_AUTHINFO(self, *args):
1496                self.push_lit(authinfo_response)
1497        authinfo_response = '503 Mechanism not recognized'
1498        self.check_constructor_error_conditions(
1499            Handler, nntplib.NNTPPermanentError, authinfo_response,
1500            login, password)
1501
1502class bypass_context:
1503    """Bypass encryption and actual SSL module"""
1504    def wrap_socket(sock, **args):
1505        return sock
1506
1507@unittest.skipUnless(ssl, 'requires SSL support')
1508class MockSslTests(MockSocketTests):
1509    @staticmethod
1510    def nntp_class(*pos, **kw):
1511        return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
1512
1513@unittest.skipUnless(threading, 'requires multithreading')
1514class LocalServerTests(unittest.TestCase):
1515    def setUp(self):
1516        sock = socket.socket()
1517        port = support.bind_port(sock)
1518        sock.listen()
1519        self.background = threading.Thread(
1520            target=self.run_server, args=(sock,))
1521        self.background.start()
1522        self.addCleanup(self.background.join)
1523
1524        self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1525        self.addCleanup(self.nntp.__exit__, None, None, None)
1526
1527    def run_server(self, sock):
1528        # Could be generalized to handle more commands in separate methods
1529        with sock:
1530            [client, _] = sock.accept()
1531        with contextlib.ExitStack() as cleanup:
1532            cleanup.enter_context(client)
1533            reader = cleanup.enter_context(client.makefile('rb'))
1534            client.sendall(b'200 Server ready\r\n')
1535            while True:
1536                cmd = reader.readline()
1537                if cmd == b'CAPABILITIES\r\n':
1538                    client.sendall(
1539                        b'101 Capability list:\r\n'
1540                        b'VERSION 2\r\n'
1541                        b'STARTTLS\r\n'
1542                        b'.\r\n'
1543                    )
1544                elif cmd == b'STARTTLS\r\n':
1545                    reader.close()
1546                    client.sendall(b'382 Begin TLS negotiation now\r\n')
1547                    context = ssl.SSLContext()
1548                    context.load_cert_chain(certfile)
1549                    client = context.wrap_socket(
1550                        client, server_side=True)
1551                    cleanup.enter_context(client)
1552                    reader = cleanup.enter_context(client.makefile('rb'))
1553                elif cmd == b'QUIT\r\n':
1554                    client.sendall(b'205 Bye!\r\n')
1555                    break
1556                else:
1557                    raise ValueError('Unexpected command {!r}'.format(cmd))
1558
1559    @unittest.skipUnless(ssl, 'requires SSL support')
1560    def test_starttls(self):
1561        file = self.nntp.file
1562        sock = self.nntp.sock
1563        self.nntp.starttls()
1564        # Check that the socket and internal pseudo-file really were
1565        # changed.
1566        self.assertNotEqual(file, self.nntp.file)
1567        self.assertNotEqual(sock, self.nntp.sock)
1568        # Check that the new socket really is an SSL one
1569        self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1570        # Check that trying starttls when it's already active fails.
1571        self.assertRaises(ValueError, self.nntp.starttls)
1572
1573
1574if __name__ == "__main__":
1575    unittest.main()
1576