1from test import support 2from test.support import bigmemtest, _4G 3 4import array 5import unittest 6import io 7from io import BytesIO, DEFAULT_BUFFER_SIZE 8import os 9import pickle 10import glob 11import tempfile 12import random 13import shutil 14import subprocess 15import threading 16from test.support import import_helper 17from test.support import threading_helper 18from test.support.os_helper import unlink, FakePath 19import _compression 20import sys 21 22 23# Skip tests if the bz2 module doesn't exist. 24bz2 = import_helper.import_module('bz2') 25from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor 26 27has_cmdline_bunzip2 = None 28 29def ext_decompress(data): 30 global has_cmdline_bunzip2 31 if has_cmdline_bunzip2 is None: 32 has_cmdline_bunzip2 = bool(shutil.which('bunzip2')) 33 if has_cmdline_bunzip2: 34 return subprocess.check_output(['bunzip2'], input=data) 35 else: 36 return bz2.decompress(data) 37 38class BaseTest(unittest.TestCase): 39 "Base for other testcases." 40 41 TEXT_LINES = [ 42 b'root:x:0:0:root:/root:/bin/bash\n', 43 b'bin:x:1:1:bin:/bin:\n', 44 b'daemon:x:2:2:daemon:/sbin:\n', 45 b'adm:x:3:4:adm:/var/adm:\n', 46 b'lp:x:4:7:lp:/var/spool/lpd:\n', 47 b'sync:x:5:0:sync:/sbin:/bin/sync\n', 48 b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n', 49 b'halt:x:7:0:halt:/sbin:/sbin/halt\n', 50 b'mail:x:8:12:mail:/var/spool/mail:\n', 51 b'news:x:9:13:news:/var/spool/news:\n', 52 b'uucp:x:10:14:uucp:/var/spool/uucp:\n', 53 b'operator:x:11:0:operator:/root:\n', 54 b'games:x:12:100:games:/usr/games:\n', 55 b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n', 56 b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n', 57 b'nobody:x:65534:65534:Nobody:/home:\n', 58 b'postfix:x:100:101:postfix:/var/spool/postfix:\n', 59 b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n', 60 b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n', 61 b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n', 62 b'www:x:103:104::/var/www:/bin/false\n', 63 ] 64 TEXT = b''.join(TEXT_LINES) 65 DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' 66 EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' 67 BAD_DATA = b'this is not a valid bzip2 file' 68 69 # Some tests need more than one block of uncompressed data. Since one block 70 # is at least 100,000 bytes, we gather some data dynamically and compress it. 71 # Note that this assumes that compression works correctly, so we cannot 72 # simply use the bigger test data for all tests. 73 test_size = 0 74 BIG_TEXT = bytearray(128*1024) 75 for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')): 76 with open(fname, 'rb') as fh: 77 test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) 78 if test_size > 128*1024: 79 break 80 BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) 81 82 def setUp(self): 83 fd, self.filename = tempfile.mkstemp() 84 os.close(fd) 85 86 def tearDown(self): 87 unlink(self.filename) 88 89 90class BZ2FileTest(BaseTest): 91 "Test the BZ2File class." 92 93 def createTempFile(self, streams=1, suffix=b""): 94 with open(self.filename, "wb") as f: 95 f.write(self.DATA * streams) 96 f.write(suffix) 97 98 def testBadArgs(self): 99 self.assertRaises(TypeError, BZ2File, 123.456) 100 self.assertRaises(ValueError, BZ2File, os.devnull, "z") 101 self.assertRaises(ValueError, BZ2File, os.devnull, "rx") 102 self.assertRaises(ValueError, BZ2File, os.devnull, "rbt") 103 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=0) 104 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=10) 105 106 # compresslevel is keyword-only 107 self.assertRaises(TypeError, BZ2File, os.devnull, "r", 3) 108 109 def testRead(self): 110 self.createTempFile() 111 with BZ2File(self.filename) as bz2f: 112 self.assertRaises(TypeError, bz2f.read, float()) 113 self.assertEqual(bz2f.read(), self.TEXT) 114 115 def testReadBadFile(self): 116 self.createTempFile(streams=0, suffix=self.BAD_DATA) 117 with BZ2File(self.filename) as bz2f: 118 self.assertRaises(OSError, bz2f.read) 119 120 def testReadMultiStream(self): 121 self.createTempFile(streams=5) 122 with BZ2File(self.filename) as bz2f: 123 self.assertRaises(TypeError, bz2f.read, float()) 124 self.assertEqual(bz2f.read(), self.TEXT * 5) 125 126 def testReadMonkeyMultiStream(self): 127 # Test BZ2File.read() on a multi-stream archive where a stream 128 # boundary coincides with the end of the raw read buffer. 129 buffer_size = _compression.BUFFER_SIZE 130 _compression.BUFFER_SIZE = len(self.DATA) 131 try: 132 self.createTempFile(streams=5) 133 with BZ2File(self.filename) as bz2f: 134 self.assertRaises(TypeError, bz2f.read, float()) 135 self.assertEqual(bz2f.read(), self.TEXT * 5) 136 finally: 137 _compression.BUFFER_SIZE = buffer_size 138 139 def testReadTrailingJunk(self): 140 self.createTempFile(suffix=self.BAD_DATA) 141 with BZ2File(self.filename) as bz2f: 142 self.assertEqual(bz2f.read(), self.TEXT) 143 144 def testReadMultiStreamTrailingJunk(self): 145 self.createTempFile(streams=5, suffix=self.BAD_DATA) 146 with BZ2File(self.filename) as bz2f: 147 self.assertEqual(bz2f.read(), self.TEXT * 5) 148 149 def testRead0(self): 150 self.createTempFile() 151 with BZ2File(self.filename) as bz2f: 152 self.assertRaises(TypeError, bz2f.read, float()) 153 self.assertEqual(bz2f.read(0), b"") 154 155 def testReadChunk10(self): 156 self.createTempFile() 157 with BZ2File(self.filename) as bz2f: 158 text = b'' 159 while True: 160 str = bz2f.read(10) 161 if not str: 162 break 163 text += str 164 self.assertEqual(text, self.TEXT) 165 166 def testReadChunk10MultiStream(self): 167 self.createTempFile(streams=5) 168 with BZ2File(self.filename) as bz2f: 169 text = b'' 170 while True: 171 str = bz2f.read(10) 172 if not str: 173 break 174 text += str 175 self.assertEqual(text, self.TEXT * 5) 176 177 def testRead100(self): 178 self.createTempFile() 179 with BZ2File(self.filename) as bz2f: 180 self.assertEqual(bz2f.read(100), self.TEXT[:100]) 181 182 def testPeek(self): 183 self.createTempFile() 184 with BZ2File(self.filename) as bz2f: 185 pdata = bz2f.peek() 186 self.assertNotEqual(len(pdata), 0) 187 self.assertTrue(self.TEXT.startswith(pdata)) 188 self.assertEqual(bz2f.read(), self.TEXT) 189 190 def testReadInto(self): 191 self.createTempFile() 192 with BZ2File(self.filename) as bz2f: 193 n = 128 194 b = bytearray(n) 195 self.assertEqual(bz2f.readinto(b), n) 196 self.assertEqual(b, self.TEXT[:n]) 197 n = len(self.TEXT) - n 198 b = bytearray(len(self.TEXT)) 199 self.assertEqual(bz2f.readinto(b), n) 200 self.assertEqual(b[:n], self.TEXT[-n:]) 201 202 def testReadLine(self): 203 self.createTempFile() 204 with BZ2File(self.filename) as bz2f: 205 self.assertRaises(TypeError, bz2f.readline, None) 206 for line in self.TEXT_LINES: 207 self.assertEqual(bz2f.readline(), line) 208 209 def testReadLineMultiStream(self): 210 self.createTempFile(streams=5) 211 with BZ2File(self.filename) as bz2f: 212 self.assertRaises(TypeError, bz2f.readline, None) 213 for line in self.TEXT_LINES * 5: 214 self.assertEqual(bz2f.readline(), line) 215 216 def testReadLines(self): 217 self.createTempFile() 218 with BZ2File(self.filename) as bz2f: 219 self.assertRaises(TypeError, bz2f.readlines, None) 220 self.assertEqual(bz2f.readlines(), self.TEXT_LINES) 221 222 def testReadLinesMultiStream(self): 223 self.createTempFile(streams=5) 224 with BZ2File(self.filename) as bz2f: 225 self.assertRaises(TypeError, bz2f.readlines, None) 226 self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5) 227 228 def testIterator(self): 229 self.createTempFile() 230 with BZ2File(self.filename) as bz2f: 231 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES) 232 233 def testIteratorMultiStream(self): 234 self.createTempFile(streams=5) 235 with BZ2File(self.filename) as bz2f: 236 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5) 237 238 def testClosedIteratorDeadlock(self): 239 # Issue #3309: Iteration on a closed BZ2File should release the lock. 240 self.createTempFile() 241 bz2f = BZ2File(self.filename) 242 bz2f.close() 243 self.assertRaises(ValueError, next, bz2f) 244 # This call will deadlock if the above call failed to release the lock. 245 self.assertRaises(ValueError, bz2f.readlines) 246 247 def testWrite(self): 248 with BZ2File(self.filename, "w") as bz2f: 249 self.assertRaises(TypeError, bz2f.write) 250 bz2f.write(self.TEXT) 251 with open(self.filename, 'rb') as f: 252 self.assertEqual(ext_decompress(f.read()), self.TEXT) 253 254 def testWriteChunks10(self): 255 with BZ2File(self.filename, "w") as bz2f: 256 n = 0 257 while True: 258 str = self.TEXT[n*10:(n+1)*10] 259 if not str: 260 break 261 bz2f.write(str) 262 n += 1 263 with open(self.filename, 'rb') as f: 264 self.assertEqual(ext_decompress(f.read()), self.TEXT) 265 266 def testWriteNonDefaultCompressLevel(self): 267 expected = bz2.compress(self.TEXT, compresslevel=5) 268 with BZ2File(self.filename, "w", compresslevel=5) as bz2f: 269 bz2f.write(self.TEXT) 270 with open(self.filename, "rb") as f: 271 self.assertEqual(f.read(), expected) 272 273 def testWriteLines(self): 274 with BZ2File(self.filename, "w") as bz2f: 275 self.assertRaises(TypeError, bz2f.writelines) 276 bz2f.writelines(self.TEXT_LINES) 277 # Issue #1535500: Calling writelines() on a closed BZ2File 278 # should raise an exception. 279 self.assertRaises(ValueError, bz2f.writelines, ["a"]) 280 with open(self.filename, 'rb') as f: 281 self.assertEqual(ext_decompress(f.read()), self.TEXT) 282 283 def testWriteMethodsOnReadOnlyFile(self): 284 with BZ2File(self.filename, "w") as bz2f: 285 bz2f.write(b"abc") 286 287 with BZ2File(self.filename, "r") as bz2f: 288 self.assertRaises(OSError, bz2f.write, b"a") 289 self.assertRaises(OSError, bz2f.writelines, [b"a"]) 290 291 def testAppend(self): 292 with BZ2File(self.filename, "w") as bz2f: 293 self.assertRaises(TypeError, bz2f.write) 294 bz2f.write(self.TEXT) 295 with BZ2File(self.filename, "a") as bz2f: 296 self.assertRaises(TypeError, bz2f.write) 297 bz2f.write(self.TEXT) 298 with open(self.filename, 'rb') as f: 299 self.assertEqual(ext_decompress(f.read()), self.TEXT * 2) 300 301 def testSeekForward(self): 302 self.createTempFile() 303 with BZ2File(self.filename) as bz2f: 304 self.assertRaises(TypeError, bz2f.seek) 305 bz2f.seek(150) 306 self.assertEqual(bz2f.read(), self.TEXT[150:]) 307 308 def testSeekForwardAcrossStreams(self): 309 self.createTempFile(streams=2) 310 with BZ2File(self.filename) as bz2f: 311 self.assertRaises(TypeError, bz2f.seek) 312 bz2f.seek(len(self.TEXT) + 150) 313 self.assertEqual(bz2f.read(), self.TEXT[150:]) 314 315 def testSeekBackwards(self): 316 self.createTempFile() 317 with BZ2File(self.filename) as bz2f: 318 bz2f.read(500) 319 bz2f.seek(-150, 1) 320 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 321 322 def testSeekBackwardsAcrossStreams(self): 323 self.createTempFile(streams=2) 324 with BZ2File(self.filename) as bz2f: 325 readto = len(self.TEXT) + 100 326 while readto > 0: 327 readto -= len(bz2f.read(readto)) 328 bz2f.seek(-150, 1) 329 self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT) 330 331 def testSeekBackwardsFromEnd(self): 332 self.createTempFile() 333 with BZ2File(self.filename) as bz2f: 334 bz2f.seek(-150, 2) 335 self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) 336 337 def testSeekBackwardsFromEndAcrossStreams(self): 338 self.createTempFile(streams=2) 339 with BZ2File(self.filename) as bz2f: 340 bz2f.seek(-1000, 2) 341 self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:]) 342 343 def testSeekPostEnd(self): 344 self.createTempFile() 345 with BZ2File(self.filename) as bz2f: 346 bz2f.seek(150000) 347 self.assertEqual(bz2f.tell(), len(self.TEXT)) 348 self.assertEqual(bz2f.read(), b"") 349 350 def testSeekPostEndMultiStream(self): 351 self.createTempFile(streams=5) 352 with BZ2File(self.filename) as bz2f: 353 bz2f.seek(150000) 354 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 355 self.assertEqual(bz2f.read(), b"") 356 357 def testSeekPostEndTwice(self): 358 self.createTempFile() 359 with BZ2File(self.filename) as bz2f: 360 bz2f.seek(150000) 361 bz2f.seek(150000) 362 self.assertEqual(bz2f.tell(), len(self.TEXT)) 363 self.assertEqual(bz2f.read(), b"") 364 365 def testSeekPostEndTwiceMultiStream(self): 366 self.createTempFile(streams=5) 367 with BZ2File(self.filename) as bz2f: 368 bz2f.seek(150000) 369 bz2f.seek(150000) 370 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 371 self.assertEqual(bz2f.read(), b"") 372 373 def testSeekPreStart(self): 374 self.createTempFile() 375 with BZ2File(self.filename) as bz2f: 376 bz2f.seek(-150) 377 self.assertEqual(bz2f.tell(), 0) 378 self.assertEqual(bz2f.read(), self.TEXT) 379 380 def testSeekPreStartMultiStream(self): 381 self.createTempFile(streams=2) 382 with BZ2File(self.filename) as bz2f: 383 bz2f.seek(-150) 384 self.assertEqual(bz2f.tell(), 0) 385 self.assertEqual(bz2f.read(), self.TEXT * 2) 386 387 def testFileno(self): 388 self.createTempFile() 389 with open(self.filename, 'rb') as rawf: 390 bz2f = BZ2File(rawf) 391 try: 392 self.assertEqual(bz2f.fileno(), rawf.fileno()) 393 finally: 394 bz2f.close() 395 self.assertRaises(ValueError, bz2f.fileno) 396 397 def testSeekable(self): 398 bz2f = BZ2File(BytesIO(self.DATA)) 399 try: 400 self.assertTrue(bz2f.seekable()) 401 bz2f.read() 402 self.assertTrue(bz2f.seekable()) 403 finally: 404 bz2f.close() 405 self.assertRaises(ValueError, bz2f.seekable) 406 407 bz2f = BZ2File(BytesIO(), "w") 408 try: 409 self.assertFalse(bz2f.seekable()) 410 finally: 411 bz2f.close() 412 self.assertRaises(ValueError, bz2f.seekable) 413 414 src = BytesIO(self.DATA) 415 src.seekable = lambda: False 416 bz2f = BZ2File(src) 417 try: 418 self.assertFalse(bz2f.seekable()) 419 finally: 420 bz2f.close() 421 self.assertRaises(ValueError, bz2f.seekable) 422 423 def testReadable(self): 424 bz2f = BZ2File(BytesIO(self.DATA)) 425 try: 426 self.assertTrue(bz2f.readable()) 427 bz2f.read() 428 self.assertTrue(bz2f.readable()) 429 finally: 430 bz2f.close() 431 self.assertRaises(ValueError, bz2f.readable) 432 433 bz2f = BZ2File(BytesIO(), "w") 434 try: 435 self.assertFalse(bz2f.readable()) 436 finally: 437 bz2f.close() 438 self.assertRaises(ValueError, bz2f.readable) 439 440 def testWritable(self): 441 bz2f = BZ2File(BytesIO(self.DATA)) 442 try: 443 self.assertFalse(bz2f.writable()) 444 bz2f.read() 445 self.assertFalse(bz2f.writable()) 446 finally: 447 bz2f.close() 448 self.assertRaises(ValueError, bz2f.writable) 449 450 bz2f = BZ2File(BytesIO(), "w") 451 try: 452 self.assertTrue(bz2f.writable()) 453 finally: 454 bz2f.close() 455 self.assertRaises(ValueError, bz2f.writable) 456 457 def testOpenDel(self): 458 self.createTempFile() 459 for i in range(10000): 460 o = BZ2File(self.filename) 461 del o 462 463 def testOpenNonexistent(self): 464 self.assertRaises(OSError, BZ2File, "/non/existent") 465 466 def testReadlinesNoNewline(self): 467 # Issue #1191043: readlines() fails on a file containing no newline. 468 data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' 469 with open(self.filename, "wb") as f: 470 f.write(data) 471 with BZ2File(self.filename) as bz2f: 472 lines = bz2f.readlines() 473 self.assertEqual(lines, [b'Test']) 474 with BZ2File(self.filename) as bz2f: 475 xlines = list(bz2f.readlines()) 476 self.assertEqual(xlines, [b'Test']) 477 478 def testContextProtocol(self): 479 with BZ2File(self.filename, "wb") as f: 480 f.write(b"xxx") 481 f = BZ2File(self.filename, "rb") 482 f.close() 483 try: 484 with f: 485 pass 486 except ValueError: 487 pass 488 else: 489 self.fail("__enter__ on a closed file didn't raise an exception") 490 try: 491 with BZ2File(self.filename, "wb") as f: 492 1/0 493 except ZeroDivisionError: 494 pass 495 else: 496 self.fail("1/0 didn't raise an exception") 497 498 @threading_helper.requires_working_threading() 499 def testThreading(self): 500 # Issue #7205: Using a BZ2File from several threads shouldn't deadlock. 501 data = b"1" * 2**20 502 nthreads = 10 503 with BZ2File(self.filename, 'wb') as f: 504 def comp(): 505 for i in range(5): 506 f.write(data) 507 threads = [threading.Thread(target=comp) for i in range(nthreads)] 508 with threading_helper.start_threads(threads): 509 pass 510 511 def testMixedIterationAndReads(self): 512 self.createTempFile() 513 linelen = len(self.TEXT_LINES[0]) 514 halflen = linelen // 2 515 with BZ2File(self.filename) as bz2f: 516 bz2f.read(halflen) 517 self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:]) 518 self.assertEqual(bz2f.read(), self.TEXT[linelen:]) 519 with BZ2File(self.filename) as bz2f: 520 bz2f.readline() 521 self.assertEqual(next(bz2f), self.TEXT_LINES[1]) 522 self.assertEqual(bz2f.readline(), self.TEXT_LINES[2]) 523 with BZ2File(self.filename) as bz2f: 524 bz2f.readlines() 525 self.assertRaises(StopIteration, next, bz2f) 526 self.assertEqual(bz2f.readlines(), []) 527 528 def testMultiStreamOrdering(self): 529 # Test the ordering of streams when reading a multi-stream archive. 530 data1 = b"foo" * 1000 531 data2 = b"bar" * 1000 532 with BZ2File(self.filename, "w") as bz2f: 533 bz2f.write(data1) 534 with BZ2File(self.filename, "a") as bz2f: 535 bz2f.write(data2) 536 with BZ2File(self.filename) as bz2f: 537 self.assertEqual(bz2f.read(), data1 + data2) 538 539 def testOpenFilename(self): 540 with BZ2File(self.filename, "wb") as f: 541 f.write(b'content') 542 self.assertEqual(f.name, self.filename) 543 self.assertIsInstance(f.fileno(), int) 544 self.assertEqual(f.mode, 'wb') 545 self.assertIs(f.readable(), False) 546 self.assertIs(f.writable(), True) 547 self.assertIs(f.seekable(), False) 548 self.assertIs(f.closed, False) 549 self.assertIs(f.closed, True) 550 with self.assertRaises(ValueError): 551 f.name 552 self.assertRaises(ValueError, f.fileno) 553 self.assertEqual(f.mode, 'wb') 554 self.assertRaises(ValueError, f.readable) 555 self.assertRaises(ValueError, f.writable) 556 self.assertRaises(ValueError, f.seekable) 557 558 with BZ2File(self.filename, "ab") as f: 559 f.write(b'appendix') 560 self.assertEqual(f.name, self.filename) 561 self.assertIsInstance(f.fileno(), int) 562 self.assertEqual(f.mode, 'wb') 563 self.assertIs(f.readable(), False) 564 self.assertIs(f.writable(), True) 565 self.assertIs(f.seekable(), False) 566 self.assertIs(f.closed, False) 567 self.assertIs(f.closed, True) 568 with self.assertRaises(ValueError): 569 f.name 570 self.assertRaises(ValueError, f.fileno) 571 self.assertEqual(f.mode, 'wb') 572 self.assertRaises(ValueError, f.readable) 573 self.assertRaises(ValueError, f.writable) 574 self.assertRaises(ValueError, f.seekable) 575 576 with BZ2File(self.filename, 'rb') as f: 577 self.assertEqual(f.read(), b'contentappendix') 578 self.assertEqual(f.name, self.filename) 579 self.assertIsInstance(f.fileno(), int) 580 self.assertEqual(f.mode, 'rb') 581 self.assertIs(f.readable(), True) 582 self.assertIs(f.writable(), False) 583 self.assertIs(f.seekable(), True) 584 self.assertIs(f.closed, False) 585 self.assertIs(f.closed, True) 586 with self.assertRaises(ValueError): 587 f.name 588 self.assertRaises(ValueError, f.fileno) 589 self.assertEqual(f.mode, 'rb') 590 self.assertRaises(ValueError, f.readable) 591 self.assertRaises(ValueError, f.writable) 592 self.assertRaises(ValueError, f.seekable) 593 594 def testOpenFileWithName(self): 595 with open(self.filename, 'wb') as raw: 596 with BZ2File(raw, 'wb') as f: 597 f.write(b'content') 598 self.assertEqual(f.name, raw.name) 599 self.assertEqual(f.fileno(), raw.fileno()) 600 self.assertEqual(f.mode, 'wb') 601 self.assertIs(f.readable(), False) 602 self.assertIs(f.writable(), True) 603 self.assertIs(f.seekable(), False) 604 self.assertIs(f.closed, False) 605 self.assertIs(f.closed, True) 606 with self.assertRaises(ValueError): 607 f.name 608 self.assertRaises(ValueError, f.fileno) 609 self.assertEqual(f.mode, 'wb') 610 self.assertRaises(ValueError, f.readable) 611 self.assertRaises(ValueError, f.writable) 612 self.assertRaises(ValueError, f.seekable) 613 614 with open(self.filename, 'ab') as raw: 615 with BZ2File(raw, 'ab') as f: 616 f.write(b'appendix') 617 self.assertEqual(f.name, raw.name) 618 self.assertEqual(f.fileno(), raw.fileno()) 619 self.assertEqual(f.mode, 'wb') 620 self.assertIs(f.readable(), False) 621 self.assertIs(f.writable(), True) 622 self.assertIs(f.seekable(), False) 623 self.assertIs(f.closed, False) 624 self.assertIs(f.closed, True) 625 with self.assertRaises(ValueError): 626 f.name 627 self.assertRaises(ValueError, f.fileno) 628 self.assertEqual(f.mode, 'wb') 629 self.assertRaises(ValueError, f.readable) 630 self.assertRaises(ValueError, f.writable) 631 self.assertRaises(ValueError, f.seekable) 632 633 with open(self.filename, 'rb') as raw: 634 with BZ2File(raw, 'rb') as f: 635 self.assertEqual(f.read(), b'contentappendix') 636 self.assertEqual(f.name, raw.name) 637 self.assertEqual(f.fileno(), raw.fileno()) 638 self.assertEqual(f.mode, 'rb') 639 self.assertIs(f.readable(), True) 640 self.assertIs(f.writable(), False) 641 self.assertIs(f.seekable(), True) 642 self.assertIs(f.closed, False) 643 self.assertIs(f.closed, True) 644 with self.assertRaises(ValueError): 645 f.name 646 self.assertRaises(ValueError, f.fileno) 647 self.assertEqual(f.mode, 'rb') 648 self.assertRaises(ValueError, f.readable) 649 self.assertRaises(ValueError, f.writable) 650 self.assertRaises(ValueError, f.seekable) 651 652 def testOpenFileWithoutName(self): 653 bio = BytesIO() 654 with BZ2File(bio, 'wb') as f: 655 f.write(b'content') 656 with self.assertRaises(AttributeError): 657 f.name 658 self.assertRaises(io.UnsupportedOperation, f.fileno) 659 self.assertEqual(f.mode, 'wb') 660 with self.assertRaises(ValueError): 661 f.name 662 self.assertRaises(ValueError, f.fileno) 663 664 with BZ2File(bio, 'ab') as f: 665 f.write(b'appendix') 666 with self.assertRaises(AttributeError): 667 f.name 668 self.assertRaises(io.UnsupportedOperation, f.fileno) 669 self.assertEqual(f.mode, 'wb') 670 with self.assertRaises(ValueError): 671 f.name 672 self.assertRaises(ValueError, f.fileno) 673 674 bio.seek(0) 675 with BZ2File(bio, 'rb') as f: 676 self.assertEqual(f.read(), b'contentappendix') 677 with self.assertRaises(AttributeError): 678 f.name 679 self.assertRaises(io.UnsupportedOperation, f.fileno) 680 self.assertEqual(f.mode, 'rb') 681 with self.assertRaises(ValueError): 682 f.name 683 self.assertRaises(ValueError, f.fileno) 684 685 def testOpenFileWithIntName(self): 686 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 687 with open(fd, 'wb') as raw: 688 with BZ2File(raw, 'wb') as f: 689 f.write(b'content') 690 self.assertEqual(f.name, raw.name) 691 self.assertEqual(f.fileno(), raw.fileno()) 692 self.assertEqual(f.mode, 'wb') 693 with self.assertRaises(ValueError): 694 f.name 695 self.assertRaises(ValueError, f.fileno) 696 697 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_APPEND) 698 with open(fd, 'ab') as raw: 699 with BZ2File(raw, 'ab') as f: 700 f.write(b'appendix') 701 self.assertEqual(f.name, raw.name) 702 self.assertEqual(f.fileno(), raw.fileno()) 703 self.assertEqual(f.mode, 'wb') 704 with self.assertRaises(ValueError): 705 f.name 706 self.assertRaises(ValueError, f.fileno) 707 708 fd = os.open(self.filename, os.O_RDONLY) 709 with open(fd, 'rb') as raw: 710 with BZ2File(raw, 'rb') as f: 711 self.assertEqual(f.read(), b'contentappendix') 712 self.assertEqual(f.name, raw.name) 713 self.assertEqual(f.fileno(), raw.fileno()) 714 self.assertEqual(f.mode, 'rb') 715 with self.assertRaises(ValueError): 716 f.name 717 self.assertRaises(ValueError, f.fileno) 718 719 def testOpenBytesFilename(self): 720 str_filename = self.filename 721 bytes_filename = os.fsencode(str_filename) 722 with BZ2File(bytes_filename, "wb") as f: 723 f.write(self.DATA) 724 self.assertEqual(f.name, bytes_filename) 725 with BZ2File(bytes_filename, "rb") as f: 726 self.assertEqual(f.read(), self.DATA) 727 self.assertEqual(f.name, bytes_filename) 728 # Sanity check that we are actually operating on the right file. 729 with BZ2File(str_filename, "rb") as f: 730 self.assertEqual(f.read(), self.DATA) 731 self.assertEqual(f.name, str_filename) 732 733 def testOpenPathLikeFilename(self): 734 filename = FakePath(self.filename) 735 with BZ2File(filename, "wb") as f: 736 f.write(self.DATA) 737 self.assertEqual(f.name, self.filename) 738 with BZ2File(filename, "rb") as f: 739 self.assertEqual(f.read(), self.DATA) 740 self.assertEqual(f.name, self.filename) 741 742 def testDecompressLimited(self): 743 """Decompressed data buffering should be limited""" 744 bomb = bz2.compress(b'\0' * int(2e6), compresslevel=9) 745 self.assertLess(len(bomb), _compression.BUFFER_SIZE) 746 747 decomp = BZ2File(BytesIO(bomb)) 748 self.assertEqual(decomp.read(1), b'\0') 749 max_decomp = 1 + DEFAULT_BUFFER_SIZE 750 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, 751 "Excessive amount of data was decompressed") 752 753 754 # Tests for a BZ2File wrapping another file object: 755 756 def testReadBytesIO(self): 757 with BytesIO(self.DATA) as bio: 758 with BZ2File(bio) as bz2f: 759 self.assertRaises(TypeError, bz2f.read, float()) 760 self.assertEqual(bz2f.read(), self.TEXT) 761 with self.assertRaises(AttributeError): 762 bz2.name 763 self.assertEqual(bz2f.mode, 'rb') 764 self.assertFalse(bio.closed) 765 766 def testPeekBytesIO(self): 767 with BytesIO(self.DATA) as bio: 768 with BZ2File(bio) as bz2f: 769 pdata = bz2f.peek() 770 self.assertNotEqual(len(pdata), 0) 771 self.assertTrue(self.TEXT.startswith(pdata)) 772 self.assertEqual(bz2f.read(), self.TEXT) 773 774 def testWriteBytesIO(self): 775 with BytesIO() as bio: 776 with BZ2File(bio, "w") as bz2f: 777 self.assertRaises(TypeError, bz2f.write) 778 bz2f.write(self.TEXT) 779 with self.assertRaises(AttributeError): 780 bz2.name 781 self.assertEqual(bz2f.mode, 'wb') 782 self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT) 783 self.assertFalse(bio.closed) 784 785 def testSeekForwardBytesIO(self): 786 with BytesIO(self.DATA) as bio: 787 with BZ2File(bio) as bz2f: 788 self.assertRaises(TypeError, bz2f.seek) 789 bz2f.seek(150) 790 self.assertEqual(bz2f.read(), self.TEXT[150:]) 791 792 def testSeekBackwardsBytesIO(self): 793 with BytesIO(self.DATA) as bio: 794 with BZ2File(bio) as bz2f: 795 bz2f.read(500) 796 bz2f.seek(-150, 1) 797 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 798 799 def test_read_truncated(self): 800 # Drop the eos_magic field (6 bytes) and CRC (4 bytes). 801 truncated = self.DATA[:-10] 802 with BZ2File(BytesIO(truncated)) as f: 803 self.assertRaises(EOFError, f.read) 804 with BZ2File(BytesIO(truncated)) as f: 805 self.assertEqual(f.read(len(self.TEXT)), self.TEXT) 806 self.assertRaises(EOFError, f.read, 1) 807 # Incomplete 4-byte file header, and block header of at least 146 bits. 808 for i in range(22): 809 with BZ2File(BytesIO(truncated[:i])) as f: 810 self.assertRaises(EOFError, f.read, 1) 811 812 def test_issue44439(self): 813 q = array.array('Q', [1, 2, 3, 4, 5]) 814 LENGTH = len(q) * q.itemsize 815 816 with BZ2File(BytesIO(), 'w') as f: 817 self.assertEqual(f.write(q), LENGTH) 818 self.assertEqual(f.tell(), LENGTH) 819 820 821class BZ2CompressorTest(BaseTest): 822 def testCompress(self): 823 bz2c = BZ2Compressor() 824 self.assertRaises(TypeError, bz2c.compress) 825 data = bz2c.compress(self.TEXT) 826 data += bz2c.flush() 827 self.assertEqual(ext_decompress(data), self.TEXT) 828 829 def testCompressEmptyString(self): 830 bz2c = BZ2Compressor() 831 data = bz2c.compress(b'') 832 data += bz2c.flush() 833 self.assertEqual(data, self.EMPTY_DATA) 834 835 def testCompressChunks10(self): 836 bz2c = BZ2Compressor() 837 n = 0 838 data = b'' 839 while True: 840 str = self.TEXT[n*10:(n+1)*10] 841 if not str: 842 break 843 data += bz2c.compress(str) 844 n += 1 845 data += bz2c.flush() 846 self.assertEqual(ext_decompress(data), self.TEXT) 847 848 @support.skip_if_pgo_task 849 @bigmemtest(size=_4G + 100, memuse=2) 850 def testCompress4G(self, size): 851 # "Test BZ2Compressor.compress()/flush() with >4GiB input" 852 bz2c = BZ2Compressor() 853 data = b"x" * size 854 try: 855 compressed = bz2c.compress(data) 856 compressed += bz2c.flush() 857 finally: 858 data = None # Release memory 859 data = bz2.decompress(compressed) 860 try: 861 self.assertEqual(len(data), size) 862 self.assertEqual(len(data.strip(b"x")), 0) 863 finally: 864 data = None 865 866 def testPickle(self): 867 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 868 with self.assertRaises(TypeError): 869 pickle.dumps(BZ2Compressor(), proto) 870 871 872class BZ2DecompressorTest(BaseTest): 873 def test_Constructor(self): 874 self.assertRaises(TypeError, BZ2Decompressor, 42) 875 876 def testDecompress(self): 877 bz2d = BZ2Decompressor() 878 self.assertRaises(TypeError, bz2d.decompress) 879 text = bz2d.decompress(self.DATA) 880 self.assertEqual(text, self.TEXT) 881 882 def testDecompressChunks10(self): 883 bz2d = BZ2Decompressor() 884 text = b'' 885 n = 0 886 while True: 887 str = self.DATA[n*10:(n+1)*10] 888 if not str: 889 break 890 text += bz2d.decompress(str) 891 n += 1 892 self.assertEqual(text, self.TEXT) 893 894 def testDecompressUnusedData(self): 895 bz2d = BZ2Decompressor() 896 unused_data = b"this is unused data" 897 text = bz2d.decompress(self.DATA+unused_data) 898 self.assertEqual(text, self.TEXT) 899 self.assertEqual(bz2d.unused_data, unused_data) 900 901 def testEOFError(self): 902 bz2d = BZ2Decompressor() 903 text = bz2d.decompress(self.DATA) 904 self.assertRaises(EOFError, bz2d.decompress, b"anything") 905 self.assertRaises(EOFError, bz2d.decompress, b"") 906 907 @support.skip_if_pgo_task 908 @bigmemtest(size=_4G + 100, memuse=3.3) 909 def testDecompress4G(self, size): 910 # "Test BZ2Decompressor.decompress() with >4GiB input" 911 blocksize = min(10 * 1024 * 1024, size) 912 block = random.randbytes(blocksize) 913 try: 914 data = block * ((size-1) // blocksize + 1) 915 compressed = bz2.compress(data) 916 bz2d = BZ2Decompressor() 917 decompressed = bz2d.decompress(compressed) 918 self.assertTrue(decompressed == data) 919 finally: 920 data = None 921 compressed = None 922 decompressed = None 923 924 def testPickle(self): 925 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 926 with self.assertRaises(TypeError): 927 pickle.dumps(BZ2Decompressor(), proto) 928 929 def testDecompressorChunksMaxsize(self): 930 bzd = BZ2Decompressor() 931 max_length = 100 932 out = [] 933 934 # Feed some input 935 len_ = len(self.BIG_DATA) - 64 936 out.append(bzd.decompress(self.BIG_DATA[:len_], 937 max_length=max_length)) 938 self.assertFalse(bzd.needs_input) 939 self.assertEqual(len(out[-1]), max_length) 940 941 # Retrieve more data without providing more input 942 out.append(bzd.decompress(b'', max_length=max_length)) 943 self.assertFalse(bzd.needs_input) 944 self.assertEqual(len(out[-1]), max_length) 945 946 # Retrieve more data while providing more input 947 out.append(bzd.decompress(self.BIG_DATA[len_:], 948 max_length=max_length)) 949 self.assertLessEqual(len(out[-1]), max_length) 950 951 # Retrieve remaining uncompressed data 952 while not bzd.eof: 953 out.append(bzd.decompress(b'', max_length=max_length)) 954 self.assertLessEqual(len(out[-1]), max_length) 955 956 out = b"".join(out) 957 self.assertEqual(out, self.BIG_TEXT) 958 self.assertEqual(bzd.unused_data, b"") 959 960 def test_decompressor_inputbuf_1(self): 961 # Test reusing input buffer after moving existing 962 # contents to beginning 963 bzd = BZ2Decompressor() 964 out = [] 965 966 # Create input buffer and fill it 967 self.assertEqual(bzd.decompress(self.DATA[:100], 968 max_length=0), b'') 969 970 # Retrieve some results, freeing capacity at beginning 971 # of input buffer 972 out.append(bzd.decompress(b'', 2)) 973 974 # Add more data that fits into input buffer after 975 # moving existing data to beginning 976 out.append(bzd.decompress(self.DATA[100:105], 15)) 977 978 # Decompress rest of data 979 out.append(bzd.decompress(self.DATA[105:])) 980 self.assertEqual(b''.join(out), self.TEXT) 981 982 def test_decompressor_inputbuf_2(self): 983 # Test reusing input buffer by appending data at the 984 # end right away 985 bzd = BZ2Decompressor() 986 out = [] 987 988 # Create input buffer and empty it 989 self.assertEqual(bzd.decompress(self.DATA[:200], 990 max_length=0), b'') 991 out.append(bzd.decompress(b'')) 992 993 # Fill buffer with new data 994 out.append(bzd.decompress(self.DATA[200:280], 2)) 995 996 # Append some more data, not enough to require resize 997 out.append(bzd.decompress(self.DATA[280:300], 2)) 998 999 # Decompress rest of data 1000 out.append(bzd.decompress(self.DATA[300:])) 1001 self.assertEqual(b''.join(out), self.TEXT) 1002 1003 def test_decompressor_inputbuf_3(self): 1004 # Test reusing input buffer after extending it 1005 1006 bzd = BZ2Decompressor() 1007 out = [] 1008 1009 # Create almost full input buffer 1010 out.append(bzd.decompress(self.DATA[:200], 5)) 1011 1012 # Add even more data to it, requiring resize 1013 out.append(bzd.decompress(self.DATA[200:300], 5)) 1014 1015 # Decompress rest of data 1016 out.append(bzd.decompress(self.DATA[300:])) 1017 self.assertEqual(b''.join(out), self.TEXT) 1018 1019 def test_failure(self): 1020 bzd = BZ2Decompressor() 1021 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 1022 # Previously, a second call could crash due to internal inconsistency 1023 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 1024 1025 @support.refcount_test 1026 def test_refleaks_in___init__(self): 1027 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 1028 bzd = BZ2Decompressor() 1029 refs_before = gettotalrefcount() 1030 for i in range(100): 1031 bzd.__init__() 1032 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 1033 1034 def test_uninitialized_BZ2Decompressor_crash(self): 1035 self.assertEqual(BZ2Decompressor.__new__(BZ2Decompressor). 1036 decompress(bytes()), b'') 1037 1038 1039class CompressDecompressTest(BaseTest): 1040 def testCompress(self): 1041 data = bz2.compress(self.TEXT) 1042 self.assertEqual(ext_decompress(data), self.TEXT) 1043 1044 def testCompressEmptyString(self): 1045 text = bz2.compress(b'') 1046 self.assertEqual(text, self.EMPTY_DATA) 1047 1048 def testDecompress(self): 1049 text = bz2.decompress(self.DATA) 1050 self.assertEqual(text, self.TEXT) 1051 1052 def testDecompressEmpty(self): 1053 text = bz2.decompress(b"") 1054 self.assertEqual(text, b"") 1055 1056 def testDecompressToEmptyString(self): 1057 text = bz2.decompress(self.EMPTY_DATA) 1058 self.assertEqual(text, b'') 1059 1060 def testDecompressIncomplete(self): 1061 self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10]) 1062 1063 def testDecompressBadData(self): 1064 self.assertRaises(OSError, bz2.decompress, self.BAD_DATA) 1065 1066 def testDecompressMultiStream(self): 1067 text = bz2.decompress(self.DATA * 5) 1068 self.assertEqual(text, self.TEXT * 5) 1069 1070 def testDecompressTrailingJunk(self): 1071 text = bz2.decompress(self.DATA + self.BAD_DATA) 1072 self.assertEqual(text, self.TEXT) 1073 1074 def testDecompressMultiStreamTrailingJunk(self): 1075 text = bz2.decompress(self.DATA * 5 + self.BAD_DATA) 1076 self.assertEqual(text, self.TEXT * 5) 1077 1078 1079class OpenTest(BaseTest): 1080 "Test the open function." 1081 1082 def open(self, *args, **kwargs): 1083 return bz2.open(*args, **kwargs) 1084 1085 def test_binary_modes(self): 1086 for mode in ("wb", "xb"): 1087 if mode == "xb": 1088 unlink(self.filename) 1089 with self.open(self.filename, mode) as f: 1090 f.write(self.TEXT) 1091 with open(self.filename, "rb") as f: 1092 file_data = ext_decompress(f.read()) 1093 self.assertEqual(file_data, self.TEXT) 1094 with self.open(self.filename, "rb") as f: 1095 self.assertEqual(f.read(), self.TEXT) 1096 with self.open(self.filename, "ab") as f: 1097 f.write(self.TEXT) 1098 with open(self.filename, "rb") as f: 1099 file_data = ext_decompress(f.read()) 1100 self.assertEqual(file_data, self.TEXT * 2) 1101 1102 def test_implicit_binary_modes(self): 1103 # Test implicit binary modes (no "b" or "t" in mode string). 1104 for mode in ("w", "x"): 1105 if mode == "x": 1106 unlink(self.filename) 1107 with self.open(self.filename, mode) as f: 1108 f.write(self.TEXT) 1109 with open(self.filename, "rb") as f: 1110 file_data = ext_decompress(f.read()) 1111 self.assertEqual(file_data, self.TEXT) 1112 with self.open(self.filename, "r") as f: 1113 self.assertEqual(f.read(), self.TEXT) 1114 with self.open(self.filename, "a") as f: 1115 f.write(self.TEXT) 1116 with open(self.filename, "rb") as f: 1117 file_data = ext_decompress(f.read()) 1118 self.assertEqual(file_data, self.TEXT * 2) 1119 1120 def test_text_modes(self): 1121 text = self.TEXT.decode("ascii") 1122 text_native_eol = text.replace("\n", os.linesep) 1123 for mode in ("wt", "xt"): 1124 if mode == "xt": 1125 unlink(self.filename) 1126 with self.open(self.filename, mode, encoding="ascii") as f: 1127 f.write(text) 1128 with open(self.filename, "rb") as f: 1129 file_data = ext_decompress(f.read()).decode("ascii") 1130 self.assertEqual(file_data, text_native_eol) 1131 with self.open(self.filename, "rt", encoding="ascii") as f: 1132 self.assertEqual(f.read(), text) 1133 with self.open(self.filename, "at", encoding="ascii") as f: 1134 f.write(text) 1135 with open(self.filename, "rb") as f: 1136 file_data = ext_decompress(f.read()).decode("ascii") 1137 self.assertEqual(file_data, text_native_eol * 2) 1138 1139 def test_x_mode(self): 1140 for mode in ("x", "xb", "xt"): 1141 unlink(self.filename) 1142 encoding = "utf-8" if "t" in mode else None 1143 with self.open(self.filename, mode, encoding=encoding) as f: 1144 pass 1145 with self.assertRaises(FileExistsError): 1146 with self.open(self.filename, mode) as f: 1147 pass 1148 1149 def test_fileobj(self): 1150 with self.open(BytesIO(self.DATA), "r") as f: 1151 self.assertEqual(f.read(), self.TEXT) 1152 with self.open(BytesIO(self.DATA), "rb") as f: 1153 self.assertEqual(f.read(), self.TEXT) 1154 text = self.TEXT.decode("ascii") 1155 with self.open(BytesIO(self.DATA), "rt", encoding="utf-8") as f: 1156 self.assertEqual(f.read(), text) 1157 1158 def test_bad_params(self): 1159 # Test invalid parameter combinations. 1160 self.assertRaises(ValueError, 1161 self.open, self.filename, "wbt") 1162 self.assertRaises(ValueError, 1163 self.open, self.filename, "xbt") 1164 self.assertRaises(ValueError, 1165 self.open, self.filename, "rb", encoding="utf-8") 1166 self.assertRaises(ValueError, 1167 self.open, self.filename, "rb", errors="ignore") 1168 self.assertRaises(ValueError, 1169 self.open, self.filename, "rb", newline="\n") 1170 1171 def test_encoding(self): 1172 # Test non-default encoding. 1173 text = self.TEXT.decode("ascii") 1174 text_native_eol = text.replace("\n", os.linesep) 1175 with self.open(self.filename, "wt", encoding="utf-16-le") as f: 1176 f.write(text) 1177 with open(self.filename, "rb") as f: 1178 file_data = ext_decompress(f.read()).decode("utf-16-le") 1179 self.assertEqual(file_data, text_native_eol) 1180 with self.open(self.filename, "rt", encoding="utf-16-le") as f: 1181 self.assertEqual(f.read(), text) 1182 1183 def test_encoding_error_handler(self): 1184 # Test with non-default encoding error handler. 1185 with self.open(self.filename, "wb") as f: 1186 f.write(b"foo\xffbar") 1187 with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \ 1188 as f: 1189 self.assertEqual(f.read(), "foobar") 1190 1191 def test_newline(self): 1192 # Test with explicit newline (universal newline mode disabled). 1193 text = self.TEXT.decode("ascii") 1194 with self.open(self.filename, "wt", encoding="utf-8", newline="\n") as f: 1195 f.write(text) 1196 with self.open(self.filename, "rt", encoding="utf-8", newline="\r") as f: 1197 self.assertEqual(f.readlines(), [text]) 1198 1199 1200def tearDownModule(): 1201 support.reap_children() 1202 1203 1204if __name__ == '__main__': 1205 unittest.main() 1206