1"""Test largefile support on system where this makes sense. 2""" 3 4import os 5import stat 6import sys 7import unittest 8import socket 9import shutil 10import threading 11from test.support import requires, bigmemtest 12from test.support import SHORT_TIMEOUT 13from test.support import socket_helper 14from test.support.os_helper import TESTFN, unlink 15import io # C implementation of io 16import _pyio as pyio # Python implementation of io 17 18# size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes) 19size = 2_500_000_000 20TESTFN2 = TESTFN + '2' 21 22 23class LargeFileTest: 24 25 def setUp(self): 26 if os.path.exists(TESTFN): 27 mode = 'r+b' 28 else: 29 mode = 'w+b' 30 31 with self.open(TESTFN, mode) as f: 32 current_size = os.fstat(f.fileno())[stat.ST_SIZE] 33 if current_size == size+1: 34 return 35 36 if current_size == 0: 37 f.write(b'z') 38 39 f.seek(0) 40 f.seek(size) 41 f.write(b'a') 42 f.flush() 43 self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1) 44 45 @classmethod 46 def tearDownClass(cls): 47 with cls.open(TESTFN, 'wb'): 48 pass 49 if not os.stat(TESTFN)[stat.ST_SIZE] == 0: 50 raise cls.failureException('File was not truncated by opening ' 51 'with mode "wb"') 52 unlink(TESTFN2) 53 54 55class TestFileMethods(LargeFileTest): 56 """Test that each file function works as expected for large 57 (i.e. > 2 GiB) files. 58 """ 59 60 # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes, 61 # so memuse=2 is needed 62 @bigmemtest(size=size, memuse=2, dry_run=False) 63 def test_large_read(self, _size): 64 # bpo-24658: Test that a read greater than 2GB does not fail. 65 with self.open(TESTFN, "rb") as f: 66 self.assertEqual(len(f.read()), size + 1) 67 self.assertEqual(f.tell(), size + 1) 68 69 def test_osstat(self): 70 self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1) 71 72 def test_seek_read(self): 73 with self.open(TESTFN, 'rb') as f: 74 self.assertEqual(f.tell(), 0) 75 self.assertEqual(f.read(1), b'z') 76 self.assertEqual(f.tell(), 1) 77 f.seek(0) 78 self.assertEqual(f.tell(), 0) 79 f.seek(0, 0) 80 self.assertEqual(f.tell(), 0) 81 f.seek(42) 82 self.assertEqual(f.tell(), 42) 83 f.seek(42, 0) 84 self.assertEqual(f.tell(), 42) 85 f.seek(42, 1) 86 self.assertEqual(f.tell(), 84) 87 f.seek(0, 1) 88 self.assertEqual(f.tell(), 84) 89 f.seek(0, 2) # seek from the end 90 self.assertEqual(f.tell(), size + 1 + 0) 91 f.seek(-10, 2) 92 self.assertEqual(f.tell(), size + 1 - 10) 93 f.seek(-size-1, 2) 94 self.assertEqual(f.tell(), 0) 95 f.seek(size) 96 self.assertEqual(f.tell(), size) 97 # the 'a' that was written at the end of file above 98 self.assertEqual(f.read(1), b'a') 99 f.seek(-size-1, 1) 100 self.assertEqual(f.read(1), b'z') 101 self.assertEqual(f.tell(), 1) 102 103 def test_lseek(self): 104 with self.open(TESTFN, 'rb') as f: 105 self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) 106 self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) 107 self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) 108 self.assertEqual(os.lseek(f.fileno(), 0, 1), 84) 109 self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0) 110 self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10) 111 self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) 112 self.assertEqual(os.lseek(f.fileno(), size, 0), size) 113 # the 'a' that was written at the end of file above 114 self.assertEqual(f.read(1), b'a') 115 116 def test_truncate(self): 117 with self.open(TESTFN, 'r+b') as f: 118 if not hasattr(f, 'truncate'): 119 raise unittest.SkipTest("open().truncate() not available " 120 "on this system") 121 f.seek(0, 2) 122 # else we've lost track of the true size 123 self.assertEqual(f.tell(), size+1) 124 # Cut it back via seek + truncate with no argument. 125 newsize = size - 10 126 f.seek(newsize) 127 f.truncate() 128 self.assertEqual(f.tell(), newsize) # else pointer moved 129 f.seek(0, 2) 130 self.assertEqual(f.tell(), newsize) # else wasn't truncated 131 # Ensure that truncate(smaller than true size) shrinks 132 # the file. 133 newsize -= 1 134 f.seek(42) 135 f.truncate(newsize) 136 self.assertEqual(f.tell(), 42) 137 f.seek(0, 2) 138 self.assertEqual(f.tell(), newsize) 139 # XXX truncate(larger than true size) is ill-defined 140 # across platform; cut it waaaaay back 141 f.seek(0) 142 f.truncate(1) 143 self.assertEqual(f.tell(), 0) # else pointer moved 144 f.seek(0) 145 self.assertEqual(len(f.read()), 1) # else wasn't truncated 146 147 def test_seekable(self): 148 # Issue #5016; seekable() can return False when the current position 149 # is negative when truncated to an int. 150 for pos in (2**31-1, 2**31, 2**31+1): 151 with self.open(TESTFN, 'rb') as f: 152 f.seek(pos) 153 self.assertTrue(f.seekable()) 154 155 156def skip_no_disk_space(path, required): 157 def decorator(fun): 158 def wrapper(*args, **kwargs): 159 if shutil.disk_usage(os.path.realpath(path)).free < required: 160 hsize = int(required / 1024 / 1024) 161 raise unittest.SkipTest( 162 f"required {hsize} MiB of free disk space") 163 return fun(*args, **kwargs) 164 return wrapper 165 return decorator 166 167 168class TestCopyfile(LargeFileTest, unittest.TestCase): 169 open = staticmethod(io.open) 170 171 # Exact required disk space would be (size * 2), but let's give it a 172 # bit more tolerance. 173 @skip_no_disk_space(TESTFN, size * 2.5) 174 def test_it(self): 175 # Internally shutil.copyfile() can use "fast copy" methods like 176 # os.sendfile(). 177 size = os.path.getsize(TESTFN) 178 shutil.copyfile(TESTFN, TESTFN2) 179 self.assertEqual(os.path.getsize(TESTFN2), size) 180 with open(TESTFN2, 'rb') as f: 181 self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') 182 f.seek(size - 5) 183 self.assertEqual(f.read(), b'\x00\x00\x00\x00a') 184 185 186@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported') 187class TestSocketSendfile(LargeFileTest, unittest.TestCase): 188 open = staticmethod(io.open) 189 timeout = SHORT_TIMEOUT 190 191 def setUp(self): 192 super().setUp() 193 self.thread = None 194 195 def tearDown(self): 196 super().tearDown() 197 if self.thread is not None: 198 self.thread.join(self.timeout) 199 self.thread = None 200 201 def tcp_server(self, sock): 202 def run(sock): 203 with sock: 204 conn, _ = sock.accept() 205 conn.settimeout(self.timeout) 206 with conn, open(TESTFN2, 'wb') as f: 207 event.wait(self.timeout) 208 while True: 209 chunk = conn.recv(65536) 210 if not chunk: 211 return 212 f.write(chunk) 213 214 event = threading.Event() 215 sock.settimeout(self.timeout) 216 self.thread = threading.Thread(target=run, args=(sock, )) 217 self.thread.start() 218 event.set() 219 220 # Exact required disk space would be (size * 2), but let's give it a 221 # bit more tolerance. 222 @skip_no_disk_space(TESTFN, size * 2.5) 223 def test_it(self): 224 port = socket_helper.find_unused_port() 225 with socket.create_server(("", port)) as sock: 226 self.tcp_server(sock) 227 with socket.create_connection(("127.0.0.1", port)) as client: 228 with open(TESTFN, 'rb') as f: 229 client.sendfile(f) 230 self.tearDown() 231 232 size = os.path.getsize(TESTFN) 233 self.assertEqual(os.path.getsize(TESTFN2), size) 234 with open(TESTFN2, 'rb') as f: 235 self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') 236 f.seek(size - 5) 237 self.assertEqual(f.read(), b'\x00\x00\x00\x00a') 238 239 240def setUpModule(): 241 try: 242 import signal 243 # The default handler for SIGXFSZ is to abort the process. 244 # By ignoring it, system calls exceeding the file size resource 245 # limit will raise OSError instead of crashing the interpreter. 246 signal.signal(signal.SIGXFSZ, signal.SIG_IGN) 247 except (ImportError, AttributeError): 248 pass 249 250 # On Windows and Mac OSX this test consumes large resources; It 251 # takes a long time to build the >2 GiB file and takes >2 GiB of disk 252 # space therefore the resource must be enabled to run this test. 253 # If not, nothing after this line stanza will be executed. 254 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 255 requires('largefile', 256 'test requires %s bytes and a long time to run' % str(size)) 257 else: 258 # Only run if the current filesystem supports large files. 259 # (Skip this test on Windows, since we now always support 260 # large files.) 261 f = open(TESTFN, 'wb', buffering=0) 262 try: 263 # 2**31 == 2147483648 264 f.seek(2147483649) 265 # Seeking is not enough of a test: you must write and flush, too! 266 f.write(b'x') 267 f.flush() 268 except (OSError, OverflowError): 269 raise unittest.SkipTest("filesystem does not have " 270 "largefile support") 271 finally: 272 f.close() 273 unlink(TESTFN) 274 275 276class CLargeFileTest(TestFileMethods, unittest.TestCase): 277 open = staticmethod(io.open) 278 279 280class PyLargeFileTest(TestFileMethods, unittest.TestCase): 281 open = staticmethod(pyio.open) 282 283 284def tearDownModule(): 285 unlink(TESTFN) 286 unlink(TESTFN2) 287 288 289if __name__ == '__main__': 290 unittest.main() 291