1import io 2import socket 3import datetime 4import textwrap 5import unittest 6import functools 7import contextlib 8import nntplib 9import os.path 10import re 11import threading 12 13from test import support 14from test.support import socket_helper 15from nntplib import NNTP, GroupInfo 16from unittest.mock import patch 17try: 18 import ssl 19except ImportError: 20 ssl = None 21 22 23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') 24 25if ssl is not None: 26 SSLError = ssl.SSLError 27else: 28 class SSLError(Exception): 29 """Non-existent exception class when we lack SSL support.""" 30 reason = "This will never be raised." 31 32# TODO: 33# - test the `file` arg to more commands 34# - test error conditions 35# - test auth and `usenetrc` 36 37 38class NetworkedNNTPTestsMixin: 39 40 def test_welcome(self): 41 welcome = self.server.getwelcome() 42 self.assertEqual(str, type(welcome)) 43 44 def test_help(self): 45 resp, lines = self.server.help() 46 self.assertTrue(resp.startswith("100 "), resp) 47 for line in lines: 48 self.assertEqual(str, type(line)) 49 50 def test_list(self): 51 resp, groups = self.server.list() 52 if len(groups) > 0: 53 self.assertEqual(GroupInfo, type(groups[0])) 54 self.assertEqual(str, type(groups[0].group)) 55 56 def test_list_active(self): 57 resp, groups = self.server.list(self.GROUP_PAT) 58 if len(groups) > 0: 59 self.assertEqual(GroupInfo, type(groups[0])) 60 self.assertEqual(str, type(groups[0].group)) 61 62 def test_unknown_command(self): 63 with self.assertRaises(nntplib.NNTPPermanentError) as cm: 64 self.server._shortcmd("XYZZY") 65 resp = cm.exception.response 66 self.assertTrue(resp.startswith("500 "), resp) 67 68 def test_newgroups(self): 69 # gmane gets a constant influx of new groups. In order not to stress 70 # the server too much, we choose a recent date in the past. 71 dt = datetime.date.today() - datetime.timedelta(days=7) 72 resp, groups = self.server.newgroups(dt) 73 if len(groups) > 0: 74 self.assertIsInstance(groups[0], GroupInfo) 75 self.assertIsInstance(groups[0].group, str) 76 77 def test_description(self): 78 def _check_desc(desc): 79 # Sanity checks 80 self.assertIsInstance(desc, str) 81 self.assertNotIn(self.GROUP_NAME, desc) 82 desc = self.server.description(self.GROUP_NAME) 83 _check_desc(desc) 84 # Another sanity check 85 self.assertIn(self.DESC, desc) 86 # With a pattern 87 desc = self.server.description(self.GROUP_PAT) 88 _check_desc(desc) 89 # Shouldn't exist 90 desc = self.server.description("zk.brrtt.baz") 91 self.assertEqual(desc, '') 92 93 def test_descriptions(self): 94 resp, descs = self.server.descriptions(self.GROUP_PAT) 95 # 215 for LIST NEWSGROUPS, 282 for XGTITLE 96 self.assertTrue( 97 resp.startswith("215 ") or resp.startswith("282 "), resp) 98 self.assertIsInstance(descs, dict) 99 desc = descs[self.GROUP_NAME] 100 self.assertEqual(desc, self.server.description(self.GROUP_NAME)) 101 102 def test_group(self): 103 result = self.server.group(self.GROUP_NAME) 104 self.assertEqual(5, len(result)) 105 resp, count, first, last, group = result 106 self.assertEqual(group, self.GROUP_NAME) 107 self.assertIsInstance(count, int) 108 self.assertIsInstance(first, int) 109 self.assertIsInstance(last, int) 110 self.assertLessEqual(first, last) 111 self.assertTrue(resp.startswith("211 "), resp) 112 113 def test_date(self): 114 resp, date = self.server.date() 115 self.assertIsInstance(date, datetime.datetime) 116 # Sanity check 117 self.assertGreaterEqual(date.year, 1995) 118 self.assertLessEqual(date.year, 2030) 119 120 def _check_art_dict(self, art_dict): 121 # Some sanity checks for a field dictionary returned by OVER / XOVER 122 self.assertIsInstance(art_dict, dict) 123 # NNTP has 7 mandatory fields 124 self.assertGreaterEqual(art_dict.keys(), 125 {"subject", "from", "date", "message-id", 126 "references", ":bytes", ":lines"} 127 ) 128 for v in art_dict.values(): 129 self.assertIsInstance(v, (str, type(None))) 130 131 def test_xover(self): 132 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 133 resp, lines = self.server.xover(last - 5, last) 134 if len(lines) == 0: 135 self.skipTest("no articles retrieved") 136 # The 'last' article is not necessarily part of the output (cancelled?) 137 art_num, art_dict = lines[0] 138 self.assertGreaterEqual(art_num, last - 5) 139 self.assertLessEqual(art_num, last) 140 self._check_art_dict(art_dict) 141 142 @unittest.skipIf(True, 'temporarily skipped until a permanent solution' 143 ' is found for issue #28971') 144 def test_over(self): 145 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 146 start = last - 10 147 # The "start-" article range form 148 resp, lines = self.server.over((start, None)) 149 art_num, art_dict = lines[0] 150 self._check_art_dict(art_dict) 151 # The "start-end" article range form 152 resp, lines = self.server.over((start, last)) 153 art_num, art_dict = lines[-1] 154 # The 'last' article is not necessarily part of the output (cancelled?) 155 self.assertGreaterEqual(art_num, start) 156 self.assertLessEqual(art_num, last) 157 self._check_art_dict(art_dict) 158 # XXX The "message_id" form is unsupported by gmane 159 # 503 Overview by message-ID unsupported 160 161 def test_xhdr(self): 162 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 163 resp, lines = self.server.xhdr('subject', last) 164 for line in lines: 165 self.assertEqual(str, type(line[1])) 166 167 def check_article_resp(self, resp, article, art_num=None): 168 self.assertIsInstance(article, nntplib.ArticleInfo) 169 if art_num is not None: 170 self.assertEqual(article.number, art_num) 171 for line in article.lines: 172 self.assertIsInstance(line, bytes) 173 # XXX this could exceptionally happen... 174 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) 175 176 @unittest.skipIf(True, "FIXME: see bpo-32128") 177 def test_article_head_body(self): 178 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 179 # Try to find an available article 180 for art_num in (last, first, last - 1): 181 try: 182 resp, head = self.server.head(art_num) 183 except nntplib.NNTPTemporaryError as e: 184 if not e.response.startswith("423 "): 185 raise 186 # "423 No such article" => choose another one 187 continue 188 break 189 else: 190 self.skipTest("could not find a suitable article number") 191 self.assertTrue(resp.startswith("221 "), resp) 192 self.check_article_resp(resp, head, art_num) 193 resp, body = self.server.body(art_num) 194 self.assertTrue(resp.startswith("222 "), resp) 195 self.check_article_resp(resp, body, art_num) 196 resp, article = self.server.article(art_num) 197 self.assertTrue(resp.startswith("220 "), resp) 198 self.check_article_resp(resp, article, art_num) 199 # Tolerate running the tests from behind a NNTP virus checker 200 blacklist = lambda line: line.startswith(b'X-Antivirus') 201 filtered_head_lines = [line for line in head.lines 202 if not blacklist(line)] 203 filtered_lines = [line for line in article.lines 204 if not blacklist(line)] 205 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) 206 207 def test_capabilities(self): 208 # The server under test implements NNTP version 2 and has a 209 # couple of well-known capabilities. Just sanity check that we 210 # got them. 211 def _check_caps(caps): 212 caps_list = caps['LIST'] 213 self.assertIsInstance(caps_list, (list, tuple)) 214 self.assertIn('OVERVIEW.FMT', caps_list) 215 self.assertGreaterEqual(self.server.nntp_version, 2) 216 _check_caps(self.server.getcapabilities()) 217 # This re-emits the command 218 resp, caps = self.server.capabilities() 219 _check_caps(caps) 220 221 def test_zlogin(self): 222 # This test must be the penultimate because further commands will be 223 # refused. 224 baduser = "notarealuser" 225 badpw = "notarealpassword" 226 # Check that bogus credentials cause failure 227 self.assertRaises(nntplib.NNTPError, self.server.login, 228 user=baduser, password=badpw, usenetrc=False) 229 # FIXME: We should check that correct credentials succeed, but that 230 # would require valid details for some server somewhere to be in the 231 # test suite, I think. Gmane is anonymous, at least as used for the 232 # other tests. 233 234 def test_zzquit(self): 235 # This test must be called last, hence the name 236 cls = type(self) 237 try: 238 self.server.quit() 239 finally: 240 cls.server = None 241 242 @classmethod 243 def wrap_methods(cls): 244 # Wrap all methods in a transient_internet() exception catcher 245 # XXX put a generic version in test.support? 246 def wrap_meth(meth): 247 @functools.wraps(meth) 248 def wrapped(self): 249 with socket_helper.transient_internet(self.NNTP_HOST): 250 meth(self) 251 return wrapped 252 for name in dir(cls): 253 if not name.startswith('test_'): 254 continue 255 meth = getattr(cls, name) 256 if not callable(meth): 257 continue 258 # Need to use a closure so that meth remains bound to its current 259 # value 260 setattr(cls, name, wrap_meth(meth)) 261 262 def test_timeout(self): 263 with self.assertRaises(ValueError): 264 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False) 265 266 def test_with_statement(self): 267 def is_connected(): 268 if not hasattr(server, 'file'): 269 return False 270 try: 271 server.help() 272 except (OSError, EOFError): 273 return False 274 return True 275 276 try: 277 server = self.NNTP_CLASS(self.NNTP_HOST, 278 timeout=support.INTERNET_TIMEOUT, 279 usenetrc=False) 280 with server: 281 self.assertTrue(is_connected()) 282 self.assertTrue(server.help()) 283 self.assertFalse(is_connected()) 284 285 server = self.NNTP_CLASS(self.NNTP_HOST, 286 timeout=support.INTERNET_TIMEOUT, 287 usenetrc=False) 288 with server: 289 server.quit() 290 self.assertFalse(is_connected()) 291 except SSLError as ssl_err: 292 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 293 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 294 raise unittest.SkipTest(f"Got {ssl_err} connecting " 295 f"to {self.NNTP_HOST!r}") 296 raise 297 298 299NetworkedNNTPTestsMixin.wrap_methods() 300 301 302EOF_ERRORS = (EOFError,) 303if ssl is not None: 304 EOF_ERRORS += (ssl.SSLEOFError,) 305 306 307class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): 308 # This server supports STARTTLS (gmane doesn't) 309 NNTP_HOST = 'news.trigofacile.com' 310 GROUP_NAME = 'fr.comp.lang.python' 311 GROUP_PAT = 'fr.comp.lang.*' 312 DESC = 'Python' 313 314 NNTP_CLASS = NNTP 315 316 @classmethod 317 def setUpClass(cls): 318 support.requires("network") 319 with socket_helper.transient_internet(cls.NNTP_HOST): 320 try: 321 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, 322 timeout=support.INTERNET_TIMEOUT, 323 usenetrc=False) 324 except SSLError as ssl_err: 325 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 326 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 327 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting " 328 f"to {cls.NNTP_HOST!r}") 329 raise 330 except EOF_ERRORS: 331 raise unittest.SkipTest(f"{cls} got EOF error on connecting " 332 f"to {cls.NNTP_HOST!r}") 333 334 @classmethod 335 def tearDownClass(cls): 336 if cls.server is not None: 337 cls.server.quit() 338 339@unittest.skipUnless(ssl, 'requires SSL support') 340class NetworkedNNTP_SSLTests(NetworkedNNTPTests): 341 342 # Technical limits for this public NNTP server (see http://www.aioe.org): 343 # "Only two concurrent connections per IP address are allowed and 344 # 400 connections per day are accepted from each IP address." 345 346 NNTP_HOST = 'nntp.aioe.org' 347 # bpo-42794: aioe.test is one of the official groups on this server 348 # used for testing: https://news.aioe.org/manual/aioe-hierarchy/ 349 GROUP_NAME = 'aioe.test' 350 GROUP_PAT = 'aioe.*' 351 DESC = 'test' 352 353 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) 354 355 # Disabled as it produces too much data 356 test_list = None 357 358 # Disabled as the connection will already be encrypted. 359 test_starttls = None 360 361 362# 363# Non-networked tests using a local server (or something mocking it). 364# 365 366class _NNTPServerIO(io.RawIOBase): 367 """A raw IO object allowing NNTP commands to be received and processed 368 by a handler. The handler can push responses which can then be read 369 from the IO object.""" 370 371 def __init__(self, handler): 372 io.RawIOBase.__init__(self) 373 # The channel from the client 374 self.c2s = io.BytesIO() 375 # The channel to the client 376 self.s2c = io.BytesIO() 377 self.handler = handler 378 self.handler.start(self.c2s.readline, self.push_data) 379 380 def readable(self): 381 return True 382 383 def writable(self): 384 return True 385 386 def push_data(self, data): 387 """Push (buffer) some data to send to the client.""" 388 pos = self.s2c.tell() 389 self.s2c.seek(0, 2) 390 self.s2c.write(data) 391 self.s2c.seek(pos) 392 393 def write(self, b): 394 """The client sends us some data""" 395 pos = self.c2s.tell() 396 self.c2s.write(b) 397 self.c2s.seek(pos) 398 self.handler.process_pending() 399 return len(b) 400 401 def readinto(self, buf): 402 """The client wants to read a response""" 403 self.handler.process_pending() 404 b = self.s2c.read(len(buf)) 405 n = len(b) 406 buf[:n] = b 407 return n 408 409 410def make_mock_file(handler): 411 sio = _NNTPServerIO(handler) 412 # Using BufferedRWPair instead of BufferedRandom ensures the file 413 # isn't seekable. 414 file = io.BufferedRWPair(sio, sio) 415 return (sio, file) 416 417 418class NNTPServer(nntplib.NNTP): 419 420 def __init__(self, f, host, readermode=None): 421 self.file = f 422 self.host = host 423 self._base_init(readermode) 424 425 def _close(self): 426 self.file.close() 427 del self.file 428 429 430class MockedNNTPTestsMixin: 431 # Override in derived classes 432 handler_class = None 433 434 def setUp(self): 435 super().setUp() 436 self.make_server() 437 438 def tearDown(self): 439 super().tearDown() 440 del self.server 441 442 def make_server(self, *args, **kwargs): 443 self.handler = self.handler_class() 444 self.sio, file = make_mock_file(self.handler) 445 self.server = NNTPServer(file, 'test.server', *args, **kwargs) 446 return self.server 447 448 449class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): 450 def setUp(self): 451 super().setUp() 452 self.make_server(readermode=True) 453 454 455class NNTPv1Handler: 456 """A handler for RFC 977""" 457 458 welcome = "200 NNTP mock server" 459 460 def start(self, readline, push_data): 461 self.in_body = False 462 self.allow_posting = True 463 self._readline = readline 464 self._push_data = push_data 465 self._logged_in = False 466 self._user_sent = False 467 # Our welcome 468 self.handle_welcome() 469 470 def _decode(self, data): 471 return str(data, "utf-8", "surrogateescape") 472 473 def process_pending(self): 474 if self.in_body: 475 while True: 476 line = self._readline() 477 if not line: 478 return 479 self.body.append(line) 480 if line == b".\r\n": 481 break 482 try: 483 meth, tokens = self.body_callback 484 meth(*tokens, body=self.body) 485 finally: 486 self.body_callback = None 487 self.body = None 488 self.in_body = False 489 while True: 490 line = self._decode(self._readline()) 491 if not line: 492 return 493 if not line.endswith("\r\n"): 494 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) 495 line = line[:-2] 496 cmd, *tokens = line.split() 497 #meth = getattr(self.handler, "handle_" + cmd.upper(), None) 498 meth = getattr(self, "handle_" + cmd.upper(), None) 499 if meth is None: 500 self.handle_unknown() 501 else: 502 try: 503 meth(*tokens) 504 except Exception as e: 505 raise ValueError("command failed: {!r}".format(line)) from e 506 else: 507 if self.in_body: 508 self.body_callback = meth, tokens 509 self.body = [] 510 511 def expect_body(self): 512 """Flag that the client is expected to post a request body""" 513 self.in_body = True 514 515 def push_data(self, data): 516 """Push some binary data""" 517 self._push_data(data) 518 519 def push_lit(self, lit): 520 """Push a string literal""" 521 lit = textwrap.dedent(lit) 522 lit = "\r\n".join(lit.splitlines()) + "\r\n" 523 lit = lit.encode('utf-8') 524 self.push_data(lit) 525 526 def handle_unknown(self): 527 self.push_lit("500 What?") 528 529 def handle_welcome(self): 530 self.push_lit(self.welcome) 531 532 def handle_QUIT(self): 533 self.push_lit("205 Bye!") 534 535 def handle_DATE(self): 536 self.push_lit("111 20100914001155") 537 538 def handle_GROUP(self, group): 539 if group == "fr.comp.lang.python": 540 self.push_lit("211 486 761 1265 fr.comp.lang.python") 541 else: 542 self.push_lit("411 No such group {}".format(group)) 543 544 def handle_HELP(self): 545 self.push_lit("""\ 546 100 Legal commands 547 authinfo user Name|pass Password|generic <prog> <args> 548 date 549 help 550 Report problems to <root@example.org> 551 .""") 552 553 def handle_STAT(self, message_spec=None): 554 if message_spec is None: 555 self.push_lit("412 No newsgroup selected") 556 elif message_spec == "3000234": 557 self.push_lit("223 3000234 <45223423@example.com>") 558 elif message_spec == "<45223423@example.com>": 559 self.push_lit("223 0 <45223423@example.com>") 560 else: 561 self.push_lit("430 No Such Article Found") 562 563 def handle_NEXT(self): 564 self.push_lit("223 3000237 <668929@example.org> retrieved") 565 566 def handle_LAST(self): 567 self.push_lit("223 3000234 <45223423@example.com> retrieved") 568 569 def handle_LIST(self, action=None, param=None): 570 if action is None: 571 self.push_lit("""\ 572 215 Newsgroups in form "group high low flags". 573 comp.lang.python 0000052340 0000002828 y 574 comp.lang.python.announce 0000001153 0000000993 m 575 free.it.comp.lang.python 0000000002 0000000002 y 576 fr.comp.lang.python 0000001254 0000000760 y 577 free.it.comp.lang.python.learner 0000000000 0000000001 y 578 tw.bbs.comp.lang.python 0000000304 0000000304 y 579 .""") 580 elif action == "ACTIVE": 581 if param == "*distutils*": 582 self.push_lit("""\ 583 215 Newsgroups in form "group high low flags" 584 gmane.comp.python.distutils.devel 0000014104 0000000001 m 585 gmane.comp.python.distutils.cvs 0000000000 0000000001 m 586 .""") 587 else: 588 self.push_lit("""\ 589 215 Newsgroups in form "group high low flags" 590 .""") 591 elif action == "OVERVIEW.FMT": 592 self.push_lit("""\ 593 215 Order of fields in overview database. 594 Subject: 595 From: 596 Date: 597 Message-ID: 598 References: 599 Bytes: 600 Lines: 601 Xref:full 602 .""") 603 elif action == "NEWSGROUPS": 604 assert param is not None 605 if param == "comp.lang.python": 606 self.push_lit("""\ 607 215 Descriptions in form "group description". 608 comp.lang.python\tThe Python computer language. 609 .""") 610 elif param == "comp.lang.python*": 611 self.push_lit("""\ 612 215 Descriptions in form "group description". 613 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) 614 comp.lang.python\tThe Python computer language. 615 .""") 616 else: 617 self.push_lit("""\ 618 215 Descriptions in form "group description". 619 .""") 620 else: 621 self.push_lit('501 Unknown LIST keyword') 622 623 def handle_NEWNEWS(self, group, date_str, time_str): 624 # We hard code different return messages depending on passed 625 # argument and date syntax. 626 if (group == "comp.lang.python" and date_str == "20100913" 627 and time_str == "082004"): 628 # Date was passed in RFC 3977 format (NNTP "v2") 629 self.push_lit("""\ 630 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows 631 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 632 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 633 .""") 634 elif (group == "comp.lang.python" and date_str == "100913" 635 and time_str == "082004"): 636 # Date was passed in RFC 977 format (NNTP "v1") 637 self.push_lit("""\ 638 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows 639 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 640 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 641 .""") 642 elif (group == 'comp.lang.python' and 643 date_str in ('20100101', '100101') and 644 time_str == '090000'): 645 self.push_lit('too long line' * 3000 + 646 '\n.') 647 else: 648 self.push_lit("""\ 649 230 An empty list of newsarticles follows 650 .""") 651 # (Note for experiments: many servers disable NEWNEWS. 652 # As of this writing, sicinfo3.epfl.ch doesn't.) 653 654 def handle_XOVER(self, message_spec): 655 if message_spec == "57-59": 656 self.push_lit( 657 "224 Overview information for 57-58 follows\n" 658 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" 659 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 660 "\tSat, 19 Jun 2010 18:04:08 -0400" 661 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" 662 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16" 663 "\tXref: news.gmane.io gmane.comp.python.authors:57" 664 "\n" 665 "58\tLooking for a few good bloggers" 666 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 667 "\tThu, 22 Jul 2010 09:14:14 -0400" 668 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>" 669 "\t\t6683\t16" 670 "\t" 671 "\n" 672 # A UTF-8 overview line from fr.comp.lang.python 673 "59\tRe: Message d'erreur incompréhensible (par moi)" 674 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>" 675 "\tWed, 15 Sep 2010 18:09:15 +0200" 676 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>" 677 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" 678 "\tXref: saria.nerim.net fr.comp.lang.python:1265" 679 "\n" 680 ".\n") 681 else: 682 self.push_lit("""\ 683 224 No articles 684 .""") 685 686 def handle_POST(self, *, body=None): 687 if body is None: 688 if self.allow_posting: 689 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") 690 self.expect_body() 691 else: 692 self.push_lit("440 Posting not permitted") 693 else: 694 assert self.allow_posting 695 self.push_lit("240 Article received OK") 696 self.posted_body = body 697 698 def handle_IHAVE(self, message_id, *, body=None): 699 if body is None: 700 if (self.allow_posting and 701 message_id == "<i.am.an.article.you.will.want@example.com>"): 702 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") 703 self.expect_body() 704 else: 705 self.push_lit("435 Article not wanted") 706 else: 707 assert self.allow_posting 708 self.push_lit("235 Article transferred OK") 709 self.posted_body = body 710 711 sample_head = """\ 712 From: "Demo User" <nobody@example.net> 713 Subject: I am just a test article 714 Content-Type: text/plain; charset=UTF-8; format=flowed 715 Message-ID: <i.am.an.article.you.will.want@example.com>""" 716 717 sample_body = """\ 718 This is just a test article. 719 ..Here is a dot-starting line. 720 721 -- Signed by Andr\xe9.""" 722 723 sample_article = sample_head + "\n\n" + sample_body 724 725 def handle_ARTICLE(self, message_spec=None): 726 if message_spec is None: 727 self.push_lit("220 3000237 <45223423@example.com>") 728 elif message_spec == "<45223423@example.com>": 729 self.push_lit("220 0 <45223423@example.com>") 730 elif message_spec == "3000234": 731 self.push_lit("220 3000234 <45223423@example.com>") 732 else: 733 self.push_lit("430 No Such Article Found") 734 return 735 self.push_lit(self.sample_article) 736 self.push_lit(".") 737 738 def handle_HEAD(self, message_spec=None): 739 if message_spec is None: 740 self.push_lit("221 3000237 <45223423@example.com>") 741 elif message_spec == "<45223423@example.com>": 742 self.push_lit("221 0 <45223423@example.com>") 743 elif message_spec == "3000234": 744 self.push_lit("221 3000234 <45223423@example.com>") 745 else: 746 self.push_lit("430 No Such Article Found") 747 return 748 self.push_lit(self.sample_head) 749 self.push_lit(".") 750 751 def handle_BODY(self, message_spec=None): 752 if message_spec is None: 753 self.push_lit("222 3000237 <45223423@example.com>") 754 elif message_spec == "<45223423@example.com>": 755 self.push_lit("222 0 <45223423@example.com>") 756 elif message_spec == "3000234": 757 self.push_lit("222 3000234 <45223423@example.com>") 758 else: 759 self.push_lit("430 No Such Article Found") 760 return 761 self.push_lit(self.sample_body) 762 self.push_lit(".") 763 764 def handle_AUTHINFO(self, cred_type, data): 765 if self._logged_in: 766 self.push_lit('502 Already Logged In') 767 elif cred_type == 'user': 768 if self._user_sent: 769 self.push_lit('482 User Credential Already Sent') 770 else: 771 self.push_lit('381 Password Required') 772 self._user_sent = True 773 elif cred_type == 'pass': 774 self.push_lit('281 Login Successful') 775 self._logged_in = True 776 else: 777 raise Exception('Unknown cred type {}'.format(cred_type)) 778 779 780class NNTPv2Handler(NNTPv1Handler): 781 """A handler for RFC 3977 (NNTP "v2")""" 782 783 def handle_CAPABILITIES(self): 784 fmt = """\ 785 101 Capability list: 786 VERSION 2 3 787 IMPLEMENTATION INN 2.5.1{} 788 HDR 789 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 790 OVER 791 POST 792 READER 793 .""" 794 795 if not self._logged_in: 796 self.push_lit(fmt.format('\n AUTHINFO USER')) 797 else: 798 self.push_lit(fmt.format('')) 799 800 def handle_MODE(self, _): 801 raise Exception('MODE READER sent despite READER has been advertised') 802 803 def handle_OVER(self, message_spec=None): 804 return self.handle_XOVER(message_spec) 805 806 807class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): 808 """A handler that allows CAPABILITIES only after login""" 809 810 def handle_CAPABILITIES(self): 811 if not self._logged_in: 812 self.push_lit('480 You must log in.') 813 else: 814 super().handle_CAPABILITIES() 815 816 817class ModeSwitchingNNTPv2Handler(NNTPv2Handler): 818 """A server that starts in transit mode""" 819 820 def __init__(self): 821 self._switched = False 822 823 def handle_CAPABILITIES(self): 824 fmt = """\ 825 101 Capability list: 826 VERSION 2 3 827 IMPLEMENTATION INN 2.5.1 828 HDR 829 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 830 OVER 831 POST 832 {}READER 833 .""" 834 if self._switched: 835 self.push_lit(fmt.format('')) 836 else: 837 self.push_lit(fmt.format('MODE-')) 838 839 def handle_MODE(self, what): 840 assert not self._switched and what == 'reader' 841 self._switched = True 842 self.push_lit('200 Posting allowed') 843 844 845class NNTPv1v2TestsMixin: 846 847 def setUp(self): 848 super().setUp() 849 850 def test_welcome(self): 851 self.assertEqual(self.server.welcome, self.handler.welcome) 852 853 def test_authinfo(self): 854 if self.nntp_version == 2: 855 self.assertIn('AUTHINFO', self.server._caps) 856 self.server.login('testuser', 'testpw') 857 # if AUTHINFO is gone from _caps we also know that getcapabilities() 858 # has been called after login as it should 859 self.assertNotIn('AUTHINFO', self.server._caps) 860 861 def test_date(self): 862 resp, date = self.server.date() 863 self.assertEqual(resp, "111 20100914001155") 864 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) 865 866 def test_quit(self): 867 self.assertFalse(self.sio.closed) 868 resp = self.server.quit() 869 self.assertEqual(resp, "205 Bye!") 870 self.assertTrue(self.sio.closed) 871 872 def test_help(self): 873 resp, help = self.server.help() 874 self.assertEqual(resp, "100 Legal commands") 875 self.assertEqual(help, [ 876 ' authinfo user Name|pass Password|generic <prog> <args>', 877 ' date', 878 ' help', 879 'Report problems to <root@example.org>', 880 ]) 881 882 def test_list(self): 883 resp, groups = self.server.list() 884 self.assertEqual(len(groups), 6) 885 g = groups[1] 886 self.assertEqual(g, 887 GroupInfo("comp.lang.python.announce", "0000001153", 888 "0000000993", "m")) 889 resp, groups = self.server.list("*distutils*") 890 self.assertEqual(len(groups), 2) 891 g = groups[0] 892 self.assertEqual(g, 893 GroupInfo("gmane.comp.python.distutils.devel", "0000014104", 894 "0000000001", "m")) 895 896 def test_stat(self): 897 resp, art_num, message_id = self.server.stat(3000234) 898 self.assertEqual(resp, "223 3000234 <45223423@example.com>") 899 self.assertEqual(art_num, 3000234) 900 self.assertEqual(message_id, "<45223423@example.com>") 901 resp, art_num, message_id = self.server.stat("<45223423@example.com>") 902 self.assertEqual(resp, "223 0 <45223423@example.com>") 903 self.assertEqual(art_num, 0) 904 self.assertEqual(message_id, "<45223423@example.com>") 905 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 906 self.server.stat("<non.existent.id>") 907 self.assertEqual(cm.exception.response, "430 No Such Article Found") 908 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 909 self.server.stat() 910 self.assertEqual(cm.exception.response, "412 No newsgroup selected") 911 912 def test_next(self): 913 resp, art_num, message_id = self.server.next() 914 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") 915 self.assertEqual(art_num, 3000237) 916 self.assertEqual(message_id, "<668929@example.org>") 917 918 def test_last(self): 919 resp, art_num, message_id = self.server.last() 920 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") 921 self.assertEqual(art_num, 3000234) 922 self.assertEqual(message_id, "<45223423@example.com>") 923 924 def test_description(self): 925 desc = self.server.description("comp.lang.python") 926 self.assertEqual(desc, "The Python computer language.") 927 desc = self.server.description("comp.lang.pythonx") 928 self.assertEqual(desc, "") 929 930 def test_descriptions(self): 931 resp, groups = self.server.descriptions("comp.lang.python") 932 self.assertEqual(resp, '215 Descriptions in form "group description".') 933 self.assertEqual(groups, { 934 "comp.lang.python": "The Python computer language.", 935 }) 936 resp, groups = self.server.descriptions("comp.lang.python*") 937 self.assertEqual(groups, { 938 "comp.lang.python": "The Python computer language.", 939 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", 940 }) 941 resp, groups = self.server.descriptions("comp.lang.pythonx") 942 self.assertEqual(groups, {}) 943 944 def test_group(self): 945 resp, count, first, last, group = self.server.group("fr.comp.lang.python") 946 self.assertTrue(resp.startswith("211 "), resp) 947 self.assertEqual(first, 761) 948 self.assertEqual(last, 1265) 949 self.assertEqual(count, 486) 950 self.assertEqual(group, "fr.comp.lang.python") 951 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 952 self.server.group("comp.lang.python.devel") 953 exc = cm.exception 954 self.assertTrue(exc.response.startswith("411 No such group"), 955 exc.response) 956 957 def test_newnews(self): 958 # NEWNEWS comp.lang.python [20]100913 082004 959 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 960 resp, ids = self.server.newnews("comp.lang.python", dt) 961 expected = ( 962 "230 list of newsarticles (NNTP v{0}) " 963 "created after Mon Sep 13 08:20:04 2010 follows" 964 ).format(self.nntp_version) 965 self.assertEqual(resp, expected) 966 self.assertEqual(ids, [ 967 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>", 968 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>", 969 ]) 970 # NEWNEWS fr.comp.lang.python [20]100913 082004 971 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 972 resp, ids = self.server.newnews("fr.comp.lang.python", dt) 973 self.assertEqual(resp, "230 An empty list of newsarticles follows") 974 self.assertEqual(ids, []) 975 976 def _check_article_body(self, lines): 977 self.assertEqual(len(lines), 4) 978 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") 979 self.assertEqual(lines[-2], b"") 980 self.assertEqual(lines[-3], b".Here is a dot-starting line.") 981 self.assertEqual(lines[-4], b"This is just a test article.") 982 983 def _check_article_head(self, lines): 984 self.assertEqual(len(lines), 4) 985 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>') 986 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>") 987 988 def _check_article_data(self, lines): 989 self.assertEqual(len(lines), 9) 990 self._check_article_head(lines[:4]) 991 self._check_article_body(lines[-4:]) 992 self.assertEqual(lines[4], b"") 993 994 def test_article(self): 995 # ARTICLE 996 resp, info = self.server.article() 997 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 998 art_num, message_id, lines = info 999 self.assertEqual(art_num, 3000237) 1000 self.assertEqual(message_id, "<45223423@example.com>") 1001 self._check_article_data(lines) 1002 # ARTICLE num 1003 resp, info = self.server.article(3000234) 1004 self.assertEqual(resp, "220 3000234 <45223423@example.com>") 1005 art_num, message_id, lines = info 1006 self.assertEqual(art_num, 3000234) 1007 self.assertEqual(message_id, "<45223423@example.com>") 1008 self._check_article_data(lines) 1009 # ARTICLE id 1010 resp, info = self.server.article("<45223423@example.com>") 1011 self.assertEqual(resp, "220 0 <45223423@example.com>") 1012 art_num, message_id, lines = info 1013 self.assertEqual(art_num, 0) 1014 self.assertEqual(message_id, "<45223423@example.com>") 1015 self._check_article_data(lines) 1016 # Non-existent id 1017 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1018 self.server.article("<non-existent@example.com>") 1019 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1020 1021 def test_article_file(self): 1022 # With a "file" argument 1023 f = io.BytesIO() 1024 resp, info = self.server.article(file=f) 1025 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 1026 art_num, message_id, lines = info 1027 self.assertEqual(art_num, 3000237) 1028 self.assertEqual(message_id, "<45223423@example.com>") 1029 self.assertEqual(lines, []) 1030 data = f.getvalue() 1031 self.assertTrue(data.startswith( 1032 b'From: "Demo User" <nobody@example.net>\r\n' 1033 b'Subject: I am just a test article\r\n' 1034 ), ascii(data)) 1035 self.assertTrue(data.endswith( 1036 b'This is just a test article.\r\n' 1037 b'.Here is a dot-starting line.\r\n' 1038 b'\r\n' 1039 b'-- Signed by Andr\xc3\xa9.\r\n' 1040 ), ascii(data)) 1041 1042 def test_head(self): 1043 # HEAD 1044 resp, info = self.server.head() 1045 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1046 art_num, message_id, lines = info 1047 self.assertEqual(art_num, 3000237) 1048 self.assertEqual(message_id, "<45223423@example.com>") 1049 self._check_article_head(lines) 1050 # HEAD num 1051 resp, info = self.server.head(3000234) 1052 self.assertEqual(resp, "221 3000234 <45223423@example.com>") 1053 art_num, message_id, lines = info 1054 self.assertEqual(art_num, 3000234) 1055 self.assertEqual(message_id, "<45223423@example.com>") 1056 self._check_article_head(lines) 1057 # HEAD id 1058 resp, info = self.server.head("<45223423@example.com>") 1059 self.assertEqual(resp, "221 0 <45223423@example.com>") 1060 art_num, message_id, lines = info 1061 self.assertEqual(art_num, 0) 1062 self.assertEqual(message_id, "<45223423@example.com>") 1063 self._check_article_head(lines) 1064 # Non-existent id 1065 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1066 self.server.head("<non-existent@example.com>") 1067 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1068 1069 def test_head_file(self): 1070 f = io.BytesIO() 1071 resp, info = self.server.head(file=f) 1072 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1073 art_num, message_id, lines = info 1074 self.assertEqual(art_num, 3000237) 1075 self.assertEqual(message_id, "<45223423@example.com>") 1076 self.assertEqual(lines, []) 1077 data = f.getvalue() 1078 self.assertTrue(data.startswith( 1079 b'From: "Demo User" <nobody@example.net>\r\n' 1080 b'Subject: I am just a test article\r\n' 1081 ), ascii(data)) 1082 self.assertFalse(data.endswith( 1083 b'This is just a test article.\r\n' 1084 b'.Here is a dot-starting line.\r\n' 1085 b'\r\n' 1086 b'-- Signed by Andr\xc3\xa9.\r\n' 1087 ), ascii(data)) 1088 1089 def test_body(self): 1090 # BODY 1091 resp, info = self.server.body() 1092 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1093 art_num, message_id, lines = info 1094 self.assertEqual(art_num, 3000237) 1095 self.assertEqual(message_id, "<45223423@example.com>") 1096 self._check_article_body(lines) 1097 # BODY num 1098 resp, info = self.server.body(3000234) 1099 self.assertEqual(resp, "222 3000234 <45223423@example.com>") 1100 art_num, message_id, lines = info 1101 self.assertEqual(art_num, 3000234) 1102 self.assertEqual(message_id, "<45223423@example.com>") 1103 self._check_article_body(lines) 1104 # BODY id 1105 resp, info = self.server.body("<45223423@example.com>") 1106 self.assertEqual(resp, "222 0 <45223423@example.com>") 1107 art_num, message_id, lines = info 1108 self.assertEqual(art_num, 0) 1109 self.assertEqual(message_id, "<45223423@example.com>") 1110 self._check_article_body(lines) 1111 # Non-existent id 1112 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1113 self.server.body("<non-existent@example.com>") 1114 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1115 1116 def test_body_file(self): 1117 f = io.BytesIO() 1118 resp, info = self.server.body(file=f) 1119 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1120 art_num, message_id, lines = info 1121 self.assertEqual(art_num, 3000237) 1122 self.assertEqual(message_id, "<45223423@example.com>") 1123 self.assertEqual(lines, []) 1124 data = f.getvalue() 1125 self.assertFalse(data.startswith( 1126 b'From: "Demo User" <nobody@example.net>\r\n' 1127 b'Subject: I am just a test article\r\n' 1128 ), ascii(data)) 1129 self.assertTrue(data.endswith( 1130 b'This is just a test article.\r\n' 1131 b'.Here is a dot-starting line.\r\n' 1132 b'\r\n' 1133 b'-- Signed by Andr\xc3\xa9.\r\n' 1134 ), ascii(data)) 1135 1136 def check_over_xover_resp(self, resp, overviews): 1137 self.assertTrue(resp.startswith("224 "), resp) 1138 self.assertEqual(len(overviews), 3) 1139 art_num, over = overviews[0] 1140 self.assertEqual(art_num, 57) 1141 self.assertEqual(over, { 1142 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>", 1143 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", 1144 "date": "Sat, 19 Jun 2010 18:04:08 -0400", 1145 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", 1146 "references": "<hvalf7$ort$1@dough.gmane.org>", 1147 ":bytes": "7103", 1148 ":lines": "16", 1149 "xref": "news.gmane.io gmane.comp.python.authors:57" 1150 }) 1151 art_num, over = overviews[1] 1152 self.assertEqual(over["xref"], None) 1153 art_num, over = overviews[2] 1154 self.assertEqual(over["subject"], 1155 "Re: Message d'erreur incompréhensible (par moi)") 1156 1157 def test_xover(self): 1158 resp, overviews = self.server.xover(57, 59) 1159 self.check_over_xover_resp(resp, overviews) 1160 1161 def test_over(self): 1162 # In NNTP "v1", this will fallback on XOVER 1163 resp, overviews = self.server.over((57, 59)) 1164 self.check_over_xover_resp(resp, overviews) 1165 1166 sample_post = ( 1167 b'From: "Demo User" <nobody@example.net>\r\n' 1168 b'Subject: I am just a test article\r\n' 1169 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' 1170 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n' 1171 b'\r\n' 1172 b'This is just a test article.\r\n' 1173 b'.Here is a dot-starting line.\r\n' 1174 b'\r\n' 1175 b'-- Signed by Andr\xc3\xa9.\r\n' 1176 ) 1177 1178 def _check_posted_body(self): 1179 # Check the raw body as received by the server 1180 lines = self.handler.posted_body 1181 # One additional line for the "." terminator 1182 self.assertEqual(len(lines), 10) 1183 self.assertEqual(lines[-1], b'.\r\n') 1184 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') 1185 self.assertEqual(lines[-3], b'\r\n') 1186 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') 1187 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n') 1188 1189 def _check_post_ihave_sub(self, func, *args, file_factory): 1190 # First the prepared post with CRLF endings 1191 post = self.sample_post 1192 func_args = args + (file_factory(post),) 1193 self.handler.posted_body = None 1194 resp = func(*func_args) 1195 self._check_posted_body() 1196 # Then the same post with "normal" line endings - they should be 1197 # converted by NNTP.post and NNTP.ihave. 1198 post = self.sample_post.replace(b"\r\n", b"\n") 1199 func_args = args + (file_factory(post),) 1200 self.handler.posted_body = None 1201 resp = func(*func_args) 1202 self._check_posted_body() 1203 return resp 1204 1205 def check_post_ihave(self, func, success_resp, *args): 1206 # With a bytes object 1207 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) 1208 self.assertEqual(resp, success_resp) 1209 # With a bytearray object 1210 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) 1211 self.assertEqual(resp, success_resp) 1212 # With a file object 1213 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) 1214 self.assertEqual(resp, success_resp) 1215 # With an iterable of terminated lines 1216 def iterlines(b): 1217 return iter(b.splitlines(keepends=True)) 1218 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1219 self.assertEqual(resp, success_resp) 1220 # With an iterable of non-terminated lines 1221 def iterlines(b): 1222 return iter(b.splitlines(keepends=False)) 1223 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1224 self.assertEqual(resp, success_resp) 1225 1226 def test_post(self): 1227 self.check_post_ihave(self.server.post, "240 Article received OK") 1228 self.handler.allow_posting = False 1229 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1230 self.server.post(self.sample_post) 1231 self.assertEqual(cm.exception.response, 1232 "440 Posting not permitted") 1233 1234 def test_ihave(self): 1235 self.check_post_ihave(self.server.ihave, "235 Article transferred OK", 1236 "<i.am.an.article.you.will.want@example.com>") 1237 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1238 self.server.ihave("<another.message.id>", self.sample_post) 1239 self.assertEqual(cm.exception.response, 1240 "435 Article not wanted") 1241 1242 def test_too_long_lines(self): 1243 dt = datetime.datetime(2010, 1, 1, 9, 0, 0) 1244 self.assertRaises(nntplib.NNTPDataError, 1245 self.server.newnews, "comp.lang.python", dt) 1246 1247 1248class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1249 """Tests an NNTP v1 server (no capabilities).""" 1250 1251 nntp_version = 1 1252 handler_class = NNTPv1Handler 1253 1254 def test_caps(self): 1255 caps = self.server.getcapabilities() 1256 self.assertEqual(caps, {}) 1257 self.assertEqual(self.server.nntp_version, 1) 1258 self.assertEqual(self.server.nntp_implementation, None) 1259 1260 1261class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1262 """Tests an NNTP v2 server (with capabilities).""" 1263 1264 nntp_version = 2 1265 handler_class = NNTPv2Handler 1266 1267 def test_caps(self): 1268 caps = self.server.getcapabilities() 1269 self.assertEqual(caps, { 1270 'VERSION': ['2', '3'], 1271 'IMPLEMENTATION': ['INN', '2.5.1'], 1272 'AUTHINFO': ['USER'], 1273 'HDR': [], 1274 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', 1275 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], 1276 'OVER': [], 1277 'POST': [], 1278 'READER': [], 1279 }) 1280 self.assertEqual(self.server.nntp_version, 3) 1281 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1') 1282 1283 1284class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase): 1285 """Tests a probably NNTP v2 server with capabilities only after login.""" 1286 1287 nntp_version = 2 1288 handler_class = CapsAfterLoginNNTPv2Handler 1289 1290 def test_caps_only_after_login(self): 1291 self.assertEqual(self.server._caps, {}) 1292 self.server.login('testuser', 'testpw') 1293 self.assertIn('VERSION', self.server._caps) 1294 1295 1296class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin, 1297 unittest.TestCase): 1298 """Same tests as for v2 but we tell NTTP to send MODE READER to a server 1299 that isn't in READER mode by default.""" 1300 1301 nntp_version = 2 1302 handler_class = ModeSwitchingNNTPv2Handler 1303 1304 def test_we_are_in_reader_mode_after_connect(self): 1305 self.assertIn('READER', self.server._caps) 1306 1307 1308class MiscTests(unittest.TestCase): 1309 1310 def test_decode_header(self): 1311 def gives(a, b): 1312 self.assertEqual(nntplib.decode_header(a), b) 1313 gives("" , "") 1314 gives("a plain header", "a plain header") 1315 gives(" with extra spaces ", " with extra spaces ") 1316 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") 1317 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" 1318 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", 1319 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") 1320 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", 1321 "Re: problème de matrice") 1322 # A natively utf-8 header (found in the real world!) 1323 gives("Re: Message d'erreur incompréhensible (par moi)", 1324 "Re: Message d'erreur incompréhensible (par moi)") 1325 1326 def test_parse_overview_fmt(self): 1327 # The minimal (default) response 1328 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1329 "References:", ":bytes", ":lines"] 1330 self.assertEqual(nntplib._parse_overview_fmt(lines), 1331 ["subject", "from", "date", "message-id", "references", 1332 ":bytes", ":lines"]) 1333 # The minimal response using alternative names 1334 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1335 "References:", "Bytes:", "Lines:"] 1336 self.assertEqual(nntplib._parse_overview_fmt(lines), 1337 ["subject", "from", "date", "message-id", "references", 1338 ":bytes", ":lines"]) 1339 # Variations in casing 1340 lines = ["subject:", "FROM:", "DaTe:", "message-ID:", 1341 "References:", "BYTES:", "Lines:"] 1342 self.assertEqual(nntplib._parse_overview_fmt(lines), 1343 ["subject", "from", "date", "message-id", "references", 1344 ":bytes", ":lines"]) 1345 # First example from RFC 3977 1346 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1347 "References:", ":bytes", ":lines", "Xref:full", 1348 "Distribution:full"] 1349 self.assertEqual(nntplib._parse_overview_fmt(lines), 1350 ["subject", "from", "date", "message-id", "references", 1351 ":bytes", ":lines", "xref", "distribution"]) 1352 # Second example from RFC 3977 1353 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1354 "References:", "Bytes:", "Lines:", "Xref:FULL", 1355 "Distribution:FULL"] 1356 self.assertEqual(nntplib._parse_overview_fmt(lines), 1357 ["subject", "from", "date", "message-id", "references", 1358 ":bytes", ":lines", "xref", "distribution"]) 1359 # A classic response from INN 1360 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1361 "References:", "Bytes:", "Lines:", "Xref:full"] 1362 self.assertEqual(nntplib._parse_overview_fmt(lines), 1363 ["subject", "from", "date", "message-id", "references", 1364 ":bytes", ":lines", "xref"]) 1365 1366 def test_parse_overview(self): 1367 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] 1368 # First example from RFC 3977 1369 lines = [ 1370 '3000234\tI am just a test article\t"Demo User" ' 1371 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1372 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1373 '17\tXref: news.example.com misc.test:3000363', 1374 ] 1375 overview = nntplib._parse_overview(lines, fmt) 1376 (art_num, fields), = overview 1377 self.assertEqual(art_num, 3000234) 1378 self.assertEqual(fields, { 1379 'subject': 'I am just a test article', 1380 'from': '"Demo User" <nobody@example.com>', 1381 'date': '6 Oct 1998 04:38:40 -0500', 1382 'message-id': '<45223423@example.com>', 1383 'references': '<45454@example.net>', 1384 ':bytes': '1234', 1385 ':lines': '17', 1386 'xref': 'news.example.com misc.test:3000363', 1387 }) 1388 # Second example; here the "Xref" field is totally absent (including 1389 # the header name) and comes out as None 1390 lines = [ 1391 '3000234\tI am just a test article\t"Demo User" ' 1392 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1393 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1394 '17\t\t', 1395 ] 1396 overview = nntplib._parse_overview(lines, fmt) 1397 (art_num, fields), = overview 1398 self.assertEqual(fields['xref'], None) 1399 # Third example; the "Xref" is an empty string, while "references" 1400 # is a single space. 1401 lines = [ 1402 '3000234\tI am just a test article\t"Demo User" ' 1403 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1404 '<45223423@example.com>\t \t1234\t' 1405 '17\tXref: \t', 1406 ] 1407 overview = nntplib._parse_overview(lines, fmt) 1408 (art_num, fields), = overview 1409 self.assertEqual(fields['references'], ' ') 1410 self.assertEqual(fields['xref'], '') 1411 1412 def test_parse_datetime(self): 1413 def gives(a, b, *c): 1414 self.assertEqual(nntplib._parse_datetime(a, b), 1415 datetime.datetime(*c)) 1416 # Output of DATE command 1417 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) 1418 # Variations 1419 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) 1420 gives("990623", "135624", 1999, 6, 23, 13, 56, 24) 1421 gives("090623", "135624", 2009, 6, 23, 13, 56, 24) 1422 1423 def test_unparse_datetime(self): 1424 # Test non-legacy mode 1425 # 1) with a datetime 1426 def gives(y, M, d, h, m, s, date_str, time_str): 1427 dt = datetime.datetime(y, M, d, h, m, s) 1428 self.assertEqual(nntplib._unparse_datetime(dt), 1429 (date_str, time_str)) 1430 self.assertEqual(nntplib._unparse_datetime(dt, False), 1431 (date_str, time_str)) 1432 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") 1433 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") 1434 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") 1435 # 2) with a date 1436 def gives(y, M, d, date_str, time_str): 1437 dt = datetime.date(y, M, d) 1438 self.assertEqual(nntplib._unparse_datetime(dt), 1439 (date_str, time_str)) 1440 self.assertEqual(nntplib._unparse_datetime(dt, False), 1441 (date_str, time_str)) 1442 gives(1999, 6, 23, "19990623", "000000") 1443 gives(2000, 6, 23, "20000623", "000000") 1444 gives(2010, 6, 5, "20100605", "000000") 1445 1446 def test_unparse_datetime_legacy(self): 1447 # Test legacy mode (RFC 977) 1448 # 1) with a datetime 1449 def gives(y, M, d, h, m, s, date_str, time_str): 1450 dt = datetime.datetime(y, M, d, h, m, s) 1451 self.assertEqual(nntplib._unparse_datetime(dt, True), 1452 (date_str, time_str)) 1453 gives(1999, 6, 23, 13, 56, 24, "990623", "135624") 1454 gives(2000, 6, 23, 13, 56, 24, "000623", "135624") 1455 gives(2010, 6, 5, 1, 2, 3, "100605", "010203") 1456 # 2) with a date 1457 def gives(y, M, d, date_str, time_str): 1458 dt = datetime.date(y, M, d) 1459 self.assertEqual(nntplib._unparse_datetime(dt, True), 1460 (date_str, time_str)) 1461 gives(1999, 6, 23, "990623", "000000") 1462 gives(2000, 6, 23, "000623", "000000") 1463 gives(2010, 6, 5, "100605", "000000") 1464 1465 @unittest.skipUnless(ssl, 'requires SSL support') 1466 def test_ssl_support(self): 1467 self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) 1468 1469 1470class PublicAPITests(unittest.TestCase): 1471 """Ensures that the correct values are exposed in the public API.""" 1472 1473 def test_module_all_attribute(self): 1474 self.assertTrue(hasattr(nntplib, '__all__')) 1475 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError', 1476 'NNTPTemporaryError', 'NNTPPermanentError', 1477 'NNTPProtocolError', 'NNTPDataError', 'decode_header'] 1478 if ssl is not None: 1479 target_api.append('NNTP_SSL') 1480 self.assertEqual(set(nntplib.__all__), set(target_api)) 1481 1482class MockSocketTests(unittest.TestCase): 1483 """Tests involving a mock socket object 1484 1485 Used where the _NNTPServerIO file object is not enough.""" 1486 1487 nntp_class = nntplib.NNTP 1488 1489 def check_constructor_error_conditions( 1490 self, handler_class, 1491 expected_error_type, expected_error_msg, 1492 login=None, password=None): 1493 1494 class mock_socket_module: 1495 def create_connection(address, timeout): 1496 return MockSocket() 1497 1498 class MockSocket: 1499 def close(self): 1500 nonlocal socket_closed 1501 socket_closed = True 1502 1503 def makefile(socket, mode): 1504 handler = handler_class() 1505 _, file = make_mock_file(handler) 1506 files.append(file) 1507 return file 1508 1509 socket_closed = False 1510 files = [] 1511 with patch('nntplib.socket', mock_socket_module), \ 1512 self.assertRaisesRegex(expected_error_type, expected_error_msg): 1513 self.nntp_class('dummy', user=login, password=password) 1514 self.assertTrue(socket_closed) 1515 for f in files: 1516 self.assertTrue(f.closed) 1517 1518 def test_bad_welcome(self): 1519 #Test a bad welcome message 1520 class Handler(NNTPv1Handler): 1521 welcome = 'Bad Welcome' 1522 self.check_constructor_error_conditions( 1523 Handler, nntplib.NNTPProtocolError, Handler.welcome) 1524 1525 def test_service_temporarily_unavailable(self): 1526 #Test service temporarily unavailable 1527 class Handler(NNTPv1Handler): 1528 welcome = '400 Service temporarily unavailable' 1529 self.check_constructor_error_conditions( 1530 Handler, nntplib.NNTPTemporaryError, Handler.welcome) 1531 1532 def test_service_permanently_unavailable(self): 1533 #Test service permanently unavailable 1534 class Handler(NNTPv1Handler): 1535 welcome = '502 Service permanently unavailable' 1536 self.check_constructor_error_conditions( 1537 Handler, nntplib.NNTPPermanentError, Handler.welcome) 1538 1539 def test_bad_capabilities(self): 1540 #Test a bad capabilities response 1541 class Handler(NNTPv1Handler): 1542 def handle_CAPABILITIES(self): 1543 self.push_lit(capabilities_response) 1544 capabilities_response = '201 bad capability' 1545 self.check_constructor_error_conditions( 1546 Handler, nntplib.NNTPReplyError, capabilities_response) 1547 1548 def test_login_aborted(self): 1549 #Test a bad authinfo response 1550 login = 't@e.com' 1551 password = 'python' 1552 class Handler(NNTPv1Handler): 1553 def handle_AUTHINFO(self, *args): 1554 self.push_lit(authinfo_response) 1555 authinfo_response = '503 Mechanism not recognized' 1556 self.check_constructor_error_conditions( 1557 Handler, nntplib.NNTPPermanentError, authinfo_response, 1558 login, password) 1559 1560class bypass_context: 1561 """Bypass encryption and actual SSL module""" 1562 def wrap_socket(sock, **args): 1563 return sock 1564 1565@unittest.skipUnless(ssl, 'requires SSL support') 1566class MockSslTests(MockSocketTests): 1567 @staticmethod 1568 def nntp_class(*pos, **kw): 1569 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) 1570 1571 1572class LocalServerTests(unittest.TestCase): 1573 def setUp(self): 1574 sock = socket.socket() 1575 port = socket_helper.bind_port(sock) 1576 sock.listen() 1577 self.background = threading.Thread( 1578 target=self.run_server, args=(sock,)) 1579 self.background.start() 1580 self.addCleanup(self.background.join) 1581 1582 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__() 1583 self.addCleanup(self.nntp.__exit__, None, None, None) 1584 1585 def run_server(self, sock): 1586 # Could be generalized to handle more commands in separate methods 1587 with sock: 1588 [client, _] = sock.accept() 1589 with contextlib.ExitStack() as cleanup: 1590 cleanup.enter_context(client) 1591 reader = cleanup.enter_context(client.makefile('rb')) 1592 client.sendall(b'200 Server ready\r\n') 1593 while True: 1594 cmd = reader.readline() 1595 if cmd == b'CAPABILITIES\r\n': 1596 client.sendall( 1597 b'101 Capability list:\r\n' 1598 b'VERSION 2\r\n' 1599 b'STARTTLS\r\n' 1600 b'.\r\n' 1601 ) 1602 elif cmd == b'STARTTLS\r\n': 1603 reader.close() 1604 client.sendall(b'382 Begin TLS negotiation now\r\n') 1605 context = ssl.SSLContext() 1606 context.load_cert_chain(certfile) 1607 client = context.wrap_socket( 1608 client, server_side=True) 1609 cleanup.enter_context(client) 1610 reader = cleanup.enter_context(client.makefile('rb')) 1611 elif cmd == b'QUIT\r\n': 1612 client.sendall(b'205 Bye!\r\n') 1613 break 1614 else: 1615 raise ValueError('Unexpected command {!r}'.format(cmd)) 1616 1617 @unittest.skipUnless(ssl, 'requires SSL support') 1618 def test_starttls(self): 1619 file = self.nntp.file 1620 sock = self.nntp.sock 1621 self.nntp.starttls() 1622 # Check that the socket and internal pseudo-file really were 1623 # changed. 1624 self.assertNotEqual(file, self.nntp.file) 1625 self.assertNotEqual(sock, self.nntp.sock) 1626 # Check that the new socket really is an SSL one 1627 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) 1628 # Check that trying starttls when it's already active fails. 1629 self.assertRaises(ValueError, self.nntp.starttls) 1630 1631 1632if __name__ == "__main__": 1633 unittest.main() 1634