1"""Test program for the fcntl C module. 2""" 3import platform 4import os 5import struct 6import sys 7import unittest 8from multiprocessing import Process 9from test.support import (verbose, TESTFN, unlink, run_unittest, import_module, 10 cpython_only) 11 12# Skip test if no fcntl module. 13fcntl = import_module('fcntl') 14 15 16 17def get_lockdata(): 18 try: 19 os.O_LARGEFILE 20 except AttributeError: 21 start_len = "ll" 22 else: 23 start_len = "qq" 24 25 if (sys.platform.startswith(('netbsd', 'freebsd', 'openbsd')) 26 or sys.platform == 'darwin'): 27 if struct.calcsize('l') == 8: 28 off_t = 'l' 29 pid_t = 'i' 30 else: 31 off_t = 'lxxxx' 32 pid_t = 'l' 33 lockdata = struct.pack(off_t + off_t + pid_t + 'hh', 0, 0, 0, 34 fcntl.F_WRLCK, 0) 35 elif sys.platform.startswith('gnukfreebsd'): 36 lockdata = struct.pack('qqihhi', 0, 0, 0, fcntl.F_WRLCK, 0, 0) 37 elif sys.platform in ['hp-uxB', 'unixware7']: 38 lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0) 39 else: 40 lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0) 41 if lockdata: 42 if verbose: 43 print('struct.pack: ', repr(lockdata)) 44 return lockdata 45 46lockdata = get_lockdata() 47 48class BadFile: 49 def __init__(self, fn): 50 self.fn = fn 51 def fileno(self): 52 return self.fn 53 54def try_lockf_on_other_process_fail(fname, cmd): 55 f = open(fname, 'wb+') 56 try: 57 fcntl.lockf(f, cmd) 58 except BlockingIOError: 59 pass 60 finally: 61 f.close() 62 63def try_lockf_on_other_process(fname, cmd): 64 f = open(fname, 'wb+') 65 fcntl.lockf(f, cmd) 66 fcntl.lockf(f, fcntl.LOCK_UN) 67 f.close() 68 69class TestFcntl(unittest.TestCase): 70 71 def setUp(self): 72 self.f = None 73 74 def tearDown(self): 75 if self.f and not self.f.closed: 76 self.f.close() 77 unlink(TESTFN) 78 79 def test_fcntl_fileno(self): 80 # the example from the library docs 81 self.f = open(TESTFN, 'wb') 82 rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) 83 if verbose: 84 print('Status from fcntl with O_NONBLOCK: ', rv) 85 rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata) 86 if verbose: 87 print('String from fcntl with F_SETLKW: ', repr(rv)) 88 self.f.close() 89 90 def test_fcntl_file_descriptor(self): 91 # again, but pass the file rather than numeric descriptor 92 self.f = open(TESTFN, 'wb') 93 rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK) 94 if verbose: 95 print('Status from fcntl with O_NONBLOCK: ', rv) 96 rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata) 97 if verbose: 98 print('String from fcntl with F_SETLKW: ', repr(rv)) 99 self.f.close() 100 101 def test_fcntl_bad_file(self): 102 with self.assertRaises(ValueError): 103 fcntl.fcntl(-1, fcntl.F_SETFL, os.O_NONBLOCK) 104 with self.assertRaises(ValueError): 105 fcntl.fcntl(BadFile(-1), fcntl.F_SETFL, os.O_NONBLOCK) 106 with self.assertRaises(TypeError): 107 fcntl.fcntl('spam', fcntl.F_SETFL, os.O_NONBLOCK) 108 with self.assertRaises(TypeError): 109 fcntl.fcntl(BadFile('spam'), fcntl.F_SETFL, os.O_NONBLOCK) 110 111 @cpython_only 112 def test_fcntl_bad_file_overflow(self): 113 from _testcapi import INT_MAX, INT_MIN 114 # Issue 15989 115 with self.assertRaises(OverflowError): 116 fcntl.fcntl(INT_MAX + 1, fcntl.F_SETFL, os.O_NONBLOCK) 117 with self.assertRaises(OverflowError): 118 fcntl.fcntl(BadFile(INT_MAX + 1), fcntl.F_SETFL, os.O_NONBLOCK) 119 with self.assertRaises(OverflowError): 120 fcntl.fcntl(INT_MIN - 1, fcntl.F_SETFL, os.O_NONBLOCK) 121 with self.assertRaises(OverflowError): 122 fcntl.fcntl(BadFile(INT_MIN - 1), fcntl.F_SETFL, os.O_NONBLOCK) 123 124 @unittest.skipIf( 125 platform.machine().startswith('arm') and platform.system() == 'Linux', 126 "ARM Linux returns EINVAL for F_NOTIFY DN_MULTISHOT") 127 def test_fcntl_64_bit(self): 128 # Issue #1309352: fcntl shouldn't fail when the third arg fits in a 129 # C 'long' but not in a C 'int'. 130 try: 131 cmd = fcntl.F_NOTIFY 132 # This flag is larger than 2**31 in 64-bit builds 133 flags = fcntl.DN_MULTISHOT 134 except AttributeError: 135 self.skipTest("F_NOTIFY or DN_MULTISHOT unavailable") 136 fd = os.open(os.path.dirname(os.path.abspath(TESTFN)), os.O_RDONLY) 137 try: 138 fcntl.fcntl(fd, cmd, flags) 139 finally: 140 os.close(fd) 141 142 def test_flock(self): 143 # Solaris needs readable file for shared lock 144 self.f = open(TESTFN, 'wb+') 145 fileno = self.f.fileno() 146 fcntl.flock(fileno, fcntl.LOCK_SH) 147 fcntl.flock(fileno, fcntl.LOCK_UN) 148 fcntl.flock(self.f, fcntl.LOCK_SH | fcntl.LOCK_NB) 149 fcntl.flock(self.f, fcntl.LOCK_UN) 150 fcntl.flock(fileno, fcntl.LOCK_EX) 151 fcntl.flock(fileno, fcntl.LOCK_UN) 152 153 self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH) 154 self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH) 155 156 @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") 157 def test_lockf_exclusive(self): 158 self.f = open(TESTFN, 'wb+') 159 cmd = fcntl.LOCK_EX | fcntl.LOCK_NB 160 fcntl.lockf(self.f, cmd) 161 p = Process(target=try_lockf_on_other_process_fail, args=(TESTFN, cmd)) 162 p.start() 163 p.join() 164 fcntl.lockf(self.f, fcntl.LOCK_UN) 165 self.assertEqual(p.exitcode, 0) 166 167 @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") 168 def test_lockf_share(self): 169 self.f = open(TESTFN, 'wb+') 170 cmd = fcntl.LOCK_SH | fcntl.LOCK_NB 171 fcntl.lockf(self.f, cmd) 172 p = Process(target=try_lockf_on_other_process, args=(TESTFN, cmd)) 173 p.start() 174 p.join() 175 fcntl.lockf(self.f, fcntl.LOCK_UN) 176 self.assertEqual(p.exitcode, 0) 177 178 @cpython_only 179 def test_flock_overflow(self): 180 import _testcapi 181 self.assertRaises(OverflowError, fcntl.flock, _testcapi.INT_MAX+1, 182 fcntl.LOCK_SH) 183 184 @unittest.skipIf(sys.platform != 'darwin', "F_GETPATH is only available on macos") 185 def test_fcntl_f_getpath(self): 186 self.f = open(TESTFN, 'wb') 187 expected = os.path.abspath(TESTFN).encode('utf-8') 188 res = fcntl.fcntl(self.f.fileno(), fcntl.F_GETPATH, bytes(len(expected))) 189 self.assertEqual(expected, res) 190 191def test_main(): 192 run_unittest(TestFcntl) 193 194if __name__ == '__main__': 195 test_main() 196