1import io 2import socket 3import datetime 4import textwrap 5import unittest 6import functools 7import contextlib 8import os.path 9import re 10import threading 11 12from test import support 13from nntplib import NNTP, GroupInfo 14import nntplib 15from unittest.mock import patch 16try: 17 import ssl 18except ImportError: 19 ssl = None 20 21 22TIMEOUT = 30 23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') 24 25if ssl is not None: 26 SSLError = ssl.SSLError 27else: 28 class SSLError(Exception): 29 """Non-existent exception class when we lack SSL support.""" 30 reason = "This will never be raised." 31 32# TODO: 33# - test the `file` arg to more commands 34# - test error conditions 35# - test auth and `usenetrc` 36 37 38class NetworkedNNTPTestsMixin: 39 40 def test_welcome(self): 41 welcome = self.server.getwelcome() 42 self.assertEqual(str, type(welcome)) 43 44 def test_help(self): 45 resp, lines = self.server.help() 46 self.assertTrue(resp.startswith("100 "), resp) 47 for line in lines: 48 self.assertEqual(str, type(line)) 49 50 def test_list(self): 51 resp, groups = self.server.list() 52 if len(groups) > 0: 53 self.assertEqual(GroupInfo, type(groups[0])) 54 self.assertEqual(str, type(groups[0].group)) 55 56 def test_list_active(self): 57 resp, groups = self.server.list(self.GROUP_PAT) 58 if len(groups) > 0: 59 self.assertEqual(GroupInfo, type(groups[0])) 60 self.assertEqual(str, type(groups[0].group)) 61 62 def test_unknown_command(self): 63 with self.assertRaises(nntplib.NNTPPermanentError) as cm: 64 self.server._shortcmd("XYZZY") 65 resp = cm.exception.response 66 self.assertTrue(resp.startswith("500 "), resp) 67 68 def test_newgroups(self): 69 # gmane gets a constant influx of new groups. In order not to stress 70 # the server too much, we choose a recent date in the past. 71 dt = datetime.date.today() - datetime.timedelta(days=7) 72 resp, groups = self.server.newgroups(dt) 73 if len(groups) > 0: 74 self.assertIsInstance(groups[0], GroupInfo) 75 self.assertIsInstance(groups[0].group, str) 76 77 def test_description(self): 78 def _check_desc(desc): 79 # Sanity checks 80 self.assertIsInstance(desc, str) 81 self.assertNotIn(self.GROUP_NAME, desc) 82 desc = self.server.description(self.GROUP_NAME) 83 _check_desc(desc) 84 # Another sanity check 85 self.assertIn("Python", desc) 86 # With a pattern 87 desc = self.server.description(self.GROUP_PAT) 88 _check_desc(desc) 89 # Shouldn't exist 90 desc = self.server.description("zk.brrtt.baz") 91 self.assertEqual(desc, '') 92 93 def test_descriptions(self): 94 resp, descs = self.server.descriptions(self.GROUP_PAT) 95 # 215 for LIST NEWSGROUPS, 282 for XGTITLE 96 self.assertTrue( 97 resp.startswith("215 ") or resp.startswith("282 "), resp) 98 self.assertIsInstance(descs, dict) 99 desc = descs[self.GROUP_NAME] 100 self.assertEqual(desc, self.server.description(self.GROUP_NAME)) 101 102 def test_group(self): 103 result = self.server.group(self.GROUP_NAME) 104 self.assertEqual(5, len(result)) 105 resp, count, first, last, group = result 106 self.assertEqual(group, self.GROUP_NAME) 107 self.assertIsInstance(count, int) 108 self.assertIsInstance(first, int) 109 self.assertIsInstance(last, int) 110 self.assertLessEqual(first, last) 111 self.assertTrue(resp.startswith("211 "), resp) 112 113 def test_date(self): 114 resp, date = self.server.date() 115 self.assertIsInstance(date, datetime.datetime) 116 # Sanity check 117 self.assertGreaterEqual(date.year, 1995) 118 self.assertLessEqual(date.year, 2030) 119 120 def _check_art_dict(self, art_dict): 121 # Some sanity checks for a field dictionary returned by OVER / XOVER 122 self.assertIsInstance(art_dict, dict) 123 # NNTP has 7 mandatory fields 124 self.assertGreaterEqual(art_dict.keys(), 125 {"subject", "from", "date", "message-id", 126 "references", ":bytes", ":lines"} 127 ) 128 for v in art_dict.values(): 129 self.assertIsInstance(v, (str, type(None))) 130 131 def test_xover(self): 132 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 133 resp, lines = self.server.xover(last - 5, last) 134 if len(lines) == 0: 135 self.skipTest("no articles retrieved") 136 # The 'last' article is not necessarily part of the output (cancelled?) 137 art_num, art_dict = lines[0] 138 self.assertGreaterEqual(art_num, last - 5) 139 self.assertLessEqual(art_num, last) 140 self._check_art_dict(art_dict) 141 142 @unittest.skipIf(True, 'temporarily skipped until a permanent solution' 143 ' is found for issue #28971') 144 def test_over(self): 145 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 146 start = last - 10 147 # The "start-" article range form 148 resp, lines = self.server.over((start, None)) 149 art_num, art_dict = lines[0] 150 self._check_art_dict(art_dict) 151 # The "start-end" article range form 152 resp, lines = self.server.over((start, last)) 153 art_num, art_dict = lines[-1] 154 # The 'last' article is not necessarily part of the output (cancelled?) 155 self.assertGreaterEqual(art_num, start) 156 self.assertLessEqual(art_num, last) 157 self._check_art_dict(art_dict) 158 # XXX The "message_id" form is unsupported by gmane 159 # 503 Overview by message-ID unsupported 160 161 def test_xhdr(self): 162 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 163 resp, lines = self.server.xhdr('subject', last) 164 for line in lines: 165 self.assertEqual(str, type(line[1])) 166 167 def check_article_resp(self, resp, article, art_num=None): 168 self.assertIsInstance(article, nntplib.ArticleInfo) 169 if art_num is not None: 170 self.assertEqual(article.number, art_num) 171 for line in article.lines: 172 self.assertIsInstance(line, bytes) 173 # XXX this could exceptionally happen... 174 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) 175 176 @unittest.skipIf(True, "FIXME: see bpo-32128") 177 def test_article_head_body(self): 178 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 179 # Try to find an available article 180 for art_num in (last, first, last - 1): 181 try: 182 resp, head = self.server.head(art_num) 183 except nntplib.NNTPTemporaryError as e: 184 if not e.response.startswith("423 "): 185 raise 186 # "423 No such article" => choose another one 187 continue 188 break 189 else: 190 self.skipTest("could not find a suitable article number") 191 self.assertTrue(resp.startswith("221 "), resp) 192 self.check_article_resp(resp, head, art_num) 193 resp, body = self.server.body(art_num) 194 self.assertTrue(resp.startswith("222 "), resp) 195 self.check_article_resp(resp, body, art_num) 196 resp, article = self.server.article(art_num) 197 self.assertTrue(resp.startswith("220 "), resp) 198 self.check_article_resp(resp, article, art_num) 199 # Tolerate running the tests from behind a NNTP virus checker 200 blacklist = lambda line: line.startswith(b'X-Antivirus') 201 filtered_head_lines = [line for line in head.lines 202 if not blacklist(line)] 203 filtered_lines = [line for line in article.lines 204 if not blacklist(line)] 205 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) 206 207 def test_capabilities(self): 208 # The server under test implements NNTP version 2 and has a 209 # couple of well-known capabilities. Just sanity check that we 210 # got them. 211 def _check_caps(caps): 212 caps_list = caps['LIST'] 213 self.assertIsInstance(caps_list, (list, tuple)) 214 self.assertIn('OVERVIEW.FMT', caps_list) 215 self.assertGreaterEqual(self.server.nntp_version, 2) 216 _check_caps(self.server.getcapabilities()) 217 # This re-emits the command 218 resp, caps = self.server.capabilities() 219 _check_caps(caps) 220 221 def test_zlogin(self): 222 # This test must be the penultimate because further commands will be 223 # refused. 224 baduser = "notarealuser" 225 badpw = "notarealpassword" 226 # Check that bogus credentials cause failure 227 self.assertRaises(nntplib.NNTPError, self.server.login, 228 user=baduser, password=badpw, usenetrc=False) 229 # FIXME: We should check that correct credentials succeed, but that 230 # would require valid details for some server somewhere to be in the 231 # test suite, I think. Gmane is anonymous, at least as used for the 232 # other tests. 233 234 def test_zzquit(self): 235 # This test must be called last, hence the name 236 cls = type(self) 237 try: 238 self.server.quit() 239 finally: 240 cls.server = None 241 242 @classmethod 243 def wrap_methods(cls): 244 # Wrap all methods in a transient_internet() exception catcher 245 # XXX put a generic version in test.support? 246 def wrap_meth(meth): 247 @functools.wraps(meth) 248 def wrapped(self): 249 with support.transient_internet(self.NNTP_HOST): 250 meth(self) 251 return wrapped 252 for name in dir(cls): 253 if not name.startswith('test_'): 254 continue 255 meth = getattr(cls, name) 256 if not callable(meth): 257 continue 258 # Need to use a closure so that meth remains bound to its current 259 # value 260 setattr(cls, name, wrap_meth(meth)) 261 262 def test_with_statement(self): 263 def is_connected(): 264 if not hasattr(server, 'file'): 265 return False 266 try: 267 server.help() 268 except (OSError, EOFError): 269 return False 270 return True 271 272 try: 273 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: 274 self.assertTrue(is_connected()) 275 self.assertTrue(server.help()) 276 self.assertFalse(is_connected()) 277 278 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: 279 server.quit() 280 self.assertFalse(is_connected()) 281 except SSLError as ssl_err: 282 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 283 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 284 raise unittest.SkipTest(f"Got {ssl_err} connecting " 285 f"to {self.NNTP_HOST!r}") 286 raise 287 288 289NetworkedNNTPTestsMixin.wrap_methods() 290 291 292EOF_ERRORS = (EOFError,) 293if ssl is not None: 294 EOF_ERRORS += (ssl.SSLEOFError,) 295 296 297class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): 298 # This server supports STARTTLS (gmane doesn't) 299 NNTP_HOST = 'news.trigofacile.com' 300 GROUP_NAME = 'fr.comp.lang.python' 301 GROUP_PAT = 'fr.comp.lang.*' 302 303 NNTP_CLASS = NNTP 304 305 @classmethod 306 def setUpClass(cls): 307 support.requires("network") 308 with support.transient_internet(cls.NNTP_HOST): 309 try: 310 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, 311 usenetrc=False) 312 except SSLError as ssl_err: 313 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 314 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 315 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting " 316 f"to {cls.NNTP_HOST!r}") 317 raise 318 except EOF_ERRORS: 319 raise unittest.SkipTest(f"{cls} got EOF error on connecting " 320 f"to {cls.NNTP_HOST!r}") 321 322 @classmethod 323 def tearDownClass(cls): 324 if cls.server is not None: 325 cls.server.quit() 326 327@unittest.skipUnless(ssl, 'requires SSL support') 328class NetworkedNNTP_SSLTests(NetworkedNNTPTests): 329 330 # Technical limits for this public NNTP server (see http://www.aioe.org): 331 # "Only two concurrent connections per IP address are allowed and 332 # 400 connections per day are accepted from each IP address." 333 334 NNTP_HOST = 'nntp.aioe.org' 335 GROUP_NAME = 'comp.lang.python' 336 GROUP_PAT = 'comp.lang.*' 337 338 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) 339 340 # Disabled as it produces too much data 341 test_list = None 342 343 # Disabled as the connection will already be encrypted. 344 test_starttls = None 345 346 347# 348# Non-networked tests using a local server (or something mocking it). 349# 350 351class _NNTPServerIO(io.RawIOBase): 352 """A raw IO object allowing NNTP commands to be received and processed 353 by a handler. The handler can push responses which can then be read 354 from the IO object.""" 355 356 def __init__(self, handler): 357 io.RawIOBase.__init__(self) 358 # The channel from the client 359 self.c2s = io.BytesIO() 360 # The channel to the client 361 self.s2c = io.BytesIO() 362 self.handler = handler 363 self.handler.start(self.c2s.readline, self.push_data) 364 365 def readable(self): 366 return True 367 368 def writable(self): 369 return True 370 371 def push_data(self, data): 372 """Push (buffer) some data to send to the client.""" 373 pos = self.s2c.tell() 374 self.s2c.seek(0, 2) 375 self.s2c.write(data) 376 self.s2c.seek(pos) 377 378 def write(self, b): 379 """The client sends us some data""" 380 pos = self.c2s.tell() 381 self.c2s.write(b) 382 self.c2s.seek(pos) 383 self.handler.process_pending() 384 return len(b) 385 386 def readinto(self, buf): 387 """The client wants to read a response""" 388 self.handler.process_pending() 389 b = self.s2c.read(len(buf)) 390 n = len(b) 391 buf[:n] = b 392 return n 393 394 395def make_mock_file(handler): 396 sio = _NNTPServerIO(handler) 397 # Using BufferedRWPair instead of BufferedRandom ensures the file 398 # isn't seekable. 399 file = io.BufferedRWPair(sio, sio) 400 return (sio, file) 401 402 403class MockedNNTPTestsMixin: 404 # Override in derived classes 405 handler_class = None 406 407 def setUp(self): 408 super().setUp() 409 self.make_server() 410 411 def tearDown(self): 412 super().tearDown() 413 del self.server 414 415 def make_server(self, *args, **kwargs): 416 self.handler = self.handler_class() 417 self.sio, file = make_mock_file(self.handler) 418 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) 419 return self.server 420 421 422class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): 423 def setUp(self): 424 super().setUp() 425 self.make_server(readermode=True) 426 427 428class NNTPv1Handler: 429 """A handler for RFC 977""" 430 431 welcome = "200 NNTP mock server" 432 433 def start(self, readline, push_data): 434 self.in_body = False 435 self.allow_posting = True 436 self._readline = readline 437 self._push_data = push_data 438 self._logged_in = False 439 self._user_sent = False 440 # Our welcome 441 self.handle_welcome() 442 443 def _decode(self, data): 444 return str(data, "utf-8", "surrogateescape") 445 446 def process_pending(self): 447 if self.in_body: 448 while True: 449 line = self._readline() 450 if not line: 451 return 452 self.body.append(line) 453 if line == b".\r\n": 454 break 455 try: 456 meth, tokens = self.body_callback 457 meth(*tokens, body=self.body) 458 finally: 459 self.body_callback = None 460 self.body = None 461 self.in_body = False 462 while True: 463 line = self._decode(self._readline()) 464 if not line: 465 return 466 if not line.endswith("\r\n"): 467 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) 468 line = line[:-2] 469 cmd, *tokens = line.split() 470 #meth = getattr(self.handler, "handle_" + cmd.upper(), None) 471 meth = getattr(self, "handle_" + cmd.upper(), None) 472 if meth is None: 473 self.handle_unknown() 474 else: 475 try: 476 meth(*tokens) 477 except Exception as e: 478 raise ValueError("command failed: {!r}".format(line)) from e 479 else: 480 if self.in_body: 481 self.body_callback = meth, tokens 482 self.body = [] 483 484 def expect_body(self): 485 """Flag that the client is expected to post a request body""" 486 self.in_body = True 487 488 def push_data(self, data): 489 """Push some binary data""" 490 self._push_data(data) 491 492 def push_lit(self, lit): 493 """Push a string literal""" 494 lit = textwrap.dedent(lit) 495 lit = "\r\n".join(lit.splitlines()) + "\r\n" 496 lit = lit.encode('utf-8') 497 self.push_data(lit) 498 499 def handle_unknown(self): 500 self.push_lit("500 What?") 501 502 def handle_welcome(self): 503 self.push_lit(self.welcome) 504 505 def handle_QUIT(self): 506 self.push_lit("205 Bye!") 507 508 def handle_DATE(self): 509 self.push_lit("111 20100914001155") 510 511 def handle_GROUP(self, group): 512 if group == "fr.comp.lang.python": 513 self.push_lit("211 486 761 1265 fr.comp.lang.python") 514 else: 515 self.push_lit("411 No such group {}".format(group)) 516 517 def handle_HELP(self): 518 self.push_lit("""\ 519 100 Legal commands 520 authinfo user Name|pass Password|generic <prog> <args> 521 date 522 help 523 Report problems to <root@example.org> 524 .""") 525 526 def handle_STAT(self, message_spec=None): 527 if message_spec is None: 528 self.push_lit("412 No newsgroup selected") 529 elif message_spec == "3000234": 530 self.push_lit("223 3000234 <45223423@example.com>") 531 elif message_spec == "<45223423@example.com>": 532 self.push_lit("223 0 <45223423@example.com>") 533 else: 534 self.push_lit("430 No Such Article Found") 535 536 def handle_NEXT(self): 537 self.push_lit("223 3000237 <668929@example.org> retrieved") 538 539 def handle_LAST(self): 540 self.push_lit("223 3000234 <45223423@example.com> retrieved") 541 542 def handle_LIST(self, action=None, param=None): 543 if action is None: 544 self.push_lit("""\ 545 215 Newsgroups in form "group high low flags". 546 comp.lang.python 0000052340 0000002828 y 547 comp.lang.python.announce 0000001153 0000000993 m 548 free.it.comp.lang.python 0000000002 0000000002 y 549 fr.comp.lang.python 0000001254 0000000760 y 550 free.it.comp.lang.python.learner 0000000000 0000000001 y 551 tw.bbs.comp.lang.python 0000000304 0000000304 y 552 .""") 553 elif action == "ACTIVE": 554 if param == "*distutils*": 555 self.push_lit("""\ 556 215 Newsgroups in form "group high low flags" 557 gmane.comp.python.distutils.devel 0000014104 0000000001 m 558 gmane.comp.python.distutils.cvs 0000000000 0000000001 m 559 .""") 560 else: 561 self.push_lit("""\ 562 215 Newsgroups in form "group high low flags" 563 .""") 564 elif action == "OVERVIEW.FMT": 565 self.push_lit("""\ 566 215 Order of fields in overview database. 567 Subject: 568 From: 569 Date: 570 Message-ID: 571 References: 572 Bytes: 573 Lines: 574 Xref:full 575 .""") 576 elif action == "NEWSGROUPS": 577 assert param is not None 578 if param == "comp.lang.python": 579 self.push_lit("""\ 580 215 Descriptions in form "group description". 581 comp.lang.python\tThe Python computer language. 582 .""") 583 elif param == "comp.lang.python*": 584 self.push_lit("""\ 585 215 Descriptions in form "group description". 586 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) 587 comp.lang.python\tThe Python computer language. 588 .""") 589 else: 590 self.push_lit("""\ 591 215 Descriptions in form "group description". 592 .""") 593 else: 594 self.push_lit('501 Unknown LIST keyword') 595 596 def handle_NEWNEWS(self, group, date_str, time_str): 597 # We hard code different return messages depending on passed 598 # argument and date syntax. 599 if (group == "comp.lang.python" and date_str == "20100913" 600 and time_str == "082004"): 601 # Date was passed in RFC 3977 format (NNTP "v2") 602 self.push_lit("""\ 603 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows 604 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 605 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 606 .""") 607 elif (group == "comp.lang.python" and date_str == "100913" 608 and time_str == "082004"): 609 # Date was passed in RFC 977 format (NNTP "v1") 610 self.push_lit("""\ 611 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows 612 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 613 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 614 .""") 615 elif (group == 'comp.lang.python' and 616 date_str in ('20100101', '100101') and 617 time_str == '090000'): 618 self.push_lit('too long line' * 3000 + 619 '\n.') 620 else: 621 self.push_lit("""\ 622 230 An empty list of newsarticles follows 623 .""") 624 # (Note for experiments: many servers disable NEWNEWS. 625 # As of this writing, sicinfo3.epfl.ch doesn't.) 626 627 def handle_XOVER(self, message_spec): 628 if message_spec == "57-59": 629 self.push_lit( 630 "224 Overview information for 57-58 follows\n" 631 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" 632 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 633 "\tSat, 19 Jun 2010 18:04:08 -0400" 634 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" 635 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16" 636 "\tXref: news.gmane.io gmane.comp.python.authors:57" 637 "\n" 638 "58\tLooking for a few good bloggers" 639 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 640 "\tThu, 22 Jul 2010 09:14:14 -0400" 641 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>" 642 "\t\t6683\t16" 643 "\t" 644 "\n" 645 # A UTF-8 overview line from fr.comp.lang.python 646 "59\tRe: Message d'erreur incompréhensible (par moi)" 647 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>" 648 "\tWed, 15 Sep 2010 18:09:15 +0200" 649 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>" 650 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" 651 "\tXref: saria.nerim.net fr.comp.lang.python:1265" 652 "\n" 653 ".\n") 654 else: 655 self.push_lit("""\ 656 224 No articles 657 .""") 658 659 def handle_POST(self, *, body=None): 660 if body is None: 661 if self.allow_posting: 662 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") 663 self.expect_body() 664 else: 665 self.push_lit("440 Posting not permitted") 666 else: 667 assert self.allow_posting 668 self.push_lit("240 Article received OK") 669 self.posted_body = body 670 671 def handle_IHAVE(self, message_id, *, body=None): 672 if body is None: 673 if (self.allow_posting and 674 message_id == "<i.am.an.article.you.will.want@example.com>"): 675 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") 676 self.expect_body() 677 else: 678 self.push_lit("435 Article not wanted") 679 else: 680 assert self.allow_posting 681 self.push_lit("235 Article transferred OK") 682 self.posted_body = body 683 684 sample_head = """\ 685 From: "Demo User" <nobody@example.net> 686 Subject: I am just a test article 687 Content-Type: text/plain; charset=UTF-8; format=flowed 688 Message-ID: <i.am.an.article.you.will.want@example.com>""" 689 690 sample_body = """\ 691 This is just a test article. 692 ..Here is a dot-starting line. 693 694 -- Signed by Andr\xe9.""" 695 696 sample_article = sample_head + "\n\n" + sample_body 697 698 def handle_ARTICLE(self, message_spec=None): 699 if message_spec is None: 700 self.push_lit("220 3000237 <45223423@example.com>") 701 elif message_spec == "<45223423@example.com>": 702 self.push_lit("220 0 <45223423@example.com>") 703 elif message_spec == "3000234": 704 self.push_lit("220 3000234 <45223423@example.com>") 705 else: 706 self.push_lit("430 No Such Article Found") 707 return 708 self.push_lit(self.sample_article) 709 self.push_lit(".") 710 711 def handle_HEAD(self, message_spec=None): 712 if message_spec is None: 713 self.push_lit("221 3000237 <45223423@example.com>") 714 elif message_spec == "<45223423@example.com>": 715 self.push_lit("221 0 <45223423@example.com>") 716 elif message_spec == "3000234": 717 self.push_lit("221 3000234 <45223423@example.com>") 718 else: 719 self.push_lit("430 No Such Article Found") 720 return 721 self.push_lit(self.sample_head) 722 self.push_lit(".") 723 724 def handle_BODY(self, message_spec=None): 725 if message_spec is None: 726 self.push_lit("222 3000237 <45223423@example.com>") 727 elif message_spec == "<45223423@example.com>": 728 self.push_lit("222 0 <45223423@example.com>") 729 elif message_spec == "3000234": 730 self.push_lit("222 3000234 <45223423@example.com>") 731 else: 732 self.push_lit("430 No Such Article Found") 733 return 734 self.push_lit(self.sample_body) 735 self.push_lit(".") 736 737 def handle_AUTHINFO(self, cred_type, data): 738 if self._logged_in: 739 self.push_lit('502 Already Logged In') 740 elif cred_type == 'user': 741 if self._user_sent: 742 self.push_lit('482 User Credential Already Sent') 743 else: 744 self.push_lit('381 Password Required') 745 self._user_sent = True 746 elif cred_type == 'pass': 747 self.push_lit('281 Login Successful') 748 self._logged_in = True 749 else: 750 raise Exception('Unknown cred type {}'.format(cred_type)) 751 752 753class NNTPv2Handler(NNTPv1Handler): 754 """A handler for RFC 3977 (NNTP "v2")""" 755 756 def handle_CAPABILITIES(self): 757 fmt = """\ 758 101 Capability list: 759 VERSION 2 3 760 IMPLEMENTATION INN 2.5.1{} 761 HDR 762 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 763 OVER 764 POST 765 READER 766 .""" 767 768 if not self._logged_in: 769 self.push_lit(fmt.format('\n AUTHINFO USER')) 770 else: 771 self.push_lit(fmt.format('')) 772 773 def handle_MODE(self, _): 774 raise Exception('MODE READER sent despite READER has been advertised') 775 776 def handle_OVER(self, message_spec=None): 777 return self.handle_XOVER(message_spec) 778 779 780class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): 781 """A handler that allows CAPABILITIES only after login""" 782 783 def handle_CAPABILITIES(self): 784 if not self._logged_in: 785 self.push_lit('480 You must log in.') 786 else: 787 super().handle_CAPABILITIES() 788 789 790class ModeSwitchingNNTPv2Handler(NNTPv2Handler): 791 """A server that starts in transit mode""" 792 793 def __init__(self): 794 self._switched = False 795 796 def handle_CAPABILITIES(self): 797 fmt = """\ 798 101 Capability list: 799 VERSION 2 3 800 IMPLEMENTATION INN 2.5.1 801 HDR 802 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 803 OVER 804 POST 805 {}READER 806 .""" 807 if self._switched: 808 self.push_lit(fmt.format('')) 809 else: 810 self.push_lit(fmt.format('MODE-')) 811 812 def handle_MODE(self, what): 813 assert not self._switched and what == 'reader' 814 self._switched = True 815 self.push_lit('200 Posting allowed') 816 817 818class NNTPv1v2TestsMixin: 819 820 def setUp(self): 821 super().setUp() 822 823 def test_welcome(self): 824 self.assertEqual(self.server.welcome, self.handler.welcome) 825 826 def test_authinfo(self): 827 if self.nntp_version == 2: 828 self.assertIn('AUTHINFO', self.server._caps) 829 self.server.login('testuser', 'testpw') 830 # if AUTHINFO is gone from _caps we also know that getcapabilities() 831 # has been called after login as it should 832 self.assertNotIn('AUTHINFO', self.server._caps) 833 834 def test_date(self): 835 resp, date = self.server.date() 836 self.assertEqual(resp, "111 20100914001155") 837 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) 838 839 def test_quit(self): 840 self.assertFalse(self.sio.closed) 841 resp = self.server.quit() 842 self.assertEqual(resp, "205 Bye!") 843 self.assertTrue(self.sio.closed) 844 845 def test_help(self): 846 resp, help = self.server.help() 847 self.assertEqual(resp, "100 Legal commands") 848 self.assertEqual(help, [ 849 ' authinfo user Name|pass Password|generic <prog> <args>', 850 ' date', 851 ' help', 852 'Report problems to <root@example.org>', 853 ]) 854 855 def test_list(self): 856 resp, groups = self.server.list() 857 self.assertEqual(len(groups), 6) 858 g = groups[1] 859 self.assertEqual(g, 860 GroupInfo("comp.lang.python.announce", "0000001153", 861 "0000000993", "m")) 862 resp, groups = self.server.list("*distutils*") 863 self.assertEqual(len(groups), 2) 864 g = groups[0] 865 self.assertEqual(g, 866 GroupInfo("gmane.comp.python.distutils.devel", "0000014104", 867 "0000000001", "m")) 868 869 def test_stat(self): 870 resp, art_num, message_id = self.server.stat(3000234) 871 self.assertEqual(resp, "223 3000234 <45223423@example.com>") 872 self.assertEqual(art_num, 3000234) 873 self.assertEqual(message_id, "<45223423@example.com>") 874 resp, art_num, message_id = self.server.stat("<45223423@example.com>") 875 self.assertEqual(resp, "223 0 <45223423@example.com>") 876 self.assertEqual(art_num, 0) 877 self.assertEqual(message_id, "<45223423@example.com>") 878 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 879 self.server.stat("<non.existent.id>") 880 self.assertEqual(cm.exception.response, "430 No Such Article Found") 881 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 882 self.server.stat() 883 self.assertEqual(cm.exception.response, "412 No newsgroup selected") 884 885 def test_next(self): 886 resp, art_num, message_id = self.server.next() 887 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") 888 self.assertEqual(art_num, 3000237) 889 self.assertEqual(message_id, "<668929@example.org>") 890 891 def test_last(self): 892 resp, art_num, message_id = self.server.last() 893 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") 894 self.assertEqual(art_num, 3000234) 895 self.assertEqual(message_id, "<45223423@example.com>") 896 897 def test_description(self): 898 desc = self.server.description("comp.lang.python") 899 self.assertEqual(desc, "The Python computer language.") 900 desc = self.server.description("comp.lang.pythonx") 901 self.assertEqual(desc, "") 902 903 def test_descriptions(self): 904 resp, groups = self.server.descriptions("comp.lang.python") 905 self.assertEqual(resp, '215 Descriptions in form "group description".') 906 self.assertEqual(groups, { 907 "comp.lang.python": "The Python computer language.", 908 }) 909 resp, groups = self.server.descriptions("comp.lang.python*") 910 self.assertEqual(groups, { 911 "comp.lang.python": "The Python computer language.", 912 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", 913 }) 914 resp, groups = self.server.descriptions("comp.lang.pythonx") 915 self.assertEqual(groups, {}) 916 917 def test_group(self): 918 resp, count, first, last, group = self.server.group("fr.comp.lang.python") 919 self.assertTrue(resp.startswith("211 "), resp) 920 self.assertEqual(first, 761) 921 self.assertEqual(last, 1265) 922 self.assertEqual(count, 486) 923 self.assertEqual(group, "fr.comp.lang.python") 924 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 925 self.server.group("comp.lang.python.devel") 926 exc = cm.exception 927 self.assertTrue(exc.response.startswith("411 No such group"), 928 exc.response) 929 930 def test_newnews(self): 931 # NEWNEWS comp.lang.python [20]100913 082004 932 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 933 resp, ids = self.server.newnews("comp.lang.python", dt) 934 expected = ( 935 "230 list of newsarticles (NNTP v{0}) " 936 "created after Mon Sep 13 08:20:04 2010 follows" 937 ).format(self.nntp_version) 938 self.assertEqual(resp, expected) 939 self.assertEqual(ids, [ 940 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>", 941 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>", 942 ]) 943 # NEWNEWS fr.comp.lang.python [20]100913 082004 944 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 945 resp, ids = self.server.newnews("fr.comp.lang.python", dt) 946 self.assertEqual(resp, "230 An empty list of newsarticles follows") 947 self.assertEqual(ids, []) 948 949 def _check_article_body(self, lines): 950 self.assertEqual(len(lines), 4) 951 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") 952 self.assertEqual(lines[-2], b"") 953 self.assertEqual(lines[-3], b".Here is a dot-starting line.") 954 self.assertEqual(lines[-4], b"This is just a test article.") 955 956 def _check_article_head(self, lines): 957 self.assertEqual(len(lines), 4) 958 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>') 959 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>") 960 961 def _check_article_data(self, lines): 962 self.assertEqual(len(lines), 9) 963 self._check_article_head(lines[:4]) 964 self._check_article_body(lines[-4:]) 965 self.assertEqual(lines[4], b"") 966 967 def test_article(self): 968 # ARTICLE 969 resp, info = self.server.article() 970 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 971 art_num, message_id, lines = info 972 self.assertEqual(art_num, 3000237) 973 self.assertEqual(message_id, "<45223423@example.com>") 974 self._check_article_data(lines) 975 # ARTICLE num 976 resp, info = self.server.article(3000234) 977 self.assertEqual(resp, "220 3000234 <45223423@example.com>") 978 art_num, message_id, lines = info 979 self.assertEqual(art_num, 3000234) 980 self.assertEqual(message_id, "<45223423@example.com>") 981 self._check_article_data(lines) 982 # ARTICLE id 983 resp, info = self.server.article("<45223423@example.com>") 984 self.assertEqual(resp, "220 0 <45223423@example.com>") 985 art_num, message_id, lines = info 986 self.assertEqual(art_num, 0) 987 self.assertEqual(message_id, "<45223423@example.com>") 988 self._check_article_data(lines) 989 # Non-existent id 990 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 991 self.server.article("<non-existent@example.com>") 992 self.assertEqual(cm.exception.response, "430 No Such Article Found") 993 994 def test_article_file(self): 995 # With a "file" argument 996 f = io.BytesIO() 997 resp, info = self.server.article(file=f) 998 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 999 art_num, message_id, lines = info 1000 self.assertEqual(art_num, 3000237) 1001 self.assertEqual(message_id, "<45223423@example.com>") 1002 self.assertEqual(lines, []) 1003 data = f.getvalue() 1004 self.assertTrue(data.startswith( 1005 b'From: "Demo User" <nobody@example.net>\r\n' 1006 b'Subject: I am just a test article\r\n' 1007 ), ascii(data)) 1008 self.assertTrue(data.endswith( 1009 b'This is just a test article.\r\n' 1010 b'.Here is a dot-starting line.\r\n' 1011 b'\r\n' 1012 b'-- Signed by Andr\xc3\xa9.\r\n' 1013 ), ascii(data)) 1014 1015 def test_head(self): 1016 # HEAD 1017 resp, info = self.server.head() 1018 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1019 art_num, message_id, lines = info 1020 self.assertEqual(art_num, 3000237) 1021 self.assertEqual(message_id, "<45223423@example.com>") 1022 self._check_article_head(lines) 1023 # HEAD num 1024 resp, info = self.server.head(3000234) 1025 self.assertEqual(resp, "221 3000234 <45223423@example.com>") 1026 art_num, message_id, lines = info 1027 self.assertEqual(art_num, 3000234) 1028 self.assertEqual(message_id, "<45223423@example.com>") 1029 self._check_article_head(lines) 1030 # HEAD id 1031 resp, info = self.server.head("<45223423@example.com>") 1032 self.assertEqual(resp, "221 0 <45223423@example.com>") 1033 art_num, message_id, lines = info 1034 self.assertEqual(art_num, 0) 1035 self.assertEqual(message_id, "<45223423@example.com>") 1036 self._check_article_head(lines) 1037 # Non-existent id 1038 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1039 self.server.head("<non-existent@example.com>") 1040 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1041 1042 def test_head_file(self): 1043 f = io.BytesIO() 1044 resp, info = self.server.head(file=f) 1045 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1046 art_num, message_id, lines = info 1047 self.assertEqual(art_num, 3000237) 1048 self.assertEqual(message_id, "<45223423@example.com>") 1049 self.assertEqual(lines, []) 1050 data = f.getvalue() 1051 self.assertTrue(data.startswith( 1052 b'From: "Demo User" <nobody@example.net>\r\n' 1053 b'Subject: I am just a test article\r\n' 1054 ), ascii(data)) 1055 self.assertFalse(data.endswith( 1056 b'This is just a test article.\r\n' 1057 b'.Here is a dot-starting line.\r\n' 1058 b'\r\n' 1059 b'-- Signed by Andr\xc3\xa9.\r\n' 1060 ), ascii(data)) 1061 1062 def test_body(self): 1063 # BODY 1064 resp, info = self.server.body() 1065 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1066 art_num, message_id, lines = info 1067 self.assertEqual(art_num, 3000237) 1068 self.assertEqual(message_id, "<45223423@example.com>") 1069 self._check_article_body(lines) 1070 # BODY num 1071 resp, info = self.server.body(3000234) 1072 self.assertEqual(resp, "222 3000234 <45223423@example.com>") 1073 art_num, message_id, lines = info 1074 self.assertEqual(art_num, 3000234) 1075 self.assertEqual(message_id, "<45223423@example.com>") 1076 self._check_article_body(lines) 1077 # BODY id 1078 resp, info = self.server.body("<45223423@example.com>") 1079 self.assertEqual(resp, "222 0 <45223423@example.com>") 1080 art_num, message_id, lines = info 1081 self.assertEqual(art_num, 0) 1082 self.assertEqual(message_id, "<45223423@example.com>") 1083 self._check_article_body(lines) 1084 # Non-existent id 1085 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1086 self.server.body("<non-existent@example.com>") 1087 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1088 1089 def test_body_file(self): 1090 f = io.BytesIO() 1091 resp, info = self.server.body(file=f) 1092 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1093 art_num, message_id, lines = info 1094 self.assertEqual(art_num, 3000237) 1095 self.assertEqual(message_id, "<45223423@example.com>") 1096 self.assertEqual(lines, []) 1097 data = f.getvalue() 1098 self.assertFalse(data.startswith( 1099 b'From: "Demo User" <nobody@example.net>\r\n' 1100 b'Subject: I am just a test article\r\n' 1101 ), ascii(data)) 1102 self.assertTrue(data.endswith( 1103 b'This is just a test article.\r\n' 1104 b'.Here is a dot-starting line.\r\n' 1105 b'\r\n' 1106 b'-- Signed by Andr\xc3\xa9.\r\n' 1107 ), ascii(data)) 1108 1109 def check_over_xover_resp(self, resp, overviews): 1110 self.assertTrue(resp.startswith("224 "), resp) 1111 self.assertEqual(len(overviews), 3) 1112 art_num, over = overviews[0] 1113 self.assertEqual(art_num, 57) 1114 self.assertEqual(over, { 1115 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>", 1116 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", 1117 "date": "Sat, 19 Jun 2010 18:04:08 -0400", 1118 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", 1119 "references": "<hvalf7$ort$1@dough.gmane.org>", 1120 ":bytes": "7103", 1121 ":lines": "16", 1122 "xref": "news.gmane.io gmane.comp.python.authors:57" 1123 }) 1124 art_num, over = overviews[1] 1125 self.assertEqual(over["xref"], None) 1126 art_num, over = overviews[2] 1127 self.assertEqual(over["subject"], 1128 "Re: Message d'erreur incompréhensible (par moi)") 1129 1130 def test_xover(self): 1131 resp, overviews = self.server.xover(57, 59) 1132 self.check_over_xover_resp(resp, overviews) 1133 1134 def test_over(self): 1135 # In NNTP "v1", this will fallback on XOVER 1136 resp, overviews = self.server.over((57, 59)) 1137 self.check_over_xover_resp(resp, overviews) 1138 1139 sample_post = ( 1140 b'From: "Demo User" <nobody@example.net>\r\n' 1141 b'Subject: I am just a test article\r\n' 1142 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' 1143 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n' 1144 b'\r\n' 1145 b'This is just a test article.\r\n' 1146 b'.Here is a dot-starting line.\r\n' 1147 b'\r\n' 1148 b'-- Signed by Andr\xc3\xa9.\r\n' 1149 ) 1150 1151 def _check_posted_body(self): 1152 # Check the raw body as received by the server 1153 lines = self.handler.posted_body 1154 # One additional line for the "." terminator 1155 self.assertEqual(len(lines), 10) 1156 self.assertEqual(lines[-1], b'.\r\n') 1157 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') 1158 self.assertEqual(lines[-3], b'\r\n') 1159 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') 1160 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n') 1161 1162 def _check_post_ihave_sub(self, func, *args, file_factory): 1163 # First the prepared post with CRLF endings 1164 post = self.sample_post 1165 func_args = args + (file_factory(post),) 1166 self.handler.posted_body = None 1167 resp = func(*func_args) 1168 self._check_posted_body() 1169 # Then the same post with "normal" line endings - they should be 1170 # converted by NNTP.post and NNTP.ihave. 1171 post = self.sample_post.replace(b"\r\n", b"\n") 1172 func_args = args + (file_factory(post),) 1173 self.handler.posted_body = None 1174 resp = func(*func_args) 1175 self._check_posted_body() 1176 return resp 1177 1178 def check_post_ihave(self, func, success_resp, *args): 1179 # With a bytes object 1180 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) 1181 self.assertEqual(resp, success_resp) 1182 # With a bytearray object 1183 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) 1184 self.assertEqual(resp, success_resp) 1185 # With a file object 1186 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) 1187 self.assertEqual(resp, success_resp) 1188 # With an iterable of terminated lines 1189 def iterlines(b): 1190 return iter(b.splitlines(keepends=True)) 1191 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1192 self.assertEqual(resp, success_resp) 1193 # With an iterable of non-terminated lines 1194 def iterlines(b): 1195 return iter(b.splitlines(keepends=False)) 1196 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1197 self.assertEqual(resp, success_resp) 1198 1199 def test_post(self): 1200 self.check_post_ihave(self.server.post, "240 Article received OK") 1201 self.handler.allow_posting = False 1202 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1203 self.server.post(self.sample_post) 1204 self.assertEqual(cm.exception.response, 1205 "440 Posting not permitted") 1206 1207 def test_ihave(self): 1208 self.check_post_ihave(self.server.ihave, "235 Article transferred OK", 1209 "<i.am.an.article.you.will.want@example.com>") 1210 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1211 self.server.ihave("<another.message.id>", self.sample_post) 1212 self.assertEqual(cm.exception.response, 1213 "435 Article not wanted") 1214 1215 def test_too_long_lines(self): 1216 dt = datetime.datetime(2010, 1, 1, 9, 0, 0) 1217 self.assertRaises(nntplib.NNTPDataError, 1218 self.server.newnews, "comp.lang.python", dt) 1219 1220 1221class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1222 """Tests an NNTP v1 server (no capabilities).""" 1223 1224 nntp_version = 1 1225 handler_class = NNTPv1Handler 1226 1227 def test_caps(self): 1228 caps = self.server.getcapabilities() 1229 self.assertEqual(caps, {}) 1230 self.assertEqual(self.server.nntp_version, 1) 1231 self.assertEqual(self.server.nntp_implementation, None) 1232 1233 1234class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1235 """Tests an NNTP v2 server (with capabilities).""" 1236 1237 nntp_version = 2 1238 handler_class = NNTPv2Handler 1239 1240 def test_caps(self): 1241 caps = self.server.getcapabilities() 1242 self.assertEqual(caps, { 1243 'VERSION': ['2', '3'], 1244 'IMPLEMENTATION': ['INN', '2.5.1'], 1245 'AUTHINFO': ['USER'], 1246 'HDR': [], 1247 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', 1248 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], 1249 'OVER': [], 1250 'POST': [], 1251 'READER': [], 1252 }) 1253 self.assertEqual(self.server.nntp_version, 3) 1254 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1') 1255 1256 1257class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase): 1258 """Tests a probably NNTP v2 server with capabilities only after login.""" 1259 1260 nntp_version = 2 1261 handler_class = CapsAfterLoginNNTPv2Handler 1262 1263 def test_caps_only_after_login(self): 1264 self.assertEqual(self.server._caps, {}) 1265 self.server.login('testuser', 'testpw') 1266 self.assertIn('VERSION', self.server._caps) 1267 1268 1269class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin, 1270 unittest.TestCase): 1271 """Same tests as for v2 but we tell NTTP to send MODE READER to a server 1272 that isn't in READER mode by default.""" 1273 1274 nntp_version = 2 1275 handler_class = ModeSwitchingNNTPv2Handler 1276 1277 def test_we_are_in_reader_mode_after_connect(self): 1278 self.assertIn('READER', self.server._caps) 1279 1280 1281class MiscTests(unittest.TestCase): 1282 1283 def test_decode_header(self): 1284 def gives(a, b): 1285 self.assertEqual(nntplib.decode_header(a), b) 1286 gives("" , "") 1287 gives("a plain header", "a plain header") 1288 gives(" with extra spaces ", " with extra spaces ") 1289 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") 1290 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" 1291 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", 1292 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") 1293 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", 1294 "Re: problème de matrice") 1295 # A natively utf-8 header (found in the real world!) 1296 gives("Re: Message d'erreur incompréhensible (par moi)", 1297 "Re: Message d'erreur incompréhensible (par moi)") 1298 1299 def test_parse_overview_fmt(self): 1300 # The minimal (default) response 1301 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1302 "References:", ":bytes", ":lines"] 1303 self.assertEqual(nntplib._parse_overview_fmt(lines), 1304 ["subject", "from", "date", "message-id", "references", 1305 ":bytes", ":lines"]) 1306 # The minimal response using alternative names 1307 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1308 "References:", "Bytes:", "Lines:"] 1309 self.assertEqual(nntplib._parse_overview_fmt(lines), 1310 ["subject", "from", "date", "message-id", "references", 1311 ":bytes", ":lines"]) 1312 # Variations in casing 1313 lines = ["subject:", "FROM:", "DaTe:", "message-ID:", 1314 "References:", "BYTES:", "Lines:"] 1315 self.assertEqual(nntplib._parse_overview_fmt(lines), 1316 ["subject", "from", "date", "message-id", "references", 1317 ":bytes", ":lines"]) 1318 # First example from RFC 3977 1319 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1320 "References:", ":bytes", ":lines", "Xref:full", 1321 "Distribution:full"] 1322 self.assertEqual(nntplib._parse_overview_fmt(lines), 1323 ["subject", "from", "date", "message-id", "references", 1324 ":bytes", ":lines", "xref", "distribution"]) 1325 # Second example from RFC 3977 1326 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1327 "References:", "Bytes:", "Lines:", "Xref:FULL", 1328 "Distribution:FULL"] 1329 self.assertEqual(nntplib._parse_overview_fmt(lines), 1330 ["subject", "from", "date", "message-id", "references", 1331 ":bytes", ":lines", "xref", "distribution"]) 1332 # A classic response from INN 1333 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1334 "References:", "Bytes:", "Lines:", "Xref:full"] 1335 self.assertEqual(nntplib._parse_overview_fmt(lines), 1336 ["subject", "from", "date", "message-id", "references", 1337 ":bytes", ":lines", "xref"]) 1338 1339 def test_parse_overview(self): 1340 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] 1341 # First example from RFC 3977 1342 lines = [ 1343 '3000234\tI am just a test article\t"Demo User" ' 1344 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1345 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1346 '17\tXref: news.example.com misc.test:3000363', 1347 ] 1348 overview = nntplib._parse_overview(lines, fmt) 1349 (art_num, fields), = overview 1350 self.assertEqual(art_num, 3000234) 1351 self.assertEqual(fields, { 1352 'subject': 'I am just a test article', 1353 'from': '"Demo User" <nobody@example.com>', 1354 'date': '6 Oct 1998 04:38:40 -0500', 1355 'message-id': '<45223423@example.com>', 1356 'references': '<45454@example.net>', 1357 ':bytes': '1234', 1358 ':lines': '17', 1359 'xref': 'news.example.com misc.test:3000363', 1360 }) 1361 # Second example; here the "Xref" field is totally absent (including 1362 # the header name) and comes out as None 1363 lines = [ 1364 '3000234\tI am just a test article\t"Demo User" ' 1365 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1366 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1367 '17\t\t', 1368 ] 1369 overview = nntplib._parse_overview(lines, fmt) 1370 (art_num, fields), = overview 1371 self.assertEqual(fields['xref'], None) 1372 # Third example; the "Xref" is an empty string, while "references" 1373 # is a single space. 1374 lines = [ 1375 '3000234\tI am just a test article\t"Demo User" ' 1376 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1377 '<45223423@example.com>\t \t1234\t' 1378 '17\tXref: \t', 1379 ] 1380 overview = nntplib._parse_overview(lines, fmt) 1381 (art_num, fields), = overview 1382 self.assertEqual(fields['references'], ' ') 1383 self.assertEqual(fields['xref'], '') 1384 1385 def test_parse_datetime(self): 1386 def gives(a, b, *c): 1387 self.assertEqual(nntplib._parse_datetime(a, b), 1388 datetime.datetime(*c)) 1389 # Output of DATE command 1390 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) 1391 # Variations 1392 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) 1393 gives("990623", "135624", 1999, 6, 23, 13, 56, 24) 1394 gives("090623", "135624", 2009, 6, 23, 13, 56, 24) 1395 1396 def test_unparse_datetime(self): 1397 # Test non-legacy mode 1398 # 1) with a datetime 1399 def gives(y, M, d, h, m, s, date_str, time_str): 1400 dt = datetime.datetime(y, M, d, h, m, s) 1401 self.assertEqual(nntplib._unparse_datetime(dt), 1402 (date_str, time_str)) 1403 self.assertEqual(nntplib._unparse_datetime(dt, False), 1404 (date_str, time_str)) 1405 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") 1406 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") 1407 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") 1408 # 2) with a date 1409 def gives(y, M, d, date_str, time_str): 1410 dt = datetime.date(y, M, d) 1411 self.assertEqual(nntplib._unparse_datetime(dt), 1412 (date_str, time_str)) 1413 self.assertEqual(nntplib._unparse_datetime(dt, False), 1414 (date_str, time_str)) 1415 gives(1999, 6, 23, "19990623", "000000") 1416 gives(2000, 6, 23, "20000623", "000000") 1417 gives(2010, 6, 5, "20100605", "000000") 1418 1419 def test_unparse_datetime_legacy(self): 1420 # Test legacy mode (RFC 977) 1421 # 1) with a datetime 1422 def gives(y, M, d, h, m, s, date_str, time_str): 1423 dt = datetime.datetime(y, M, d, h, m, s) 1424 self.assertEqual(nntplib._unparse_datetime(dt, True), 1425 (date_str, time_str)) 1426 gives(1999, 6, 23, 13, 56, 24, "990623", "135624") 1427 gives(2000, 6, 23, 13, 56, 24, "000623", "135624") 1428 gives(2010, 6, 5, 1, 2, 3, "100605", "010203") 1429 # 2) with a date 1430 def gives(y, M, d, date_str, time_str): 1431 dt = datetime.date(y, M, d) 1432 self.assertEqual(nntplib._unparse_datetime(dt, True), 1433 (date_str, time_str)) 1434 gives(1999, 6, 23, "990623", "000000") 1435 gives(2000, 6, 23, "000623", "000000") 1436 gives(2010, 6, 5, "100605", "000000") 1437 1438 @unittest.skipUnless(ssl, 'requires SSL support') 1439 def test_ssl_support(self): 1440 self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) 1441 1442 1443class PublicAPITests(unittest.TestCase): 1444 """Ensures that the correct values are exposed in the public API.""" 1445 1446 def test_module_all_attribute(self): 1447 self.assertTrue(hasattr(nntplib, '__all__')) 1448 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError', 1449 'NNTPTemporaryError', 'NNTPPermanentError', 1450 'NNTPProtocolError', 'NNTPDataError', 'decode_header'] 1451 if ssl is not None: 1452 target_api.append('NNTP_SSL') 1453 self.assertEqual(set(nntplib.__all__), set(target_api)) 1454 1455class MockSocketTests(unittest.TestCase): 1456 """Tests involving a mock socket object 1457 1458 Used where the _NNTPServerIO file object is not enough.""" 1459 1460 nntp_class = nntplib.NNTP 1461 1462 def check_constructor_error_conditions( 1463 self, handler_class, 1464 expected_error_type, expected_error_msg, 1465 login=None, password=None): 1466 1467 class mock_socket_module: 1468 def create_connection(address, timeout): 1469 return MockSocket() 1470 1471 class MockSocket: 1472 def close(self): 1473 nonlocal socket_closed 1474 socket_closed = True 1475 1476 def makefile(socket, mode): 1477 handler = handler_class() 1478 _, file = make_mock_file(handler) 1479 files.append(file) 1480 return file 1481 1482 socket_closed = False 1483 files = [] 1484 with patch('nntplib.socket', mock_socket_module), \ 1485 self.assertRaisesRegex(expected_error_type, expected_error_msg): 1486 self.nntp_class('dummy', user=login, password=password) 1487 self.assertTrue(socket_closed) 1488 for f in files: 1489 self.assertTrue(f.closed) 1490 1491 def test_bad_welcome(self): 1492 #Test a bad welcome message 1493 class Handler(NNTPv1Handler): 1494 welcome = 'Bad Welcome' 1495 self.check_constructor_error_conditions( 1496 Handler, nntplib.NNTPProtocolError, Handler.welcome) 1497 1498 def test_service_temporarily_unavailable(self): 1499 #Test service temporarily unavailable 1500 class Handler(NNTPv1Handler): 1501 welcome = '400 Service temporarily unavailable' 1502 self.check_constructor_error_conditions( 1503 Handler, nntplib.NNTPTemporaryError, Handler.welcome) 1504 1505 def test_service_permanently_unavailable(self): 1506 #Test service permanently unavailable 1507 class Handler(NNTPv1Handler): 1508 welcome = '502 Service permanently unavailable' 1509 self.check_constructor_error_conditions( 1510 Handler, nntplib.NNTPPermanentError, Handler.welcome) 1511 1512 def test_bad_capabilities(self): 1513 #Test a bad capabilities response 1514 class Handler(NNTPv1Handler): 1515 def handle_CAPABILITIES(self): 1516 self.push_lit(capabilities_response) 1517 capabilities_response = '201 bad capability' 1518 self.check_constructor_error_conditions( 1519 Handler, nntplib.NNTPReplyError, capabilities_response) 1520 1521 def test_login_aborted(self): 1522 #Test a bad authinfo response 1523 login = 't@e.com' 1524 password = 'python' 1525 class Handler(NNTPv1Handler): 1526 def handle_AUTHINFO(self, *args): 1527 self.push_lit(authinfo_response) 1528 authinfo_response = '503 Mechanism not recognized' 1529 self.check_constructor_error_conditions( 1530 Handler, nntplib.NNTPPermanentError, authinfo_response, 1531 login, password) 1532 1533class bypass_context: 1534 """Bypass encryption and actual SSL module""" 1535 def wrap_socket(sock, **args): 1536 return sock 1537 1538@unittest.skipUnless(ssl, 'requires SSL support') 1539class MockSslTests(MockSocketTests): 1540 @staticmethod 1541 def nntp_class(*pos, **kw): 1542 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) 1543 1544 1545class LocalServerTests(unittest.TestCase): 1546 def setUp(self): 1547 sock = socket.socket() 1548 port = support.bind_port(sock) 1549 sock.listen() 1550 self.background = threading.Thread( 1551 target=self.run_server, args=(sock,)) 1552 self.background.start() 1553 self.addCleanup(self.background.join) 1554 1555 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() 1556 self.addCleanup(self.nntp.__exit__, None, None, None) 1557 1558 def run_server(self, sock): 1559 # Could be generalized to handle more commands in separate methods 1560 with sock: 1561 [client, _] = sock.accept() 1562 with contextlib.ExitStack() as cleanup: 1563 cleanup.enter_context(client) 1564 reader = cleanup.enter_context(client.makefile('rb')) 1565 client.sendall(b'200 Server ready\r\n') 1566 while True: 1567 cmd = reader.readline() 1568 if cmd == b'CAPABILITIES\r\n': 1569 client.sendall( 1570 b'101 Capability list:\r\n' 1571 b'VERSION 2\r\n' 1572 b'STARTTLS\r\n' 1573 b'.\r\n' 1574 ) 1575 elif cmd == b'STARTTLS\r\n': 1576 reader.close() 1577 client.sendall(b'382 Begin TLS negotiation now\r\n') 1578 context = ssl.SSLContext() 1579 context.load_cert_chain(certfile) 1580 client = context.wrap_socket( 1581 client, server_side=True) 1582 cleanup.enter_context(client) 1583 reader = cleanup.enter_context(client.makefile('rb')) 1584 elif cmd == b'QUIT\r\n': 1585 client.sendall(b'205 Bye!\r\n') 1586 break 1587 else: 1588 raise ValueError('Unexpected command {!r}'.format(cmd)) 1589 1590 @unittest.skipUnless(ssl, 'requires SSL support') 1591 def test_starttls(self): 1592 file = self.nntp.file 1593 sock = self.nntp.sock 1594 self.nntp.starttls() 1595 # Check that the socket and internal pseudo-file really were 1596 # changed. 1597 self.assertNotEqual(file, self.nntp.file) 1598 self.assertNotEqual(sock, self.nntp.sock) 1599 # Check that the new socket really is an SSL one 1600 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) 1601 # Check that trying starttls when it's already active fails. 1602 self.assertRaises(ValueError, self.nntp.starttls) 1603 1604 1605if __name__ == "__main__": 1606 unittest.main() 1607