1import io 2import socket 3import datetime 4import textwrap 5import unittest 6import functools 7import contextlib 8import os.path 9from test import support 10from nntplib import NNTP, GroupInfo 11import nntplib 12from unittest.mock import patch 13try: 14 import ssl 15except ImportError: 16 ssl = None 17try: 18 import threading 19except ImportError: 20 threading = None 21 22TIMEOUT = 30 23certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') 24 25# TODO: 26# - test the `file` arg to more commands 27# - test error conditions 28# - test auth and `usenetrc` 29 30 31class NetworkedNNTPTestsMixin: 32 33 def test_welcome(self): 34 welcome = self.server.getwelcome() 35 self.assertEqual(str, type(welcome)) 36 37 def test_help(self): 38 resp, lines = self.server.help() 39 self.assertTrue(resp.startswith("100 "), resp) 40 for line in lines: 41 self.assertEqual(str, type(line)) 42 43 def test_list(self): 44 resp, groups = self.server.list() 45 if len(groups) > 0: 46 self.assertEqual(GroupInfo, type(groups[0])) 47 self.assertEqual(str, type(groups[0].group)) 48 49 def test_list_active(self): 50 resp, groups = self.server.list(self.GROUP_PAT) 51 if len(groups) > 0: 52 self.assertEqual(GroupInfo, type(groups[0])) 53 self.assertEqual(str, type(groups[0].group)) 54 55 def test_unknown_command(self): 56 with self.assertRaises(nntplib.NNTPPermanentError) as cm: 57 self.server._shortcmd("XYZZY") 58 resp = cm.exception.response 59 self.assertTrue(resp.startswith("500 "), resp) 60 61 def test_newgroups(self): 62 # gmane gets a constant influx of new groups. In order not to stress 63 # the server too much, we choose a recent date in the past. 64 dt = datetime.date.today() - datetime.timedelta(days=7) 65 resp, groups = self.server.newgroups(dt) 66 if len(groups) > 0: 67 self.assertIsInstance(groups[0], GroupInfo) 68 self.assertIsInstance(groups[0].group, str) 69 70 def test_description(self): 71 def _check_desc(desc): 72 # Sanity checks 73 self.assertIsInstance(desc, str) 74 self.assertNotIn(self.GROUP_NAME, desc) 75 desc = self.server.description(self.GROUP_NAME) 76 _check_desc(desc) 77 # Another sanity check 78 self.assertIn("Python", desc) 79 # With a pattern 80 desc = self.server.description(self.GROUP_PAT) 81 _check_desc(desc) 82 # Shouldn't exist 83 desc = self.server.description("zk.brrtt.baz") 84 self.assertEqual(desc, '') 85 86 def test_descriptions(self): 87 resp, descs = self.server.descriptions(self.GROUP_PAT) 88 # 215 for LIST NEWSGROUPS, 282 for XGTITLE 89 self.assertTrue( 90 resp.startswith("215 ") or resp.startswith("282 "), resp) 91 self.assertIsInstance(descs, dict) 92 desc = descs[self.GROUP_NAME] 93 self.assertEqual(desc, self.server.description(self.GROUP_NAME)) 94 95 def test_group(self): 96 result = self.server.group(self.GROUP_NAME) 97 self.assertEqual(5, len(result)) 98 resp, count, first, last, group = result 99 self.assertEqual(group, self.GROUP_NAME) 100 self.assertIsInstance(count, int) 101 self.assertIsInstance(first, int) 102 self.assertIsInstance(last, int) 103 self.assertLessEqual(first, last) 104 self.assertTrue(resp.startswith("211 "), resp) 105 106 def test_date(self): 107 resp, date = self.server.date() 108 self.assertIsInstance(date, datetime.datetime) 109 # Sanity check 110 self.assertGreaterEqual(date.year, 1995) 111 self.assertLessEqual(date.year, 2030) 112 113 def _check_art_dict(self, art_dict): 114 # Some sanity checks for a field dictionary returned by OVER / XOVER 115 self.assertIsInstance(art_dict, dict) 116 # NNTP has 7 mandatory fields 117 self.assertGreaterEqual(art_dict.keys(), 118 {"subject", "from", "date", "message-id", 119 "references", ":bytes", ":lines"} 120 ) 121 for v in art_dict.values(): 122 self.assertIsInstance(v, (str, type(None))) 123 124 def test_xover(self): 125 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 126 resp, lines = self.server.xover(last - 5, last) 127 if len(lines) == 0: 128 self.skipTest("no articles retrieved") 129 # The 'last' article is not necessarily part of the output (cancelled?) 130 art_num, art_dict = lines[0] 131 self.assertGreaterEqual(art_num, last - 5) 132 self.assertLessEqual(art_num, last) 133 self._check_art_dict(art_dict) 134 135 @unittest.skipIf(True, 'temporarily skipped until a permanent solution' 136 ' is found for issue #28971') 137 def test_over(self): 138 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 139 start = last - 10 140 # The "start-" article range form 141 resp, lines = self.server.over((start, None)) 142 art_num, art_dict = lines[0] 143 self._check_art_dict(art_dict) 144 # The "start-end" article range form 145 resp, lines = self.server.over((start, last)) 146 art_num, art_dict = lines[-1] 147 # The 'last' article is not necessarily part of the output (cancelled?) 148 self.assertGreaterEqual(art_num, start) 149 self.assertLessEqual(art_num, last) 150 self._check_art_dict(art_dict) 151 # XXX The "message_id" form is unsupported by gmane 152 # 503 Overview by message-ID unsupported 153 154 def test_xhdr(self): 155 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 156 resp, lines = self.server.xhdr('subject', last) 157 for line in lines: 158 self.assertEqual(str, type(line[1])) 159 160 def check_article_resp(self, resp, article, art_num=None): 161 self.assertIsInstance(article, nntplib.ArticleInfo) 162 if art_num is not None: 163 self.assertEqual(article.number, art_num) 164 for line in article.lines: 165 self.assertIsInstance(line, bytes) 166 # XXX this could exceptionally happen... 167 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) 168 169 def test_article_head_body(self): 170 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 171 # Try to find an available article 172 for art_num in (last, first, last - 1): 173 try: 174 resp, head = self.server.head(art_num) 175 except nntplib.NNTPTemporaryError as e: 176 if not e.response.startswith("423 "): 177 raise 178 # "423 No such article" => choose another one 179 continue 180 break 181 else: 182 self.skipTest("could not find a suitable article number") 183 self.assertTrue(resp.startswith("221 "), resp) 184 self.check_article_resp(resp, head, art_num) 185 resp, body = self.server.body(art_num) 186 self.assertTrue(resp.startswith("222 "), resp) 187 self.check_article_resp(resp, body, art_num) 188 resp, article = self.server.article(art_num) 189 self.assertTrue(resp.startswith("220 "), resp) 190 self.check_article_resp(resp, article, art_num) 191 # Tolerate running the tests from behind a NNTP virus checker 192 blacklist = lambda line: line.startswith(b'X-Antivirus') 193 filtered_head_lines = [line for line in head.lines 194 if not blacklist(line)] 195 filtered_lines = [line for line in article.lines 196 if not blacklist(line)] 197 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) 198 199 def test_capabilities(self): 200 # The server under test implements NNTP version 2 and has a 201 # couple of well-known capabilities. Just sanity check that we 202 # got them. 203 def _check_caps(caps): 204 caps_list = caps['LIST'] 205 self.assertIsInstance(caps_list, (list, tuple)) 206 self.assertIn('OVERVIEW.FMT', caps_list) 207 self.assertGreaterEqual(self.server.nntp_version, 2) 208 _check_caps(self.server.getcapabilities()) 209 # This re-emits the command 210 resp, caps = self.server.capabilities() 211 _check_caps(caps) 212 213 def test_zlogin(self): 214 # This test must be the penultimate because further commands will be 215 # refused. 216 baduser = "notarealuser" 217 badpw = "notarealpassword" 218 # Check that bogus credentials cause failure 219 self.assertRaises(nntplib.NNTPError, self.server.login, 220 user=baduser, password=badpw, usenetrc=False) 221 # FIXME: We should check that correct credentials succeed, but that 222 # would require valid details for some server somewhere to be in the 223 # test suite, I think. Gmane is anonymous, at least as used for the 224 # other tests. 225 226 def test_zzquit(self): 227 # This test must be called last, hence the name 228 cls = type(self) 229 try: 230 self.server.quit() 231 finally: 232 cls.server = None 233 234 @classmethod 235 def wrap_methods(cls): 236 # Wrap all methods in a transient_internet() exception catcher 237 # XXX put a generic version in test.support? 238 def wrap_meth(meth): 239 @functools.wraps(meth) 240 def wrapped(self): 241 with support.transient_internet(self.NNTP_HOST): 242 meth(self) 243 return wrapped 244 for name in dir(cls): 245 if not name.startswith('test_'): 246 continue 247 meth = getattr(cls, name) 248 if not callable(meth): 249 continue 250 # Need to use a closure so that meth remains bound to its current 251 # value 252 setattr(cls, name, wrap_meth(meth)) 253 254 def test_with_statement(self): 255 def is_connected(): 256 if not hasattr(server, 'file'): 257 return False 258 try: 259 server.help() 260 except (OSError, EOFError): 261 return False 262 return True 263 264 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: 265 self.assertTrue(is_connected()) 266 self.assertTrue(server.help()) 267 self.assertFalse(is_connected()) 268 269 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: 270 server.quit() 271 self.assertFalse(is_connected()) 272 273 274NetworkedNNTPTestsMixin.wrap_methods() 275 276 277class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): 278 # This server supports STARTTLS (gmane doesn't) 279 NNTP_HOST = 'news.trigofacile.com' 280 GROUP_NAME = 'fr.comp.lang.python' 281 GROUP_PAT = 'fr.comp.lang.*' 282 283 NNTP_CLASS = NNTP 284 285 @classmethod 286 def setUpClass(cls): 287 support.requires("network") 288 with support.transient_internet(cls.NNTP_HOST): 289 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) 290 291 @classmethod 292 def tearDownClass(cls): 293 if cls.server is not None: 294 cls.server.quit() 295 296@unittest.skipUnless(ssl, 'requires SSL support') 297class NetworkedNNTP_SSLTests(NetworkedNNTPTests): 298 299 # Technical limits for this public NNTP server (see http://www.aioe.org): 300 # "Only two concurrent connections per IP address are allowed and 301 # 400 connections per day are accepted from each IP address." 302 303 NNTP_HOST = 'nntp.aioe.org' 304 GROUP_NAME = 'comp.lang.python' 305 GROUP_PAT = 'comp.lang.*' 306 307 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) 308 309 # Disabled as it produces too much data 310 test_list = None 311 312 # Disabled as the connection will already be encrypted. 313 test_starttls = None 314 315 316# 317# Non-networked tests using a local server (or something mocking it). 318# 319 320class _NNTPServerIO(io.RawIOBase): 321 """A raw IO object allowing NNTP commands to be received and processed 322 by a handler. The handler can push responses which can then be read 323 from the IO object.""" 324 325 def __init__(self, handler): 326 io.RawIOBase.__init__(self) 327 # The channel from the client 328 self.c2s = io.BytesIO() 329 # The channel to the client 330 self.s2c = io.BytesIO() 331 self.handler = handler 332 self.handler.start(self.c2s.readline, self.push_data) 333 334 def readable(self): 335 return True 336 337 def writable(self): 338 return True 339 340 def push_data(self, data): 341 """Push (buffer) some data to send to the client.""" 342 pos = self.s2c.tell() 343 self.s2c.seek(0, 2) 344 self.s2c.write(data) 345 self.s2c.seek(pos) 346 347 def write(self, b): 348 """The client sends us some data""" 349 pos = self.c2s.tell() 350 self.c2s.write(b) 351 self.c2s.seek(pos) 352 self.handler.process_pending() 353 return len(b) 354 355 def readinto(self, buf): 356 """The client wants to read a response""" 357 self.handler.process_pending() 358 b = self.s2c.read(len(buf)) 359 n = len(b) 360 buf[:n] = b 361 return n 362 363 364def make_mock_file(handler): 365 sio = _NNTPServerIO(handler) 366 # Using BufferedRWPair instead of BufferedRandom ensures the file 367 # isn't seekable. 368 file = io.BufferedRWPair(sio, sio) 369 return (sio, file) 370 371 372class MockedNNTPTestsMixin: 373 # Override in derived classes 374 handler_class = None 375 376 def setUp(self): 377 super().setUp() 378 self.make_server() 379 380 def tearDown(self): 381 super().tearDown() 382 del self.server 383 384 def make_server(self, *args, **kwargs): 385 self.handler = self.handler_class() 386 self.sio, file = make_mock_file(self.handler) 387 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) 388 return self.server 389 390 391class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): 392 def setUp(self): 393 super().setUp() 394 self.make_server(readermode=True) 395 396 397class NNTPv1Handler: 398 """A handler for RFC 977""" 399 400 welcome = "200 NNTP mock server" 401 402 def start(self, readline, push_data): 403 self.in_body = False 404 self.allow_posting = True 405 self._readline = readline 406 self._push_data = push_data 407 self._logged_in = False 408 self._user_sent = False 409 # Our welcome 410 self.handle_welcome() 411 412 def _decode(self, data): 413 return str(data, "utf-8", "surrogateescape") 414 415 def process_pending(self): 416 if self.in_body: 417 while True: 418 line = self._readline() 419 if not line: 420 return 421 self.body.append(line) 422 if line == b".\r\n": 423 break 424 try: 425 meth, tokens = self.body_callback 426 meth(*tokens, body=self.body) 427 finally: 428 self.body_callback = None 429 self.body = None 430 self.in_body = False 431 while True: 432 line = self._decode(self._readline()) 433 if not line: 434 return 435 if not line.endswith("\r\n"): 436 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) 437 line = line[:-2] 438 cmd, *tokens = line.split() 439 #meth = getattr(self.handler, "handle_" + cmd.upper(), None) 440 meth = getattr(self, "handle_" + cmd.upper(), None) 441 if meth is None: 442 self.handle_unknown() 443 else: 444 try: 445 meth(*tokens) 446 except Exception as e: 447 raise ValueError("command failed: {!r}".format(line)) from e 448 else: 449 if self.in_body: 450 self.body_callback = meth, tokens 451 self.body = [] 452 453 def expect_body(self): 454 """Flag that the client is expected to post a request body""" 455 self.in_body = True 456 457 def push_data(self, data): 458 """Push some binary data""" 459 self._push_data(data) 460 461 def push_lit(self, lit): 462 """Push a string literal""" 463 lit = textwrap.dedent(lit) 464 lit = "\r\n".join(lit.splitlines()) + "\r\n" 465 lit = lit.encode('utf-8') 466 self.push_data(lit) 467 468 def handle_unknown(self): 469 self.push_lit("500 What?") 470 471 def handle_welcome(self): 472 self.push_lit(self.welcome) 473 474 def handle_QUIT(self): 475 self.push_lit("205 Bye!") 476 477 def handle_DATE(self): 478 self.push_lit("111 20100914001155") 479 480 def handle_GROUP(self, group): 481 if group == "fr.comp.lang.python": 482 self.push_lit("211 486 761 1265 fr.comp.lang.python") 483 else: 484 self.push_lit("411 No such group {}".format(group)) 485 486 def handle_HELP(self): 487 self.push_lit("""\ 488 100 Legal commands 489 authinfo user Name|pass Password|generic <prog> <args> 490 date 491 help 492 Report problems to <root@example.org> 493 .""") 494 495 def handle_STAT(self, message_spec=None): 496 if message_spec is None: 497 self.push_lit("412 No newsgroup selected") 498 elif message_spec == "3000234": 499 self.push_lit("223 3000234 <45223423@example.com>") 500 elif message_spec == "<45223423@example.com>": 501 self.push_lit("223 0 <45223423@example.com>") 502 else: 503 self.push_lit("430 No Such Article Found") 504 505 def handle_NEXT(self): 506 self.push_lit("223 3000237 <668929@example.org> retrieved") 507 508 def handle_LAST(self): 509 self.push_lit("223 3000234 <45223423@example.com> retrieved") 510 511 def handle_LIST(self, action=None, param=None): 512 if action is None: 513 self.push_lit("""\ 514 215 Newsgroups in form "group high low flags". 515 comp.lang.python 0000052340 0000002828 y 516 comp.lang.python.announce 0000001153 0000000993 m 517 free.it.comp.lang.python 0000000002 0000000002 y 518 fr.comp.lang.python 0000001254 0000000760 y 519 free.it.comp.lang.python.learner 0000000000 0000000001 y 520 tw.bbs.comp.lang.python 0000000304 0000000304 y 521 .""") 522 elif action == "ACTIVE": 523 if param == "*distutils*": 524 self.push_lit("""\ 525 215 Newsgroups in form "group high low flags" 526 gmane.comp.python.distutils.devel 0000014104 0000000001 m 527 gmane.comp.python.distutils.cvs 0000000000 0000000001 m 528 .""") 529 else: 530 self.push_lit("""\ 531 215 Newsgroups in form "group high low flags" 532 .""") 533 elif action == "OVERVIEW.FMT": 534 self.push_lit("""\ 535 215 Order of fields in overview database. 536 Subject: 537 From: 538 Date: 539 Message-ID: 540 References: 541 Bytes: 542 Lines: 543 Xref:full 544 .""") 545 elif action == "NEWSGROUPS": 546 assert param is not None 547 if param == "comp.lang.python": 548 self.push_lit("""\ 549 215 Descriptions in form "group description". 550 comp.lang.python\tThe Python computer language. 551 .""") 552 elif param == "comp.lang.python*": 553 self.push_lit("""\ 554 215 Descriptions in form "group description". 555 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) 556 comp.lang.python\tThe Python computer language. 557 .""") 558 else: 559 self.push_lit("""\ 560 215 Descriptions in form "group description". 561 .""") 562 else: 563 self.push_lit('501 Unknown LIST keyword') 564 565 def handle_NEWNEWS(self, group, date_str, time_str): 566 # We hard code different return messages depending on passed 567 # argument and date syntax. 568 if (group == "comp.lang.python" and date_str == "20100913" 569 and time_str == "082004"): 570 # Date was passed in RFC 3977 format (NNTP "v2") 571 self.push_lit("""\ 572 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows 573 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 574 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 575 .""") 576 elif (group == "comp.lang.python" and date_str == "100913" 577 and time_str == "082004"): 578 # Date was passed in RFC 977 format (NNTP "v1") 579 self.push_lit("""\ 580 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows 581 <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com> 582 <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com> 583 .""") 584 elif (group == 'comp.lang.python' and 585 date_str in ('20100101', '100101') and 586 time_str == '090000'): 587 self.push_lit('too long line' * 3000 + 588 '\n.') 589 else: 590 self.push_lit("""\ 591 230 An empty list of newsarticles follows 592 .""") 593 # (Note for experiments: many servers disable NEWNEWS. 594 # As of this writing, sicinfo3.epfl.ch doesn't.) 595 596 def handle_XOVER(self, message_spec): 597 if message_spec == "57-59": 598 self.push_lit( 599 "224 Overview information for 57-58 follows\n" 600 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" 601 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 602 "\tSat, 19 Jun 2010 18:04:08 -0400" 603 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>" 604 "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16" 605 "\tXref: news.gmane.org gmane.comp.python.authors:57" 606 "\n" 607 "58\tLooking for a few good bloggers" 608 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>" 609 "\tThu, 22 Jul 2010 09:14:14 -0400" 610 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>" 611 "\t\t6683\t16" 612 "\t" 613 "\n" 614 # A UTF-8 overview line from fr.comp.lang.python 615 "59\tRe: Message d'erreur incompréhensible (par moi)" 616 "\tEric Brunel <eric.brunel@pragmadev.nospam.com>" 617 "\tWed, 15 Sep 2010 18:09:15 +0200" 618 "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>" 619 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" 620 "\tXref: saria.nerim.net fr.comp.lang.python:1265" 621 "\n" 622 ".\n") 623 else: 624 self.push_lit("""\ 625 224 No articles 626 .""") 627 628 def handle_POST(self, *, body=None): 629 if body is None: 630 if self.allow_posting: 631 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") 632 self.expect_body() 633 else: 634 self.push_lit("440 Posting not permitted") 635 else: 636 assert self.allow_posting 637 self.push_lit("240 Article received OK") 638 self.posted_body = body 639 640 def handle_IHAVE(self, message_id, *, body=None): 641 if body is None: 642 if (self.allow_posting and 643 message_id == "<i.am.an.article.you.will.want@example.com>"): 644 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") 645 self.expect_body() 646 else: 647 self.push_lit("435 Article not wanted") 648 else: 649 assert self.allow_posting 650 self.push_lit("235 Article transferred OK") 651 self.posted_body = body 652 653 sample_head = """\ 654 From: "Demo User" <nobody@example.net> 655 Subject: I am just a test article 656 Content-Type: text/plain; charset=UTF-8; format=flowed 657 Message-ID: <i.am.an.article.you.will.want@example.com>""" 658 659 sample_body = """\ 660 This is just a test article. 661 ..Here is a dot-starting line. 662 663 -- Signed by Andr\xe9.""" 664 665 sample_article = sample_head + "\n\n" + sample_body 666 667 def handle_ARTICLE(self, message_spec=None): 668 if message_spec is None: 669 self.push_lit("220 3000237 <45223423@example.com>") 670 elif message_spec == "<45223423@example.com>": 671 self.push_lit("220 0 <45223423@example.com>") 672 elif message_spec == "3000234": 673 self.push_lit("220 3000234 <45223423@example.com>") 674 else: 675 self.push_lit("430 No Such Article Found") 676 return 677 self.push_lit(self.sample_article) 678 self.push_lit(".") 679 680 def handle_HEAD(self, message_spec=None): 681 if message_spec is None: 682 self.push_lit("221 3000237 <45223423@example.com>") 683 elif message_spec == "<45223423@example.com>": 684 self.push_lit("221 0 <45223423@example.com>") 685 elif message_spec == "3000234": 686 self.push_lit("221 3000234 <45223423@example.com>") 687 else: 688 self.push_lit("430 No Such Article Found") 689 return 690 self.push_lit(self.sample_head) 691 self.push_lit(".") 692 693 def handle_BODY(self, message_spec=None): 694 if message_spec is None: 695 self.push_lit("222 3000237 <45223423@example.com>") 696 elif message_spec == "<45223423@example.com>": 697 self.push_lit("222 0 <45223423@example.com>") 698 elif message_spec == "3000234": 699 self.push_lit("222 3000234 <45223423@example.com>") 700 else: 701 self.push_lit("430 No Such Article Found") 702 return 703 self.push_lit(self.sample_body) 704 self.push_lit(".") 705 706 def handle_AUTHINFO(self, cred_type, data): 707 if self._logged_in: 708 self.push_lit('502 Already Logged In') 709 elif cred_type == 'user': 710 if self._user_sent: 711 self.push_lit('482 User Credential Already Sent') 712 else: 713 self.push_lit('381 Password Required') 714 self._user_sent = True 715 elif cred_type == 'pass': 716 self.push_lit('281 Login Successful') 717 self._logged_in = True 718 else: 719 raise Exception('Unknown cred type {}'.format(cred_type)) 720 721 722class NNTPv2Handler(NNTPv1Handler): 723 """A handler for RFC 3977 (NNTP "v2")""" 724 725 def handle_CAPABILITIES(self): 726 fmt = """\ 727 101 Capability list: 728 VERSION 2 3 729 IMPLEMENTATION INN 2.5.1{} 730 HDR 731 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 732 OVER 733 POST 734 READER 735 .""" 736 737 if not self._logged_in: 738 self.push_lit(fmt.format('\n AUTHINFO USER')) 739 else: 740 self.push_lit(fmt.format('')) 741 742 def handle_MODE(self, _): 743 raise Exception('MODE READER sent despite READER has been advertised') 744 745 def handle_OVER(self, message_spec=None): 746 return self.handle_XOVER(message_spec) 747 748 749class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): 750 """A handler that allows CAPABILITIES only after login""" 751 752 def handle_CAPABILITIES(self): 753 if not self._logged_in: 754 self.push_lit('480 You must log in.') 755 else: 756 super().handle_CAPABILITIES() 757 758 759class ModeSwitchingNNTPv2Handler(NNTPv2Handler): 760 """A server that starts in transit mode""" 761 762 def __init__(self): 763 self._switched = False 764 765 def handle_CAPABILITIES(self): 766 fmt = """\ 767 101 Capability list: 768 VERSION 2 3 769 IMPLEMENTATION INN 2.5.1 770 HDR 771 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 772 OVER 773 POST 774 {}READER 775 .""" 776 if self._switched: 777 self.push_lit(fmt.format('')) 778 else: 779 self.push_lit(fmt.format('MODE-')) 780 781 def handle_MODE(self, what): 782 assert not self._switched and what == 'reader' 783 self._switched = True 784 self.push_lit('200 Posting allowed') 785 786 787class NNTPv1v2TestsMixin: 788 789 def setUp(self): 790 super().setUp() 791 792 def test_welcome(self): 793 self.assertEqual(self.server.welcome, self.handler.welcome) 794 795 def test_authinfo(self): 796 if self.nntp_version == 2: 797 self.assertIn('AUTHINFO', self.server._caps) 798 self.server.login('testuser', 'testpw') 799 # if AUTHINFO is gone from _caps we also know that getcapabilities() 800 # has been called after login as it should 801 self.assertNotIn('AUTHINFO', self.server._caps) 802 803 def test_date(self): 804 resp, date = self.server.date() 805 self.assertEqual(resp, "111 20100914001155") 806 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) 807 808 def test_quit(self): 809 self.assertFalse(self.sio.closed) 810 resp = self.server.quit() 811 self.assertEqual(resp, "205 Bye!") 812 self.assertTrue(self.sio.closed) 813 814 def test_help(self): 815 resp, help = self.server.help() 816 self.assertEqual(resp, "100 Legal commands") 817 self.assertEqual(help, [ 818 ' authinfo user Name|pass Password|generic <prog> <args>', 819 ' date', 820 ' help', 821 'Report problems to <root@example.org>', 822 ]) 823 824 def test_list(self): 825 resp, groups = self.server.list() 826 self.assertEqual(len(groups), 6) 827 g = groups[1] 828 self.assertEqual(g, 829 GroupInfo("comp.lang.python.announce", "0000001153", 830 "0000000993", "m")) 831 resp, groups = self.server.list("*distutils*") 832 self.assertEqual(len(groups), 2) 833 g = groups[0] 834 self.assertEqual(g, 835 GroupInfo("gmane.comp.python.distutils.devel", "0000014104", 836 "0000000001", "m")) 837 838 def test_stat(self): 839 resp, art_num, message_id = self.server.stat(3000234) 840 self.assertEqual(resp, "223 3000234 <45223423@example.com>") 841 self.assertEqual(art_num, 3000234) 842 self.assertEqual(message_id, "<45223423@example.com>") 843 resp, art_num, message_id = self.server.stat("<45223423@example.com>") 844 self.assertEqual(resp, "223 0 <45223423@example.com>") 845 self.assertEqual(art_num, 0) 846 self.assertEqual(message_id, "<45223423@example.com>") 847 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 848 self.server.stat("<non.existent.id>") 849 self.assertEqual(cm.exception.response, "430 No Such Article Found") 850 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 851 self.server.stat() 852 self.assertEqual(cm.exception.response, "412 No newsgroup selected") 853 854 def test_next(self): 855 resp, art_num, message_id = self.server.next() 856 self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved") 857 self.assertEqual(art_num, 3000237) 858 self.assertEqual(message_id, "<668929@example.org>") 859 860 def test_last(self): 861 resp, art_num, message_id = self.server.last() 862 self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved") 863 self.assertEqual(art_num, 3000234) 864 self.assertEqual(message_id, "<45223423@example.com>") 865 866 def test_description(self): 867 desc = self.server.description("comp.lang.python") 868 self.assertEqual(desc, "The Python computer language.") 869 desc = self.server.description("comp.lang.pythonx") 870 self.assertEqual(desc, "") 871 872 def test_descriptions(self): 873 resp, groups = self.server.descriptions("comp.lang.python") 874 self.assertEqual(resp, '215 Descriptions in form "group description".') 875 self.assertEqual(groups, { 876 "comp.lang.python": "The Python computer language.", 877 }) 878 resp, groups = self.server.descriptions("comp.lang.python*") 879 self.assertEqual(groups, { 880 "comp.lang.python": "The Python computer language.", 881 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", 882 }) 883 resp, groups = self.server.descriptions("comp.lang.pythonx") 884 self.assertEqual(groups, {}) 885 886 def test_group(self): 887 resp, count, first, last, group = self.server.group("fr.comp.lang.python") 888 self.assertTrue(resp.startswith("211 "), resp) 889 self.assertEqual(first, 761) 890 self.assertEqual(last, 1265) 891 self.assertEqual(count, 486) 892 self.assertEqual(group, "fr.comp.lang.python") 893 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 894 self.server.group("comp.lang.python.devel") 895 exc = cm.exception 896 self.assertTrue(exc.response.startswith("411 No such group"), 897 exc.response) 898 899 def test_newnews(self): 900 # NEWNEWS comp.lang.python [20]100913 082004 901 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 902 resp, ids = self.server.newnews("comp.lang.python", dt) 903 expected = ( 904 "230 list of newsarticles (NNTP v{0}) " 905 "created after Mon Sep 13 08:20:04 2010 follows" 906 ).format(self.nntp_version) 907 self.assertEqual(resp, expected) 908 self.assertEqual(ids, [ 909 "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>", 910 "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>", 911 ]) 912 # NEWNEWS fr.comp.lang.python [20]100913 082004 913 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 914 resp, ids = self.server.newnews("fr.comp.lang.python", dt) 915 self.assertEqual(resp, "230 An empty list of newsarticles follows") 916 self.assertEqual(ids, []) 917 918 def _check_article_body(self, lines): 919 self.assertEqual(len(lines), 4) 920 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") 921 self.assertEqual(lines[-2], b"") 922 self.assertEqual(lines[-3], b".Here is a dot-starting line.") 923 self.assertEqual(lines[-4], b"This is just a test article.") 924 925 def _check_article_head(self, lines): 926 self.assertEqual(len(lines), 4) 927 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>') 928 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>") 929 930 def _check_article_data(self, lines): 931 self.assertEqual(len(lines), 9) 932 self._check_article_head(lines[:4]) 933 self._check_article_body(lines[-4:]) 934 self.assertEqual(lines[4], b"") 935 936 def test_article(self): 937 # ARTICLE 938 resp, info = self.server.article() 939 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 940 art_num, message_id, lines = info 941 self.assertEqual(art_num, 3000237) 942 self.assertEqual(message_id, "<45223423@example.com>") 943 self._check_article_data(lines) 944 # ARTICLE num 945 resp, info = self.server.article(3000234) 946 self.assertEqual(resp, "220 3000234 <45223423@example.com>") 947 art_num, message_id, lines = info 948 self.assertEqual(art_num, 3000234) 949 self.assertEqual(message_id, "<45223423@example.com>") 950 self._check_article_data(lines) 951 # ARTICLE id 952 resp, info = self.server.article("<45223423@example.com>") 953 self.assertEqual(resp, "220 0 <45223423@example.com>") 954 art_num, message_id, lines = info 955 self.assertEqual(art_num, 0) 956 self.assertEqual(message_id, "<45223423@example.com>") 957 self._check_article_data(lines) 958 # Non-existent id 959 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 960 self.server.article("<non-existent@example.com>") 961 self.assertEqual(cm.exception.response, "430 No Such Article Found") 962 963 def test_article_file(self): 964 # With a "file" argument 965 f = io.BytesIO() 966 resp, info = self.server.article(file=f) 967 self.assertEqual(resp, "220 3000237 <45223423@example.com>") 968 art_num, message_id, lines = info 969 self.assertEqual(art_num, 3000237) 970 self.assertEqual(message_id, "<45223423@example.com>") 971 self.assertEqual(lines, []) 972 data = f.getvalue() 973 self.assertTrue(data.startswith( 974 b'From: "Demo User" <nobody@example.net>\r\n' 975 b'Subject: I am just a test article\r\n' 976 ), ascii(data)) 977 self.assertTrue(data.endswith( 978 b'This is just a test article.\r\n' 979 b'.Here is a dot-starting line.\r\n' 980 b'\r\n' 981 b'-- Signed by Andr\xc3\xa9.\r\n' 982 ), ascii(data)) 983 984 def test_head(self): 985 # HEAD 986 resp, info = self.server.head() 987 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 988 art_num, message_id, lines = info 989 self.assertEqual(art_num, 3000237) 990 self.assertEqual(message_id, "<45223423@example.com>") 991 self._check_article_head(lines) 992 # HEAD num 993 resp, info = self.server.head(3000234) 994 self.assertEqual(resp, "221 3000234 <45223423@example.com>") 995 art_num, message_id, lines = info 996 self.assertEqual(art_num, 3000234) 997 self.assertEqual(message_id, "<45223423@example.com>") 998 self._check_article_head(lines) 999 # HEAD id 1000 resp, info = self.server.head("<45223423@example.com>") 1001 self.assertEqual(resp, "221 0 <45223423@example.com>") 1002 art_num, message_id, lines = info 1003 self.assertEqual(art_num, 0) 1004 self.assertEqual(message_id, "<45223423@example.com>") 1005 self._check_article_head(lines) 1006 # Non-existent id 1007 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1008 self.server.head("<non-existent@example.com>") 1009 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1010 1011 def test_head_file(self): 1012 f = io.BytesIO() 1013 resp, info = self.server.head(file=f) 1014 self.assertEqual(resp, "221 3000237 <45223423@example.com>") 1015 art_num, message_id, lines = info 1016 self.assertEqual(art_num, 3000237) 1017 self.assertEqual(message_id, "<45223423@example.com>") 1018 self.assertEqual(lines, []) 1019 data = f.getvalue() 1020 self.assertTrue(data.startswith( 1021 b'From: "Demo User" <nobody@example.net>\r\n' 1022 b'Subject: I am just a test article\r\n' 1023 ), ascii(data)) 1024 self.assertFalse(data.endswith( 1025 b'This is just a test article.\r\n' 1026 b'.Here is a dot-starting line.\r\n' 1027 b'\r\n' 1028 b'-- Signed by Andr\xc3\xa9.\r\n' 1029 ), ascii(data)) 1030 1031 def test_body(self): 1032 # BODY 1033 resp, info = self.server.body() 1034 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1035 art_num, message_id, lines = info 1036 self.assertEqual(art_num, 3000237) 1037 self.assertEqual(message_id, "<45223423@example.com>") 1038 self._check_article_body(lines) 1039 # BODY num 1040 resp, info = self.server.body(3000234) 1041 self.assertEqual(resp, "222 3000234 <45223423@example.com>") 1042 art_num, message_id, lines = info 1043 self.assertEqual(art_num, 3000234) 1044 self.assertEqual(message_id, "<45223423@example.com>") 1045 self._check_article_body(lines) 1046 # BODY id 1047 resp, info = self.server.body("<45223423@example.com>") 1048 self.assertEqual(resp, "222 0 <45223423@example.com>") 1049 art_num, message_id, lines = info 1050 self.assertEqual(art_num, 0) 1051 self.assertEqual(message_id, "<45223423@example.com>") 1052 self._check_article_body(lines) 1053 # Non-existent id 1054 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1055 self.server.body("<non-existent@example.com>") 1056 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1057 1058 def test_body_file(self): 1059 f = io.BytesIO() 1060 resp, info = self.server.body(file=f) 1061 self.assertEqual(resp, "222 3000237 <45223423@example.com>") 1062 art_num, message_id, lines = info 1063 self.assertEqual(art_num, 3000237) 1064 self.assertEqual(message_id, "<45223423@example.com>") 1065 self.assertEqual(lines, []) 1066 data = f.getvalue() 1067 self.assertFalse(data.startswith( 1068 b'From: "Demo User" <nobody@example.net>\r\n' 1069 b'Subject: I am just a test article\r\n' 1070 ), ascii(data)) 1071 self.assertTrue(data.endswith( 1072 b'This is just a test article.\r\n' 1073 b'.Here is a dot-starting line.\r\n' 1074 b'\r\n' 1075 b'-- Signed by Andr\xc3\xa9.\r\n' 1076 ), ascii(data)) 1077 1078 def check_over_xover_resp(self, resp, overviews): 1079 self.assertTrue(resp.startswith("224 "), resp) 1080 self.assertEqual(len(overviews), 3) 1081 art_num, over = overviews[0] 1082 self.assertEqual(art_num, 57) 1083 self.assertEqual(over, { 1084 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>", 1085 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", 1086 "date": "Sat, 19 Jun 2010 18:04:08 -0400", 1087 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>", 1088 "references": "<hvalf7$ort$1@dough.gmane.org>", 1089 ":bytes": "7103", 1090 ":lines": "16", 1091 "xref": "news.gmane.org gmane.comp.python.authors:57" 1092 }) 1093 art_num, over = overviews[1] 1094 self.assertEqual(over["xref"], None) 1095 art_num, over = overviews[2] 1096 self.assertEqual(over["subject"], 1097 "Re: Message d'erreur incompréhensible (par moi)") 1098 1099 def test_xover(self): 1100 resp, overviews = self.server.xover(57, 59) 1101 self.check_over_xover_resp(resp, overviews) 1102 1103 def test_over(self): 1104 # In NNTP "v1", this will fallback on XOVER 1105 resp, overviews = self.server.over((57, 59)) 1106 self.check_over_xover_resp(resp, overviews) 1107 1108 sample_post = ( 1109 b'From: "Demo User" <nobody@example.net>\r\n' 1110 b'Subject: I am just a test article\r\n' 1111 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' 1112 b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n' 1113 b'\r\n' 1114 b'This is just a test article.\r\n' 1115 b'.Here is a dot-starting line.\r\n' 1116 b'\r\n' 1117 b'-- Signed by Andr\xc3\xa9.\r\n' 1118 ) 1119 1120 def _check_posted_body(self): 1121 # Check the raw body as received by the server 1122 lines = self.handler.posted_body 1123 # One additional line for the "." terminator 1124 self.assertEqual(len(lines), 10) 1125 self.assertEqual(lines[-1], b'.\r\n') 1126 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') 1127 self.assertEqual(lines[-3], b'\r\n') 1128 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') 1129 self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n') 1130 1131 def _check_post_ihave_sub(self, func, *args, file_factory): 1132 # First the prepared post with CRLF endings 1133 post = self.sample_post 1134 func_args = args + (file_factory(post),) 1135 self.handler.posted_body = None 1136 resp = func(*func_args) 1137 self._check_posted_body() 1138 # Then the same post with "normal" line endings - they should be 1139 # converted by NNTP.post and NNTP.ihave. 1140 post = self.sample_post.replace(b"\r\n", b"\n") 1141 func_args = args + (file_factory(post),) 1142 self.handler.posted_body = None 1143 resp = func(*func_args) 1144 self._check_posted_body() 1145 return resp 1146 1147 def check_post_ihave(self, func, success_resp, *args): 1148 # With a bytes object 1149 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) 1150 self.assertEqual(resp, success_resp) 1151 # With a bytearray object 1152 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) 1153 self.assertEqual(resp, success_resp) 1154 # With a file object 1155 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) 1156 self.assertEqual(resp, success_resp) 1157 # With an iterable of terminated lines 1158 def iterlines(b): 1159 return iter(b.splitlines(keepends=True)) 1160 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1161 self.assertEqual(resp, success_resp) 1162 # With an iterable of non-terminated lines 1163 def iterlines(b): 1164 return iter(b.splitlines(keepends=False)) 1165 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1166 self.assertEqual(resp, success_resp) 1167 1168 def test_post(self): 1169 self.check_post_ihave(self.server.post, "240 Article received OK") 1170 self.handler.allow_posting = False 1171 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1172 self.server.post(self.sample_post) 1173 self.assertEqual(cm.exception.response, 1174 "440 Posting not permitted") 1175 1176 def test_ihave(self): 1177 self.check_post_ihave(self.server.ihave, "235 Article transferred OK", 1178 "<i.am.an.article.you.will.want@example.com>") 1179 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1180 self.server.ihave("<another.message.id>", self.sample_post) 1181 self.assertEqual(cm.exception.response, 1182 "435 Article not wanted") 1183 1184 def test_too_long_lines(self): 1185 dt = datetime.datetime(2010, 1, 1, 9, 0, 0) 1186 self.assertRaises(nntplib.NNTPDataError, 1187 self.server.newnews, "comp.lang.python", dt) 1188 1189 1190class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1191 """Tests an NNTP v1 server (no capabilities).""" 1192 1193 nntp_version = 1 1194 handler_class = NNTPv1Handler 1195 1196 def test_caps(self): 1197 caps = self.server.getcapabilities() 1198 self.assertEqual(caps, {}) 1199 self.assertEqual(self.server.nntp_version, 1) 1200 self.assertEqual(self.server.nntp_implementation, None) 1201 1202 1203class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1204 """Tests an NNTP v2 server (with capabilities).""" 1205 1206 nntp_version = 2 1207 handler_class = NNTPv2Handler 1208 1209 def test_caps(self): 1210 caps = self.server.getcapabilities() 1211 self.assertEqual(caps, { 1212 'VERSION': ['2', '3'], 1213 'IMPLEMENTATION': ['INN', '2.5.1'], 1214 'AUTHINFO': ['USER'], 1215 'HDR': [], 1216 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', 1217 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], 1218 'OVER': [], 1219 'POST': [], 1220 'READER': [], 1221 }) 1222 self.assertEqual(self.server.nntp_version, 3) 1223 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1') 1224 1225 1226class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase): 1227 """Tests a probably NNTP v2 server with capabilities only after login.""" 1228 1229 nntp_version = 2 1230 handler_class = CapsAfterLoginNNTPv2Handler 1231 1232 def test_caps_only_after_login(self): 1233 self.assertEqual(self.server._caps, {}) 1234 self.server.login('testuser', 'testpw') 1235 self.assertIn('VERSION', self.server._caps) 1236 1237 1238class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin, 1239 unittest.TestCase): 1240 """Same tests as for v2 but we tell NTTP to send MODE READER to a server 1241 that isn't in READER mode by default.""" 1242 1243 nntp_version = 2 1244 handler_class = ModeSwitchingNNTPv2Handler 1245 1246 def test_we_are_in_reader_mode_after_connect(self): 1247 self.assertIn('READER', self.server._caps) 1248 1249 1250class MiscTests(unittest.TestCase): 1251 1252 def test_decode_header(self): 1253 def gives(a, b): 1254 self.assertEqual(nntplib.decode_header(a), b) 1255 gives("" , "") 1256 gives("a plain header", "a plain header") 1257 gives(" with extra spaces ", " with extra spaces ") 1258 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python") 1259 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" 1260 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", 1261 "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées") 1262 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", 1263 "Re: problème de matrice") 1264 # A natively utf-8 header (found in the real world!) 1265 gives("Re: Message d'erreur incompréhensible (par moi)", 1266 "Re: Message d'erreur incompréhensible (par moi)") 1267 1268 def test_parse_overview_fmt(self): 1269 # The minimal (default) response 1270 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1271 "References:", ":bytes", ":lines"] 1272 self.assertEqual(nntplib._parse_overview_fmt(lines), 1273 ["subject", "from", "date", "message-id", "references", 1274 ":bytes", ":lines"]) 1275 # The minimal response using alternative names 1276 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1277 "References:", "Bytes:", "Lines:"] 1278 self.assertEqual(nntplib._parse_overview_fmt(lines), 1279 ["subject", "from", "date", "message-id", "references", 1280 ":bytes", ":lines"]) 1281 # Variations in casing 1282 lines = ["subject:", "FROM:", "DaTe:", "message-ID:", 1283 "References:", "BYTES:", "Lines:"] 1284 self.assertEqual(nntplib._parse_overview_fmt(lines), 1285 ["subject", "from", "date", "message-id", "references", 1286 ":bytes", ":lines"]) 1287 # First example from RFC 3977 1288 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1289 "References:", ":bytes", ":lines", "Xref:full", 1290 "Distribution:full"] 1291 self.assertEqual(nntplib._parse_overview_fmt(lines), 1292 ["subject", "from", "date", "message-id", "references", 1293 ":bytes", ":lines", "xref", "distribution"]) 1294 # Second example from RFC 3977 1295 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1296 "References:", "Bytes:", "Lines:", "Xref:FULL", 1297 "Distribution:FULL"] 1298 self.assertEqual(nntplib._parse_overview_fmt(lines), 1299 ["subject", "from", "date", "message-id", "references", 1300 ":bytes", ":lines", "xref", "distribution"]) 1301 # A classic response from INN 1302 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1303 "References:", "Bytes:", "Lines:", "Xref:full"] 1304 self.assertEqual(nntplib._parse_overview_fmt(lines), 1305 ["subject", "from", "date", "message-id", "references", 1306 ":bytes", ":lines", "xref"]) 1307 1308 def test_parse_overview(self): 1309 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] 1310 # First example from RFC 3977 1311 lines = [ 1312 '3000234\tI am just a test article\t"Demo User" ' 1313 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1314 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1315 '17\tXref: news.example.com misc.test:3000363', 1316 ] 1317 overview = nntplib._parse_overview(lines, fmt) 1318 (art_num, fields), = overview 1319 self.assertEqual(art_num, 3000234) 1320 self.assertEqual(fields, { 1321 'subject': 'I am just a test article', 1322 'from': '"Demo User" <nobody@example.com>', 1323 'date': '6 Oct 1998 04:38:40 -0500', 1324 'message-id': '<45223423@example.com>', 1325 'references': '<45454@example.net>', 1326 ':bytes': '1234', 1327 ':lines': '17', 1328 'xref': 'news.example.com misc.test:3000363', 1329 }) 1330 # Second example; here the "Xref" field is totally absent (including 1331 # the header name) and comes out as None 1332 lines = [ 1333 '3000234\tI am just a test article\t"Demo User" ' 1334 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1335 '<45223423@example.com>\t<45454@example.net>\t1234\t' 1336 '17\t\t', 1337 ] 1338 overview = nntplib._parse_overview(lines, fmt) 1339 (art_num, fields), = overview 1340 self.assertEqual(fields['xref'], None) 1341 # Third example; the "Xref" is an empty string, while "references" 1342 # is a single space. 1343 lines = [ 1344 '3000234\tI am just a test article\t"Demo User" ' 1345 '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t' 1346 '<45223423@example.com>\t \t1234\t' 1347 '17\tXref: \t', 1348 ] 1349 overview = nntplib._parse_overview(lines, fmt) 1350 (art_num, fields), = overview 1351 self.assertEqual(fields['references'], ' ') 1352 self.assertEqual(fields['xref'], '') 1353 1354 def test_parse_datetime(self): 1355 def gives(a, b, *c): 1356 self.assertEqual(nntplib._parse_datetime(a, b), 1357 datetime.datetime(*c)) 1358 # Output of DATE command 1359 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) 1360 # Variations 1361 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) 1362 gives("990623", "135624", 1999, 6, 23, 13, 56, 24) 1363 gives("090623", "135624", 2009, 6, 23, 13, 56, 24) 1364 1365 def test_unparse_datetime(self): 1366 # Test non-legacy mode 1367 # 1) with a datetime 1368 def gives(y, M, d, h, m, s, date_str, time_str): 1369 dt = datetime.datetime(y, M, d, h, m, s) 1370 self.assertEqual(nntplib._unparse_datetime(dt), 1371 (date_str, time_str)) 1372 self.assertEqual(nntplib._unparse_datetime(dt, False), 1373 (date_str, time_str)) 1374 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") 1375 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") 1376 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") 1377 # 2) with a date 1378 def gives(y, M, d, date_str, time_str): 1379 dt = datetime.date(y, M, d) 1380 self.assertEqual(nntplib._unparse_datetime(dt), 1381 (date_str, time_str)) 1382 self.assertEqual(nntplib._unparse_datetime(dt, False), 1383 (date_str, time_str)) 1384 gives(1999, 6, 23, "19990623", "000000") 1385 gives(2000, 6, 23, "20000623", "000000") 1386 gives(2010, 6, 5, "20100605", "000000") 1387 1388 def test_unparse_datetime_legacy(self): 1389 # Test legacy mode (RFC 977) 1390 # 1) with a datetime 1391 def gives(y, M, d, h, m, s, date_str, time_str): 1392 dt = datetime.datetime(y, M, d, h, m, s) 1393 self.assertEqual(nntplib._unparse_datetime(dt, True), 1394 (date_str, time_str)) 1395 gives(1999, 6, 23, 13, 56, 24, "990623", "135624") 1396 gives(2000, 6, 23, 13, 56, 24, "000623", "135624") 1397 gives(2010, 6, 5, 1, 2, 3, "100605", "010203") 1398 # 2) with a date 1399 def gives(y, M, d, date_str, time_str): 1400 dt = datetime.date(y, M, d) 1401 self.assertEqual(nntplib._unparse_datetime(dt, True), 1402 (date_str, time_str)) 1403 gives(1999, 6, 23, "990623", "000000") 1404 gives(2000, 6, 23, "000623", "000000") 1405 gives(2010, 6, 5, "100605", "000000") 1406 1407 @unittest.skipUnless(ssl, 'requires SSL support') 1408 def test_ssl_support(self): 1409 self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) 1410 1411 1412class PublicAPITests(unittest.TestCase): 1413 """Ensures that the correct values are exposed in the public API.""" 1414 1415 def test_module_all_attribute(self): 1416 self.assertTrue(hasattr(nntplib, '__all__')) 1417 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError', 1418 'NNTPTemporaryError', 'NNTPPermanentError', 1419 'NNTPProtocolError', 'NNTPDataError', 'decode_header'] 1420 if ssl is not None: 1421 target_api.append('NNTP_SSL') 1422 self.assertEqual(set(nntplib.__all__), set(target_api)) 1423 1424class MockSocketTests(unittest.TestCase): 1425 """Tests involving a mock socket object 1426 1427 Used where the _NNTPServerIO file object is not enough.""" 1428 1429 nntp_class = nntplib.NNTP 1430 1431 def check_constructor_error_conditions( 1432 self, handler_class, 1433 expected_error_type, expected_error_msg, 1434 login=None, password=None): 1435 1436 class mock_socket_module: 1437 def create_connection(address, timeout): 1438 return MockSocket() 1439 1440 class MockSocket: 1441 def close(self): 1442 nonlocal socket_closed 1443 socket_closed = True 1444 1445 def makefile(socket, mode): 1446 handler = handler_class() 1447 _, file = make_mock_file(handler) 1448 files.append(file) 1449 return file 1450 1451 socket_closed = False 1452 files = [] 1453 with patch('nntplib.socket', mock_socket_module), \ 1454 self.assertRaisesRegex(expected_error_type, expected_error_msg): 1455 self.nntp_class('dummy', user=login, password=password) 1456 self.assertTrue(socket_closed) 1457 for f in files: 1458 self.assertTrue(f.closed) 1459 1460 def test_bad_welcome(self): 1461 #Test a bad welcome message 1462 class Handler(NNTPv1Handler): 1463 welcome = 'Bad Welcome' 1464 self.check_constructor_error_conditions( 1465 Handler, nntplib.NNTPProtocolError, Handler.welcome) 1466 1467 def test_service_temporarily_unavailable(self): 1468 #Test service temporarily unavailable 1469 class Handler(NNTPv1Handler): 1470 welcome = '400 Service temporarily unavailable' 1471 self.check_constructor_error_conditions( 1472 Handler, nntplib.NNTPTemporaryError, Handler.welcome) 1473 1474 def test_service_permanently_unavailable(self): 1475 #Test service permanently unavailable 1476 class Handler(NNTPv1Handler): 1477 welcome = '502 Service permanently unavailable' 1478 self.check_constructor_error_conditions( 1479 Handler, nntplib.NNTPPermanentError, Handler.welcome) 1480 1481 def test_bad_capabilities(self): 1482 #Test a bad capabilities response 1483 class Handler(NNTPv1Handler): 1484 def handle_CAPABILITIES(self): 1485 self.push_lit(capabilities_response) 1486 capabilities_response = '201 bad capability' 1487 self.check_constructor_error_conditions( 1488 Handler, nntplib.NNTPReplyError, capabilities_response) 1489 1490 def test_login_aborted(self): 1491 #Test a bad authinfo response 1492 login = 't@e.com' 1493 password = 'python' 1494 class Handler(NNTPv1Handler): 1495 def handle_AUTHINFO(self, *args): 1496 self.push_lit(authinfo_response) 1497 authinfo_response = '503 Mechanism not recognized' 1498 self.check_constructor_error_conditions( 1499 Handler, nntplib.NNTPPermanentError, authinfo_response, 1500 login, password) 1501 1502class bypass_context: 1503 """Bypass encryption and actual SSL module""" 1504 def wrap_socket(sock, **args): 1505 return sock 1506 1507@unittest.skipUnless(ssl, 'requires SSL support') 1508class MockSslTests(MockSocketTests): 1509 @staticmethod 1510 def nntp_class(*pos, **kw): 1511 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) 1512 1513@unittest.skipUnless(threading, 'requires multithreading') 1514class LocalServerTests(unittest.TestCase): 1515 def setUp(self): 1516 sock = socket.socket() 1517 port = support.bind_port(sock) 1518 sock.listen() 1519 self.background = threading.Thread( 1520 target=self.run_server, args=(sock,)) 1521 self.background.start() 1522 self.addCleanup(self.background.join) 1523 1524 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() 1525 self.addCleanup(self.nntp.__exit__, None, None, None) 1526 1527 def run_server(self, sock): 1528 # Could be generalized to handle more commands in separate methods 1529 with sock: 1530 [client, _] = sock.accept() 1531 with contextlib.ExitStack() as cleanup: 1532 cleanup.enter_context(client) 1533 reader = cleanup.enter_context(client.makefile('rb')) 1534 client.sendall(b'200 Server ready\r\n') 1535 while True: 1536 cmd = reader.readline() 1537 if cmd == b'CAPABILITIES\r\n': 1538 client.sendall( 1539 b'101 Capability list:\r\n' 1540 b'VERSION 2\r\n' 1541 b'STARTTLS\r\n' 1542 b'.\r\n' 1543 ) 1544 elif cmd == b'STARTTLS\r\n': 1545 reader.close() 1546 client.sendall(b'382 Begin TLS negotiation now\r\n') 1547 context = ssl.SSLContext() 1548 context.load_cert_chain(certfile) 1549 client = context.wrap_socket( 1550 client, server_side=True) 1551 cleanup.enter_context(client) 1552 reader = cleanup.enter_context(client.makefile('rb')) 1553 elif cmd == b'QUIT\r\n': 1554 client.sendall(b'205 Bye!\r\n') 1555 break 1556 else: 1557 raise ValueError('Unexpected command {!r}'.format(cmd)) 1558 1559 @unittest.skipUnless(ssl, 'requires SSL support') 1560 def test_starttls(self): 1561 file = self.nntp.file 1562 sock = self.nntp.sock 1563 self.nntp.starttls() 1564 # Check that the socket and internal pseudo-file really were 1565 # changed. 1566 self.assertNotEqual(file, self.nntp.file) 1567 self.assertNotEqual(sock, self.nntp.sock) 1568 # Check that the new socket really is an SSL one 1569 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) 1570 # Check that trying starttls when it's already active fails. 1571 self.assertRaises(ValueError, self.nntp.starttls) 1572 1573 1574if __name__ == "__main__": 1575 unittest.main() 1576