1"""Test largefile support on system where this makes sense. 2""" 3 4from __future__ import print_function 5 6import os 7import stat 8import sys 9import unittest 10from test.test_support import run_unittest, TESTFN, verbose, requires, \ 11 unlink 12import io # C implementation of io 13import _pyio as pyio # Python implementation of io 14 15try: 16 import signal 17 # The default handler for SIGXFSZ is to abort the process. 18 # By ignoring it, system calls exceeding the file size resource 19 # limit will raise IOError instead of crashing the interpreter. 20 oldhandler = signal.signal(signal.SIGXFSZ, signal.SIG_IGN) 21except (ImportError, AttributeError): 22 pass 23 24# create >2GB file (2GB = 2147483648 bytes) 25size = 2500000000 26 27 28class LargeFileTest(unittest.TestCase): 29 """Test that each file function works as expected for a large 30 (i.e. > 2GB, do we have to check > 4GB) files. 31 32 NOTE: the order of execution of the test methods is important! test_seek 33 must run first to create the test file. File cleanup must also be handled 34 outside the test instances because of this. 35 36 """ 37 38 def test_seek(self): 39 if verbose: 40 print('create large file via seek (may be sparse file) ...') 41 with self.open(TESTFN, 'wb') as f: 42 f.write(b'z') 43 f.seek(0) 44 f.seek(size) 45 f.write(b'a') 46 f.flush() 47 if verbose: 48 print('check file size with os.fstat') 49 self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1) 50 51 def test_osstat(self): 52 if verbose: 53 print('check file size with os.stat') 54 self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1) 55 56 def test_seek_read(self): 57 if verbose: 58 print('play around with seek() and read() with the built largefile') 59 with self.open(TESTFN, 'rb') as f: 60 self.assertEqual(f.tell(), 0) 61 self.assertEqual(f.read(1), b'z') 62 self.assertEqual(f.tell(), 1) 63 f.seek(0) 64 self.assertEqual(f.tell(), 0) 65 f.seek(0, 0) 66 self.assertEqual(f.tell(), 0) 67 f.seek(42) 68 self.assertEqual(f.tell(), 42) 69 f.seek(42, 0) 70 self.assertEqual(f.tell(), 42) 71 f.seek(42, 1) 72 self.assertEqual(f.tell(), 84) 73 f.seek(0, 1) 74 self.assertEqual(f.tell(), 84) 75 f.seek(0, 2) # seek from the end 76 self.assertEqual(f.tell(), size + 1 + 0) 77 f.seek(-10, 2) 78 self.assertEqual(f.tell(), size + 1 - 10) 79 f.seek(-size-1, 2) 80 self.assertEqual(f.tell(), 0) 81 f.seek(size) 82 self.assertEqual(f.tell(), size) 83 # the 'a' that was written at the end of file above 84 self.assertEqual(f.read(1), b'a') 85 f.seek(-size-1, 1) 86 self.assertEqual(f.read(1), b'z') 87 self.assertEqual(f.tell(), 1) 88 89 def test_lseek(self): 90 if verbose: 91 print('play around with os.lseek() with the built largefile') 92 with self.open(TESTFN, 'rb') as f: 93 self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) 94 self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) 95 self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) 96 self.assertEqual(os.lseek(f.fileno(), 0, 1), 84) 97 self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0) 98 self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10) 99 self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) 100 self.assertEqual(os.lseek(f.fileno(), size, 0), size) 101 # the 'a' that was written at the end of file above 102 self.assertEqual(f.read(1), b'a') 103 104 def test_truncate(self): 105 if verbose: 106 print('try truncate') 107 with self.open(TESTFN, 'r+b') as f: 108 # this is already decided before start running the test suite 109 # but we do it anyway for extra protection 110 if not hasattr(f, 'truncate'): 111 raise unittest.SkipTest("open().truncate() not available on this system") 112 f.seek(0, 2) 113 # else we've lost track of the true size 114 self.assertEqual(f.tell(), size+1) 115 # Cut it back via seek + truncate with no argument. 116 newsize = size - 10 117 f.seek(newsize) 118 f.truncate() 119 self.assertEqual(f.tell(), newsize) # else pointer moved 120 f.seek(0, 2) 121 self.assertEqual(f.tell(), newsize) # else wasn't truncated 122 # Ensure that truncate(smaller than true size) shrinks 123 # the file. 124 newsize -= 1 125 f.seek(42) 126 f.truncate(newsize) 127 if self.new_io: 128 self.assertEqual(f.tell(), 42) 129 f.seek(0, 2) 130 self.assertEqual(f.tell(), newsize) 131 # XXX truncate(larger than true size) is ill-defined 132 # across platform; cut it waaaaay back 133 f.seek(0) 134 f.truncate(1) 135 if self.new_io: 136 self.assertEqual(f.tell(), 0) # else pointer moved 137 f.seek(0) 138 self.assertEqual(len(f.read()), 1) # else wasn't truncated 139 140 def test_seekable(self): 141 # Issue #5016; seekable() can return False when the current position 142 # is negative when truncated to an int. 143 if not self.new_io: 144 self.skipTest("builtin file doesn't have seekable()") 145 for pos in (2**31-1, 2**31, 2**31+1): 146 with self.open(TESTFN, 'rb') as f: 147 f.seek(pos) 148 self.assertTrue(f.seekable()) 149 150 151def test_main(): 152 # On Windows and Mac OSX this test comsumes large resources; It 153 # takes a long time to build the >2GB file and takes >2GB of disk 154 # space therefore the resource must be enabled to run this test. 155 # If not, nothing after this line stanza will be executed. 156 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 157 requires('largefile', 158 'test requires %s bytes and a long time to run' % str(size)) 159 else: 160 # Only run if the current filesystem supports large files. 161 # (Skip this test on Windows, since we now always support 162 # large files.) 163 f = open(TESTFN, 'wb', buffering=0) 164 try: 165 # 2**31 == 2147483648 166 f.seek(2147483649) 167 # Seeking is not enough of a test: you must write and 168 # flush, too! 169 f.write(b'x') 170 f.flush() 171 except (IOError, OverflowError): 172 f.close() 173 unlink(TESTFN) 174 raise unittest.SkipTest("filesystem does not have largefile support") 175 else: 176 f.close() 177 suite = unittest.TestSuite() 178 for _open, prefix in [(io.open, 'C'), (pyio.open, 'Py'), 179 (open, 'Builtin')]: 180 class TestCase(LargeFileTest): 181 pass 182 TestCase.open = staticmethod(_open) 183 TestCase.new_io = _open is not open 184 TestCase.__name__ = prefix + LargeFileTest.__name__ 185 suite.addTest(TestCase('test_seek')) 186 suite.addTest(TestCase('test_osstat')) 187 suite.addTest(TestCase('test_seek_read')) 188 suite.addTest(TestCase('test_lseek')) 189 with _open(TESTFN, 'wb') as f: 190 if hasattr(f, 'truncate'): 191 suite.addTest(TestCase('test_truncate')) 192 suite.addTest(TestCase('test_seekable')) 193 unlink(TESTFN) 194 try: 195 run_unittest(suite) 196 finally: 197 unlink(TESTFN) 198 199if __name__ == '__main__': 200 test_main() 201