1import io 2import socket 3import datetime 4import textwrap 5import unittest 6import functools 7import contextlib 8import nntplib 9import os.path 10import re 11import threading 12 13from test import support 14from test.support import socket_helper 15from nntplib import NNTP, GroupInfo 16from unittest.mock import patch 17try: 18 import ssl 19except ImportError: 20 ssl = None 21 22 23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') 24 25if ssl is not None: 26 SSLError = ssl.SSLError 27else: 28 class SSLError(Exception): 29 """Non-existent exception class when we lack SSL support.""" 30 reason = "This will never be raised." 31 32# TODO: 33# - test the `file` arg to more commands 34# - test error conditions 35# - test auth and `usenetrc` 36 37 38class NetworkedNNTPTestsMixin: 39 40 ssl_context = None 41 42 def test_welcome(self): 43 welcome = self.server.getwelcome() 44 self.assertEqual(str, type(welcome)) 45 46 def test_help(self): 47 resp, lines = self.server.help() 48 self.assertTrue(resp.startswith("100 "), resp) 49 for line in lines: 50 self.assertEqual(str, type(line)) 51 52 def test_list(self): 53 resp, groups = self.server.list() 54 if len(groups) > 0: 55 self.assertEqual(GroupInfo, type(groups[0])) 56 self.assertEqual(str, type(groups[0].group)) 57 58 def test_list_active(self): 59 resp, groups = self.server.list(self.GROUP_PAT) 60 if len(groups) > 0: 61 self.assertEqual(GroupInfo, type(groups[0])) 62 self.assertEqual(str, type(groups[0].group)) 63 64 def test_unknown_command(self): 65 with self.assertRaises(nntplib.NNTPPermanentError) as cm: 66 self.server._shortcmd("XYZZY") 67 resp = cm.exception.response 68 self.assertTrue(resp.startswith("500 "), resp) 69 70 def test_newgroups(self): 71 # gmane gets a constant influx of new groups. In order not to stress 72 # the server too much, we choose a recent date in the past. 73 dt = datetime.date.today() - datetime.timedelta(days=7) 74 resp, groups = self.server.newgroups(dt) 75 if len(groups) > 0: 76 self.assertIsInstance(groups[0], GroupInfo) 77 self.assertIsInstance(groups[0].group, str) 78 79 def test_description(self): 80 def _check_desc(desc): 81 # Sanity checks 82 self.assertIsInstance(desc, str) 83 self.assertNotIn(self.GROUP_NAME, desc) 84 desc = self.server.description(self.GROUP_NAME) 85 _check_desc(desc) 86 # Another sanity check 87 self.assertIn(self.DESC, desc) 88 # With a pattern 89 desc = self.server.description(self.GROUP_PAT) 90 _check_desc(desc) 91 # Shouldn't exist 92 desc = self.server.description("zk.brrtt.baz") 93 self.assertEqual(desc, '') 94 95 def test_descriptions(self): 96 resp, descs = self.server.descriptions(self.GROUP_PAT) 97 # 215 for LIST NEWSGROUPS, 282 for XGTITLE 98 self.assertTrue( 99 resp.startswith("215 ") or resp.startswith("282 "), resp) 100 self.assertIsInstance(descs, dict) 101 desc = descs[self.GROUP_NAME] 102 self.assertEqual(desc, self.server.description(self.GROUP_NAME)) 103 104 def test_group(self): 105 result = self.server.group(self.GROUP_NAME) 106 self.assertEqual(5, len(result)) 107 resp, count, first, last, group = result 108 self.assertEqual(group, self.GROUP_NAME) 109 self.assertIsInstance(count, int) 110 self.assertIsInstance(first, int) 111 self.assertIsInstance(last, int) 112 self.assertLessEqual(first, last) 113 self.assertTrue(resp.startswith("211 "), resp) 114 115 def test_date(self): 116 resp, date = self.server.date() 117 self.assertIsInstance(date, datetime.datetime) 118 # Sanity check 119 self.assertGreaterEqual(date.year, 1995) 120 self.assertLessEqual(date.year, 2030) 121 122 def _check_art_dict(self, art_dict): 123 # Some sanity checks for a field dictionary returned by OVER / XOVER 124 self.assertIsInstance(art_dict, dict) 125 # NNTP has 7 mandatory fields 126 self.assertGreaterEqual(art_dict.keys(), 127 {"subject", "from", "date", "message-id", 128 "references", ":bytes", ":lines"} 129 ) 130 for v in art_dict.values(): 131 self.assertIsInstance(v, (str, type(None))) 132 133 def test_xover(self): 134 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 135 resp, lines = self.server.xover(last - 5, last) 136 if len(lines) == 0: 137 self.skipTest("no articles retrieved") 138 # The 'last' article is not necessarily part of the output (cancelled?) 139 art_num, art_dict = lines[0] 140 self.assertGreaterEqual(art_num, last - 5) 141 self.assertLessEqual(art_num, last) 142 self._check_art_dict(art_dict) 143 144 @unittest.skipIf(True, 'temporarily skipped until a permanent solution' 145 ' is found for issue #28971') 146 def test_over(self): 147 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 148 start = last - 10 149 # The "start-" article range form 150 resp, lines = self.server.over((start, None)) 151 art_num, art_dict = lines[0] 152 self._check_art_dict(art_dict) 153 # The "start-end" article range form 154 resp, lines = self.server.over((start, last)) 155 art_num, art_dict = lines[-1] 156 # The 'last' article is not necessarily part of the output (cancelled?) 157 self.assertGreaterEqual(art_num, start) 158 self.assertLessEqual(art_num, last) 159 self._check_art_dict(art_dict) 160 # XXX The "message_id" form is unsupported by gmane 161 # 503 Overview by message-ID unsupported 162 163 def test_xhdr(self): 164 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 165 resp, lines = self.server.xhdr('subject', last) 166 for line in lines: 167 self.assertEqual(str, type(line[1])) 168 169 def check_article_resp(self, resp, article, art_num=None): 170 self.assertIsInstance(article, nntplib.ArticleInfo) 171 if art_num is not None: 172 self.assertEqual(article.number, art_num) 173 for line in article.lines: 174 self.assertIsInstance(line, bytes) 175 # XXX this could exceptionally happen... 176 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) 177 178 @unittest.skipIf(True, "FIXME: see bpo-32128") 179 def test_article_head_body(self): 180 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 181 # Try to find an available article 182 for art_num in (last, first, last - 1): 183 try: 184 resp, head = self.server.head(art_num) 185 except nntplib.NNTPTemporaryError as e: 186 if not e.response.startswith("423 "): 187 raise 188 # "423 No such article" => choose another one 189 continue 190 break 191 else: 192 self.skipTest("could not find a suitable article number") 193 self.assertTrue(resp.startswith("221 "), resp) 194 self.check_article_resp(resp, head, art_num) 195 resp, body = self.server.body(art_num) 196 self.assertTrue(resp.startswith("222 "), resp) 197 self.check_article_resp(resp, body, art_num) 198 resp, article = self.server.article(art_num) 199 self.assertTrue(resp.startswith("220 "), resp) 200 self.check_article_resp(resp, article, art_num) 201 # Tolerate running the tests from behind a NNTP virus checker 202 denylist = lambda line: line.startswith(b'X-Antivirus') 203 filtered_head_lines = [line for line in head.lines 204 if not denylist(line)] 205 filtered_lines = [line for line in article.lines 206 if not denylist(line)] 207 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) 208 209 def test_capabilities(self): 210 # The server under test implements NNTP version 2 and has a 211 # couple of well-known capabilities. Just sanity check that we 212 # got them. 213 def _check_caps(caps): 214 caps_list = caps['LIST'] 215 self.assertIsInstance(caps_list, (list, tuple)) 216 self.assertIn('OVERVIEW.FMT', caps_list) 217 self.assertGreaterEqual(self.server.nntp_version, 2) 218 _check_caps(self.server.getcapabilities()) 219 # This re-emits the command 220 resp, caps = self.server.capabilities() 221 _check_caps(caps) 222 223 def test_zlogin(self): 224 # This test must be the penultimate because further commands will be 225 # refused. 226 baduser = "notarealuser" 227 badpw = "notarealpassword" 228 # Check that bogus credentials cause failure 229 self.assertRaises(nntplib.NNTPError, self.server.login, 230 user=baduser, password=badpw, usenetrc=False) 231 # FIXME: We should check that correct credentials succeed, but that 232 # would require valid details for some server somewhere to be in the 233 # test suite, I think. Gmane is anonymous, at least as used for the 234 # other tests. 235 236 def test_zzquit(self): 237 # This test must be called last, hence the name 238 cls = type(self) 239 try: 240 self.server.quit() 241 finally: 242 cls.server = None 243 244 @classmethod 245 def wrap_methods(cls): 246 # Wrap all methods in a transient_internet() exception catcher 247 # XXX put a generic version in test.support? 248 def wrap_meth(meth): 249 @functools.wraps(meth) 250 def wrapped(self): 251 with socket_helper.transient_internet(self.NNTP_HOST): 252 meth(self) 253 return wrapped 254 for name in dir(cls): 255 if not name.startswith('test_'): 256 continue 257 meth = getattr(cls, name) 258 if not callable(meth): 259 continue 260 # Need to use a closure so that meth remains bound to its current 261 # value 262 setattr(cls, name, wrap_meth(meth)) 263 264 def test_timeout(self): 265 with self.assertRaises(ValueError): 266 self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False) 267 268 def test_with_statement(self): 269 def is_connected(): 270 if not hasattr(server, 'file'): 271 return False 272 try: 273 server.help() 274 except (OSError, EOFError): 275 return False 276 return True 277 278 kwargs = dict( 279 timeout=support.INTERNET_TIMEOUT, 280 usenetrc=False 281 ) 282 if self.ssl_context is not None: 283 kwargs["ssl_context"] = self.ssl_context 284 285 try: 286 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs) 287 with server: 288 self.assertTrue(is_connected()) 289 self.assertTrue(server.help()) 290 self.assertFalse(is_connected()) 291 292 server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs) 293 with server: 294 server.quit() 295 self.assertFalse(is_connected()) 296 except SSLError as ssl_err: 297 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 298 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 299 raise unittest.SkipTest(f"Got {ssl_err} connecting " 300 f"to {self.NNTP_HOST!r}") 301 raise 302 303 304NetworkedNNTPTestsMixin.wrap_methods() 305 306 307EOF_ERRORS = (EOFError,) 308if ssl is not None: 309 EOF_ERRORS += (ssl.SSLEOFError,) 310 311 312class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): 313 # This server supports STARTTLS (gmane doesn't) 314 NNTP_HOST = 'news.trigofacile.com' 315 GROUP_NAME = 'fr.comp.lang.python' 316 GROUP_PAT = 'fr.comp.lang.*' 317 DESC = 'Python' 318 319 NNTP_CLASS = NNTP 320 321 @classmethod 322 def setUpClass(cls): 323 support.requires("network") 324 kwargs = dict( 325 timeout=support.INTERNET_TIMEOUT, 326 usenetrc=False 327 ) 328 if cls.ssl_context is not None: 329 kwargs["ssl_context"] = cls.ssl_context 330 with socket_helper.transient_internet(cls.NNTP_HOST): 331 try: 332 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs) 333 except SSLError as ssl_err: 334 # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small" 335 if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason): 336 raise unittest.SkipTest(f"{cls} got {ssl_err} connecting " 337 f"to {cls.NNTP_HOST!r}") 338 print(cls.NNTP_HOST) 339 raise 340 except EOF_ERRORS: 341 raise unittest.SkipTest(f"{cls} got EOF error on connecting " 342 f"to {cls.NNTP_HOST!r}") 343 344 @classmethod 345 def tearDownClass(cls): 346 if cls.server is not None: 347 cls.server.quit() 348 349@unittest.skipUnless(ssl, 'requires SSL support') 350class NetworkedNNTP_SSLTests(NetworkedNNTPTests): 351 352 # Technical limits for this public NNTP server (see http://www.aioe.org): 353 # "Only two concurrent connections per IP address are allowed and 354 # 400 connections per day are accepted from each IP address." 355 356 NNTP_HOST = 'nntp.aioe.org' 357 # bpo-42794: aioe.test is one of the official groups on this server 358 # used for testing: https://news.aioe.org/manual/aioe-hierarchy/ 359 GROUP_NAME = 'aioe.test' 360 GROUP_PAT = 'aioe.*' 361 DESC = 'test' 362 363 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) 364 365 # Disabled as it produces too much data 366 test_list = None 367 368 # Disabled as the connection will already be encrypted. 369 test_starttls = None 370 371 if ssl is not None: 372 ssl_context = ssl._create_unverified_context() 373 ssl_context.set_ciphers("DEFAULT") 374 ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2 375 376# 377# Non-networked tests using a local server (or something mocking it). 378# 379 380class _NNTPServerIO(io.RawIOBase): 381 """A raw IO object allowing NNTP commands to be received and processed 382 by a handler. The handler can push responses which can then be read 383 from the IO object.""" 384 385 def __init__(self, handler): 386 io.RawIOBase.__init__(self) 387 # The channel from the client 388 self.c2s = io.BytesIO() 389 # The channel to the client 390 self.s2c = io.BytesIO() 391 self.handler = handler 392 self.handler.start(self.c2s.readline, self.push_data) 393 394 def readable(self): 395 return True 396 397 def writable(self): 398 return True 399 400 def push_data(self, data): 401 """Push (buffer) some data to send to the client.""" 402 pos = self.s2c.tell() 403 self.s2c.seek(0, 2) 404 self.s2c.write(data) 405 self.s2c.seek(pos) 406 407 def write(self, b): 408 """The client sends us some data""" 409 pos = self.c2s.tell() 410 self.c2s.write(b) 411 self.c2s.seek(pos) 412 self.handler.process_pending() 413 return len(b) 414 415 def readinto(self, buf): 416 """The client wants to read a response""" 417 self.handler.process_pending() 418 b = self.s2c.read(len(buf)) 419 n = len(b) 420 buf[:n] = b 421 return n 422 423 424def make_mock_file(handler): 425 sio = _NNTPServerIO(handler) 426 # Using BufferedRWPair instead of BufferedRandom ensures the file 427 # isn't seekable. 428 file = io.BufferedRWPair(sio, sio) 429 return (sio, file) 430 431 432class NNTPServer(nntplib.NNTP): 433 434 def __init__(self, f, host, readermode=None): 435 self.file = f 436 self.host = host 437 self._base_init(readermode) 438 439 def _close(self): 440 self.file.close() 441 del self.file 442 443 444class MockedNNTPTestsMixin: 445 # Override in derived classes 446 handler_class = None 447 448 def setUp(self): 449 super().setUp() 450 self.make_server() 451 452 def tearDown(self): 453 super().tearDown() 454 del self.server 455 456 def make_server(self, *args, **kwargs): 457 self.handler = self.handler_class() 458 self.sio, file = make_mock_file(self.handler) 459 self.server = NNTPServer(file, 'test.server', *args, **kwargs) 460 return self.server 461 462 463class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): 464 def setUp(self): 465 super().setUp() 466 self.make_server(readermode=True) 467 468 469class NNTPv1Handler: 470 """A handler for RFC 977""" 471 472 welcome = "200 NNTP mock server" 473 474 def start(self, readline, push_data): 475 self.in_body = False 476 self.allow_posting = True 477 self._readline = readline 478 self._push_data = push_data 479 self._logged_in = False 480 self._user_sent = False 481 # Our welcome 482 self.handle_welcome() 483 484 def _decode(self, data): 485 return str(data, "utf-8", "surrogateescape") 486 487 def process_pending(self): 488 if self.in_body: 489 while True: 490 line = self._readline() 491 if not line: 492 return 493 self.body.append(line) 494 if line == b".\r\n": 495 break 496 try: 497 meth, tokens = self.body_callback 498 meth(*tokens, body=self.body) 499 finally: 500 self.body_callback = None 501 self.body = None 502 self.in_body = False 503 while True: 504 line = self._decode(self._readline()) 505 if not line: 506 return 507 if not line.endswith("\r\n"): 508 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) 509 line = line[:-2] 510 cmd, *tokens = line.split() 511 #meth = getattr(self.handler, "handle_" + cmd.upper(), None) 512 meth = getattr(self, "handle_" + cmd.upper(), None) 513 if meth is None: 514 self.handle_unknown() 515 else: 516 try: 517 meth(*tokens) 518 except Exception as e: 519 raise ValueError("command failed: {!r}".format(line)) from e 520 else: 521 if self.in_body: 522 self.body_callback = meth, tokens 523 self.body = [] 524 525 def expect_body(self): 526 """Flag that the client is expected to post a request body""" 527 self.in_body = True 528 529 def push_data(self, data): 530 """Push some binary data""" 531 self._push_data(data) 532 533 def push_lit(self, lit): 534 """Push a string literal""" 535 lit = textwrap.dedent(lit) 536 lit = "\r\n".join(lit.splitlines()) + "\r\n" 537 lit = lit.encode('utf-8') 538 self.push_data(lit) 539 540 def handle_unknown(self): 541 self.push_lit("500 What?") 542 543 def handle_welcome(self): 544 self.push_lit(self.welcome) 545 546 def handle_QUIT(self): 547 self.push_lit("205 Bye!") 548 549 def handle_DATE(self): 550 self.push_lit("111 20100914001155") 551 552 def handle_GROUP(self, group): 553 if group == "fr.comp.lang.python": 554 self.push_lit("211 486 761 1265 fr.comp.lang.python") 555 else: 556 self.push_lit("411 No such group {}".format(group)) 557 558 def handle_HELP(self): 559 self.push_lit("""\ 560 100 Legal commands 561 authinfo user Name|pass Password|generic <prog> <args> 562 date 563 help 564 Report problems to <root@example.org> 565 .""") 566 567 def handle_STAT(self, message_spec=None): 568 if message_spec is None: 569 self.push_lit("412 No newsgroup selected") 570 elif message_spec == "3000234": 571 self.push_lit("223 3000234 <45223423@example.com>") 572 elif message_spec == "<45223423@example.com>": 573 self.push_lit("223 0 <45223423@example.com>") 574 else: 575 self.push_lit("430 No Such Article Found") 576 577 def handle_NEXT(self): 578 self.push_lit("223 3000237 <668929@example.org> retrieved") 579 580 def handle_LAST(self): 581 self.push_lit("223 3000234 <45223423@example.com> retrieved") 582 583 def handle_LIST(self, action=None, param=None): 584 if action is None: 585 self.push_lit("""\ 586 215 Newsgroups in form "group high low flags". 587 comp.lang.python 0000052340 0000002828 y 588 comp.lang.python.announce 0000001153 0000000993 m 589 free.it.comp.lang.python 0000000002 0000000002 y 590 fr.comp.lang.python 0000001254 0000000760 y 591 free.it.comp.lang.python.learner 0000000000 0000000001 y 592 tw.bbs.comp.lang.python 0000000304 0000000304 y 593 .""") 594 elif action == "ACTIVE": 595 if param == "*distutils*": 596 self.push_lit("""\ 597 215 Newsgroups in form "group high low flags" 598 gmane.comp.python.distutils.devel 0000014104 0000000001 m 599 gmane.comp.python.distutils.cvs 0000000000 0000000001 m 600 .""") 601 else: 602 self.push_lit("""\ 603 215 Newsgroups in form "group high low flags" 604 .""") 605 elif action == "OVERVIEW.FMT": 606 self.push_lit("""\ 607 215 Order of fields in overview database. 608 Subject: 609 From: 610 Date: 611 Message-ID: 612 References: 613 Bytes: 614 Lines: 615 Xref:full 616 .""") 617 elif action == "NEWSGROUPS": 618 assert param is not None 619 if param == "comp.lang.python": 620 self.push_lit("""\ 621 215 Descriptions in form "group description". 622 comp.lang.python\tThe Python computer language. 623 .""") 624 elif param == "comp.lang.python*": 625 self.push_lit("""\ 626 215 Descriptions in form "group description". 627 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) 628 comp.lang.python\tThe Python computer language. 629 .""") 630 else: 631 self.push_lit("""\ 632 215 Descriptions in form "group description". 633 .""") 634 else: 635 self.push_lit('501 Unknown LIST keyword') 636 637 def handle_NEWNEWS(self, group, date_str, time_str): 638 # We hard code different return messages depending on passed 639 # argument and date syntax. 640 if (group == "comp.lang.python" and date_str == "20100913" 641 and time_str == "082004"): 642 # Date was passed in RFC 3977 format (NNTP "v2") 643 self.push_lit("""\ 644 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows 645 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 646 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 647 .""") 648 elif (group == "comp.lang.python" and date_str == "100913" 649 and time_str == "082004"): 650 # Date was passed in RFC 977 format (NNTP "v1") 651 self.push_lit("""\ 652 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows 653 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 654 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 655 .""") 656 elif (group == 'comp.lang.python' and 657 date_str in ('20100101', '100101') and 658 time_str == '090000'): 659 self.push_lit('too long line' * 3000 + 660 '\n.') 661 else: 662 self.push_lit("""\ 663 230 An empty list of newsarticles follows 664 .""") 665 # (Note for experiments: many servers disable NEWNEWS. 666 # As of this writing, sicinfo3.epfl.ch doesn't.) 667 668 def handle_XOVER(self, message_spec): 669 if message_spec == "57-59": 670 self.push_lit( 671 "224 Overview information for 57-58 follows\n" 672 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" 673 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 674 "\tSat, 19 Jun 2010 18:04:08 -0400" 675 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" 676 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16" 677 "\tXref: news.gmane.io gmane.comp.python.authors:57" 678 "\n" 679 "58\tLooking for a few good bloggers" 680 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 681 "\tThu, 22 Jul 2010 09:14:14 -0400" 682 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>" 683 "\t\t6683\t16" 684 "\t" 685 "\n" 686 # A UTF-8 overview line from fr.comp.lang.python 687 "59\tRe: Message d'erreur incompréhensible (par moi)" 688 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>" 689 "\tWed, 15 Sep 2010 18:09:15 +0200" 690 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>" 691 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" 692 "\tXref: saria.nerim.net fr.comp.lang.python:1265" 693 "\n" 694 ".\n") 695 else: 696 self.push_lit("""\ 697 224 No articles 698 .""") 699 700 def handle_POST(self, *, body=None): 701 if body is None: 702 if self.allow_posting: 703 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") 704 self.expect_body() 705 else: 706 self.push_lit("440 Posting not permitted") 707 else: 708 assert self.allow_posting 709 self.push_lit("240 Article received OK") 710 self.posted_body = body 711 712 def handle_IHAVE(self, message_id, *, body=None): 713 if body is None: 714 if (self.allow_posting and 715 message_id == "<i.am.an.article.you.will.want@example.com>"): 716 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") 717 self.expect_body() 718 else: 719 self.push_lit("435 Article not wanted") 720 else: 721 assert self.allow_posting 722 self.push_lit("235 Article transferred OK") 723 self.posted_body = body 724 725 sample_head = """\ 726 From: "Demo User" <nobody@example.net> 727 Subject: I am just a test article 728 Content-Type: text/plain; charset=UTF-8; format=flowed 729 Message-ID: <i.am.an.article.you.will.want@example.com>""" 730 731 sample_body = """\ 732 This is just a test article. 733 ..Here is a dot-starting line. 734 735 -- Signed by Andr\xe9.""" 736 737 sample_article = sample_head + "\n\n" + sample_body 738 739 def handle_ARTICLE(self, message_spec=None): 740 if message_spec is None: 741 self.push_lit("220 3000237 <45223423@example.com>") 742 elif message_spec == "<45223423@example.com>": 743 self.push_lit("220 0 <45223423@example.com>") 744 elif message_spec == "3000234": 745 self.push_lit("220 3000234 <45223423@example.com>") 746 else: 747 self.push_lit("430 No Such Article Found") 748 return 749 self.push_lit(self.sample_article) 750 self.push_lit(".") 751 752 def handle_HEAD(self, message_spec=None): 753 if message_spec is None: 754 self.push_lit("221 3000237 <45223423@example.com>") 755 elif message_spec == "<45223423@example.com>": 756 self.push_lit("221 0 <45223423@example.com>") 757 elif message_spec == "3000234": 758 self.push_lit("221 3000234 <45223423@example.com>") 759 else: 760 self.push_lit("430 No Such Article Found") 761 return 762 self.push_lit(self.sample_head) 763 self.push_lit(".") 764 765 def handle_BODY(self, message_spec=None): 766 if message_spec is None: 767 self.push_lit("222 3000237 <45223423@example.com>") 768 elif message_spec == "<45223423@example.com>": 769 self.push_lit("222 0 <45223423@example.com>") 770 elif message_spec == "3000234": 771 self.push_lit("222 3000234 <45223423@example.com>") 772 else: 773 self.push_lit("430 No Such Article Found") 774 return 775 self.push_lit(self.sample_body) 776 self.push_lit(".") 777 778 def handle_AUTHINFO(self, cred_type, data): 779 if self._logged_in: 780 self.push_lit('502 Already Logged In') 781 elif cred_type == 'user': 782 if self._user_sent: 783 self.push_lit('482 User Credential Already Sent') 784 else: 785 self.push_lit('381 Password Required') 786 self._user_sent = True 787 elif cred_type == 'pass': 788 self.push_lit('281 Login Successful') 789 self._logged_in = True 790 else: 791 raise Exception('Unknown cred type {}'.format(cred_type)) 792 793 794class NNTPv2Handler(NNTPv1Handler): 795 """A handler for RFC 3977 (NNTP "v2")""" 796 797 def handle_CAPABILITIES(self): 798 fmt = """\ 799 101 Capability list: 800 VERSION 2 3 801 IMPLEMENTATION INN 2.5.1{} 802 HDR 803 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 804 OVER 805 POST 806 READER 807 .""" 808 809 if not self._logged_in: 810 self.push_lit(fmt.format('\n AUTHINFO USER')) 811 else: 812 self.push_lit(fmt.format('')) 813 814 def handle_MODE(self, _): 815 raise Exception('MODE READER sent despite READER has been advertised') 816 817 def handle_OVER(self, message_spec=None): 818 return self.handle_XOVER(message_spec) 819 820 821class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): 822 """A handler that allows CAPABILITIES only after login""" 823 824 def handle_CAPABILITIES(self): 825 if not self._logged_in: 826 self.push_lit('480 You must log in.') 827 else: 828 super().handle_CAPABILITIES() 829 830 831class ModeSwitchingNNTPv2Handler(NNTPv2Handler): 832 """A server that starts in transit mode""" 833 834 def __init__(self): 835 self._switched = False 836 837 def handle_CAPABILITIES(self): 838 fmt = """\ 839 101 Capability list: 840 VERSION 2 3 841 IMPLEMENTATION INN 2.5.1 842 HDR 843 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 844 OVER 845 POST 846 {}READER 847 .""" 848 if self._switched: 849 self.push_lit(fmt.format('')) 850 else: 851 self.push_lit(fmt.format('MODE-')) 852 853 def handle_MODE(self, what): 854 assert not self._switched and what == 'reader' 855 self._switched = True 856 self.push_lit('200 Posting allowed') 857 858 859class NNTPv1v2TestsMixin: 860 861 def setUp(self): 862 super().setUp() 863 864 def test_welcome(self): 865 self.assertEqual(self.server.welcome, self.handler.welcome) 866 867 def test_authinfo(self): 868 if self.nntp_version == 2: 869 self.assertIn('AUTHINFO', self.server._caps) 870 self.server.login('testuser', 'testpw') 871 # if AUTHINFO is gone from _caps we also know that getcapabilities() 872 # has been called after login as it should 873 self.assertNotIn('AUTHINFO', self.server._caps) 874 875 def test_date(self): 876 resp, date = self.server.date() 877 self.assertEqual(resp, "111 20100914001155") 878 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) 879 880 def test_quit(self): 881 self.assertFalse(self.sio.closed) 882 resp = self.server.quit() 883 self.assertEqual(resp, "205 Bye!") 884 self.assertTrue(self.sio.closed) 885 886 def test_help(self): 887 resp, help = self.server.help() 888 self.assertEqual(resp, "100 Legal commands") 889 self.assertEqual(help, [ 890 ' authinfo user Name|pass Password|generic <prog> <args>', 891 ' date', 892 ' help', 893 'Report problems to <root@example.org>', 894 ]) 895 896 def test_list(self): 897 resp, groups = self.server.list() 898 self.assertEqual(len(groups), 6) 899 g = groups[1] 900 self.assertEqual(g, 901 GroupInfo("comp.lang.python.announce", "0000001153", 902 "0000000993", "m")) 903 resp, groups = self.server.list("*distutils*") 904 self.assertEqual(len(groups), 2) 905 g = groups[0] 906 self.assertEqual(g, 907 GroupInfo("gmane.comp.python.distutils.devel", "0000014104", 908 "0000000001", "m")) 909 910 def test_stat(self): 911 resp, art_num, message_id = self.server.stat(3000234) 912 self.assertEqual(resp, "223 3000234 <45223423@example.com>") 913 self.assertEqual(art_num, 3000234) 914 self.assertEqual(message_id, "<45223423@example.com>") 915 resp, art_num, message_id = self.server.stat("<45223423@example.com>") 916 self.assertEqual(resp, "223 0 <45223423@example.com>") 917 self.assertEqual(art_num, 0) 918 self.assertEqual(message_id, "<45223423@example.com>") 919 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 920 self.server.stat("<non.existent.id>") 921 self.assertEqual(cm.exception.response, "430 No Such Article Found") 922 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 923 self.server.stat() 924 self.assertEqual(cm.exception.response, "412 No newsgroup selected") 925 926 def test_next(self): 927 resp, art_num, message_id = self.server.next() 928 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") 929 self.assertEqual(art_num, 3000237) 930 self.assertEqual(message_id, "<668929@example.org>") 931 932 def test_last(self): 933 resp, art_num, message_id = self.server.last() 934 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") 935 self.assertEqual(art_num, 3000234) 936 self.assertEqual(message_id, "<45223423@example.com>") 937 938 def test_description(self): 939 desc = self.server.description("comp.lang.python") 940 self.assertEqual(desc, "The Python computer language.") 941 desc = self.server.description("comp.lang.pythonx") 942 self.assertEqual(desc, "") 943 944 def test_descriptions(self): 945 resp, groups = self.server.descriptions("comp.lang.python") 946 self.assertEqual(resp, '215 Descriptions in form "group description".') 947 self.assertEqual(groups, { 948 "comp.lang.python": "The Python computer language.", 949 }) 950 resp, groups = self.server.descriptions("comp.lang.python*") 951 self.assertEqual(groups, { 952 "comp.lang.python": "The Python computer language.", 953 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", 954 }) 955 resp, groups = self.server.descriptions("comp.lang.pythonx") 956 self.assertEqual(groups, {}) 957 958 def test_group(self): 959 resp, count, first, last, group = self.server.group("fr.comp.lang.python") 960 self.assertTrue(resp.startswith("211 "), resp) 961 self.assertEqual(first, 761) 962 self.assertEqual(last, 1265) 963 self.assertEqual(count, 486) 964 self.assertEqual(group, "fr.comp.lang.python") 965 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 966 self.server.group("comp.lang.python.devel") 967 exc = cm.exception 968 self.assertTrue(exc.response.startswith("411 No such group"), 969 exc.response) 970 971 def test_newnews(self): 972 # NEWNEWS comp.lang.python [20]100913 082004 973 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 974 resp, ids = self.server.newnews("comp.lang.python", dt) 975 expected = ( 976 "230 list of newsarticles (NNTP v{0}) " 977 "created after Mon Sep 13 08:20:04 2010 follows" 978 ).format(self.nntp_version) 979 self.assertEqual(resp, expected) 980 self.assertEqual(ids, [ 981 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>", 982 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>", 983 ]) 984 # NEWNEWS fr.comp.lang.python [20]100913 082004 985 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 986 resp, ids = self.server.newnews("fr.comp.lang.python", dt) 987 self.assertEqual(resp, "230 An empty list of newsarticles follows") 988 self.assertEqual(ids, []) 989 990 def _check_article_body(self, lines): 991 self.assertEqual(len(lines), 4) 992 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") 993 self.assertEqual(lines[-2], b"") 994 self.assertEqual(lines[-3], b".Here is a dot-starting line.") 995 self.assertEqual(lines[-4], b"This is just a test article.") 996 997 def _check_article_head(self, lines): 998 self.assertEqual(len(lines), 4) 999 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>') 1000 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>") 1001 1002 def _check_article_data(self, lines): 1003 self.assertEqual(len(lines), 9) 1004 self._check_article_head(lines[:4]) 1005 self._check_article_body(lines[-4:]) 1006 self.assertEqual(lines[4], b"") 1007 1008 def test_article(self): 1009 # ARTICLE 1010 resp, info = self.server.article() 1011 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 1012 art_num, message_id, lines = info 1013 self.assertEqual(art_num, 3000237) 1014 self.assertEqual(message_id, "<45223423@example.com>") 1015 self._check_article_data(lines) 1016 # ARTICLE num 1017 resp, info = self.server.article(3000234) 1018 self.assertEqual(resp, "220 3000234 <45223423@example.com>") 1019 art_num, message_id, lines = info 1020 self.assertEqual(art_num, 3000234) 1021 self.assertEqual(message_id, "<45223423@example.com>") 1022 self._check_article_data(lines) 1023 # ARTICLE id 1024 resp, info = self.server.article("<45223423@example.com>") 1025 self.assertEqual(resp, "220 0 <45223423@example.com>") 1026 art_num, message_id, lines = info 1027 self.assertEqual(art_num, 0) 1028 self.assertEqual(message_id, "<45223423@example.com>") 1029 self._check_article_data(lines) 1030 # Non-existent id 1031 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1032 self.server.article("<non-existent@example.com>") 1033 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1034 1035 def test_article_file(self): 1036 # With a "file" argument 1037 f = io.BytesIO() 1038 resp, info = self.server.article(file=f) 1039 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 1040 art_num, message_id, lines = info 1041 self.assertEqual(art_num, 3000237) 1042 self.assertEqual(message_id, "<45223423@example.com>") 1043 self.assertEqual(lines, []) 1044 data = f.getvalue() 1045 self.assertTrue(data.startswith( 1046 b'From: "Demo User" <nobody@example.net>\r\n' 1047 b'Subject: I am just a test article\r\n' 1048 ), ascii(data)) 1049 self.assertTrue(data.endswith( 1050 b'This is just a test article.\r\n' 1051 b'.Here is a dot-starting line.\r\n' 1052 b'\r\n' 1053 b'-- Signed by Andr\xc3\xa9.\r\n' 1054 ), ascii(data)) 1055 1056 def test_head(self): 1057 # HEAD 1058 resp, info = self.server.head() 1059 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1060 art_num, message_id, lines = info 1061 self.assertEqual(art_num, 3000237) 1062 self.assertEqual(message_id, "<45223423@example.com>") 1063 self._check_article_head(lines) 1064 # HEAD num 1065 resp, info = self.server.head(3000234) 1066 self.assertEqual(resp, "221 3000234 <45223423@example.com>") 1067 art_num, message_id, lines = info 1068 self.assertEqual(art_num, 3000234) 1069 self.assertEqual(message_id, "<45223423@example.com>") 1070 self._check_article_head(lines) 1071 # HEAD id 1072 resp, info = self.server.head("<45223423@example.com>") 1073 self.assertEqual(resp, "221 0 <45223423@example.com>") 1074 art_num, message_id, lines = info 1075 self.assertEqual(art_num, 0) 1076 self.assertEqual(message_id, "<45223423@example.com>") 1077 self._check_article_head(lines) 1078 # Non-existent id 1079 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1080 self.server.head("<non-existent@example.com>") 1081 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1082 1083 def test_head_file(self): 1084 f = io.BytesIO() 1085 resp, info = self.server.head(file=f) 1086 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1087 art_num, message_id, lines = info 1088 self.assertEqual(art_num, 3000237) 1089 self.assertEqual(message_id, "<45223423@example.com>") 1090 self.assertEqual(lines, []) 1091 data = f.getvalue() 1092 self.assertTrue(data.startswith( 1093 b'From: "Demo User" <nobody@example.net>\r\n' 1094 b'Subject: I am just a test article\r\n' 1095 ), ascii(data)) 1096 self.assertFalse(data.endswith( 1097 b'This is just a test article.\r\n' 1098 b'.Here is a dot-starting line.\r\n' 1099 b'\r\n' 1100 b'-- Signed by Andr\xc3\xa9.\r\n' 1101 ), ascii(data)) 1102 1103 def test_body(self): 1104 # BODY 1105 resp, info = self.server.body() 1106 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1107 art_num, message_id, lines = info 1108 self.assertEqual(art_num, 3000237) 1109 self.assertEqual(message_id, "<45223423@example.com>") 1110 self._check_article_body(lines) 1111 # BODY num 1112 resp, info = self.server.body(3000234) 1113 self.assertEqual(resp, "222 3000234 <45223423@example.com>") 1114 art_num, message_id, lines = info 1115 self.assertEqual(art_num, 3000234) 1116 self.assertEqual(message_id, "<45223423@example.com>") 1117 self._check_article_body(lines) 1118 # BODY id 1119 resp, info = self.server.body("<45223423@example.com>") 1120 self.assertEqual(resp, "222 0 <45223423@example.com>") 1121 art_num, message_id, lines = info 1122 self.assertEqual(art_num, 0) 1123 self.assertEqual(message_id, "<45223423@example.com>") 1124 self._check_article_body(lines) 1125 # Non-existent id 1126 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1127 self.server.body("<non-existent@example.com>") 1128 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1129 1130 def test_body_file(self): 1131 f = io.BytesIO() 1132 resp, info = self.server.body(file=f) 1133 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1134 art_num, message_id, lines = info 1135 self.assertEqual(art_num, 3000237) 1136 self.assertEqual(message_id, "<45223423@example.com>") 1137 self.assertEqual(lines, []) 1138 data = f.getvalue() 1139 self.assertFalse(data.startswith( 1140 b'From: "Demo User" <nobody@example.net>\r\n' 1141 b'Subject: I am just a test article\r\n' 1142 ), ascii(data)) 1143 self.assertTrue(data.endswith( 1144 b'This is just a test article.\r\n' 1145 b'.Here is a dot-starting line.\r\n' 1146 b'\r\n' 1147 b'-- Signed by Andr\xc3\xa9.\r\n' 1148 ), ascii(data)) 1149 1150 def check_over_xover_resp(self, resp, overviews): 1151 self.assertTrue(resp.startswith("224 "), resp) 1152 self.assertEqual(len(overviews), 3) 1153 art_num, over = overviews[0] 1154 self.assertEqual(art_num, 57) 1155 self.assertEqual(over, { 1156 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>", 1157 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", 1158 "date": "Sat, 19 Jun 2010 18:04:08 -0400", 1159 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", 1160 "references": "<hvalf7$ort$1@dough.gmane.org>", 1161 ":bytes": "7103", 1162 ":lines": "16", 1163 "xref": "news.gmane.io gmane.comp.python.authors:57" 1164 }) 1165 art_num, over = overviews[1] 1166 self.assertEqual(over["xref"], None) 1167 art_num, over = overviews[2] 1168 self.assertEqual(over["subject"], 1169 "Re: Message d'erreur incompréhensible (par moi)") 1170 1171 def test_xover(self): 1172 resp, overviews = self.server.xover(57, 59) 1173 self.check_over_xover_resp(resp, overviews) 1174 1175 def test_over(self): 1176 # In NNTP "v1", this will fallback on XOVER 1177 resp, overviews = self.server.over((57, 59)) 1178 self.check_over_xover_resp(resp, overviews) 1179 1180 sample_post = ( 1181 b'From: "Demo User" <nobody@example.net>\r\n' 1182 b'Subject: I am just a test article\r\n' 1183 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' 1184 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n' 1185 b'\r\n' 1186 b'This is just a test article.\r\n' 1187 b'.Here is a dot-starting line.\r\n' 1188 b'\r\n' 1189 b'-- Signed by Andr\xc3\xa9.\r\n' 1190 ) 1191 1192 def _check_posted_body(self): 1193 # Check the raw body as received by the server 1194 lines = self.handler.posted_body 1195 # One additional line for the "." terminator 1196 self.assertEqual(len(lines), 10) 1197 self.assertEqual(lines[-1], b'.\r\n') 1198 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') 1199 self.assertEqual(lines[-3], b'\r\n') 1200 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') 1201 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n') 1202 1203 def _check_post_ihave_sub(self, func, *args, file_factory): 1204 # First the prepared post with CRLF endings 1205 post = self.sample_post 1206 func_args = args + (file_factory(post),) 1207 self.handler.posted_body = None 1208 resp = func(*func_args) 1209 self._check_posted_body() 1210 # Then the same post with "normal" line endings - they should be 1211 # converted by NNTP.post and NNTP.ihave. 1212 post = self.sample_post.replace(b"\r\n", b"\n") 1213 func_args = args + (file_factory(post),) 1214 self.handler.posted_body = None 1215 resp = func(*func_args) 1216 self._check_posted_body() 1217 return resp 1218 1219 def check_post_ihave(self, func, success_resp, *args): 1220 # With a bytes object 1221 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) 1222 self.assertEqual(resp, success_resp) 1223 # With a bytearray object 1224 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) 1225 self.assertEqual(resp, success_resp) 1226 # With a file object 1227 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) 1228 self.assertEqual(resp, success_resp) 1229 # With an iterable of terminated lines 1230 def iterlines(b): 1231 return iter(b.splitlines(keepends=True)) 1232 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1233 self.assertEqual(resp, success_resp) 1234 # With an iterable of non-terminated lines 1235 def iterlines(b): 1236 return iter(b.splitlines(keepends=False)) 1237 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1238 self.assertEqual(resp, success_resp) 1239 1240 def test_post(self): 1241 self.check_post_ihave(self.server.post, "240 Article received OK") 1242 self.handler.allow_posting = False 1243 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1244 self.server.post(self.sample_post) 1245 self.assertEqual(cm.exception.response, 1246 "440 Posting not permitted") 1247 1248 def test_ihave(self): 1249 self.check_post_ihave(self.server.ihave, "235 Article transferred OK", 1250 "<i.am.an.article.you.will.want@example.com>") 1251 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1252 self.server.ihave("<another.message.id>", self.sample_post) 1253 self.assertEqual(cm.exception.response, 1254 "435 Article not wanted") 1255 1256 def test_too_long_lines(self): 1257 dt = datetime.datetime(2010, 1, 1, 9, 0, 0) 1258 self.assertRaises(nntplib.NNTPDataError, 1259 self.server.newnews, "comp.lang.python", dt) 1260 1261 1262class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1263 """Tests an NNTP v1 server (no capabilities).""" 1264 1265 nntp_version = 1 1266 handler_class = NNTPv1Handler 1267 1268 def test_caps(self): 1269 caps = self.server.getcapabilities() 1270 self.assertEqual(caps, {}) 1271 self.assertEqual(self.server.nntp_version, 1) 1272 self.assertEqual(self.server.nntp_implementation, None) 1273 1274 1275class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1276 """Tests an NNTP v2 server (with capabilities).""" 1277 1278 nntp_version = 2 1279 handler_class = NNTPv2Handler 1280 1281 def test_caps(self): 1282 caps = self.server.getcapabilities() 1283 self.assertEqual(caps, { 1284 'VERSION': ['2', '3'], 1285 'IMPLEMENTATION': ['INN', '2.5.1'], 1286 'AUTHINFO': ['USER'], 1287 'HDR': [], 1288 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', 1289 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], 1290 'OVER': [], 1291 'POST': [], 1292 'READER': [], 1293 }) 1294 self.assertEqual(self.server.nntp_version, 3) 1295 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1') 1296 1297 1298class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase): 1299 """Tests a probably NNTP v2 server with capabilities only after login.""" 1300 1301 nntp_version = 2 1302 handler_class = CapsAfterLoginNNTPv2Handler 1303 1304 def test_caps_only_after_login(self): 1305 self.assertEqual(self.server._caps, {}) 1306 self.server.login('testuser', 'testpw') 1307 self.assertIn('VERSION', self.server._caps) 1308 1309 1310class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin, 1311 unittest.TestCase): 1312 """Same tests as for v2 but we tell NTTP to send MODE READER to a server 1313 that isn't in READER mode by default.""" 1314 1315 nntp_version = 2 1316 handler_class = ModeSwitchingNNTPv2Handler 1317 1318 def test_we_are_in_reader_mode_after_connect(self): 1319 self.assertIn('READER', self.server._caps) 1320 1321 1322class MiscTests(unittest.TestCase): 1323 1324 def test_decode_header(self): 1325 def gives(a, b): 1326 self.assertEqual(nntplib.decode_header(a), b) 1327 gives("" , "") 1328 gives("a plain header", "a plain header") 1329 gives(" with extra spaces ", " with extra spaces ") 1330 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") 1331 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" 1332 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", 1333 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") 1334 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", 1335 "Re: problème de matrice") 1336 # A natively utf-8 header (found in the real world!) 1337 gives("Re: Message d'erreur incompréhensible (par moi)", 1338 "Re: Message d'erreur incompréhensible (par moi)") 1339 1340 def test_parse_overview_fmt(self): 1341 # The minimal (default) response 1342 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1343 "References:", ":bytes", ":lines"] 1344 self.assertEqual(nntplib._parse_overview_fmt(lines), 1345 ["subject", "from", "date", "message-id", "references", 1346 ":bytes", ":lines"]) 1347 # The minimal response using alternative names 1348 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1349 "References:", "Bytes:", "Lines:"] 1350 self.assertEqual(nntplib._parse_overview_fmt(lines), 1351 ["subject", "from", "date", "message-id", "references", 1352 ":bytes", ":lines"]) 1353 # Variations in casing 1354 lines = ["subject:", "FROM:", "DaTe:", "message-ID:", 1355 "References:", "BYTES:", "Lines:"] 1356 self.assertEqual(nntplib._parse_overview_fmt(lines), 1357 ["subject", "from", "date", "message-id", "references", 1358 ":bytes", ":lines"]) 1359 # First example from RFC 3977 1360 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1361 "References:", ":bytes", ":lines", "Xref:full", 1362 "Distribution:full"] 1363 self.assertEqual(nntplib._parse_overview_fmt(lines), 1364 ["subject", "from", "date", "message-id", "references", 1365 ":bytes", ":lines", "xref", "distribution"]) 1366 # Second example from RFC 3977 1367 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1368 "References:", "Bytes:", "Lines:", "Xref:FULL", 1369 "Distribution:FULL"] 1370 self.assertEqual(nntplib._parse_overview_fmt(lines), 1371 ["subject", "from", "date", "message-id", "references", 1372 ":bytes", ":lines", "xref", "distribution"]) 1373 # A classic response from INN 1374 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1375 "References:", "Bytes:", "Lines:", "Xref:full"] 1376 self.assertEqual(nntplib._parse_overview_fmt(lines), 1377 ["subject", "from", "date", "message-id", "references", 1378 ":bytes", ":lines", "xref"]) 1379 1380 def test_parse_overview(self): 1381 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] 1382 # First example from RFC 3977 1383 lines = [ 1384 '3000234\tI am just a test article\t"Demo User" ' 1385 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1386 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1387 '17\tXref: news.example.com misc.test:3000363', 1388 ] 1389 overview = nntplib._parse_overview(lines, fmt) 1390 (art_num, fields), = overview 1391 self.assertEqual(art_num, 3000234) 1392 self.assertEqual(fields, { 1393 'subject': 'I am just a test article', 1394 'from': '"Demo User" <nobody@example.com>', 1395 'date': '6 Oct 1998 04:38:40 -0500', 1396 'message-id': '<45223423@example.com>', 1397 'references': '<45454@example.net>', 1398 ':bytes': '1234', 1399 ':lines': '17', 1400 'xref': 'news.example.com misc.test:3000363', 1401 }) 1402 # Second example; here the "Xref" field is totally absent (including 1403 # the header name) and comes out as None 1404 lines = [ 1405 '3000234\tI am just a test article\t"Demo User" ' 1406 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1407 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1408 '17\t\t', 1409 ] 1410 overview = nntplib._parse_overview(lines, fmt) 1411 (art_num, fields), = overview 1412 self.assertEqual(fields['xref'], None) 1413 # Third example; the "Xref" is an empty string, while "references" 1414 # is a single space. 1415 lines = [ 1416 '3000234\tI am just a test article\t"Demo User" ' 1417 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1418 '<45223423@example.com>\t \t1234\t' 1419 '17\tXref: \t', 1420 ] 1421 overview = nntplib._parse_overview(lines, fmt) 1422 (art_num, fields), = overview 1423 self.assertEqual(fields['references'], ' ') 1424 self.assertEqual(fields['xref'], '') 1425 1426 def test_parse_datetime(self): 1427 def gives(a, b, *c): 1428 self.assertEqual(nntplib._parse_datetime(a, b), 1429 datetime.datetime(*c)) 1430 # Output of DATE command 1431 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) 1432 # Variations 1433 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) 1434 gives("990623", "135624", 1999, 6, 23, 13, 56, 24) 1435 gives("090623", "135624", 2009, 6, 23, 13, 56, 24) 1436 1437 def test_unparse_datetime(self): 1438 # Test non-legacy mode 1439 # 1) with a datetime 1440 def gives(y, M, d, h, m, s, date_str, time_str): 1441 dt = datetime.datetime(y, M, d, h, m, s) 1442 self.assertEqual(nntplib._unparse_datetime(dt), 1443 (date_str, time_str)) 1444 self.assertEqual(nntplib._unparse_datetime(dt, False), 1445 (date_str, time_str)) 1446 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") 1447 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") 1448 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") 1449 # 2) with a date 1450 def gives(y, M, d, date_str, time_str): 1451 dt = datetime.date(y, M, d) 1452 self.assertEqual(nntplib._unparse_datetime(dt), 1453 (date_str, time_str)) 1454 self.assertEqual(nntplib._unparse_datetime(dt, False), 1455 (date_str, time_str)) 1456 gives(1999, 6, 23, "19990623", "000000") 1457 gives(2000, 6, 23, "20000623", "000000") 1458 gives(2010, 6, 5, "20100605", "000000") 1459 1460 def test_unparse_datetime_legacy(self): 1461 # Test legacy mode (RFC 977) 1462 # 1) with a datetime 1463 def gives(y, M, d, h, m, s, date_str, time_str): 1464 dt = datetime.datetime(y, M, d, h, m, s) 1465 self.assertEqual(nntplib._unparse_datetime(dt, True), 1466 (date_str, time_str)) 1467 gives(1999, 6, 23, 13, 56, 24, "990623", "135624") 1468 gives(2000, 6, 23, 13, 56, 24, "000623", "135624") 1469 gives(2010, 6, 5, 1, 2, 3, "100605", "010203") 1470 # 2) with a date 1471 def gives(y, M, d, date_str, time_str): 1472 dt = datetime.date(y, M, d) 1473 self.assertEqual(nntplib._unparse_datetime(dt, True), 1474 (date_str, time_str)) 1475 gives(1999, 6, 23, "990623", "000000") 1476 gives(2000, 6, 23, "000623", "000000") 1477 gives(2010, 6, 5, "100605", "000000") 1478 1479 @unittest.skipUnless(ssl, 'requires SSL support') 1480 def test_ssl_support(self): 1481 self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) 1482 1483 1484class PublicAPITests(unittest.TestCase): 1485 """Ensures that the correct values are exposed in the public API.""" 1486 1487 def test_module_all_attribute(self): 1488 self.assertTrue(hasattr(nntplib, '__all__')) 1489 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError', 1490 'NNTPTemporaryError', 'NNTPPermanentError', 1491 'NNTPProtocolError', 'NNTPDataError', 'decode_header'] 1492 if ssl is not None: 1493 target_api.append('NNTP_SSL') 1494 self.assertEqual(set(nntplib.__all__), set(target_api)) 1495 1496class MockSocketTests(unittest.TestCase): 1497 """Tests involving a mock socket object 1498 1499 Used where the _NNTPServerIO file object is not enough.""" 1500 1501 nntp_class = nntplib.NNTP 1502 1503 def check_constructor_error_conditions( 1504 self, handler_class, 1505 expected_error_type, expected_error_msg, 1506 login=None, password=None): 1507 1508 class mock_socket_module: 1509 def create_connection(address, timeout): 1510 return MockSocket() 1511 1512 class MockSocket: 1513 def close(self): 1514 nonlocal socket_closed 1515 socket_closed = True 1516 1517 def makefile(socket, mode): 1518 handler = handler_class() 1519 _, file = make_mock_file(handler) 1520 files.append(file) 1521 return file 1522 1523 socket_closed = False 1524 files = [] 1525 with patch('nntplib.socket', mock_socket_module), \ 1526 self.assertRaisesRegex(expected_error_type, expected_error_msg): 1527 self.nntp_class('dummy', user=login, password=password) 1528 self.assertTrue(socket_closed) 1529 for f in files: 1530 self.assertTrue(f.closed) 1531 1532 def test_bad_welcome(self): 1533 #Test a bad welcome message 1534 class Handler(NNTPv1Handler): 1535 welcome = 'Bad Welcome' 1536 self.check_constructor_error_conditions( 1537 Handler, nntplib.NNTPProtocolError, Handler.welcome) 1538 1539 def test_service_temporarily_unavailable(self): 1540 #Test service temporarily unavailable 1541 class Handler(NNTPv1Handler): 1542 welcome = '400 Service temporarily unavailable' 1543 self.check_constructor_error_conditions( 1544 Handler, nntplib.NNTPTemporaryError, Handler.welcome) 1545 1546 def test_service_permanently_unavailable(self): 1547 #Test service permanently unavailable 1548 class Handler(NNTPv1Handler): 1549 welcome = '502 Service permanently unavailable' 1550 self.check_constructor_error_conditions( 1551 Handler, nntplib.NNTPPermanentError, Handler.welcome) 1552 1553 def test_bad_capabilities(self): 1554 #Test a bad capabilities response 1555 class Handler(NNTPv1Handler): 1556 def handle_CAPABILITIES(self): 1557 self.push_lit(capabilities_response) 1558 capabilities_response = '201 bad capability' 1559 self.check_constructor_error_conditions( 1560 Handler, nntplib.NNTPReplyError, capabilities_response) 1561 1562 def test_login_aborted(self): 1563 #Test a bad authinfo response 1564 login = 't@e.com' 1565 password = 'python' 1566 class Handler(NNTPv1Handler): 1567 def handle_AUTHINFO(self, *args): 1568 self.push_lit(authinfo_response) 1569 authinfo_response = '503 Mechanism not recognized' 1570 self.check_constructor_error_conditions( 1571 Handler, nntplib.NNTPPermanentError, authinfo_response, 1572 login, password) 1573 1574class bypass_context: 1575 """Bypass encryption and actual SSL module""" 1576 def wrap_socket(sock, **args): 1577 return sock 1578 1579@unittest.skipUnless(ssl, 'requires SSL support') 1580class MockSslTests(MockSocketTests): 1581 @staticmethod 1582 def nntp_class(*pos, **kw): 1583 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) 1584 1585 1586class LocalServerTests(unittest.TestCase): 1587 def setUp(self): 1588 sock = socket.socket() 1589 port = socket_helper.bind_port(sock) 1590 sock.listen() 1591 self.background = threading.Thread( 1592 target=self.run_server, args=(sock,)) 1593 self.background.start() 1594 self.addCleanup(self.background.join) 1595 1596 self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__() 1597 self.addCleanup(self.nntp.__exit__, None, None, None) 1598 1599 def run_server(self, sock): 1600 # Could be generalized to handle more commands in separate methods 1601 with sock: 1602 [client, _] = sock.accept() 1603 with contextlib.ExitStack() as cleanup: 1604 cleanup.enter_context(client) 1605 reader = cleanup.enter_context(client.makefile('rb')) 1606 client.sendall(b'200 Server ready\r\n') 1607 while True: 1608 cmd = reader.readline() 1609 if cmd == b'CAPABILITIES\r\n': 1610 client.sendall( 1611 b'101 Capability list:\r\n' 1612 b'VERSION 2\r\n' 1613 b'STARTTLS\r\n' 1614 b'.\r\n' 1615 ) 1616 elif cmd == b'STARTTLS\r\n': 1617 reader.close() 1618 client.sendall(b'382 Begin TLS negotiation now\r\n') 1619 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 1620 context.load_cert_chain(certfile) 1621 client = context.wrap_socket( 1622 client, server_side=True) 1623 cleanup.enter_context(client) 1624 reader = cleanup.enter_context(client.makefile('rb')) 1625 elif cmd == b'QUIT\r\n': 1626 client.sendall(b'205 Bye!\r\n') 1627 break 1628 else: 1629 raise ValueError('Unexpected command {!r}'.format(cmd)) 1630 1631 @unittest.skipUnless(ssl, 'requires SSL support') 1632 def test_starttls(self): 1633 file = self.nntp.file 1634 sock = self.nntp.sock 1635 self.nntp.starttls() 1636 # Check that the socket and internal pseudo-file really were 1637 # changed. 1638 self.assertNotEqual(file, self.nntp.file) 1639 self.assertNotEqual(sock, self.nntp.sock) 1640 # Check that the new socket really is an SSL one 1641 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) 1642 # Check that trying starttls when it's already active fails. 1643 self.assertRaises(ValueError, self.nntp.starttls) 1644 1645 1646if __name__ == "__main__": 1647 unittest.main() 1648